#include #include #include using namespace ff; class Worker: public ff_node { public: void * svc(void * task) { int * t = (int *)task; std::cout << "Worker got " << *t << std::endl; (*t)++; std::cout << "Worker emitting " << *t << std::endl; return t; } }; class Collector: public ff_node { public: void * svc(void * task) { int * t = (int *)task; std::cout << "Collector received " << *t << "\n"; if (*t == -1) return NULL; return task; } }; // the load-balancer filter class Emitter: public ff_node { public: Emitter(int max_task):ntask(max_task) {}; void * svc(void *) { int * task = new int(ntask); --ntask; if (ntask<0) return NULL; std::cout << "Emitting task " << ntask << std::endl;; return task; } private: int ntask; }; int main(int argc, char * argv[]) { if (argc<3) { std::cerr << "use: " << argv[0] << " nworkers streamlen\n"; return -1; } int nworkers=atoi(argv[1]); int streamlen=atoi(argv[2]); if (!nworkers || !streamlen) { std::cerr << "Wrong parameters values\n"; return -1; } ff_farm<> farm; Emitter E(streamlen); farm.add_emitter(&E); std::vector w; for(int i=0;i