thread.h
00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00037 #ifndef PAR_THREAD_H
00038 #define PAR_THREAD_H
00039
00040 #include "fastlib/par/task.h"
00041
00042
00043 #include "fastlib/base/base.h"
00044
00045 #include <pthread.h>
00046
00053 class Thread {
00054 FORBID_ACCIDENTAL_COPIES(Thread);
00055
00056 public:
00057 enum {
00058 LOW_PRIORITY = 20,
00059 NORMAL_PRIORITY = 0
00060 };
00061
00062 private:
00063 #ifdef DEBUG
00064 enum {UNINIT, READY, ATTACHED, DETACHED, DONE} status_;
00065 #endif
00066 pthread_t thread_;
00067 Task *task_;
00068
00069 static void *ThreadMain_(void *self) {
00070 Thread* thread = reinterpret_cast<Thread*>(self);
00071
00072 thread->task_->Run();
00073
00074 return NULL;
00075 }
00076
00077 void Exit_() {
00078 pthread_exit(NULL);
00079 }
00080
00081 public:
00082 Thread() {
00083 #ifdef DEBUG
00084 DEBUG_ONLY(status_ = UNINIT);
00085 #endif
00086 }
00087 ~Thread() {
00088 #ifdef DEBUG
00089 DEBUG_ASSERT(status_ == DETACHED || status_ == READY || status_ == DONE || status_ == UNINIT);
00090 DEBUG_ONLY(status_ = UNINIT);
00091 #endif
00092 }
00093
00097 void Init(Task* task_in) {
00098 #ifdef DEBUG
00099 DEBUG_ASSERT(status_ == UNINIT);
00100 #endif
00101 task_ = task_in;
00102 #ifdef DEBUG
00103 DEBUG_ONLY(status_ = READY);
00104 #endif
00105 }
00106
00110 void Start() {
00111 #ifdef DEBUG
00112 DEBUG_ASSERT(status_ == READY);
00113 #endif
00114 pthread_create(&thread_, NULL,
00115 ThreadMain_, reinterpret_cast<void*>(this));
00116 #ifdef DEBUG
00117 DEBUG_ONLY(status_ = ATTACHED);
00118 #endif
00119 }
00120
00129 void Start(int prio) {
00130 pthread_attr_t tattr;
00131 sched_param param;
00132
00133 #ifdef DEBUG
00134 DEBUG_ASSERT(status_ == READY);
00135 #endif
00136 pthread_attr_init(&tattr);
00137 pthread_attr_getschedparam(&tattr, ¶m);
00138 param.sched_priority = prio;
00139 pthread_attr_setschedparam(&tattr, ¶m);
00140 pthread_create(&thread_, &tattr,
00141 ThreadMain_, reinterpret_cast<void*>(this));
00142 pthread_attr_destroy(&tattr);
00143 #ifdef DEBUG
00144 DEBUG_ONLY(status_ = ATTACHED);
00145 #endif
00146 }
00147
00152 void Detach() {
00153 #ifdef DEBUG
00154 DEBUG_ASSERT(status_ == ATTACHED);
00155 #endif
00156 pthread_detach(thread_);
00157 #ifdef DEBUG
00158 DEBUG_ONLY(status_ = DETACHED);
00159 #endif
00160 }
00161
00167 void WaitStop() {
00168 #ifdef DEBUG
00169 DEBUG_ASSERT(status_ == ATTACHED);
00170 #endif
00171 pthread_join(thread_, NULL);
00172 #ifdef DEBUG
00173 DEBUG_ONLY(status_ = DONE);
00174 #endif
00175 }
00176
00180 Task* task() const {
00181 return task_;
00182 }
00183 };
00184
00188 class Mutex {
00189 FORBID_ACCIDENTAL_COPIES(Mutex);
00190 friend class WaitCondition;
00191
00192 public:
00193 struct DummyRecursiveAttribute {};
00194
00195 private:
00196 mutable pthread_mutex_t mutex_;
00197
00198 public:
00199 static Mutex global;
00200
00201 public:
00202 Mutex() {
00203 #if defined(DEBUG) && defined(PTHREAD_ERRORCHECK_MUTEX_INITIALIZER_NP)
00204 mutex_ = (pthread_mutex_t)PTHREAD_ERRORCHECK_MUTEX_INITIALIZER_NP;
00205 #else
00206 mutex_ = (pthread_mutex_t)PTHREAD_MUTEX_INITIALIZER;
00207 #endif
00208 }
00209 Mutex(DummyRecursiveAttribute v) {
00210 #ifdef PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP
00211 mutex_ = (pthread_mutex_t)PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP;
00212 #else
00213 pthread_mutexattr_t attr;
00214 pthread_mutexattr_init(&attr);
00215 pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
00216 pthread_mutex_init(&mutex_, &attr);
00217 pthread_mutexattr_destroy(&attr);
00218 #endif
00219 }
00220
00221 ~Mutex() {
00222 pthread_mutex_destroy(&mutex_);
00223 }
00224
00226 void Lock() const {
00227 int t = pthread_mutex_lock(&mutex_);
00228 (void)t;
00229 DEBUG_ASSERT_MSG(t == 0, "Error locking mutex -- relocking a non-recursive mutex?");
00230 }
00231
00233 bool TryLock() const {
00234 return likely(pthread_mutex_trylock(&mutex_) == 0);
00235 }
00236
00238 void Unlock() const {
00239 pthread_mutex_unlock(&mutex_);
00240 }
00241 };
00242
00247 class RecursiveMutex : public Mutex {
00248 FORBID_ACCIDENTAL_COPIES(RecursiveMutex);
00249
00250 public:
00251 RecursiveMutex() : Mutex(DummyRecursiveAttribute()) {}
00252 };
00253
00257 class WaitCondition {
00258 FORBID_ACCIDENTAL_COPIES(WaitCondition);
00259
00260 private:
00261 pthread_cond_t cond_;
00262
00263 public:
00264 WaitCondition() {
00265 pthread_cond_init(&cond_, NULL);
00266 }
00267
00268 ~WaitCondition() {
00269 pthread_cond_destroy(&cond_);
00270 }
00271
00272 void Signal() {
00273 pthread_cond_signal(&cond_);
00274 }
00275
00276 void Broadcast() {
00277 pthread_cond_broadcast(&cond_);
00278 }
00279
00280 void Wait(Mutex* mutex_to_unlock) {
00281 pthread_cond_wait(&cond_, &mutex_to_unlock->mutex_);
00282 }
00283
00284 void WaitMillis(Mutex& mutex_to_unlock, unsigned millis) {
00285 struct timespec ts;
00286
00287 ts.tv_sec = millis / 1000;
00288 ts.tv_nsec = (millis % 1000) * 1000000;
00289
00290 pthread_cond_timedwait(&cond_, &mutex_to_unlock.mutex_, &ts);
00291 }
00292
00293 void WaitSec(Mutex& mutex_to_unlock, unsigned sec) {
00294 struct timespec ts;
00295
00296 ts.tv_sec = sec;
00297 ts.tv_nsec = 0;
00298
00299 pthread_cond_timedwait(&cond_, &mutex_to_unlock.mutex_, &ts);
00300 }
00301 };
00302
00313 class DoneCondition {
00314 Mutex mutex_;
00315 WaitCondition cond_;
00316 bool done_;
00317
00318 public:
00319 DoneCondition() { done_ = false; }
00320 ~DoneCondition() {}
00321
00325 void Wait() {
00326 mutex_.Lock();
00327 while (!done_) {
00328 cond_.Wait(&mutex_);
00329 }
00330 done_ = false;
00331 mutex_.Unlock();
00332 }
00333
00337 void Done() {
00338 mutex_.Lock();
00339 DEBUG_ASSERT_MSG(done_ == false, "Doesn't do a counter -- should it?");
00340 done_ = true;
00341 cond_.Signal();
00342 mutex_.Unlock();
00343 }
00344 };
00345
00349 class ValueCondition {
00350 Mutex mutex_;
00351 WaitCondition cond_;
00352 int value_;
00353
00354 public:
00355 ValueCondition() { value_ = 0; }
00356 ~ValueCondition() {}
00357
00361 void Wait(int v) {
00362 mutex_.Lock();
00363 while (value_ != v) {
00364 cond_.Wait(&mutex_);
00365 }
00366 mutex_.Unlock();
00367 }
00368
00369 void WaitNot(int v) {
00370 mutex_.Lock();
00371 while (value_ == v) {
00372 cond_.Wait(&mutex_);
00373 }
00374 mutex_.Unlock();
00375 }
00376
00377 void Set(int v) {
00378 mutex_.Lock();
00379 if (value_ != v) {
00380 value_ = v;
00381 cond_.Broadcast();
00382 }
00383 mutex_.Unlock();
00384 }
00385 };
00386
00393 template<class TContained>
00394 class Lockable : public TContained, public Mutex {
00395 FORBID_ACCIDENTAL_COPIES(Lockable);
00396
00397 Lockable() {}
00398 ~Lockable() {}
00399 };
00400
00401 #endif