magistraleinformaticanetworking:spm:samplefastflowcode
Differenze
Queste sono le differenze tra la revisione selezionata e la versione attuale della pagina.
| Entrambe le parti precedenti la revisioneRevisione precedenteProssima revisione | Revisione precedente | ||
| magistraleinformaticanetworking:spm:samplefastflowcode [19/05/2011 alle 14:27 (15 anni fa)] – Marco Danelutto | magistraleinformaticanetworking:spm:samplefastflowcode [19/05/2011 alle 14:33 (15 anni fa)] (versione attuale) – Marco Danelutto | ||
|---|---|---|---|
| Linea 2: | Linea 2: | ||
| This is the code used during the FastFlow lesson. | This is the code used during the FastFlow lesson. | ||
| + | All code should be compiled with a command such as | ||
| + | < | ||
| <code c++ pipe.cpp> | <code c++ pipe.cpp> | ||
| Linea 154: | Linea 156: | ||
| } | } | ||
| </ | </ | ||
| + | |||
| + | In case we want to parallelize the increase integers stage with a farm, we can keep the same code and use the following main code: | ||
| + | <code c++ altmain.cpp> | ||
| + | int main(int argc, char * argv[]) { | ||
| + | int tasks = 10; // dummy, should be taken from the input line | ||
| + | |||
| + | // allocator must be initialized before using it | ||
| + | allocator.init(); | ||
| + | |||
| + | // declare a pipeline object | ||
| + | ff_pipeline pipe_a; | ||
| + | // add stages in order: | ||
| + | // the first stage added is the first pipeline stage | ||
| + | // the i-th stage added is the i-th pipeline stage | ||
| + | pipe_a.add_stage(new GenerateStream(tasks)); | ||
| + | |||
| + | // we create a farm to process increments in the second stage | ||
| + | // declare a ff_farm object | ||
| + | ff_farm<> | ||
| + | // default collector | ||
| + | farm.add_collector(NULL); | ||
| + | // declare worker vector (string) | ||
| + | std:: | ||
| + | // two workers | ||
| + | workers.push_back(new IntIncStage); | ||
| + | workers.push_back(new IntIncStage); | ||
| + | // add workers to farm | ||
| + | farm.add_workers(workers); | ||
| + | // second pipeline stage is the farm | ||
| + | pipe_a.add_stage(& | ||
| + | // the third one is the print stream | ||
| + | pipe_a.add_stage(new DrainStage); | ||
| + | |||
| + | |||
| + | // now we ca run the application: | ||
| + | std::cout << " | ||
| + | pipe_a.run(); | ||
| + | std::cout << " | ||
| + | |||
| + | // here more (unrelated) work can be performed ... | ||
| + | // when results are needed we should wait for application termination | ||
| + | pipe_a.wait(); | ||
| + | // alternatively, | ||
| + | // call such as: | ||
| + | // in this case the pipeline is started and its termination awaited | ||
| + | // synchronously | ||
| + | std::cout << " | ||
| + | return(0); | ||
| + | } | ||
| + | </ | ||
| + | |||
| + | Sample farm code (taken from FF tests/ directory): | ||
| + | |||
| + | <code c++ farm.cpp> | ||
| + | /* -*- Mode: C++; tab-width: 4; c-basic-offset: | ||
| + | /* *************************************************************************** | ||
| + | | ||
| + | | ||
| + | | ||
| + | * | ||
| + | | ||
| + | | ||
| + | | ||
| + | | ||
| + | * | ||
| + | | ||
| + | | ||
| + | | ||
| + | * | ||
| + | | ||
| + | | ||
| + | | ||
| + | | ||
| + | | ||
| + | | ||
| + | | ||
| + | | ||
| + | * | ||
| + | | ||
| + | */ | ||
| + | |||
| + | /** | ||
| + | |||
| + | Very basic test for the FastFlow farm. | ||
| + | |||
| + | */ | ||
| + | #include < | ||
| + | #include < | ||
| + | #include < | ||
| + | |||
| + | |||
| + | using namespace ff; | ||
| + | |||
| + | // generic worker | ||
| + | class Worker: public ff_node { | ||
| + | public: | ||
| + | void * svc(void * task) { | ||
| + | int * t = (int *)task; | ||
| + | std::cout << " | ||
| + | << " received task " << *t << " | ||
| + | return task; | ||
| + | } | ||
| + | // I don't need the following functions for this test | ||
| + | //int | ||
| + | // | ||
| + | |||
| + | }; | ||
| + | |||
| + | // the gatherer filter | ||
| + | class Collector: public ff_node { | ||
| + | public: | ||
| + | void * svc(void * task) { | ||
| + | int * t = (int *)task; | ||
| + | if (*t == -1) return NULL; | ||
| + | return task; | ||
| + | } | ||
| + | }; | ||
| + | |||
| + | // the load-balancer filter | ||
| + | class Emitter: public ff_node { | ||
| + | public: | ||
| + | Emitter(int max_task): | ||
| + | |||
| + | void * svc(void *) { | ||
| + | int * task = new int(ntask); | ||
| + | --ntask; | ||
| + | if (ntask< | ||
| + | return task; | ||
| + | } | ||
| + | private: | ||
| + | int ntask; | ||
| + | }; | ||
| + | |||
| + | |||
| + | int main(int argc, char * argv[]) { | ||
| + | |||
| + | if (argc<3) { | ||
| + | std::cerr << "use: " | ||
| + | << argv[0] | ||
| + | << " nworkers streamlen\n"; | ||
| + | return -1; | ||
| + | } | ||
| + | | ||
| + | int nworkers=atoi(argv[1]); | ||
| + | int streamlen=atoi(argv[2]); | ||
| + | |||
| + | if (!nworkers || !streamlen) { | ||
| + | std::cerr << "Wrong parameters values\n"; | ||
| + | return -1; | ||
| + | } | ||
| + | | ||
| + | ff_farm<> | ||
| + | | ||
| + | Emitter E(streamlen); | ||
| + | farm.add_emitter(& | ||
| + | |||
| + | std:: | ||
| + | for(int i=0; | ||
| + | farm.add_workers(w); | ||
| + | |||
| + | Collector C; | ||
| + | farm.add_collector(& | ||
| + | | ||
| + | if (farm.run_and_wait_end()< | ||
| + | error(" | ||
| + | return -1; | ||
| + | } | ||
| + | std::cerr << "DONE, time= " << farm.ffTime() << " (ms)\n"; | ||
| + | farm.ffStats(std:: | ||
| + | |||
| + | return 0; | ||
| + | } | ||
| + | </ | ||
| + | |||
| + | |||
| + | Sample accelerator code: | ||
| + | <code c++ accelerator.cpp> | ||
| + | #include < | ||
| + | #include < | ||
| + | |||
| + | #include < | ||
| + | #include < | ||
| + | |||
| + | using namespace std; | ||
| + | using namespace ff; | ||
| + | |||
| + | |||
| + | // should be global to be accessible from workers | ||
| + | #define MAX 1024 | ||
| + | double x[MAX]; | ||
| + | double y[MAX]; | ||
| + | | ||
| + | #define ITERNO 100000 | ||
| + | int iterno = ITERNO; | ||
| + | |||
| + | class Worker: public ff_node { | ||
| + | int svc_init() { | ||
| + | cout << " | ||
| + | return 0; | ||
| + | } | ||
| + | |||
| + | void * svc(void * in) { | ||
| + | int i = * ((int *) in); | ||
| + | // cout << " | ||
| + | for(int j=0; j< | ||
| + | x[i] = sin(x[i]); | ||
| + | y[i] += x[i]; | ||
| + | return in; | ||
| + | } | ||
| + | }; | ||
| + | |||
| + | |||
| + | #define NW 2 | ||
| + | |||
| + | int main(int argc, char * argv[]) | ||
| + | { | ||
| + | ffTime(START_TIME); | ||
| + | |||
| + | cout << "init " << argc << endl; | ||
| + | int nw = (argc==1 ? NW : atoi(argv[1])); | ||
| + | iterno = (argc<=2 ? ITERNO : atoi(argv[2])); | ||
| + | |||
| + | cout << "using " << nw << " workers iterating " << iterno << " times "<< | ||
| + | |||
| + | // init input (fake) | ||
| + | for(int i=0; i<MAX; i++) { | ||
| + | x[i] = (double)(i*10); | ||
| + | y[i] = (double)(-i*13); | ||
| + | } | ||
| + | cout << " | ||
| + | ff_farm<> | ||
| + | |||
| + | std:: | ||
| + | for(int i=0; | ||
| + | w.push_back(new Worker); | ||
| + | farm.add_workers(w); | ||
| + | farm.run_then_freeze(); | ||
| + | | ||
| + | cout << " | ||
| + | int tasks[MAX]; | ||
| + | for(int i=0; i<MAX; i++) { | ||
| + | | ||
| + | | ||
| + | } | ||
| + | farm.offload((void *) FF_EOS); | ||
| + | |||
| + | cout << " | ||
| + | farm.wait(); | ||
| + | |||
| + | cout << "Farm terminated after computing for " << farm.ffTime() << endl; | ||
| + | |||
| + | ffTime(STOP_TIME); | ||
| + | cout << "Spent overall " << ffTime(GET_TIME) << endl; | ||
| + | |||
| + | } | ||
| + | </ | ||
| + | |||
magistraleinformaticanetworking/spm/samplefastflowcode.1305815247.txt.gz · Ultima modifica: 19/05/2011 alle 14:27 (15 anni fa) da Marco Danelutto
