00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041
00042
00043
00044
00045
00046
00047
00048
00049
00050
00051
00052
00053
00054 #include "w_defines.h"
00055
00056
00057
00058
00059
00060
00061
00062
00063
00064
00065
00066
00067
00068
00069
00070
00071
00072
00073
00074
00075
00076
00077 #include <w.h>
00078
00079 #include <w_debug.h>
00080 #include <w_stream.h>
00081 #include <cstdlib>
00082 #include <sched.h>
00083 #include <cstring>
00084
00085 #ifdef __SUNPRO_CC
00086 #include <sys/time.h>
00087 #else
00088 #include <ctime>
00089 #endif
00090
00091 #include <sys/wait.h>
00092 #include <new>
00093
00094 #include <sys/stat.h>
00095 #include <w_rusage.h>
00096 #include "tls.h"
00097
00098 #ifdef __GNUC__
00099 #pragma implementation "sthread.h"
00100 #endif
00101
00102 #include "sthread.h"
00103 #include "rand48.h"
00104 #include "sthread_stats.h"
00105 #include "stcore_pthread.h"
00106
00107 #ifdef PURIFY
00108 #include <purify.h>
00109 #endif
00110
00111 #ifdef EXPLICIT_TEMPLATE
00112 template class w_list_t<sthread_t, queue_based_lock_t>;
00113 template class w_list_i<sthread_t, queue_based_lock_t>;
00114 template class w_descend_list_t<sthread_t, queue_based_lock_t, sthread_t::priority_t>;
00115 template class w_keyed_list_t<sthread_t, queue_based_lock_t, sthread_t::priority_t>;
00116 #endif
00117
00118
00119
00120
00121
00122
00123
00124
00125
00126 static __thread rand48 tls_rng = RAND48_INITIALIZER;
00127
00128 int sthread_t::rand() { return tls_rng.rand(); }
00129 double sthread_t::drand() { return tls_rng.drand(); }
00130 int sthread_t::randn(int max) { return tls_rng.randn(max); }
00131
00132 class sthread_stats SthreadStats;
00133
00134 const
00135 #include "st_einfo_gen.h"
00136
00137 extern "C" void dumpthreads();
00138 extern "C" void threadstats();
00139
00140
00141
00142
00143
00144
00145
00146
00147
00148
00149
00150
00151
00152
00153
00154
00155
00156
00157
00158
00159
00160
00161
00162 class sthread_init_t : public sthread_base_t {
00163 public:
00164 NORET sthread_init_t();
00165 static void do_init();
00166 NORET ~sthread_init_t();
00167 private:
00168 static uint4_t initialized;
00169 static w_pthread_lock_t init_mutex;
00170 };
00171
00172 static sthread_init_t sthread_init;
00173
00174 w_base_t::uint4_t sthread_init_t::initialized = 0;
00175 w_pthread_lock_t sthread_init_t::init_mutex;
00176 w_base_t::int8_t sthread_t::max_os_file_size;
00177
00178 bool sthread_t::isStackOK(const char * , int ) const
00179 {
00180
00181
00182
00183 return true;
00184 }
00185
00186
00187
00188 void sthread_t::check_all_stacks(const char *file, int line)
00189 {
00190 w_list_i<sthread_t, queue_based_lock_t> i(*_class_list);
00191 unsigned corrupt = 0;
00192
00193 while (i.next()) {
00194 if (! i.curr()->isStackOK(file, line))
00195 corrupt++;
00196 }
00197
00198 if (corrupt > 0) {
00199 cerr << "sthread_t::check_all: " << corrupt
00200 << " thread stacks, dieing" << endl;
00201 W_FATAL(fcINTERNAL);
00202 }
00203 }
00204
00205
00206
00207
00208
00209
00210 extern "C" void stackoverflowed() {}
00211
00212 bool sthread_t::isStackFrameOK(size_t size)
00213 {
00214 bool ok;
00215 void *stack_top = &ok;
00216 void *_stack_top = &ok - size;
00217
00218 w_assert1(this->_danger < this->_start_frame);
00219 void *absolute_bottom = (void *)((char *)_start_frame - _stack_size);
00220
00221 if( stack_top < _danger) {
00222 if( stack_top <= absolute_bottom) {
00223 fprintf(stderr,
00224
00225 "STACK OVERFLOW frame (offset -%ld) %p bottom %p danger %p top %p stack_size %ld \n",
00226
00227 (long int) size, _stack_top, absolute_bottom, _danger, _start_frame,
00228 (long int) _stack_size);
00229 } else {
00230 fprintf(stderr,
00231
00232 "STACK IN GUARD AREA bottom %p frame (offset -%ld) %p danger %p top %p stack_size %ld \n",
00233
00234 absolute_bottom, (long int) size, _stack_top, _danger, _start_frame,
00235 (long int) _stack_size);
00236 }
00237 return false;
00238 }
00239
00240 return true;
00241 }
00242
00243
00244
00245
00246
00247
00248
00249
00250
00251
00252 sthread_t* sthread_t::_main_thread = 0;
00253 const w_base_t::uint4_t MAIN_THREAD_ID(1);
00254 w_base_t::uint4_t sthread_t::_next_id = MAIN_THREAD_ID;
00255 sthread_list_t* sthread_t::_class_list = 0;
00256 queue_based_lock_t sthread_t::_class_list_lock;
00257
00258 stime_t sthread_t::boot_time = stime_t::now();
00259
00260
00261
00262
00263
00264
00265
00266 sthread_t* &sthread_t::me_lval() {
00267
00268
00269
00270
00271 static __thread sthread_t* _me(NULL);
00272 return _me;
00273 }
00274
00275
00276
00277
00278
00279
00280
00281
00282
00283 w_rc_t sthread_t::cold_startup()
00284 {
00285
00286 _class_list = new sthread_list_t(W_LIST_ARG(sthread_t, _class_link),
00287 &_class_list_lock);
00288 if (_class_list == 0)
00289 W_FATAL(fcOUTOFMEMORY);
00290
00291
00292 struct timeval now;
00293 gettimeofday(&now, NULL);
00294
00295
00296 ::srand(now.tv_usec);
00297
00298
00299
00300
00301 sthread_main_t *main = new sthread_main_t;
00302 if (!main)
00303 W_FATAL(fcOUTOFMEMORY);
00304 me_lval() = _main_thread = main;
00305 W_COERCE( main->fork() );
00306
00307 if (me() != main)
00308 W_FATAL(stINTERNAL);
00309
00310 #if defined(PURIFY)
00311
00312 purify_name_thread(me()->name());
00313 #endif
00314
00315 return RCOK;
00316 }
00317
00318
00319
00320
00321
00322
00323
00324
00325 w_rc_t sthread_t::shutdown()
00326 {
00327 if (me() != _main_thread) {
00328 cerr << "sthread_t::shutdown(): not main thread!" << endl;
00329 return RC(stINTERNAL);
00330 }
00331
00332 return RCOK;
00333 }
00334
00335
00336
00337
00338
00339
00340
00341
00342
00343
00344 sthread_main_t::sthread_main_t()
00345 : sthread_t(t_regular, "main_thread", 0)
00346 {
00347
00348
00349
00350
00351 }
00352
00353
00354
00355
00356
00357
00358
00359
00360
00361 void sthread_main_t::run()
00362 {
00363 }
00364
00365
00366
00367
00368
00369
00370
00371
00372
00373
00374 w_rc_t sthread_t::set_priority(priority_t priority)
00375 {
00376 CRITICAL_SECTION(cs, _wait_lock);
00377 _priority = priority;
00378
00379
00380 if (_priority <= min_priority) _priority = min_priority;
00381 if (_priority > max_priority) _priority = max_priority;
00382
00383 if (_status == t_ready) {
00384 cerr << "sthread_t::set_priority() :- "
00385 << "cannot change priority of ready thread" << endl;
00386 W_FATAL(stINTERNAL);
00387 }
00388
00389 return RCOK;
00390 }
00391
00392
00393
00394
00395
00396
00397
00398
00399
00400 void sthread_t::sleep(timeout_in_ms timeout, const char *reason)
00401 {
00402 reason = (reason && *reason) ? reason : "sleep";
00403
00404
00405
00406
00407
00408
00409 CRITICAL_SECTION(cs, _wait_lock);
00410 _sleeping = true;
00411 (void) _block(timeout, reason, this);
00412 _sleeping = false;
00413 }
00414
00415
00416
00417
00418
00419
00420
00421 void sthread_t::wakeup()
00422 {
00423 CRITICAL_SECTION(cs, _wait_lock);
00424 if(_sleeping) _unblock(stOK);
00425 }
00426
00427
00428
00429
00430
00431
00432
00433 w_rc_t
00434 sthread_t::join(timeout_in_ms )
00435 {
00436 w_rc_t rc;
00437 {
00438 CRITICAL_SECTION(cs, _start_terminate_lock);
00439
00440
00441
00442
00443 if (!_forked) {
00444 rc = RC(stOS);
00445 } else
00446 {
00447 cs.exit();
00448
00449
00450
00451 #define TRACE_START_TERM 0
00452 #if TRACE_START_TERM
00453 { w_ostrstream o;
00454 o << *this << endl;
00455 fprintf(stderr,
00456 "me: %#lx Joining on (%s = %s) \n",
00457 (long) pthread_self() ,
00458 name(), o.c_str()
00459 );
00460 fflush(stderr);
00461 }
00462 #endif
00463 sthread_core_exit(_core, _terminated);
00464
00465 #if TRACE_START_TERM
00466 { w_ostrstream o;
00467 o << *this << endl;
00468 fprintf(stderr,
00469 "me: %#lx Join on (%s = %s) done\n",
00470 (long) pthread_self() ,
00471 name(), o.c_str()
00472 );
00473 fflush(stderr);
00474 }
00475 DBGTHRD(<< "Join on " << name() << " successful");
00476 #endif
00477 }
00478 }
00479
00480 return rc;
00481 }
00482
00483
00484
00485
00486
00487
00488
00489
00490 w_rc_t sthread_t::fork()
00491 {
00492 {
00493 sthread_init_t::do_init();
00494 CRITICAL_SECTION(cs, _start_terminate_lock);
00495
00496 if (_forked)
00497 return RC(stOS);
00498
00499
00500 if(this != _main_thread)
00501 {
00502 CRITICAL_SECTION(cs, _class_list_lock);
00503 _class_list->append(this);
00504 }
00505
00506
00507 _forked = true;
00508 if(this == _main_thread) {
00509
00510 CRITICAL_SECTION(cs_thread, _wait_lock);
00511 _status = t_running;
00512 } else {
00513
00514 DO_PTHREAD( pthread_cond_signal(_start_cond) );
00515 }
00516 }
00517 #if TRACE_START_TERM
00518 { w_ostrstream o;
00519 o << *this << endl;
00520 fprintf(stderr, "me: %#lx Forked: %s : %s\n",
00521 (long) pthread_self() ,
00522 (const char *)(name()?name():"anonymous"),
00523 o.c_str()
00524 );
00525 fflush(stderr);
00526 }
00527 #endif
00528
00529 return RCOK;
00530 }
00531
00532
00533
00534
00535
00536
00537
00538
00539
00540 sthread_t::sthread_t(priority_t pr,
00541 const char *nm,
00542 unsigned stack_size)
00543 : sthread_named_base_t(nm),
00544 user(0),
00545 id(_next_id++),
00546 _start_terminate_lock(new pthread_mutex_t),
00547 _start_cond(new pthread_cond_t),
00548 _sleeping(false),
00549 _forked(false),
00550 _terminated(false),
00551 _unblock_flag(false),
00552 _core(0),
00553 _status(t_virgin),
00554 _priority(pr)
00555 {
00556 if(!_start_terminate_lock || !_start_cond )
00557 W_FATAL(fcOUTOFMEMORY);
00558
00559 DO_PTHREAD(pthread_cond_init(_start_cond, NULL));
00560 DO_PTHREAD(pthread_mutex_init(_start_terminate_lock, NULL));
00561
00562 _core = new sthread_core_t;
00563 if (!_core)
00564 W_FATAL(fcOUTOFMEMORY);
00565 _core->sthread = (void *)this;
00566
00567
00568
00569
00570
00571 if (_priority > max_priority)
00572 _priority = max_priority;
00573 else if (_priority <= min_priority)
00574 _priority = min_priority;
00575
00576
00577
00578
00579 DO_PTHREAD(pthread_mutex_init(&_wait_lock, NULL));
00580 DO_PTHREAD(pthread_cond_init(&_wait_cond, NULL));
00581
00582
00583
00584
00585
00586
00587
00588
00589
00590
00591
00592 if (sthread_core_init(_core, __start, this, stack_size) == -1) {
00593 cerr << "sthread_t: cannot initialize thread core" << endl;
00594 W_FATAL(stINTERNAL);
00595 }
00596
00597 #if TRACE_START_TERM
00598 { w_ostrstream o;
00599 o << *this << endl;
00600 fprintf(stderr, "me: %#lx Constructed: %s : %s\n",
00601 (long) pthread_self() ,
00602 (const char *)(name()?name():"anonymous"),
00603 o.c_str()
00604 );
00605 fflush(stderr);
00606 }
00607
00608
00609
00610
00611
00612
00613
00614
00615
00616 #endif
00617
00618 }
00619
00620
00621
00622
00623
00624
00625
00626
00627
00628
00629 sthread_t::~sthread_t()
00630 {
00631
00632
00633
00634
00635
00636 {
00637 CRITICAL_SECTION(cs, _wait_lock);
00638
00639
00640
00641
00642
00643
00644
00645
00646 w_assert1(_status == t_virgin
00647 || _status == t_defunct
00648 || _status == t_blocked
00649 );
00650
00651 if (_link.member_of()) {
00652 W_FORM2(cerr,("sthread_t(%#lx): \"%s\": destroying a thread on a list!",
00653 (long)this, name()));
00654 }
00655 }
00656 #if TRACE_START_TERM
00657 { w_ostrstream o;
00658 o << *this << endl;
00659 fprintf(stderr,
00660 "me: %#lx Destruction of (%s = %s) \n",
00661 (long) pthread_self() ,
00662 name(), o.c_str()
00663 );
00664 fflush(stderr);
00665 }
00666 #endif
00667 sthread_core_exit(_core, _terminated);
00668
00669 delete _core;
00670 _core = 0;
00671
00672 DO_PTHREAD(pthread_cond_destroy(_start_cond));
00673 delete _start_cond;
00674 _start_cond = 0;
00675
00676 DO_PTHREAD(pthread_mutex_destroy(_start_terminate_lock));
00677 delete _start_terminate_lock;
00678 _start_terminate_lock = 0;
00679
00680 }
00681
00682 #ifndef PTHREAD_STACK_MIN
00683
00684
00685
00686 size_t get_pthread_stack_min()
00687 {
00688 static size_t gotit(0);
00689 if(!gotit) {
00690 gotit = sysconf(_SC_THREAD_STACK_MIN);
00691 if(!gotit) {
00692 const char *errmsg =
00693 "Platform does not appear to conform to POSIX 1003.1c-1995 re: limits";
00694
00695 W_FATAL_MSG(fcINTERNAL, << errmsg);
00696 }
00697 w_assert1(gotit > 0);
00698 }
00699 return gotit;
00700 }
00701 #endif
00702
00703
00704
00705
00706 void sthread_t::__start(void *arg)
00707 {
00708 sthread_t* t = (sthread_t*) arg;
00709 me_lval() = t;
00710 t->_start_frame = &t;
00711
00712 #ifndef PTHREAD_STACK_MIN
00713 size_t PTHREAD_STACK_MIN = get_pthread_stack_min();
00714 #endif
00715
00716 #if HAVE_PTHREAD_ATTR_GETSTACKSIZE
00717 pthread_attr_t attr;
00718 size_t sz=0;
00719 int e = pthread_attr_init(&attr);
00720 if(e) {
00721 fprintf(stderr,"Cannot init pthread_attr e=%d\n", e);
00722 ::exit(1);
00723 }
00724 else
00725 {
00726 e = pthread_attr_getstacksize( &attr, &sz);
00727 if(e || sz==0) {
00728 #if HAVE_PTHREAD_ATTR_GETSTACK
00729 void *voidp(NULL);
00730 e = pthread_attr_getstack( &attr, &voidp, &sz);
00731 if(e || sz == 0)
00732 #endif
00733 {
00734 #if W_DEBUG_LEVEL > 2
00735 fprintf(stderr,"Cannot get pthread stack size e=%d, sz=%lld\n",
00736 e, (long long)sz);
00737 #endif
00738 sz = PTHREAD_STACK_MIN;
00739 }
00740 }
00741 }
00742 #define GUARD 8192*4
00743 if(sz < GUARD) {
00744
00745 #ifndef PTHREAD_STACK_MIN_SUBSTITUTE
00746
00747
00748
00749 #define PTHREAD_STACK_MIN_SUBSTITUTE 0x100000
00750 #endif
00751 sz = PTHREAD_STACK_MIN_SUBSTITUTE;
00752 #if W_DEBUG_LEVEL > 2
00753 fprintf(stderr,"using %lld temporarily\n", (long long)sz);
00754 #endif
00755 }
00756 t->_stack_size = sz;
00757
00758
00759
00760
00761
00762 sz -= GUARD;
00763 t->_danger = (void *)((char *)t->_start_frame - sz);
00764
00765 #endif
00766 w_assert1(t->_danger < t->_start_frame);
00767 w_assert1(t->_stack_size > 0);
00768
00769 t->_start();
00770 }
00771
00772
00773
00774
00775
00776
00777
00778 void sthread_t::_start()
00779 {
00780 tls_tricks::tls_manager::thread_init();
00781 w_assert1(me() == this);
00782
00783
00784 w_assert1(isStackFrameOK(0));
00785 {
00786 CRITICAL_SECTION(cs, _start_terminate_lock);
00787 if(_forked) {
00788
00789
00790
00791 CRITICAL_SECTION(cs_thread, _wait_lock);
00792 _status = t_running;
00793 } else {
00794 DO_PTHREAD(pthread_cond_wait(_start_cond, _start_terminate_lock));
00795 CRITICAL_SECTION(cs_thread, _wait_lock);
00796 _status = t_running;
00797 }
00798 }
00799
00800 #if defined(PURIFY)
00801
00802
00803 purify_name_thread(name());
00804 #endif
00805
00806 {
00807
00808
00809
00810
00811
00812
00813 static queue_based_lock_t rand_mutex;
00814
00815 long seed1, seed2;
00816 {
00817 CRITICAL_SECTION(cs, rand_mutex);
00818 seed1 = ::rand();
00819 seed2 = ::rand();
00820 }
00821 tls_rng.seed( (seed1 << 24) ^ seed2);
00822 }
00823
00824
00825 {
00826
00827 w_assert1(me() == this);
00828 #ifdef STHREAD_CXX_EXCEPTION
00829
00830
00831
00832 try {
00833 before_run();
00834 run();
00835 after_run();
00836 }
00837 catch (...) {
00838 cerr << endl
00839 << "sthread_t(id = " << id << " name = " << name()
00840 << "): run() threw an exception."
00841 << endl
00842 << endl;
00843 }
00844 #else
00845 before_run();
00846 run();
00847 after_run();
00848 #endif
00849 }
00850
00851
00852 {
00853 CRITICAL_SECTION(cs, _wait_lock);
00854 w_assert3(me() == this);
00855 _status = t_defunct;
00856 _link.detach();
00857 }
00858 {
00859 CRITICAL_SECTION(cs, _class_list_lock);
00860 _class_link.detach();
00861 }
00862
00863 w_assert3(this == me());
00864
00865 {
00866 w_assert1(_status == t_defunct);
00867
00868 DBGTHRD(<< name() << " terminating");
00869 tls_tricks::tls_manager::thread_fini();
00870 DBGTHRD(<< name() << " pthread_exiting");
00871 pthread_exit(0);
00872 }
00873
00874 W_FATAL(stINTERNAL);
00875 }
00876
00877
00878
00879
00880
00881
00882
00883
00884
00885
00886
00887
00888
00889 w_rc_t
00890 sthread_t::block(
00891 pthread_mutex_t &lock,
00892 timeout_in_ms timeout,
00893 sthread_list_t* list,
00894 const char* const caller,
00895 const void * id)
00896 {
00897 w_rc_t::errcode_t rce = _block(&lock, timeout, list, caller, id);
00898 if(rce) return RC(rce);
00899 return RCOK;
00900 }
00901
00902 w_rc_t::errcode_t
00903 sthread_t::block(int4_t timeout )
00904 {
00905 return _block(NULL, timeout);
00906 }
00907
00908 w_rc_t::errcode_t
00909 sthread_t::_block(
00910 pthread_mutex_t *lock,
00911 timeout_in_ms timeout,
00912 sthread_list_t* list,
00913 const char* const caller,
00914 const void * id)
00915 {
00916 w_rc_t::errcode_t rce(stOK);
00917 sthread_t* self = me();
00918 {
00919 CRITICAL_SECTION(cs, self->_wait_lock);
00920
00921
00922
00923
00924 w_assert3(self->_link.member_of() == 0);
00925 if (list) {
00926 list->put_in_order(self);
00927 }
00928
00929 if(lock) {
00930
00931 DO_PTHREAD(pthread_mutex_unlock(lock));
00932 }
00933 rce = _block(timeout, caller, id);
00934 }
00935 if(rce == stTIMEOUT) {
00936 if(lock) {
00937 CRITICAL_SECTION(outer_cs, &lock);
00938
00939 CRITICAL_SECTION(cs, self->_wait_lock);
00940 self->_link.detach();
00941 } else {
00942 CRITICAL_SECTION(cs, self->_wait_lock);
00943 self->_link.detach();
00944 }
00945 }
00946
00947 return rce;
00948 }
00949
00950 void sthread_t::timeout_to_timespec(timeout_in_ms timeout, struct timespec &when)
00951 {
00952 w_assert1(timeout != WAIT_IMMEDIATE);
00953 w_assert1(timeout != sthread_t::WAIT_FOREVER);
00954 if(timeout > 0) {
00955 struct timeval now;
00956
00957
00958 gettimeofday(&now, NULL);
00959 when.tv_sec = now.tv_sec + timeout/1000;
00960 when.tv_nsec = now.tv_usec*1000 + 100000*(timeout%1000);
00961 if(when.tv_nsec >= 1000*1000*1000) {
00962 when.tv_sec++;
00963 when.tv_nsec -= 1000*1000*1000;
00964 }
00965 }
00966 }
00967
00968 w_rc_t::errcode_t
00969 sthread_t::_block(
00970 timeout_in_ms timeout,
00971 const char* const
00972 ,
00973 const void *
00974 )
00975 {
00976
00977
00978
00979
00980
00981 sthread_t* self = me();
00982 w_assert1(timeout != WAIT_IMMEDIATE);
00983
00984
00985
00986
00987 status_t old_status = self->_status;
00988 self->_status = t_blocked;
00989
00990 int error = 0;
00991 self->_unblock_flag = false;
00992 if(timeout > 0) {
00993 timespec when;
00994 timeout_to_timespec(timeout, when);
00995
00996
00997
00998
00999 while(!error && !self->_unblock_flag) {
01000 error = pthread_cond_timedwait(&self->_wait_cond,
01001 &self->_wait_lock, &when);
01002 w_assert1(error == ETIMEDOUT || error == 0);
01003
01004 if(!error) break;
01005 }
01006 }
01007 else {
01008
01009 w_assert1(timeout == sthread_t::WAIT_FOREVER);
01010
01011
01012
01013 while(!error && !self->_unblock_flag)
01014
01015 error = pthread_cond_wait(&self->_wait_cond, &self->_wait_lock);
01016 }
01017
01018 switch(error) {
01019 case ETIMEDOUT:
01020
01021 W_COERCE(self->_unblock(stTIMEOUT));
01022
01023 case 0:
01024
01025
01026
01027
01028 self->_status = old_status;
01029 #ifdef SM_THREAD_SAFE_ERRORS
01030 if(self->_rc.is_error()) {
01031 self->_rc->claim();
01032 w_rc_t rc(self->_rce);
01033 self->_rc = RCOK;
01034 return rc;
01035 }
01036 return RCOK;
01037 #else
01038 return self->_rce;
01039 #endif
01040 default:
01041 self->_status = old_status;
01042 return sthread_t::stOS;
01043 }
01044 }
01045
01046
01047
01048
01049
01050
01051
01052
01053
01054
01055 w_rc_t
01056 sthread_t::unblock(w_rc_t::errcode_t e)
01057 {
01058 CRITICAL_SECTION(cs, _wait_lock);
01059
01060
01061
01062
01063
01064
01065 _link.detach();
01066 return _unblock(e);
01067
01068 }
01069
01070
01071 w_rc_t
01072 sthread_t::_unblock(w_rc_t::errcode_t e)
01073 {
01074 _status = t_ready;
01075
01076
01077
01078
01079 if (e)
01080 _rce = e;
01081 else
01082 _rce = stOK;
01083
01084
01085
01086
01087 _unblock_flag = true;
01088 membar_producer();
01089 DO_PTHREAD(pthread_cond_signal(&_wait_cond));
01090 _status = t_running;
01091
01092 return RCOK;
01093 }
01094
01095
01096
01097
01098
01099
01100
01101
01102
01103
01104
01105
01106 void sthread_t::yield()
01107 {
01108 #define USE_YIELD
01109 #ifdef USE_YIELD
01110 sthread_t* self = me();
01111 CRITICAL_SECTION(cs, self->_wait_lock);
01112 w_assert3(self->_status == t_running);
01113 self->_status = t_ready;
01114 cs.pause();
01115 sched_yield();
01116 cs.resume();
01117 self->_status = t_running;
01118 #endif
01119 }
01120
01121
01122 void sthread_t::dumpall(const char *str, ostream &o)
01123 {
01124 if (str)
01125 o << str << ": " << endl;
01126
01127 dumpall(o);
01128 }
01129
01130 void sthread_t::dumpall(ostream &o)
01131 {
01132
01133
01134
01135 CRITICAL_SECTION(cs, _class_list_lock);
01136 w_list_i<sthread_t, queue_based_lock_t> i(*_class_list);
01137
01138 while (i.next()) {
01139 o << "******* ";
01140 if (me() == i.curr())
01141 o << " --->ME<---- ";
01142 o << endl;
01143
01144 i.curr()->_dump(o);
01145 }
01146 }
01147
01148
01149
01150 void sthread_t::_dump(ostream &o) const
01151 {
01152 o << *this << endl;
01153 }
01154
01155
01156
01157
01158
01159
01160 static void print_time(ostream &o, const sinterval_t &real,
01161 const sinterval_t &user, const sinterval_t &kernel)
01162 {
01163 sinterval_t total(user + kernel);
01164 double pcpu = ((double)total / (double)real) * 100.0;
01165 double pcpu2 = ((double)user / (double)real) * 100.0;
01166
01167 o << "\t" << "real: " << real
01168 << endl;
01169 o << "\tcpu:"
01170 << " kernel: " << kernel
01171 << " user: " << user
01172 << " total: " << total
01173 << endl;
01174 o << "\t%CPU:"
01175 << " " << setprecision(3) << pcpu
01176 << " %user: " << setprecision(2) << pcpu2;
01177 o
01178 << endl;
01179 }
01180
01181 void sthread_t::dump_stats(ostream &o)
01182 {
01183 o << SthreadStats;
01184
01185
01186
01187
01188 struct rusage ru;
01189 int n;
01190
01191 stime_t now(stime_t::now());
01192 n = getrusage(RUSAGE_SELF, &ru);
01193 if (n == -1) {
01194 w_rc_t e = RC(fcOS);
01195 cerr << "getrusage() fails:" << endl << e << endl;
01196 return;
01197 }
01198
01199 sinterval_t real(now - boot_time);
01200 sinterval_t kernel(ru.ru_stime);
01201 sinterval_t user(ru.ru_utime);
01202
01203
01204
01205
01206 static sinterval_t last_real;
01207 static sinterval_t last_kernel;
01208 static sinterval_t last_user;
01209 static bool last_valid = false;
01210
01211 o << "TIME:" << endl;
01212 print_time(o, real, user, kernel);
01213 if (last_valid) {
01214 sinterval_t r(real - last_real);
01215 sinterval_t u(user - last_user);
01216 sinterval_t k(kernel - last_kernel);
01217 o << "RECENT:" << endl;
01218 print_time(o, r, u, k);
01219 }
01220 else
01221 last_valid = true;
01222
01223 last_kernel = kernel;
01224 last_user = user;
01225 last_real = real;
01226
01227 o << endl;
01228 }
01229
01230 void sthread_t::reset_stats()
01231 {
01232 SthreadStats.clear();
01233 }
01234
01235
01236 const char *sthread_t::status_strings[] = {
01237 "defunct",
01238 "virgin",
01239 "ready",
01240 "running",
01241 "blocked",
01242 "boot"
01243 };
01244
01245 const char *sthread_t::priority_strings[]= {
01246 "idle_time",
01247 "fixed_low",
01248 "regular",
01249 "time_critical"
01250 };
01251
01252
01253 ostream& operator<<(ostream &o, const sthread_t &t)
01254 {
01255 return t.print(o);
01256 }
01257
01258
01259
01260
01261
01262
01263
01264 ostream &sthread_t::print(ostream &o) const
01265 {
01266 o << "thread id = " << id ;
01267
01268 if (name()) {
01269 o << ", name = " << name() ? name() : "anonymous";
01270 };
01271
01272 o
01273 << ", addr = " << (void *) this
01274 << ", core = " << (void *) _core << endl;
01275 o
01276 << "priority = " << sthread_t::priority_strings[priority()]
01277 << ", status = " << sthread_t::status_strings[status()];
01278 o << endl;
01279
01280 if (user)
01281 o << "user = " << user << endl;
01282
01283 if ((status() != t_defunct) && !isStackOK(__FILE__,__LINE__))
01284 {
01285 cerr << "*** warning: Thread stack overflow ***" << endl;
01286 }
01287
01288 return o;
01289 }
01290
01291
01292
01293
01294
01295
01296
01297
01298
01299
01300 void sthread_t::for_each_thread(ThreadFunc& f)
01301 {
01302
01303
01304
01305 CRITICAL_SECTION(cs, _class_list_lock);
01306 w_list_i<sthread_t, queue_based_lock_t> i(*_class_list);
01307
01308 while (i.next()) {
01309 f(*i.curr());
01310 }
01311 }
01312
01313 void print_timeout(ostream& o, const sthread_base_t::timeout_in_ms timeout)
01314 {
01315 if (timeout > 0) {
01316 o << timeout;
01317 } else if (timeout >= -5) {
01318 static const char* names[] = {"WAIT_IMMEDIATE",
01319 "WAIT_FOREVER",
01320 "WAIT_ANY",
01321 "WAIT_ALL",
01322 "WAIT_SPECIFIED_BY_THREAD",
01323 "WAIT_SPECIFIED_BY_XCT"};
01324 o << names[-timeout];
01325 } else {
01326 o << "UNKNOWN_TIMEOUT_VALUE(" << timeout << ")";
01327 }
01328 }
01329
01330 occ_rwlock::occ_rwlock()
01331 : _active_count(0)
01332 {
01333 _write_lock._lock = _read_lock._lock = this;
01334 DO_PTHREAD(pthread_mutex_init(&_read_write_mutex, NULL));
01335 DO_PTHREAD(pthread_cond_init(&_read_cond, NULL));
01336 DO_PTHREAD(pthread_cond_init(&_write_cond, NULL));
01337 }
01338
01339 occ_rwlock::~occ_rwlock()
01340 {
01341 DO_PTHREAD(pthread_mutex_destroy(&_read_write_mutex));
01342 DO_PTHREAD(pthread_cond_destroy(&_read_cond));
01343 DO_PTHREAD(pthread_cond_destroy(&_write_cond));
01344 _write_lock._lock = _read_lock._lock = NULL;
01345 }
01346
01347 void occ_rwlock::release_read()
01348 {
01349 membar_exit();
01350 w_assert1(READER <= (int) _active_count);
01351 unsigned count = atomic_add_32_nv(&_active_count, -READER);
01352 if(count == WRITER) {
01353
01354 CRITICAL_SECTION(cs, _read_write_mutex);
01355 DO_PTHREAD(pthread_cond_signal(&_write_cond));
01356 }
01357 }
01358
01359 void occ_rwlock::acquire_read()
01360 {
01361 int count = atomic_add_32_nv(&_active_count, READER);
01362 while(count & WRITER) {
01363
01364 count = atomic_add_32_nv(&_active_count, -READER);
01365 {
01366 CRITICAL_SECTION(cs, _read_write_mutex);
01367
01368
01369 if(count == WRITER)
01370 DO_PTHREAD(pthread_cond_signal(&_write_cond));
01371
01372 while(*&_active_count & WRITER) {
01373 DO_PTHREAD(pthread_cond_wait(&_read_cond, &_read_write_mutex));
01374 }
01375 }
01376 count = atomic_add_32_nv(&_active_count, READER);
01377 }
01378 membar_enter();
01379 }
01380
01381 void occ_rwlock::release_write()
01382 {
01383 w_assert9(_active_count & WRITER);
01384 CRITICAL_SECTION(cs, _read_write_mutex);
01385 atomic_add_32(&_active_count, -WRITER);
01386 DO_PTHREAD(pthread_cond_broadcast(&_read_cond));
01387 }
01388
01389 void occ_rwlock::acquire_write()
01390 {
01391
01392 CRITICAL_SECTION(cs, _read_write_mutex);
01393 while(*&_active_count & WRITER) {
01394 DO_PTHREAD(pthread_cond_wait(&_read_cond, &_read_write_mutex));
01395 }
01396
01397
01398 int count = atomic_add_32_nv(&_active_count, WRITER);
01399 w_assert1(count & WRITER);
01400
01401
01402 while(count != WRITER) {
01403 DO_PTHREAD(pthread_cond_wait(&_write_cond, &_read_write_mutex));
01404 count = *&_active_count;
01405 }
01406 }
01407
01408
01409
01410 sthread_name_t::sthread_name_t()
01411 {
01412 memset(_name, '\0', sizeof(_name));
01413 }
01414
01415 sthread_name_t::~sthread_name_t()
01416 {
01417 }
01418
01419 void
01420 sthread_name_t::rename(
01421
01422
01423 const char* n1,
01424 const char* n2,
01425 const char* n3)
01426 {
01427 const int sz = sizeof(_name) - 1;
01428 size_t len = 0;
01429 _name[0] = '\0';
01430 if (n1) {
01431 #if W_DEBUG_LEVEL > 2
01432 len = strlen(n1);
01433 if(n2) len += strlen(n2);
01434 if(n3) len += strlen(n3);
01435 len++;
01436 if(len>sizeof(_name)) {
01437 cerr << "WARNING-- name too long for sthread_named_t: "
01438 << n1 << n2 << n3;
01439 }
01440 #endif
01441
01442
01443 strncpy(_name, n1, sz);
01444 len = strlen(_name);
01445 if (n2 && (int)len < sz) {
01446 strncat(_name, n2, sz - len);
01447 len = strlen(_name);
01448 if (n3 && (int)len < sz)
01449 strncat(_name, n3, sz - len);
01450 }
01451
01452 _name[sz] = '\0';
01453 }
01454 }
01455
01456 void
01457 sthread_named_base_t::unname()
01458 {
01459 rename(0,0,0);
01460 }
01461
01462 void
01463 sthread_named_base_t::rename(
01464
01465
01466 const char* n1,
01467 const char* n2,
01468 const char* n3)
01469 {
01470 _name.rename(n1,n2,n3);
01471 }
01472
01473 sthread_named_base_t::~sthread_named_base_t()
01474 {
01475 unname();
01476 }
01477
01478
01479
01480
01481
01482
01483 smthread_t* sthread_t::dynamic_cast_to_smthread()
01484 {
01485 return 0;
01486 }
01487
01488
01489 const smthread_t* sthread_t::dynamic_cast_to_const_smthread() const
01490 {
01491 return 0;
01492 }
01493
01494
01495
01496
01497
01498
01499
01500
01501
01502
01503 void dumpthreads()
01504 {
01505 sthread_t::dumpall("dumpthreads()", cerr);
01506 sthread_t::dump_io(cerr);
01507
01508 }
01509
01510 void threadstats()
01511 {
01512 sthread_t::dump_stats(cerr);
01513 }
01514
01515
01516
01517 static void get_large_file_size(w_base_t::int8_t &max_os_file_size)
01518 {
01519
01520
01521
01522
01523 os_rlimit_t r;
01524 int n;
01525
01526 n = os_getrlimit(RLIMIT_FSIZE, &r);
01527 if (n == -1) {
01528 w_rc_t e = RC(fcOS);
01529 cerr << "getrlimit(RLIMIT_FSIZE):" << endl << e << endl;
01530 W_COERCE(e);
01531 }
01532 if (r.rlim_cur < r.rlim_max) {
01533 r.rlim_cur = r.rlim_max;
01534 n = os_setrlimit(RLIMIT_FSIZE, &r);
01535 if (n == -1) {
01536 w_rc_t e = RC(fcOS);
01537 cerr << "setrlimit(RLIMIT_FSIZE, " << r.rlim_cur
01538 << "):" << endl << e << endl;
01539 cerr << e << endl;
01540 W_FATAL(fcINTERNAL);
01541 }
01542 }
01543 max_os_file_size = w_base_t::int8_t(r.rlim_cur);
01544
01545
01546
01547
01548
01549 if (max_os_file_size < 0) {
01550 max_os_file_size = w_base_t::uint8_t(r.rlim_cur) >> 1;
01551 w_assert1( max_os_file_size > 0);
01552 }
01553 }
01554
01555
01556
01557
01558
01559
01560
01561
01562
01563
01564
01565
01566
01567
01568 #include "sthread_vtable_enum.h"
01569
01570
01571
01572
01573 void sthread_t::initialize_sthreads_package()
01574 { sthread_init_t::do_init(); }
01575
01576 NORET
01577 sthread_init_t::sthread_init_t() { }
01578
01579 void
01580 sthread_init_t::do_init()
01581 {
01582
01583 if (sthread_init_t::initialized == 0)
01584 {
01585 CRITICAL_SECTION(cs, init_mutex);
01586
01587
01588 if (sthread_init_t::initialized == 0)
01589 {
01590 sthread_init_t::initialized ++;
01591
01592 get_large_file_size(sthread_t::max_os_file_size);
01593
01594
01595
01596
01597 if (! w_error_t::insert(
01598 "Threads Package",
01599 error_info,
01600 sizeof(error_info) / sizeof(error_info[0]))) {
01601
01602 cerr << "sthread_init_t::do_init: "
01603 << " cannot register error code" << endl;
01604
01605 W_FATAL(stINTERNAL);
01606 }
01607
01608 W_COERCE(sthread_t::cold_startup());
01609 }
01610 }
01611 }
01612
01613
01614
01615
01616
01617
01618
01619
01620
01621
01622 NORET
01623 sthread_init_t::~sthread_init_t()
01624 {
01625 CRITICAL_SECTION(cs, init_mutex);
01626
01627
01628
01629 w_assert1 (sthread_init_t::initialized <= 1) ;
01630 if (--sthread_init_t::initialized == 0)
01631 {
01632
01633 W_COERCE(sthread_t::shutdown());
01634
01635
01636
01637
01638
01639 sthread_t::_main_thread->_status = sthread_t::t_defunct;
01640
01641 delete sthread_t::_main_thread;
01642 sthread_t::_main_thread = 0;
01643
01644 delete sthread_t::_class_list;
01645 sthread_t::_class_list = 0;
01646 }
01647 }
01648
01649 pthread_t sthread_t::myself() { return _core->pthread; }
01650