grain.h
00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00043 #ifndef PAR_GRAIN_H
00044 #define PAR_GRAIN_H
00045
00046 #include "fastlib/col/heap.h"
00047
00048 #include "fastlib/par/thread.h"
00049
00050
00106 template<typename TGrain>
00107 class GrainQueue {
00108 public:
00109 typedef TGrain Grain;
00110
00111 private:
00112 MinHeap<double, Grain*> queue_;
00113 Mutex mutex_;
00114
00115 public:
00116 GrainQueue() {}
00117 ~GrainQueue() {}
00118
00122 void Init() {
00123 queue_.Init();
00124 }
00125
00131 void Put(double difficulty, Grain *grain) {
00132 mutex_.Lock();
00133 queue_.Put(-difficulty, grain);
00134 mutex_.Unlock();
00135 }
00136
00142 Grain *Pop() {
00143 mutex_.Lock();
00144 Grain *result = likely(queue_.size() != 0) ? queue_.Pop() : NULL;
00145 mutex_.Unlock();
00146 return result;
00147 }
00148
00152 index_t size() const {
00153 return queue_.size();
00154 }
00155 };
00156
00157 template<typename TGrain, typename TContext = int>
00158 class ThreadedGrainRunner {
00159 FORBID_ACCIDENTAL_COPIES(ThreadedGrainRunner);
00160 public:
00161 typedef TGrain Grain;
00162 typedef TContext Context;
00163
00164 private:
00165 struct ThreadTask : public Task {
00166 ThreadedGrainRunner *runner_;
00167
00168 ThreadTask(ThreadedGrainRunner *runner_in) {
00169 runner_ = runner_in;
00170 }
00171
00172 void Run() {
00173 while (runner_->RunOneGrain()) {}
00174 delete this;
00175 }
00176 };
00177
00178 private:
00179 GrainQueue<Grain> *queue_;
00180 Context context_;
00181
00182 public:
00183 ThreadedGrainRunner() {}
00184 ~ThreadedGrainRunner() {}
00185
00186 void Init(GrainQueue<Grain> *queue_in, Context context_in) {
00187 queue_ = queue_in;
00188 context_ = context_in;
00189 }
00190
00199 bool RunOneGrain() {
00200 Grain *grain = queue_->Pop();
00201 if (unlikely(!grain)) {
00202 return false;
00203 } else {
00204 grain->Run(context_);
00205 delete grain;
00206 return true;
00207 }
00208 }
00209
00220 Thread *SpawnThread() {
00221 ThreadTask *task = new ThreadTask(this);
00222 Thread *thread = new Thread();
00223 thread->Init(task);
00224 thread->Start();
00225 return thread;
00226 }
00227
00232 void RunThreads(int num_threads) {
00233 ArrayList<Thread*> threads;
00234
00235 threads.Init(num_threads);
00236
00237 for (int i = 0; i < num_threads; i++) {
00238 threads[i] = SpawnThread();
00239 }
00240 for (int i = 0; i < num_threads; i++) {
00241 threads[i]->WaitStop();
00242 delete threads[i];
00243 }
00244 }
00245 };
00246
00247 #endif