grain.h

00001 /* MLPACK 0.2
00002  *
00003  * Copyright (c) 2008, 2009 Alexander Gray,
00004  *                          Garry Boyer,
00005  *                          Ryan Riegel,
00006  *                          Nikolaos Vasiloglou,
00007  *                          Dongryeol Lee,
00008  *                          Chip Mappus, 
00009  *                          Nishant Mehta,
00010  *                          Hua Ouyang,
00011  *                          Parikshit Ram,
00012  *                          Long Tran,
00013  *                          Wee Chin Wong
00014  *
00015  * Copyright (c) 2008, 2009 Georgia Institute of Technology
00016  *
00017  * This program is free software; you can redistribute it and/or
00018  * modify it under the terms of the GNU General Public License as
00019  * published by the Free Software Foundation; either version 2 of the
00020  * License, or (at your option) any later version.
00021  *
00022  * This program is distributed in the hope that it will be useful, but
00023  * WITHOUT ANY WARRANTY; without even the implied warranty of
00024  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00025  * General Public License for more details.
00026  *
00027  * You should have received a copy of the GNU General Public License
00028  * along with this program; if not, write to the Free Software
00029  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
00030  * 02110-1301, USA.
00031  */
00043 #ifndef PAR_GRAIN_H
00044 #define PAR_GRAIN_H
00045 
00046 #include "fastlib/col/heap.h"
00047 
00048 #include "fastlib/par/thread.h"
00049 //#include "thread.h"
00050 
00106 template<typename TGrain>
00107 class GrainQueue {
00108  public:
00109   typedef TGrain Grain;
00110 
00111  private:
00112   MinHeap<double, Grain*> queue_;
00113   Mutex mutex_;
00114   
00115  public:
00116   GrainQueue() {}
00117   ~GrainQueue() {}
00118   
00122   void Init() {
00123     queue_.Init();
00124   }
00125   
00131   void Put(double difficulty, Grain *grain) {
00132     mutex_.Lock();
00133     queue_.Put(-difficulty, grain);
00134     mutex_.Unlock();
00135   }
00136   
00142   Grain *Pop() {
00143     mutex_.Lock();
00144     Grain *result = likely(queue_.size() != 0) ? queue_.Pop() : NULL;
00145     mutex_.Unlock();
00146     return result;
00147   }
00148   
00152   index_t size() const {
00153     return queue_.size();
00154   }
00155 };
00156 
00157 template<typename TGrain, typename TContext = int>
00158 class ThreadedGrainRunner {
00159   FORBID_ACCIDENTAL_COPIES(ThreadedGrainRunner);
00160  public:
00161   typedef TGrain Grain;
00162   typedef TContext Context;
00163 
00164  private:
00165   struct ThreadTask : public Task {
00166     ThreadedGrainRunner *runner_;
00167     
00168     ThreadTask(ThreadedGrainRunner *runner_in) {
00169       runner_ = runner_in;
00170     }
00171     
00172     void Run() {
00173       while (runner_->RunOneGrain()) {}
00174       delete this;
00175     }
00176   };
00177   
00178  private:
00179   GrainQueue<Grain> *queue_;
00180   Context context_;
00181   
00182  public:
00183   ThreadedGrainRunner() {}
00184   ~ThreadedGrainRunner() {}
00185   
00186   void Init(GrainQueue<Grain> *queue_in, Context context_in) {
00187     queue_ = queue_in;
00188     context_ = context_in;
00189   }
00190     
00199   bool RunOneGrain() {
00200     Grain *grain = queue_->Pop();
00201     if (unlikely(!grain)) {
00202       return false;
00203     } else {
00204       grain->Run(context_);
00205       delete grain;
00206       return true;
00207     }
00208   }
00209   
00220   Thread *SpawnThread() {
00221     ThreadTask *task = new ThreadTask(this);
00222     Thread *thread = new Thread();
00223     thread->Init(task);
00224     thread->Start();
00225     return thread;
00226   }
00227 
00232   void RunThreads(int num_threads) {
00233     ArrayList<Thread*> threads;
00234     
00235     threads.Init(num_threads);
00236     
00237     for (int i = 0; i < num_threads; i++) {
00238       threads[i] = SpawnThread();
00239     }
00240     for (int i = 0; i < num_threads; i++) {
00241       threads[i]->WaitStop();
00242       delete threads[i];
00243     }
00244   }
00245 };
00246 
00247 #endif
Generated on Mon Jan 24 12:04:37 2011 for FASTlib by  doxygen 1.6.3