thread.h

00001 /* MLPACK 0.2
00002  *
00003  * Copyright (c) 2008, 2009 Alexander Gray,
00004  *                          Garry Boyer,
00005  *                          Ryan Riegel,
00006  *                          Nikolaos Vasiloglou,
00007  *                          Dongryeol Lee,
00008  *                          Chip Mappus, 
00009  *                          Nishant Mehta,
00010  *                          Hua Ouyang,
00011  *                          Parikshit Ram,
00012  *                          Long Tran,
00013  *                          Wee Chin Wong
00014  *
00015  * Copyright (c) 2008, 2009 Georgia Institute of Technology
00016  *
00017  * This program is free software; you can redistribute it and/or
00018  * modify it under the terms of the GNU General Public License as
00019  * published by the Free Software Foundation; either version 2 of the
00020  * License, or (at your option) any later version.
00021  *
00022  * This program is distributed in the hope that it will be useful, but
00023  * WITHOUT ANY WARRANTY; without even the implied warranty of
00024  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00025  * General Public License for more details.
00026  *
00027  * You should have received a copy of the GNU General Public License
00028  * along with this program; if not, write to the Free Software
00029  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
00030  * 02110-1301, USA.
00031  */
00037 #ifndef PAR_THREAD_H
00038 #define PAR_THREAD_H
00039 
00040 #include "fastlib/par/task.h"
00041 //#include "task.h"
00042 
00043 #include "fastlib/base/base.h"
00044 
00045 #include <pthread.h>
00046 
00053 class Thread {
00054   FORBID_ACCIDENTAL_COPIES(Thread);
00055   
00056  public:
00057   enum {
00058     LOW_PRIORITY = 20,
00059     NORMAL_PRIORITY = 0
00060   };
00061   
00062  private:
00063 #ifdef DEBUG
00064   enum {UNINIT, READY, ATTACHED, DETACHED, DONE} status_;
00065 #endif
00066   pthread_t thread_;
00067   Task *task_;
00068   
00069   static void *ThreadMain_(void *self) {
00070     Thread* thread = reinterpret_cast<Thread*>(self);
00071     
00072     thread->task_->Run();
00073     
00074     return NULL;
00075   }
00076 
00077   void Exit_() {
00078     pthread_exit(NULL);
00079   }
00080   
00081  public:
00082   Thread() {
00083 #ifdef DEBUG
00084     DEBUG_ONLY(status_ = UNINIT);
00085 #endif
00086   }
00087   ~Thread() {
00088 #ifdef DEBUG
00089     DEBUG_ASSERT(status_ == DETACHED || status_ == READY || status_ == DONE || status_ == UNINIT);
00090     DEBUG_ONLY(status_ = UNINIT);
00091 #endif
00092   }
00093   
00097   void Init(Task* task_in) {
00098 #ifdef DEBUG
00099     DEBUG_ASSERT(status_ == UNINIT);
00100 #endif
00101     task_ = task_in;
00102 #ifdef DEBUG
00103     DEBUG_ONLY(status_ = READY);
00104 #endif
00105   }
00106   
00110   void Start() {
00111 #ifdef DEBUG
00112     DEBUG_ASSERT(status_ == READY);
00113 #endif
00114     pthread_create(&thread_, NULL,
00115         ThreadMain_, reinterpret_cast<void*>(this));
00116 #ifdef DEBUG
00117     DEBUG_ONLY(status_ = ATTACHED);
00118 #endif
00119   }
00120   
00129   void Start(int prio) {
00130     pthread_attr_t tattr;
00131     sched_param param;
00132 
00133 #ifdef DEBUG
00134     DEBUG_ASSERT(status_ == READY);
00135 #endif
00136     pthread_attr_init(&tattr);
00137     pthread_attr_getschedparam(&tattr, &param);
00138     param.sched_priority = prio;
00139     pthread_attr_setschedparam(&tattr, &param);
00140     pthread_create(&thread_, &tattr,
00141         ThreadMain_, reinterpret_cast<void*>(this));
00142     pthread_attr_destroy(&tattr);
00143 #ifdef DEBUG
00144     DEBUG_ONLY(status_ = ATTACHED);
00145 #endif
00146   }
00147   
00152   void Detach() {
00153 #ifdef DEBUG
00154     DEBUG_ASSERT(status_ == ATTACHED);
00155 #endif
00156     pthread_detach(thread_);
00157 #ifdef DEBUG
00158     DEBUG_ONLY(status_ = DETACHED);
00159 #endif
00160   }
00161   
00167   void WaitStop() {
00168 #ifdef DEBUG
00169     DEBUG_ASSERT(status_ == ATTACHED);
00170 #endif
00171     pthread_join(thread_, NULL);
00172 #ifdef DEBUG
00173     DEBUG_ONLY(status_ = DONE);
00174 #endif
00175   }
00176   
00180   Task* task() const {
00181     return task_;
00182   }
00183 };
00184 
00188 class Mutex {
00189   FORBID_ACCIDENTAL_COPIES(Mutex);
00190   friend class WaitCondition;
00191  
00192  public:
00193   struct DummyRecursiveAttribute {};
00194 
00195  private:
00196   mutable pthread_mutex_t mutex_;
00197 
00198  public:
00199   static Mutex global;
00200 
00201  public:
00202   Mutex() {
00203 #if defined(DEBUG) && defined(PTHREAD_ERRORCHECK_MUTEX_INITIALIZER_NP)
00204     mutex_ = (pthread_mutex_t)PTHREAD_ERRORCHECK_MUTEX_INITIALIZER_NP;
00205 #else
00206     mutex_ = (pthread_mutex_t)PTHREAD_MUTEX_INITIALIZER;
00207 #endif
00208   }
00209   Mutex(DummyRecursiveAttribute v) {
00210 #ifdef PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP
00211     mutex_ = (pthread_mutex_t)PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP;
00212 #else
00213     pthread_mutexattr_t attr;
00214     pthread_mutexattr_init(&attr);
00215     pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
00216     pthread_mutex_init(&mutex_, &attr);
00217     pthread_mutexattr_destroy(&attr);
00218 #endif
00219   }
00220 
00221   ~Mutex() {
00222     pthread_mutex_destroy(&mutex_);
00223   }
00224   
00226   void Lock() const {
00227     int t = pthread_mutex_lock(&mutex_);
00228     (void)t; // avoid an "unused variable" warning
00229     DEBUG_ASSERT_MSG(t == 0, "Error locking mutex -- relocking a non-recursive mutex?");
00230   }
00231   
00233   bool TryLock() const {
00234     return likely(pthread_mutex_trylock(&mutex_) == 0);
00235   }
00236   
00238   void Unlock() const {
00239     pthread_mutex_unlock(&mutex_);
00240   }
00241 };
00242 
00247 class RecursiveMutex : public Mutex {
00248   FORBID_ACCIDENTAL_COPIES(RecursiveMutex);
00249 
00250  public:
00251   RecursiveMutex() : Mutex(DummyRecursiveAttribute()) {}
00252 };
00253 
00257 class WaitCondition {
00258   FORBID_ACCIDENTAL_COPIES(WaitCondition);
00259   
00260  private:
00261   pthread_cond_t cond_;
00262  
00263  public:
00264   WaitCondition() {
00265     pthread_cond_init(&cond_, NULL);
00266   }
00267   
00268   ~WaitCondition() {
00269     pthread_cond_destroy(&cond_);
00270   }
00271   
00272   void Signal() {
00273     pthread_cond_signal(&cond_);
00274   }
00275   
00276   void Broadcast() {
00277     pthread_cond_broadcast(&cond_);
00278   }
00279   
00280   void Wait(Mutex* mutex_to_unlock) {
00281     pthread_cond_wait(&cond_, &mutex_to_unlock->mutex_);
00282   }
00283   
00284   void WaitMillis(Mutex& mutex_to_unlock, unsigned millis) {
00285     struct timespec ts;
00286     
00287     ts.tv_sec = millis / 1000;
00288     ts.tv_nsec = (millis % 1000) * 1000000;
00289     
00290     pthread_cond_timedwait(&cond_, &mutex_to_unlock.mutex_, &ts);
00291   }
00292   
00293   void WaitSec(Mutex& mutex_to_unlock, unsigned sec) {
00294     struct timespec ts;
00295     
00296     ts.tv_sec = sec;
00297     ts.tv_nsec = 0;
00298     
00299     pthread_cond_timedwait(&cond_, &mutex_to_unlock.mutex_, &ts);
00300   }
00301 };
00302 
00313 class DoneCondition {
00314   Mutex mutex_;
00315   WaitCondition cond_;
00316   bool done_;
00317 
00318  public:
00319   DoneCondition() { done_ = false; }
00320   ~DoneCondition() {}
00321 
00325   void Wait() {
00326     mutex_.Lock();
00327     while (!done_) {
00328       cond_.Wait(&mutex_);
00329     }
00330     done_ = false;
00331     mutex_.Unlock();
00332   }
00333 
00337   void Done() {
00338     mutex_.Lock();
00339     DEBUG_ASSERT_MSG(done_ == false, "Doesn't do a counter -- should it?");
00340     done_ = true;
00341     cond_.Signal();
00342     mutex_.Unlock();
00343   }
00344 };
00345 
00349 class ValueCondition {
00350   Mutex mutex_;
00351   WaitCondition cond_;
00352   int value_;
00353 
00354  public:
00355   ValueCondition() { value_ = 0; }
00356   ~ValueCondition() {}
00357 
00361   void Wait(int v) {
00362     mutex_.Lock();
00363     while (value_ != v) {
00364       cond_.Wait(&mutex_);
00365     }
00366     mutex_.Unlock();
00367   }
00368 
00369   void WaitNot(int v) {
00370     mutex_.Lock();
00371     while (value_ == v) {
00372       cond_.Wait(&mutex_);
00373     }
00374     mutex_.Unlock();
00375   }
00376 
00377   void Set(int v) {
00378     mutex_.Lock();
00379     if (value_ != v) {
00380       value_ = v;
00381       cond_.Broadcast();
00382     }
00383     mutex_.Unlock();
00384   }
00385 };
00386 
00393 template<class TContained>
00394 class Lockable : public TContained, public Mutex {
00395   FORBID_ACCIDENTAL_COPIES(Lockable);
00396 
00397   Lockable() {}
00398   ~Lockable() {}
00399 };
00400 
00401 #endif
Generated on Mon Jan 24 12:04:37 2011 for FASTlib by  doxygen 1.6.3