fc2ブログ

TBBと並列ソート

 並列処理を学習していると、初期の段階で気になるのがソートです。整列処理を並列化する事は誰しも考えます。しかし、実際に自分で実装するとなると結構難しい課題です。
 幸いインテルTBBには、並列ソートを行うテンプレート「parallel_sort」が予め用意されています。parallel_sortの使用法は簡単で、std::sortとほぼ同様なので、サンプルプログラムを見れば直ぐに分かると思います。

#include <iostream>
#include <iterator>
#include <algorithm>
#include "tbb/task_scheduler_init.h"
#include "tbb/parallel_sort.h"
#include "tbb/concurrent_vector.h"
using namespace std;
using namespace tbb;

int main()
{
    //データを準備
    concurrent_vector< char > v;
    v.push_back( 'c' );
    v.push_back( 'b' );
    v.push_back( 'd' );
    v.push_back( 'a' );

    //初期データを表示
    cout << "整列前のデータ: ";
    copy( v.begin(), v.end(), ostream_iterator< char >( cout, " " ) );
    cout << endl;

    //昇順でソート
    parallel_sort( v.begin(), v.end() );
    cout << "昇順で整列後のデータ: ";
    copy( v.begin(), v.end(), ostream_iterator< char >( cout, " " ) );
    cout << endl;

    //降順でソート
    parallel_sort( v.begin(), v.end(), std::greater< char >() );
    cout << "降順で整列後のデータ: ";
    copy( v.begin(), v.end(), ostream_iterator< char >( cout, " " ) );
    cout << endl;

    cout << endl;
    return 0;
}

 parallel_sortテンプレートは、平均時間計算量0(n log n)の不安定なソートを行います。不安定というのは、同値の要素については相対的な順序を維持しないという事を意味します。同値があり得る場合は注意しましょう。並列処理では、基本的に順序が保障されないと考えるとよいでしょう。順序を保証するとパフォーマンスが下がりますので、基本的に並列処理では順序に依存しないアルゴリズムを考えます。
 parallel_sortテンプレートは、RandomAccessIteratorを使用します。STLに存在する他のイテレーターは、本質的に直列であり、並列に向いていないので使用されません。
 これは余談ですが、書籍・ インテル スレッディング・ビルディング・ブロック ―マルチコア時代のC++並列プログラミング によると、インテルTBBの初期段階の設計では、並列範囲を表現する為にRandomAccessIteratorを使用していたそうです。今では再帰的な範囲を使用しています。並列処理の範囲の表現に再帰が選ばれたのは、非常に興味深い事柄です。


【参考資料】
スポンサーサイト



テーマ : プログラミング
ジャンル : コンピュータ

強く型付けするパイプラインテンプレートparallel_pipeline

 今回は前回予告した通りに、parallel_pipelineの解説を行います。
 先ずは、前回掲載したシーザー暗号のサンプルプログラムを、parallel_pipelineテンプレートを使って書き変えたプログラムを見て下さい。

#include <iostream>
#include <fstream>
#include "tbb/pipeline.h"
using namespace std;
using namespace tbb;

/*-------------------------------------------------------------
    フィルタークラス間でやり取りをするデータを管理するクラス
---------------------------------------------------------------*/
class Buffer
{
private:
    static const size_t buffer_size = 100;
    char* end_ptr;
    char data[ buffer_size ];
public:
    char* begin() { return data; }
    const char* begin() const { return data; }
    char* end() const { return end_ptr; }
    void set_end(  char* new_ptr ) { end_ptr = new_ptr; }
    size_t max_size() const { return buffer_size; }
    size_t size() const { return end_ptr - begin(); }
};

/*------------------------------------------------------
    指定されたファイルからデータを読み取るクラス
--------------------------------------------------------*/
class InputFileFilter 
{
public:
    static const size_t buffer_Count = 4;
private:
    ifstream* file;
    size_t next_buffer;
    Buffer buffer[ buffer_Count ];
public:
    InputFileFilter( ifstream* name ) :
            next_buffer( 0 ), file( name ) { }
    InputFileFilter( const InputFileFilter& f ) : 
            next_buffer( 0 ), file( f.file ) { }

    Buffer* operator()( flow_control& fc )
    {
        //入力データをファイルから読みだす
        Buffer& b = buffer[ next_buffer] ;
        next_buffer = ( next_buffer + 1 ) % buffer_Count;
        file->read( b.begin(), b.max_size() );
        int count = file->gcount();

        //終了判定が重要
        if ( count == 0 ) {
            fc.stop();
            return NULL;
        } else {
            b.set_end( b.begin() + count );
            return &b;
        }
    }
};

/*---------------------------------------------------
    受け取った文字列をシーザー暗号化するクラス
-----------------------------------------------------*/
class CaesarTransformFilter 
{
private:
    int key;
public:
    CaesarTransformFilter( int key ) : 
      key( key ) { }
      
    Buffer* operator()( Buffer* item )
    {
        for( char* s = item->begin(); s != item->end(); ++s ) {
            char val = *s;
            if ( ( val + key ) > 90 ) {
                *s = 64 + ( val + key - 90 );
            } else {
                *s = val + key;
            }
        }
        return item;
    }
};

/*---------------------------------------------------------
    受け取ったバッファの内容をファイルへ書き込むクラス
-----------------------------------------------------------*/
class OutputFileFilter 
{
    ofstream* file;
public:
    OutputFileFilter( ofstream* name  ) : 
      file( name ) { }

    OutputFileFilter( const OutputFileFilter& f ) : 
      file( f.file ) { }

    void operator()( Buffer* item )
    {
        file->write( item->begin(), item->size() );
        file->flush();
    }
};

/*------------------------------------------------------
    パイプラインパターンでシーザー暗号処理を実行する
    ※基本的な書き方
--------------------------------------------------------*/
void PipelineCaesar( string inputFileName, int key, string outputFileName )
{
    //使用するファイルを準備
    ifstream inputFile(inputFileName);
    ofstream outputFile( outputFileName );

    //各種フィルターを用意
    filter_t< void, Buffer* > inputFilter( 
        filter::serial_in_order, 
        InputFileFilter( &inputFile ) );
    filter_t< Buffer*, Buffer* > transFilter( 
        filter::parallel,
        CaesarTransformFilter( key ) );
    filter_t< Buffer*, void > outputFilter( 
        filter::serial_in_order,
        OutputFileFilter( &outputFile ) );
    filter_t< void, void > all_filter( 
        inputFilter & transFilter & outputFilter );
    
    //パイプライン実行
    parallel_pipeline( 
        InputFileFilter::buffer_Count,
        all_filter );

    //念のためにファイルを閉じる
    inputFile.close();
    outputFile.close();
};

/*------------------------------------------------------
    パイプラインパターンでシーザー暗号処理を実行する
    ※make_filterを使用した場合
--------------------------------------------------------*/
void PipelineCaesar1( string inputFileName, int key, string outputFileName )
{
    //使用するファイルを準備
    ifstream inputFile(inputFileName);
    ofstream outputFile( outputFileName );

    //パイプライン実行
    parallel_pipeline( 
        InputFileFilter::buffer_Count,
        make_filter< void, Buffer* > ( 
            filter::serial_in_order, 
            InputFileFilter( &inputFile ) ) &
        make_filter< Buffer*, Buffer* > (
            filter::parallel,
            CaesarTransformFilter( key ) ) &
        make_filter< Buffer*, void > ( 
            filter::serial_in_order,
            OutputFileFilter( &outputFile ) )
    );

    //念のためにファイルを閉じる
    inputFile.close();
    outputFile.close();
};

/*------------------------------------------------------
    サンプルデータをシーザー暗号化する処理を行う
--------------------------------------------------------*/
int main()
{
    //サンプルファイルを作成
    string inputFileName = "TestData.txt";
    ofstream file( inputFileName ); 
    for ( char i = 65; i < 91; i++ ) {
        file << i;
    }
    file.close();

    //確認
    ifstream fout( inputFileName );
    char c;
    cout << "変換前の文字" << endl;
    while( !fout.eof() ) {
        fout >> c;
        if ( !fout.eof() ) cout << c << ' '; 
    } 
    cout << endl;
    fout.close();

    //パイプライン実行
    int key = 1;
    string outputFileName = "CaesarData.txt";
    PipelineCaesar( inputFileName, key, outputFileName );

    //確認
    cout << endl;
    ifstream fout1( outputFileName );
    cout << "変換後の文字" << endl;
    while( !fout1.eof() ) {
        fout1 >> c;
        if ( !fout1.eof() ) cout << c << ' '; 
    }
    cout << endl << endl;
    fout1.close();
}

 処理内容は前回と同じですが、関数呼び出し演算子が強く型付けされている点が違います。前回のサンプルは、型付けが不十分で、voidポインターを使用していましたが、今回はBuffer型を明示的に指定しています。これにより、つまらないエラーを減らす事が出来ます。その他にも差異がいくつかありますので、これから1つずつ解説していきます。
 関数呼び出し演算子以外にも、フィルタークラスの定義方法が違います。pipelineテンプレートを使用した前回のサンプルは、tbb::filterクラスから派生させて、親のコンストラクタにモードを指定する必要がありました。ですが、parallel_pipelineを使用する場合、使用するフィルタークラスはtbb::filterクラスから派生させる必要がありませんparallel_pipelineテンプレートを使用する場合は、filter_tテンプレートを使用します。
 parallel_pipelineテンプレートで使用する各フィルタークラスは、filter_tテンプレートのインスタンスを通して、間接的に指定します。filter_tテンプレートのインスタンスの生成法は2つあり、それがプログラムの違いを生みます。
 1つめは、コンストラクタを使って、filter_tテンプレートのインスタンスを生成する方法です。サンプルプログラムのPipelineCaesarがこの方法でプログラミングしたものです。この方法は見やすいという利点がありますが、いちいちフィルターオブジェクトに命名しなくてはならず、プログラムが冗長になってしまいます。これは個人的な感想ですが、動的にフィルタークラスを追加する必要がなければ、この方法はお勧めできません。
 2つめは、make_filter関数を使用して、filter_tテンプレートのインスタンスを生成する方法です。サンプルプログラムのPipelineCaesar1がこの方法でプログラミングしたものです。この方法を使うと、プログラムが簡潔になります。また、ラムダ式とも親和性が高い方法で、フィルターが固定化している場合はこちらをお勧めします。
 ラムダ式を使うと、プログラムが簡潔なものとなります。気になる方も多いと思いますので、ラムダ式を使用した場合のシーザー暗号プログラムを掲載します。

#include <iostream>
#include <fstream>
#include "tbb/pipeline.h"
using namespace std;
using namespace tbb;

/*-------------------------------------------------------------
    フィルタークラス間でやり取りをするデータを管理するクラス
---------------------------------------------------------------*/
class Buffer
{
private:
    static const size_t buffer_size = 100;
    char* end_ptr;
    char data[ buffer_size ];
public:
    char* begin() { return data; }
    const char* begin() const { return data; }
    char* end() const { return end_ptr; }
    void set_end(  char* new_ptr ) { end_ptr = new_ptr; }
    size_t max_size() const { return buffer_size; }
    size_t size() const { return end_ptr - begin(); }
};

/*------------------------------------------------------
    パイプラインパターンでシーザー暗号処理を実行する
    ※ラムダ式を使用した場合
--------------------------------------------------------*/
void PipelineCaesar( string inputFileName, int key, string outputFileName )
{
    //使用するファイルを準備
    ifstream inputFile( inputFileName );
    ofstream outputFile( outputFileName );

    //使用する変数を用意
    int buffer_Count = 4;
    size_t next_buffer = 0;
    Buffer* buffer = new Buffer[ buffer_Count ];

    //パイプライン実行
    Buffer* b;
    parallel_pipeline( 
        buffer_Count,
        make_filter< void,  Buffer* > ( 
            filter::serial_in_order, 
            //入力データをファイルから読みだす
            [ &buffer, &next_buffer, &buffer_Count, &inputFile] 
            ( flow_control& fc ) -> Buffer* 
            {
                Buffer& b = buffer[ next_buffer] ;
                next_buffer = ( next_buffer + 1 ) % buffer_Count;
                inputFile.read( b.begin(), b.max_size() );
                int count = inputFile.gcount();
                if ( count == 0 ) {
                    fc.stop();
                    return NULL;
                } else {
                    b.set_end( b.begin() + count );
                    return &b;
                }
            }
            ) &
        make_filter< Buffer*, Buffer* > (
            filter::parallel,
            //入力ストリームに対してシーザー暗号化を行う
            [ &key ] ( Buffer* item ) -> Buffer* 
            {
                for( char* s = item->begin(); s != item->end(); ++s ) {
                char val = *s;
                if ( ( val + key ) > 90 ) {
                    *s = 64 + ( val + key - 90 );
                } else {
                    *s = val + key;
                }
                }
                return item;
            }
            ) &
        make_filter< Buffer*, void > ( 
            filter::serial_in_order,
            //暗号化したデータをファイルへ出力
            [ &outputFile ] ( Buffer* item )
            {
                outputFile.write( item->begin(), item->size() );
                outputFile.flush();
            }
            )
    );

    //念のためにファイルを閉じる
    inputFile.close();
    outputFile.close();
};

/*------------------------------------------------------
    サンプルデータをシーザー暗号化する処理を行う
--------------------------------------------------------*/
int main()
{
    //サンプルファイルを作成
    string inputFileName = "TestData.txt";
    ofstream file( inputFileName ); 
    for ( char i = 65; i < 91; i++ ) {
        file << i;
    }
    file.close();

    //確認
    ifstream fout( inputFileName );
    char c;
    cout << "変換前の文字" << endl;
    while( !fout.eof() ) {
        fout >> c;
        if ( !fout.eof() ) cout << c << ' '; 
    } 
    cout << endl;
    fout.close();

    //パイプライン実行
    int key = 1;
    string outputFileName = "CaesarData.txt";
    PipelineCaesar( inputFileName, key, outputFileName );

    //確認
    cout << endl;
    ifstream fout1( outputFileName );
    cout << "変換後の文字" << endl;
    while( !fout1.eof() ) {
        fout1 >> c;
        if ( !fout1.eof() ) cout << c << ' '; 
    }
    cout << endl << endl;
    fout1.close();
}

 見ての通りで、ラムダ式を使うとプログラムが短くなります。ただし、ラムダ式は使えばいいというものではありません。よく考えて、適切な状況で使用しましょう。


【参考資料】

テーマ : プログラミング
ジャンル : コンピュータ

インテルTBBとパイプライン

 並行プログラミングモデルには、パイプラインモデルもしくはパイプラインパターンがあります。長いので、これ以降はパイプラインと記述します。
 簡潔に言いますと、パイプラインとは、ストリームをいくつかの工程で順番に処理する並行処理モデルです。
 例としてシーザー暗号を挙げます。シーザー暗号は、文字を任意の数シフト事により暗号化する暗号法です。文字「A」を1シフトすると文字「B」、文字「B」を1シフトすると文字「C」というふうに文字を暗号化します。
 サンプルプログラムを掲載しますので、試してみて下さい。

#include <iostream>
#include <vector>
#include <algorithm>
#include <iterator>
using namespace std;

int main(void) 
{
    //暗号対象を用意
    vector< char > strs;
    for (int i = 0; i < 26; i++ ) {
        strs.push_back( static_cast< char >( i + 65 ) );
    }

    //確認
    cout << "暗号化する前の文字" << endl;
    copy( strs.begin(), strs.end(), 
        ostream_iterator< char >( cout, " " ) );
    cout << endl << endl;

    //暗号化
    int key = 1;
    vector< char > outs;
    for ( int i = 0; i < 26; i++ ) {
        char val = strs[ i ]; 
        if ( ( val + key ) > 90 ) {
            outs.push_back( 64 + ( val + key - 90 ) ); 
        } else {
            outs.push_back( val + key );
        }
    }

    //確認
    cout << "暗号化後の文字" << endl;
    copy( outs.begin(), outs.end(), 
        ostream_iterator< char >( cout, " " ) );
    cout << endl << endl << endl;
    return 0;
}
※このサンプルはASCII文字コードを前提としています。


 この一連のシーザー暗号化処理を並列化するのは、一般的には困難です。全ての処理を並列処理化してしまうと、ファイルへ書き込む値が順不同になってしまい、データが壊れてしまうでしょうし、他にも色々な不都合が起こるでしょう。
 ですが、文字列を変換するという工程だけは、なんの処理にも依存していませんので並列化できます。ここでパイプラインの出番です。パイプラインで処理をすると、一部の処理を並列化出来ます。パイプラインを自分で実装するのは大変ですが、幸いインテルTBBには、pipelineテンプレートが用意されていますので、比較的容易に並列化出来ます。
 シーザー暗号処理をパイプラインで行うサンプルを掲載します。

#include <iostream>
#include <fstream>
#include "tbb/pipeline.h"
using namespace std;
using namespace tbb;

/*-------------------------------------------------------------
    フィルタークラス間でやり取りをするデータを管理するクラス
---------------------------------------------------------------*/
class Buffer
{
private:
    static const size_t buffer_size = 100;
    char* end_ptr;
    char data[ buffer_size ];
public:
    char* begin() { return data; }
    const char* begin() const { return data; }
    char* end() const { return end_ptr; }
    void set_end(  char* new_ptr ) { end_ptr = new_ptr; }
    size_t max_size() const { return buffer_size; }
    size_t size() const { return end_ptr - begin(); }
};

/*------------------------------------------------------
    指定されたファイルからデータを読み取るクラス
--------------------------------------------------------*/
class InputFileFilter : public tbb::filter
{
public:
    static const size_t buffer_Count = 4;
private:
    ifstream file;
    size_t next_buffer;
    Buffer buffer[ buffer_Count ];
public:
    InputFileFilter( string name ) :
      filter( serial_in_order ),
            next_buffer( 0 ),
            file( name ) 
      { cout << "@InputFileFilterのコンストラクタが呼び出されました" << endl; }

     ~InputFileFilter() { file.close(); }

     void* operator()( void* )
    {
        cout << "@InputFileFilterの関数呼び出し演算子が呼び出されました" << endl;

        //入力データをファイルから読みだす
        Buffer& b = buffer[ next_buffer] ;
        next_buffer = ( next_buffer + 1 ) % buffer_Count;
        file.read( b.begin(), b.max_size() );
        int count = file.gcount();

        //終了判定が重要
        if ( count == 0 ) {
            return NULL; 
        } else {
            b.set_end( b.begin() + count );
            return &b;
        }
    }
};

/*---------------------------------------------------
    受け取った文字列をシーザー暗号化するクラス
-----------------------------------------------------*/
class CaesarTransformFilter : public tbb::filter
{
private:
    int key;
public:
    CaesarTransformFilter( int key ) : 
      tbb::filter( parallel ), key( key ) 
    {
        cout << 
            "*CaesarTransformFilterのコンストラクタが呼び出されました" << endl;
    }
      
    void* operator()( void* item )
    {
        cout << 
           "*CaesarTransformFilterの関数呼び出し演算子が呼び出されました" << endl;
        Buffer& b = *static_cast< Buffer* >( item );
        for( char* s = b.begin(); s != b.end(); ++s ) {
            char val = *s;
            if ( ( val + key ) > 90 ) {
                *s = 64 + ( val + key - 90 );
            } else {
                *s = val + key;
            }
        }
        return &b;
    }
};

/*---------------------------------------------------------
    受け取ったバッファの内容をファイルへ書き込むクラス
-----------------------------------------------------------*/
class OutputFileFilter : public tbb::filter
{
    ofstream file;
public:
    OutputFileFilter( string name  )
        : tbb::filter( serial_in_order ), file( name )
    {
        cout << 
            "+OutputFileFilterのコンストラクタが呼び出されました" << endl;
    }

     ~OutputFileFilter() { file.close(); }

     void* operator()( void* item)
    {
        cout << 
            "+OutputFileFilterの関数呼び出し演算子が呼び出されました" << endl;
        Buffer& b = *static_cast<Buffer*>( item );
        file.write( b.begin(), b.size() );
        file.flush();
        return NULL;
    }
};

/*------------------------------------------------------
    サンプルデータをシーザー暗号化する処理を行う
--------------------------------------------------------*/
int main()
{
    //サンプルファイルを作成
    string inputFileName = "TestData.txt";
    ofstream file( inputFileName ); 
    for ( char i = 65; i < 91; i++ )
    {
        file << i;
    }
    file.close();

    //確認
    ifstream fout( inputFileName );
    char c;
    cout << "変換前の文字" << endl;
    while( !fout.eof() )
    {
        fout >> c;
        if ( !fout.eof() ) cout << c << ' '; 
    } 
    cout << endl;
    cout << endl;
    fout.close();
    cout << "これからパイプラインでシーザー暗号処理を行います・・・" << endl;

    //入力フィルタを用意
    pipeline pipeline;
    InputFileFilter inputFilter( inputFileName );
    pipeline.add_filter( inputFilter );

    //変換フィルタを用意
    int key = 1;
    CaesarTransformFilter transform( key );
    pipeline.add_filter( transform );

    //出力フィルタを用意
    string outputFileName = "CaesarData.txt";
    OutputFileFilter outputFilter( outputFileName );
    pipeline.add_filter( outputFilter );

    //パイプラインで実行
    pipeline.run( InputFileFilter::buffer_Count );
    pipeline.clear();

    //確認
    cout << endl;
    ifstream fout1( outputFileName );
    cout << "変換後の文字" << endl;
    while( !fout1.eof() )
    {
        fout1 >> c;
        if ( !fout1.eof() ) cout << c << ' '; 
    }
    cout << endl << endl;
    fout1.close();
}


 このサンプルは、パイプラインを使用した簡単なサンプルです。各フィルターの動きをみる為に、coutに文字列を出力していますが、実務では止めておいた方がよいでしょう。並列的にcoutする仕様は、多くの検討が必要となる上に、利益が殆どありません。
 パイプラインを実装した事のない人には、非常に冗長で難しく思えるコードですが、自分で実装する事を考えると非常にお手軽に実装できます。それに、見けかほど難しくありません。お約束に従って実装するだけです。
 pipelineテンプレートを使う為の準備として、ストリーム入力用フィルター、ストリーム変換用フィルター、ストリーム出力用フィルターの3個のクラスを用意します。各filterクラスは、filterクラスから継承して、親クラスのコンストラクタにモード(serial_in_order、parallelなど)を指定して呼び出します。ひとまず、入力/出力用フィルターはserial_in_order、変換用フィルターはparallelを使用するとよいでしょう。
※処理内容によっては、今回の様に、各ステージでやり取りする為のクラスが別に必要となります。
 フィルタークラスの定義が少し面倒ですが、パイプラインの使用法は簡単です。filterクラスを追加後、runメソッドとclearメソッドを順番に実行するだけです。それだけで、並列的に処理されるようになります。今まで自分でパイプラインを実装してきた事を考えれば、これは大きな進歩です。
 それに、パイプラインには柔軟性があります。例えば、今回のサンプルで、排他的論理和を使用した暗号化クラスを追加し、シーザー暗号化後のデータをさらに暗号化するなどの事が可能となります。他にも、このサンプルでは意味はありませんが、いくつも変換フィルターを追加して、ストリームを何重に変換することもできます。この様に、パイプライン処理をする事により、モジュール化が進められ、柔軟な処理が可能となります。
 察しがいい方は、多くの処理がパイプラインで処理できる事に気付き、片っぱしから既存のコードを並列化したくなったかもしれません。しかし、パイプラインも万能ではありません。パイプラインは、直列部分も存在するため、パフォーマンス向上に限界があります。また、変換ロジックが大量に存在する場合に、個々のスレッドを割り当てると逆にパフォーマンスが劣化します。それで、pipelineのrunメソッドに限界値を指定する事になっています。パイプラインに使用するリソースの限界値を割り出すのは困難ですので、設計にも負担がかかります。パイプラインを使用する際には、よく検討して下さい。
 最後に注意するべき事柄を書きます。pipelineテンプレートがサポートするのは、線形パイプラインです。バロック形式のパイプラインを直接処理できません。線形順へと変換する必要があります。あと、インテルTBB3.0には強く型付けできるparallel_pipelineが存在します。parallel_pipelineについてはまた今度解説します。


【参考資料】

テーマ : プログラミング
ジャンル : コンピュータ

parallel_for_eachテンプレート

 C++標準ライブラリに、コンテナ要素に任意の単項関数オブジェクトを適用するfor_eachテンプレートがあります。インテルTBBには、これと同じ働きをするparallel_for_eachテンプレートが用意されています。
 サンプルを見れば使い方はすぐ分かると思います。

#include <iostream>
#include <functional>
#include <algorithm>
#include "tbb/task_scheduler_init.h"
#include "tbb/concurrent_vector.h"
#include "tbb/parallel_for_each.h"
using namespace std;
using namespace tbb;

int main()
{
    //スケジューラを初期化(省略可能)
    task_scheduler_init init;

    //データを用意
    int count = 10;
    concurrent_vector<int> v;
    for ( int i = 0; i <  count; i++ )
        v.push_back( i );

    //並列的に合計値を算出
    unsigned int total = 0;
    TotalValueComputer obj;
    parallel_for_each( v.begin(), v.end(), 
        [ &total ] ( const int value ) { total += value;
    });
    cout << "並列的に0~9の数値の合計を算出します。" << endl;
    cout << "合計値 = " << total << endl << endl;

    //逐次的に合計値を算出
    total = 0;
    for_each( v.begin(), v.end(), 
        [ &total ] ( const int value ) { total += value;
    });
    cout << "逐次的に0~9の数値の合計を算出します。" << endl;
    cout << "合計値 = " << total << endl << endl;

    return 0;
}


 このサンプルは、0~9の数値を合計する単純なものです。for_eachとparallel_for_eachが似ている事が確認できると思います。画面のスペースを節約する為に、単項関数オブジェクトを定義せずに、ラムダ式を使用しました。
 ただし、全く同じではありません。for_eachテンプレートは、戻り値を使用できますが、現在のところparallel_for_eachは戻り値を使用できません。例えば、parallel_doの記事で書いた、TotalValueComputerオブジェクトを使用する場合、for_each( v.begin(), v.end(), TotalValueComputer() ).GetTotal()と出来ましたが、現時点parallel_for_eachテンプレートはこれを出来ません。そして、使用法や使用するべき状況が事なります。
 今回の様な単純な処理でparallel_for_eachを使用するとパフォーマンスが低下します。また、parallel_for_eachは並列処理を行うので、データの整合性に注意せねばなりません。
 parallel_for_eachの正体は、parallel_do_feederを使用しないparallel_doです。parallel_doテンプレートは、複雑なオブジェクトによる、複雑な処理に使用するべきものですから、for_eachと同じ感覚で使用しないでください。parallel_for_eachは、慎重に使用しましょう。
 なお、parallel_for_eachは、Microsoft Parallel Patterns Library (PPL)と互換性を持つ為のものの様です。

【参考資料】

テーマ : プログラミング
ジャンル : コンピュータ

インテルTBBにおける要素数が不定なループの並列化

 対象となる要素の数が不定な場合、一般的に並列化が困難です。何故ならば、単純に要素を分割して、複数のスレッドが個々に処理をするわけにはいかないからです。処理中に要素が増加した時、その要素をどのスレッドが処理をするのかなど、細かなルールを考える必要が生じます。
 幸いインテルTBBには、この状況に対応できるparallel_doテンプレートが用意されています。インテルTBB2.0には、parallel_whileテンプレートがありましたが、現在は推奨されていません。parallel_doテンプレートの方を使用しましょう。
 parallel_doテンプレートの使い方を示す、極力簡単なサンプルを用意しましたので、先ずはそれを見て下さい。


#include <iostream>
#include "tbb/task_scheduler_init.h"
#include "tbb/concurrent_vector.h"
#include "tbb/parallel_do.h"
using namespace std;
using namespace tbb;

/*
    合計値を算出するオブジェクト。
*/
class TotalValueComputer
{
    unsigned int total;
public:
    //コンストラクタ
    TotalValueComputer( ) : total( 0 ) {}

    //合計を取得
    unsigned int GetTotal(){ return this->total; }

    //合計値を加算
    void Add( int value ) { this->total += value; }

    //-----------------------parallel_do用の設定-----------------------

    //コピーコンストラクタ
    TotalValueComputer( const TotalValueComputer& obj ) : total( obj.total ){} 

    //デストラクタ
    ~TotalValueComputer(){}

    /*  
        並列処理用オペレーター
        保持している数値に加算します。
    */
    void operator() ( const int value, parallel_do_feeder< int >& feeder ) const
    {
        TotalValueComputer* tmp = const_cast< TotalValueComputer* >( this );
        tmp->Add( value );
        if ( value == 0 )
            feeder.add( 10 );
    }
    //--------------------------------------------------------------
};

int main()
{
    //スケジューラを初期化(省略可能)
    task_scheduler_init init;

    //データを用意
    int count = 10;
    concurrent_vector< int > v;
    for ( int i = 0; i <  count; i++ )
        v.push_back( i );

    //並列的に合計値を算出
    TotalValueComputer obj;
    parallel_do( v.begin(), v.end(), obj );
    cout << "並列的に1~10の数値の合計を算出します。" << endl;
    cout << "合計値 = " << obj.GetTotal() << endl;

    //ラムダ式を使用する場合
    unsigned int total = 0;
    parallel_do( v.begin(), v.end(), 
        [ &total ]( const int value, parallel_do_feeder< int >& feeder ) 
    {
        total += value;
        if ( value == 0 )
            feeder.add( 10 );
    } );
    cout << "ラムダ式を使用して1~10の数値の合計を算出します。" << endl;
    cout << "合計値 = " << obj.GetTotal() << endl;
    return 0;
}


 このサンプルは、1~10の数値を合計するものです。parallel_doテンプレートの特徴を示す為に、0はマジックナンバーとして扱い、0が渡されると10を追加するようにしています。
 parallel_do_feeder<T>オブジェクトを使用すると、安全に処理対象となる要素を追加する事が出来ます。
 parallel_doテンプレートを使用する為の要件は三つあります。インテルTBBのマニュアルでは、コピーコンストラクタ、デストラクタ、関数呼び出し演算子の三つが要件として定義されています。しかし、マニュアルの例とサンプルをみると、関数呼び出し演算子だけが定義されていますので、コピーコンストラクタとデストラクタは、状況に応じて省略可能なようです。
 parallel_doテンプレートが処理対象とするオブジェクトは、単項関数オブジェクトをイメージするとよいでしょう。
 このサンプルでは、説明のために単純なものを用意しましたが、実際にこの様な単純な処理でparallel_doテンプレートを使うのは止めましょう。parallel_doテンプレートは、複雑なオブジェクトによる、大量のデータが必要となる複雑な処理を行う時使用するべきです。
 またこのサンプルは、count変数に大きな数値を指定すると正しい結果を出しません。理由は、アトミックに数値を加算していないからです。正しい結果を出すためには、インテルTBBに用意されているatomicテンプレートを使うか、OSに用意されているアトミック関数を使用します。 atomicテンプレートについては、また改めて解説します。


【参考資料】

テーマ : プログラミング
ジャンル : コンピュータ

インテルTBBと並列プリフィクススキャン

 今回は並列化が困難な処理を、インテルTBBを使用して並列化する方法を解説します。
 要素数を変化させずに、配列の全ての合計値を求める処理をプリフィクススキャンと呼びます。プリフィクススキャンは、プリフィクスもしくはスキャンと呼ばれる場合もあります。
 文章では分かり難いですので、プリフィクススキャンのサンプル(C++言語を使用)を載せます。

#include <iostream> using namespace std; int main() { //配列を初期化 int count = 10; int* values = new int[ count ]; for ( int i = 0; i < count; i++ ) values[ i ] = i; cout << "処理前の状態\t"; for ( int i = 0; i < count; i++ ) cout << values[ i ] << " "; cout << endl; //プリフィクススキャン cout << "プリフィクススキャンを実行します・・・" << endl; for ( int i = 1; i < count; i++ ) values[ i ] += values[ i - 1 ]; //処理結果を表示 cout << "処理後の状態\t"; for ( int i = 0; i < count; i++ ) cout << values[ i ] << " "; cout << endl; cout << "合計値は【" << values[ count - 1 ] << "】です" << endl << endl; //後処理 delete[] values; return 0; }
 サンプルを読めば、プリフィクススキャンは理解できると思います。この様に、プリフィクススキャンそのものは難しくありません。しかし、各配列が前の結果に依存しているので、並列化が不可能だと感じるでしょう。
 並列化が不可能に思えるプリフィクスですが、次の様に考えると並列化が可能になります。

int tmp = values[ 0 ];
for ( int i = 1; i < count; i++ ) {
    tmp = tmp + values[ i ];
    results[ i ] = tmp;
}

 この様に、プリフィクススキャンが実現したい事を、分解して考える事により、並列化が可能となります。とはいえ、ネイティブスレッドを使用して、並列プリフィクススキャンを実装するのは、間違いやすく煩雑な作業です。

#include <iostream> #include "tbb/task_scheduler_init.h" #include "tbb/blocked_range.h" #include "tbb/concurrent_vector.h" #include "tbb/parallel_scan.h" using namespace std; using namespace tbb; #define DATA_COUNT 10 /* 並列的にスキャンして、合計値を求めるオブジェクト。 */ class ParallelScanSum { int sum; concurrent_vector< int > inputData; //入力データ static concurrent_vector< int > results; //計算結果を格納する領域 public: //コンストラクタ ParallelScanSum( concurrent_vector< int > inputData ) : inputData( inputData ), sum( 0 ) { } //合計値を取得する int get_sum() const { return this->sum; } //計算結果のベクタを取得する concurrent_vector< int > get_results() const { return this->results; } //並列的に合計を求める static ParallelScanSum pararell_scan( concurrent_vector< int > inputData ) { ParallelScanSum body( inputData ); parallel_scan( blocked_range< int >( 0, inputData.size() ), body, auto_partitioner() ); return body; } //-----------------------parallel_scan用の設定----------------------- //分割コンストラクタ ParallelScanSum( ParallelScanSum& obj, split ) : inputData( obj.inputData ), sum( 0 ) { } //分割して導出された結果をマージ void reverse_join( ParallelScanSum& obj ) { this->sum += obj.sum; } //結果を割り当てる void assign( ParallelScanSum& obj ) { this->sum = obj.sum; } //合計値を算出する template< typename Tag > void operator()( const blocked_range< int >& range, Tag ) { int tmp = this->sum; for ( int i = range.begin(); i != range.end(); i++ ) { tmp += inputData[ i ]; if ( Tag::is_final_scan() ) { results[ i ] = tmp; } } this->sum = tmp; } //------------------------------------------------------------------- }; concurrent_vector< int > ParallelScanSum::results( DATA_COUNT, 0 ); int main() { //スケジューラを初期化(省略可能) task_scheduler_init init; //入力データを用意 concurrent_vector< int > data; for ( int i = 0; i < DATA_COUNT; i++ ) data.push_back( i ); cout << "処理前の状態\t"; copy( data.begin(), data.end(), ostream_iterator< int >( cout, " " ) ); cout << endl; //並列スキャンを実行 cout << "並列スキャンを実行します・・・" << endl; ParallelScanSum result = ParallelScanSum::pararell_scan( data ); //処理結果を表示 concurrent_vector< int > output = result.get_results(); cout << "処理後の状態\t"; copy( output.begin(), output.end(), ostream_iterator< int >( cout, " " ) ); cout << endl; cout << "合計値は【" << result.get_sum() << "】です" << endl << endl; return 0; }
 幸いインテルTBBには、並列プリフィクススキャンが比較的容易に実装できる、parallel_scanテンプレートが用意されています。
 このサンプルは、先ほど掲載したスキャンのサンプルとほぼ同様で、0~9の合計を求めています。違いは、並列処理をするためにparallel_scanテンプレートを使っている点と、インテルTBBのコンテナクラスの使い方を復習するために、concurrent_vectorを使っている点です。
 parallel_scanテンプレートは、は5つの要件を満たす必要があります。
1つ目は、分割コンストラクタを実装する事です。parallel_scanテンプレートは、処理を分割して実行するので必要となります。
2つ目は、分割した処理の結果をマージするために、reverse_joinメソッドを実装しなくてはなりません。
3つ目は、引き渡されたオブジェクトを元に、値を割り当てるために、assignメソッドを実装しなくてはなりません。
4つ目は、pre_scan_tagを引数に取る関数呼び出し演算子を実装する事です。
5つ目は、final_scan_tagを引数に取る関数呼び出し演算子を実装する事です。pre_scan_tagとfinal_scan_tagは、parallel_scanの段階を識別するためのもので、is_final_scan()メソッドを使用する事により、最終段階であるか否かを判定できます。この2つ関数呼び出し演算子は、関数呼び出し演算子で、テンプレートを使用にする事により、1つの実装で済ませられます。
 ひとつ注意するべき点があります。それはparallel_scanテンプレートはマルチコアCPUでは、あまりパフォーマンスが向上しない点です。何故ならば、依存性がある処理を分解するので、必要なスレッド数や処理が増加するからです。parallel_scanテンプレートは、クアッドコア以上のCPUに適しています。
 parallel_scanテンプレートの存在価値は、プログラム依存性があり容易に並列化出来ないプリフィクススキャン処理を、並列化する点にあります。一見並列化出来ない処理も、見方を変える事により並列化出来ます。

【参考資料】

テーマ : プログラミング
ジャンル : コンピュータ

プロフィール

インドリ

Author:インドリ
みなさん、はじめまして、
コンニチハ。

ボクは、無限の夢(infinity dream)を持つネタ好きな虹色の鳥インドリ(in dre)です。
色々な情報処理技術を啄ばむから楽しみにしてね。

http://twitter.com/indori
は別人による嫌がらせ行為です。
私とは関係ないので注意して下さい。
次はなりすましブログなどをするかもしれませんが、ここ以外でブログをするつもりがないので、ここ以外にインドリのブログがあったとしても無視してください。


何度言っても分からない人がいるので、ここにコメント欄へ書き込むときの注意事項を書きます。


一、社会人としてのマナーをわきまえましょう。
一、妄想に基づく書き込みを止めてください。
一、暴言の類は書かないで下さい。
一、某誹謗中傷サイトの書き込みは彼らの妄想に基づく書き込みですから無視して、ここへ書き込まないで下さい。
一、コメント書く前に他のコメントよく読んでから行って下さい。
一、言いがかかり等の行為を禁止します。
一、その他常識的に考えて迷惑なコメントはしないで下さい。


以上のルールを守れない人のコメントは削除します。



利用上の注意
ここに紹介してある文章およびプログラムコードは正確であるように心がけておりますが、内容を保証するものではありません。当サイトの内容によって生じた損害については、一切の責任を負いませんので御了承ください。


執筆したCodeZineの記事


【VB.NETで仮想CPUを作ろう】

  1. VB.NETで仮想CPUを作ろう
  2. レジスタの実装
  3. 仮想CPUのGUI化
  4. テストドライバの改良
  5. CPUの基礎動作の実装
  6. MOV命令の実装
  7. ADD命令実装
  8. SUB命令実装
  9. INC命令&DEC命令の実装と命令長
  10. MLU命令の実装とModR/Mについて
  11. DIV命令の実装とイベント設計について
  12. 機械語駆動式 関数電卓を作ろう!
  13. 機械語駆動式 関数電卓を作ろう! 解答編(前半)
  14. 機械語駆動式 関数電卓を作ろう! 解答編(後半)


【仮想ネットワーク実装でTCP/IPを学ぼう】
  1. TCP/IPの基礎と勘所
  2. ネットワークアクセス層の勘所
  3. インターネット層の勘所
  4. トランスポート層の勘所
  5. アプリケーション層の勘所
  6. セキュリティの基礎と仮想ネットワークの仕様
  7. GDI+と独自プロトコルの定義



【並列化】
インテル Parallel Studioを使って並列化プログラミングを試してみた
並列プログラミングの効率的なデバッグを実現する「Parallel Inspector」


【TBBシリーズ】
  1. インテル スレッディング・ビルディング・ブロックの概要
  2. インテルTBBから学ぶループの並列化
  3. スレッドセーフとインテルTBBのコンテナ
  4. インテルTBBのスレッドクラス


【OpenMPシリーズ】
  1. OpenMPの基礎構文
  2. OpenMPの実行時ライブラリと並列ループ
  3. OpenMPのメモリモデルとfork- joinモデル

最近の記事
最近のコメント
月別アーカイブ
カテゴリ
Ada (9)
COBOL (5)
C (9)
C++ (11)
C# (370)
D (25)
Java (8)
Perl (1)
Ruby (14)
PHP (2)
Boo (2)
Cobra (2)
LISP (6)
F# (33)
HTML (0)
XHTML (0)
CSS (0)
XML (0)
XSLT (0)
Scala (4)
WPF (0)
WF (2)
WCF (0)
LINQ (4)
MONO (5)
Linux (0)
MySQL (0)
ブログ内検索
リンク
最近のトラックバック
RSSフィード
ブロとも申請フォーム

この人とブロともになる

QRコード
FC2カウンター