===== Sample FastFlow farms ===== These are the programs used in the course notes. Comments may be found in the course notes book. #include #include #include using namespace ff; class Worker: public ff_node { public: void * svc(void * task) { int * t = (int *)task; std::cout << "Worker got " << *t << std::endl; (*t)++; std::cout << "Worker emitting " << *t << std::endl; return t; } }; class Collector: public ff_node { public: void * svc(void * task) { int * t = (int *)task; std::cout << "Collector received " << *t << "\n"; if (*t == -1) return NULL; return task; } }; // the load-balancer filter class Emitter: public ff_node { public: Emitter(int max_task):ntask(max_task) {}; void * svc(void *) { int * task = new int(ntask); --ntask; if (ntask<0) return NULL; std::cout << "Emitting task " << ntask << std::endl;; return task; } private: int ntask; }; int main(int argc, char * argv[]) { if (argc<3) { std::cerr << "use: " << argv[0] << " nworkers streamlen\n"; return -1; } int nworkers=atoi(argv[1]); int streamlen=atoi(argv[2]); if (!nworkers || !streamlen) { std::cerr << "Wrong parameters values\n"; return -1; } ff_farm<> farm; Emitter E(streamlen); farm.add_emitter(&E); std::vector w; for(int i=0;i #include #include #include using namespace ff; class Worker: public ff_node { public: void * svc(void * task) { int * t = (int *)task; (*t)++; return task; } }; class Collector: public ff_node { public: void * svc(void * task) { int * t = (int *)task; std::cout << "Collector got " << *t << std::endl; return GO_ON; } }; class Emitter: public ff_node { public: Emitter(int n) { streamlen = n; task = 0; }; void * svc(void *) { sleep(1); task++; int * t = new int(task); if (task farm; // farm object Emitter E(streamlen); farm.add_emitter(&E); std::vector w; for(int i=0;i #include #include #include static int * results; typedef struct task_t { int i; int t; } TASK; using namespace ff; class Worker: public ff_node { public: void * svc(void * task) { TASK * t = (TASK *) task; results[t->i] = ++(t->t); return GO_ON; } }; class Emitter: public ff_node { public: Emitter(int n) { streamlen = n; task = 0; }; void * svc(void *) { task++; TASK * t = (TASK *) calloc(1,sizeof(TASK)); t->i = task; t->t = task*task; if (task farm; Emitter E(streamlen); farm.add_emitter(&E); std::vector w; for(int i=0;i