grain.h
00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 
00012 
00013 
00014 
00015 
00016 
00017 
00018 
00019 
00020 
00021 
00022 
00023 
00024 
00025 
00026 
00027 
00028 
00029 
00030 
00031 
00043 #ifndef PAR_GRAIN_H
00044 #define PAR_GRAIN_H
00045 
00046 #include "fastlib/col/heap.h"
00047 
00048 #include "fastlib/par/thread.h"
00049 
00050 
00106 template<typename TGrain>
00107 class GrainQueue {
00108  public:
00109   typedef TGrain Grain;
00110 
00111  private:
00112   MinHeap<double, Grain*> queue_;
00113   Mutex mutex_;
00114   
00115  public:
00116   GrainQueue() {}
00117   ~GrainQueue() {}
00118   
00122   void Init() {
00123     queue_.Init();
00124   }
00125   
00131   void Put(double difficulty, Grain *grain) {
00132     mutex_.Lock();
00133     queue_.Put(-difficulty, grain);
00134     mutex_.Unlock();
00135   }
00136   
00142   Grain *Pop() {
00143     mutex_.Lock();
00144     Grain *result = likely(queue_.size() != 0) ? queue_.Pop() : NULL;
00145     mutex_.Unlock();
00146     return result;
00147   }
00148   
00152   index_t size() const {
00153     return queue_.size();
00154   }
00155 };
00156 
00157 template<typename TGrain, typename TContext = int>
00158 class ThreadedGrainRunner {
00159   FORBID_ACCIDENTAL_COPIES(ThreadedGrainRunner);
00160  public:
00161   typedef TGrain Grain;
00162   typedef TContext Context;
00163 
00164  private:
00165   struct ThreadTask : public Task {
00166     ThreadedGrainRunner *runner_;
00167     
00168     ThreadTask(ThreadedGrainRunner *runner_in) {
00169       runner_ = runner_in;
00170     }
00171     
00172     void Run() {
00173       while (runner_->RunOneGrain()) {}
00174       delete this;
00175     }
00176   };
00177   
00178  private:
00179   GrainQueue<Grain> *queue_;
00180   Context context_;
00181   
00182  public:
00183   ThreadedGrainRunner() {}
00184   ~ThreadedGrainRunner() {}
00185   
00186   void Init(GrainQueue<Grain> *queue_in, Context context_in) {
00187     queue_ = queue_in;
00188     context_ = context_in;
00189   }
00190     
00199   bool RunOneGrain() {
00200     Grain *grain = queue_->Pop();
00201     if (unlikely(!grain)) {
00202       return false;
00203     } else {
00204       grain->Run(context_);
00205       delete grain;
00206       return true;
00207     }
00208   }
00209   
00220   Thread *SpawnThread() {
00221     ThreadTask *task = new ThreadTask(this);
00222     Thread *thread = new Thread();
00223     thread->Init(task);
00224     thread->Start();
00225     return thread;
00226   }
00227 
00232   void RunThreads(int num_threads) {
00233     ArrayList<Thread*> threads;
00234     
00235     threads.Init(num_threads);
00236     
00237     for (int i = 0; i < num_threads; i++) {
00238       threads[i] = SpawnThread();
00239     }
00240     for (int i = 0; i < num_threads; i++) {
00241       threads[i]->WaitStop();
00242       delete threads[i];
00243     }
00244   }
00245 };
00246 
00247 #endif