magistraleinformaticanetworking:spm:samplefastflowcode
Differenze
Queste sono le differenze tra la revisione selezionata e la versione attuale della pagina.
Entrambe le parti precedenti la revisioneRevisione precedenteProssima revisione | Revisione precedente | ||
magistraleinformaticanetworking:spm:samplefastflowcode [19/05/2011 alle 14:27 (14 anni fa)] – Marco Danelutto | magistraleinformaticanetworking:spm:samplefastflowcode [19/05/2011 alle 14:33 (14 anni fa)] (versione attuale) – Marco Danelutto | ||
---|---|---|---|
Linea 2: | Linea 2: | ||
This is the code used during the FastFlow lesson. | This is the code used during the FastFlow lesson. | ||
+ | All code should be compiled with a command such as | ||
+ | < | ||
<code c++ pipe.cpp> | <code c++ pipe.cpp> | ||
Linea 154: | Linea 156: | ||
} | } | ||
</ | </ | ||
+ | |||
+ | In case we want to parallelize the increase integers stage with a farm, we can keep the same code and use the following main code: | ||
+ | <code c++ altmain.cpp> | ||
+ | int main(int argc, char * argv[]) { | ||
+ | int tasks = 10; // dummy, should be taken from the input line | ||
+ | |||
+ | // allocator must be initialized before using it | ||
+ | allocator.init(); | ||
+ | |||
+ | // declare a pipeline object | ||
+ | ff_pipeline pipe_a; | ||
+ | // add stages in order: | ||
+ | // the first stage added is the first pipeline stage | ||
+ | // the i-th stage added is the i-th pipeline stage | ||
+ | pipe_a.add_stage(new GenerateStream(tasks)); | ||
+ | |||
+ | // we create a farm to process increments in the second stage | ||
+ | // declare a ff_farm object | ||
+ | ff_farm<> | ||
+ | // default collector | ||
+ | farm.add_collector(NULL); | ||
+ | // declare worker vector (string) | ||
+ | std:: | ||
+ | // two workers | ||
+ | workers.push_back(new IntIncStage); | ||
+ | workers.push_back(new IntIncStage); | ||
+ | // add workers to farm | ||
+ | farm.add_workers(workers); | ||
+ | // second pipeline stage is the farm | ||
+ | pipe_a.add_stage(& | ||
+ | // the third one is the print stream | ||
+ | pipe_a.add_stage(new DrainStage); | ||
+ | |||
+ | |||
+ | // now we ca run the application: | ||
+ | std::cout << " | ||
+ | pipe_a.run(); | ||
+ | std::cout << " | ||
+ | |||
+ | // here more (unrelated) work can be performed ... | ||
+ | // when results are needed we should wait for application termination | ||
+ | pipe_a.wait(); | ||
+ | // alternatively, | ||
+ | // call such as: | ||
+ | // in this case the pipeline is started and its termination awaited | ||
+ | // synchronously | ||
+ | std::cout << " | ||
+ | return(0); | ||
+ | } | ||
+ | </ | ||
+ | |||
+ | Sample farm code (taken from FF tests/ directory): | ||
+ | |||
+ | <code c++ farm.cpp> | ||
+ | /* -*- Mode: C++; tab-width: 4; c-basic-offset: | ||
+ | /* *************************************************************************** | ||
+ | | ||
+ | | ||
+ | | ||
+ | * | ||
+ | | ||
+ | | ||
+ | | ||
+ | | ||
+ | * | ||
+ | | ||
+ | | ||
+ | | ||
+ | * | ||
+ | | ||
+ | | ||
+ | | ||
+ | | ||
+ | | ||
+ | | ||
+ | | ||
+ | | ||
+ | * | ||
+ | | ||
+ | */ | ||
+ | |||
+ | /** | ||
+ | |||
+ | Very basic test for the FastFlow farm. | ||
+ | |||
+ | */ | ||
+ | #include < | ||
+ | #include < | ||
+ | #include < | ||
+ | |||
+ | |||
+ | using namespace ff; | ||
+ | |||
+ | // generic worker | ||
+ | class Worker: public ff_node { | ||
+ | public: | ||
+ | void * svc(void * task) { | ||
+ | int * t = (int *)task; | ||
+ | std::cout << " | ||
+ | << " received task " << *t << " | ||
+ | return task; | ||
+ | } | ||
+ | // I don't need the following functions for this test | ||
+ | //int | ||
+ | // | ||
+ | |||
+ | }; | ||
+ | |||
+ | // the gatherer filter | ||
+ | class Collector: public ff_node { | ||
+ | public: | ||
+ | void * svc(void * task) { | ||
+ | int * t = (int *)task; | ||
+ | if (*t == -1) return NULL; | ||
+ | return task; | ||
+ | } | ||
+ | }; | ||
+ | |||
+ | // the load-balancer filter | ||
+ | class Emitter: public ff_node { | ||
+ | public: | ||
+ | Emitter(int max_task): | ||
+ | |||
+ | void * svc(void *) { | ||
+ | int * task = new int(ntask); | ||
+ | --ntask; | ||
+ | if (ntask< | ||
+ | return task; | ||
+ | } | ||
+ | private: | ||
+ | int ntask; | ||
+ | }; | ||
+ | |||
+ | |||
+ | int main(int argc, char * argv[]) { | ||
+ | |||
+ | if (argc<3) { | ||
+ | std::cerr << "use: " | ||
+ | << argv[0] | ||
+ | << " nworkers streamlen\n"; | ||
+ | return -1; | ||
+ | } | ||
+ | | ||
+ | int nworkers=atoi(argv[1]); | ||
+ | int streamlen=atoi(argv[2]); | ||
+ | |||
+ | if (!nworkers || !streamlen) { | ||
+ | std::cerr << "Wrong parameters values\n"; | ||
+ | return -1; | ||
+ | } | ||
+ | | ||
+ | ff_farm<> | ||
+ | | ||
+ | Emitter E(streamlen); | ||
+ | farm.add_emitter(& | ||
+ | |||
+ | std:: | ||
+ | for(int i=0; | ||
+ | farm.add_workers(w); | ||
+ | |||
+ | Collector C; | ||
+ | farm.add_collector(& | ||
+ | | ||
+ | if (farm.run_and_wait_end()< | ||
+ | error(" | ||
+ | return -1; | ||
+ | } | ||
+ | std::cerr << "DONE, time= " << farm.ffTime() << " (ms)\n"; | ||
+ | farm.ffStats(std:: | ||
+ | |||
+ | return 0; | ||
+ | } | ||
+ | </ | ||
+ | |||
+ | |||
+ | Sample accelerator code: | ||
+ | <code c++ accelerator.cpp> | ||
+ | #include < | ||
+ | #include < | ||
+ | |||
+ | #include < | ||
+ | #include < | ||
+ | |||
+ | using namespace std; | ||
+ | using namespace ff; | ||
+ | |||
+ | |||
+ | // should be global to be accessible from workers | ||
+ | #define MAX 1024 | ||
+ | double x[MAX]; | ||
+ | double y[MAX]; | ||
+ | | ||
+ | #define ITERNO 100000 | ||
+ | int iterno = ITERNO; | ||
+ | |||
+ | class Worker: public ff_node { | ||
+ | int svc_init() { | ||
+ | cout << " | ||
+ | return 0; | ||
+ | } | ||
+ | |||
+ | void * svc(void * in) { | ||
+ | int i = * ((int *) in); | ||
+ | // cout << " | ||
+ | for(int j=0; j< | ||
+ | x[i] = sin(x[i]); | ||
+ | y[i] += x[i]; | ||
+ | return in; | ||
+ | } | ||
+ | }; | ||
+ | |||
+ | |||
+ | #define NW 2 | ||
+ | |||
+ | int main(int argc, char * argv[]) | ||
+ | { | ||
+ | ffTime(START_TIME); | ||
+ | |||
+ | cout << "init " << argc << endl; | ||
+ | int nw = (argc==1 ? NW : atoi(argv[1])); | ||
+ | iterno = (argc<=2 ? ITERNO : atoi(argv[2])); | ||
+ | |||
+ | cout << "using " << nw << " workers iterating " << iterno << " times "<< | ||
+ | |||
+ | // init input (fake) | ||
+ | for(int i=0; i<MAX; i++) { | ||
+ | x[i] = (double)(i*10); | ||
+ | y[i] = (double)(-i*13); | ||
+ | } | ||
+ | cout << " | ||
+ | ff_farm<> | ||
+ | |||
+ | std:: | ||
+ | for(int i=0; | ||
+ | w.push_back(new Worker); | ||
+ | farm.add_workers(w); | ||
+ | farm.run_then_freeze(); | ||
+ | | ||
+ | cout << " | ||
+ | int tasks[MAX]; | ||
+ | for(int i=0; i<MAX; i++) { | ||
+ | | ||
+ | | ||
+ | } | ||
+ | farm.offload((void *) FF_EOS); | ||
+ | |||
+ | cout << " | ||
+ | farm.wait(); | ||
+ | |||
+ | cout << "Farm terminated after computing for " << farm.ffTime() << endl; | ||
+ | |||
+ | ffTime(STOP_TIME); | ||
+ | cout << "Spent overall " << ffTime(GET_TIME) << endl; | ||
+ | |||
+ | } | ||
+ | </ | ||
+ | |||
magistraleinformaticanetworking/spm/samplefastflowcode.1305815247.txt.gz · Ultima modifica: 19/05/2011 alle 14:27 (14 anni fa) da Marco Danelutto