mixskepuffmap.cpp
#include <iostream>
#include <ff/pipeline.hpp>
#include <ff/farm.hpp>
#include <math.h>
 
#include <skepu/vector.h>
#include <skepu/map.h>
 
 
 
using namespace ff;
using namespace std;
 
int N = 10; 
 
class Source: public ff_node {
public:
  Source(unsigned int streamlen):streamlen(streamlen)  {}
 
  void * svc(void * task) {
    if(streamlen != 0) {
      skepu::Vector<float> * v = new skepu::Vector<float>(N,(float)streamlen);
      streamlen--;
      task = (void *) v;
#ifdef DEBUG
      std::cout << "Source delivering:" << *v << std::endl;
#endif
    } else {
      task = NULL; 
    }
    return task;
  }
private:
  unsigned int streamlen;
};
 
class Drain: public ff_node {
  void * svc(void * task) {
    skepu::Vector<float> * v = (skepu::Vector<float> *) task;
#ifdef DEBUG
    std::cout << "Drain got " << *v << std::endl;
#endif
    return(GO_ON);
  }
}; 
 
#define ITERNO 800000
 
UNARY_FUNC(iters, float, a,
           for(int _i=0; _i<ITERNO; _i++) a = sin(a); return(a);
           )
 
float f_iters(float a) {
  for(int _i=0; _i<ITERNO; _i++) a = sin(a); return(a);
}
 
 
 
// UNARY_FUNC(f, float, a, return(a+1); )
 
class MapStage:public ff_node {
private: 
  int tasks; 
  int wno;
public: 
 
  MapStage(int wno):tasks(0),wno(wno) {}
 
  void svc_end() {
    cout << "SKEPU map stage " << wno << " computed " << tasks << " tasks " << e
ndl; 
  }
 
  void *svc(void * task) {
    tasks++;
    skepu::Vector<float> * v = (skepu::Vector<float> *) task;
 
#ifdef DEBUG
    std::cout << "MapStage got: " << *v << std::endl; 
#endif
 
    skepu::Vector<float> * r = new skepu::Vector<float>(v->size());
    skepu::Map<iters> skepumap(new iters);
 
    skepumap(*v, *r);
 
#ifdef DEBUG
    std::cout << "MapStage delivering: " << *r << std::endl; 
#endif
    return((void *) r);
  }
};
 
class SeqMapStage : public ff_node {
private: 
  int tasks; 
  int wno; 
 
public:
 
  SeqMapStage(int wno):tasks(0),wno(wno) {}
 
  void svc_end() {
    cout << "CPU worker " << wno << " computed " << tasks << " tasks " << endl; 
  }
 
  void * svc(void * task) {
    tasks++;
    skepu::Vector<float> * v = (skepu::Vector<float> *) task;
    skepu::Vector<float> * r = new skepu::Vector<float>(v->size());
    for(int i=0; i<v->size(); i++) 
      (*r)[i] = f_iters((*v)[i]); 
    return((void *) r); 
  }
};
 
class Emitter : public ff_node {
  void * svc (void * t) { return t; }
};
 
class Collector : public ff_node { 
  void * svc (void * t) { return t; }
};
 
 
int main(int argc, char * argv[]) {
  if (argc==1) {
    std::cerr << "use: "  << argv[0] << " streamlen veclen nw [skepuworkers]\n";
    return -1;
  }
  int m = atoi(argv[1]);
  N = atoi(argv[2]);
  int nw = atoi(argv[3]); 
 
  int skepuworker;
  if(argc>4) 
    skepuworker=atoi(argv[4]); 
  else 
    skepuworker = 0; 
 
  cout << "Using " << nw << " ff workers and " << skepuworker << " Skepu workers
" << endl; 
 
  // bild a 2-stage pipeline
  ff_pipeline pipe;
 
  pipe.add_stage(new Source(m));
 
  // pipe.add_stage(new MapStage());
  ff_farm<> farm; 
  std::vector<ff_node *> w; 
 
  if(skepuworker!=0) 
    for(int i=0; i<skepuworker; i++) 
      w.push_back(new MapStage(i));      // one stage on SKEPU
  for(int i=0; i<nw; i++) 
    w.push_back(new SeqMapStage(i));
 
  farm.set_scheduling_ondemand();
  farm.add_workers(w); 
  farm.add_collector(new Collector());
  farm.add_emitter(new Emitter());
  pipe.add_stage(&farm); 
 
  pipe.add_stage(new Drain());
 
 
  ffTime(START_TIME);
  if (pipe.run_and_wait_end()<0) {
    error("running pipeline\n");
    return -1;
  }
  ffTime(STOP_TIME);
 
  std::cerr << "DONE, pipe  time= " << pipe.ffTime() << " (ms)\n";
  std::cerr << "DONE, total time= " << ffTime(GET_TIME) << " (ms)\n";
  pipe.ffStats(std::cerr);
  return 0;
}