thread.h
00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 
00012 
00013 
00014 
00015 
00016 
00017 
00018 
00019 
00020 
00021 
00022 
00023 
00024 
00025 
00026 
00027 
00028 
00029 
00030 
00031 
00037 #ifndef PAR_THREAD_H
00038 #define PAR_THREAD_H
00039 
00040 #include "fastlib/par/task.h"
00041 
00042 
00043 #include "fastlib/base/base.h"
00044 
00045 #include <pthread.h>
00046 
00053 class Thread {
00054   FORBID_ACCIDENTAL_COPIES(Thread);
00055   
00056  public:
00057   enum {
00058     LOW_PRIORITY = 20,
00059     NORMAL_PRIORITY = 0
00060   };
00061   
00062  private:
00063 #ifdef DEBUG
00064   enum {UNINIT, READY, ATTACHED, DETACHED, DONE} status_;
00065 #endif
00066   pthread_t thread_;
00067   Task *task_;
00068   
00069   static void *ThreadMain_(void *self) {
00070     Thread* thread = reinterpret_cast<Thread*>(self);
00071     
00072     thread->task_->Run();
00073     
00074     return NULL;
00075   }
00076 
00077   void Exit_() {
00078     pthread_exit(NULL);
00079   }
00080   
00081  public:
00082   Thread() {
00083 #ifdef DEBUG
00084     DEBUG_ONLY(status_ = UNINIT);
00085 #endif
00086   }
00087   ~Thread() {
00088 #ifdef DEBUG
00089     DEBUG_ASSERT(status_ == DETACHED || status_ == READY || status_ == DONE || status_ == UNINIT);
00090     DEBUG_ONLY(status_ = UNINIT);
00091 #endif
00092   }
00093   
00097   void Init(Task* task_in) {
00098 #ifdef DEBUG
00099     DEBUG_ASSERT(status_ == UNINIT);
00100 #endif
00101     task_ = task_in;
00102 #ifdef DEBUG
00103     DEBUG_ONLY(status_ = READY);
00104 #endif
00105   }
00106   
00110   void Start() {
00111 #ifdef DEBUG
00112     DEBUG_ASSERT(status_ == READY);
00113 #endif
00114     pthread_create(&thread_, NULL,
00115         ThreadMain_, reinterpret_cast<void*>(this));
00116 #ifdef DEBUG
00117     DEBUG_ONLY(status_ = ATTACHED);
00118 #endif
00119   }
00120   
00129   void Start(int prio) {
00130     pthread_attr_t tattr;
00131     sched_param param;
00132 
00133 #ifdef DEBUG
00134     DEBUG_ASSERT(status_ == READY);
00135 #endif
00136     pthread_attr_init(&tattr);
00137     pthread_attr_getschedparam(&tattr, ¶m);
00138     param.sched_priority = prio;
00139     pthread_attr_setschedparam(&tattr, ¶m);
00140     pthread_create(&thread_, &tattr,
00141         ThreadMain_, reinterpret_cast<void*>(this));
00142     pthread_attr_destroy(&tattr);
00143 #ifdef DEBUG
00144     DEBUG_ONLY(status_ = ATTACHED);
00145 #endif
00146   }
00147   
00152   void Detach() {
00153 #ifdef DEBUG
00154     DEBUG_ASSERT(status_ == ATTACHED);
00155 #endif
00156     pthread_detach(thread_);
00157 #ifdef DEBUG
00158     DEBUG_ONLY(status_ = DETACHED);
00159 #endif
00160   }
00161   
00167   void WaitStop() {
00168 #ifdef DEBUG
00169     DEBUG_ASSERT(status_ == ATTACHED);
00170 #endif
00171     pthread_join(thread_, NULL);
00172 #ifdef DEBUG
00173     DEBUG_ONLY(status_ = DONE);
00174 #endif
00175   }
00176   
00180   Task* task() const {
00181     return task_;
00182   }
00183 };
00184 
00188 class Mutex {
00189   FORBID_ACCIDENTAL_COPIES(Mutex);
00190   friend class WaitCondition;
00191  
00192  public:
00193   struct DummyRecursiveAttribute {};
00194 
00195  private:
00196   mutable pthread_mutex_t mutex_;
00197 
00198  public:
00199   static Mutex global;
00200 
00201  public:
00202   Mutex() {
00203 #if defined(DEBUG) && defined(PTHREAD_ERRORCHECK_MUTEX_INITIALIZER_NP)
00204     mutex_ = (pthread_mutex_t)PTHREAD_ERRORCHECK_MUTEX_INITIALIZER_NP;
00205 #else
00206     mutex_ = (pthread_mutex_t)PTHREAD_MUTEX_INITIALIZER;
00207 #endif
00208   }
00209   Mutex(DummyRecursiveAttribute v) {
00210 #ifdef PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP
00211     mutex_ = (pthread_mutex_t)PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP;
00212 #else
00213     pthread_mutexattr_t attr;
00214     pthread_mutexattr_init(&attr);
00215     pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
00216     pthread_mutex_init(&mutex_, &attr);
00217     pthread_mutexattr_destroy(&attr);
00218 #endif
00219   }
00220 
00221   ~Mutex() {
00222     pthread_mutex_destroy(&mutex_);
00223   }
00224   
00226   void Lock() const {
00227     int t = pthread_mutex_lock(&mutex_);
00228     (void)t; 
00229     DEBUG_ASSERT_MSG(t == 0, "Error locking mutex -- relocking a non-recursive mutex?");
00230   }
00231   
00233   bool TryLock() const {
00234     return likely(pthread_mutex_trylock(&mutex_) == 0);
00235   }
00236   
00238   void Unlock() const {
00239     pthread_mutex_unlock(&mutex_);
00240   }
00241 };
00242 
00247 class RecursiveMutex : public Mutex {
00248   FORBID_ACCIDENTAL_COPIES(RecursiveMutex);
00249 
00250  public:
00251   RecursiveMutex() : Mutex(DummyRecursiveAttribute()) {}
00252 };
00253 
00257 class WaitCondition {
00258   FORBID_ACCIDENTAL_COPIES(WaitCondition);
00259   
00260  private:
00261   pthread_cond_t cond_;
00262  
00263  public:
00264   WaitCondition() {
00265     pthread_cond_init(&cond_, NULL);
00266   }
00267   
00268   ~WaitCondition() {
00269     pthread_cond_destroy(&cond_);
00270   }
00271   
00272   void Signal() {
00273     pthread_cond_signal(&cond_);
00274   }
00275   
00276   void Broadcast() {
00277     pthread_cond_broadcast(&cond_);
00278   }
00279   
00280   void Wait(Mutex* mutex_to_unlock) {
00281     pthread_cond_wait(&cond_, &mutex_to_unlock->mutex_);
00282   }
00283   
00284   void WaitMillis(Mutex& mutex_to_unlock, unsigned millis) {
00285     struct timespec ts;
00286     
00287     ts.tv_sec = millis / 1000;
00288     ts.tv_nsec = (millis % 1000) * 1000000;
00289     
00290     pthread_cond_timedwait(&cond_, &mutex_to_unlock.mutex_, &ts);
00291   }
00292   
00293   void WaitSec(Mutex& mutex_to_unlock, unsigned sec) {
00294     struct timespec ts;
00295     
00296     ts.tv_sec = sec;
00297     ts.tv_nsec = 0;
00298     
00299     pthread_cond_timedwait(&cond_, &mutex_to_unlock.mutex_, &ts);
00300   }
00301 };
00302 
00313 class DoneCondition {
00314   Mutex mutex_;
00315   WaitCondition cond_;
00316   bool done_;
00317 
00318  public:
00319   DoneCondition() { done_ = false; }
00320   ~DoneCondition() {}
00321 
00325   void Wait() {
00326     mutex_.Lock();
00327     while (!done_) {
00328       cond_.Wait(&mutex_);
00329     }
00330     done_ = false;
00331     mutex_.Unlock();
00332   }
00333 
00337   void Done() {
00338     mutex_.Lock();
00339     DEBUG_ASSERT_MSG(done_ == false, "Doesn't do a counter -- should it?");
00340     done_ = true;
00341     cond_.Signal();
00342     mutex_.Unlock();
00343   }
00344 };
00345 
00349 class ValueCondition {
00350   Mutex mutex_;
00351   WaitCondition cond_;
00352   int value_;
00353 
00354  public:
00355   ValueCondition() { value_ = 0; }
00356   ~ValueCondition() {}
00357 
00361   void Wait(int v) {
00362     mutex_.Lock();
00363     while (value_ != v) {
00364       cond_.Wait(&mutex_);
00365     }
00366     mutex_.Unlock();
00367   }
00368 
00369   void WaitNot(int v) {
00370     mutex_.Lock();
00371     while (value_ == v) {
00372       cond_.Wait(&mutex_);
00373     }
00374     mutex_.Unlock();
00375   }
00376 
00377   void Set(int v) {
00378     mutex_.Lock();
00379     if (value_ != v) {
00380       value_ = v;
00381       cond_.Broadcast();
00382     }
00383     mutex_.Unlock();
00384   }
00385 };
00386 
00393 template<class TContained>
00394 class Lockable : public TContained, public Mutex {
00395   FORBID_ACCIDENTAL_COPIES(Lockable);
00396 
00397   Lockable() {}
00398   ~Lockable() {}
00399 };
00400 
00401 #endif