sthread.h

Go to the documentation of this file.
00001 /* -*- mode:C++; c-basic-offset:4 -*-
00002      Shore-MT -- Multi-threaded port of the SHORE storage manager
00003    
00004                        Copyright (c) 2007-2009
00005       Data Intensive Applications and Systems Labaratory (DIAS)
00006                Ecole Polytechnique Federale de Lausanne
00007    
00008                          All Rights Reserved.
00009    
00010    Permission to use, copy, modify and distribute this software and
00011    its documentation is hereby granted, provided that both the
00012    copyright notice and this permission notice appear in all copies of
00013    the software, derivative works or modified versions, and any
00014    portions thereof, and that both notices appear in supporting
00015    documentation.
00016    
00017    This code is distributed in the hope that it will be useful, but
00018    WITHOUT ANY WARRANTY; without even the implied warranty of
00019    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. THE AUTHORS
00020    DISCLAIM ANY LIABILITY OF ANY KIND FOR ANY DAMAGES WHATSOEVER
00021    RESULTING FROM THE USE OF THIS SOFTWARE.
00022 */
00023 
00024 // -*- mode:c++; c-basic-offset:4 -*-
00025 /*<std-header orig-src='shore' incl-file-exclusion='STHREAD_H'>
00026 
00027  $Id: sthread.h,v 1.199 2010/07/07 21:43:47 nhall Exp $
00028 
00029 SHORE -- Scalable Heterogeneous Object REpository
00030 
00031 Copyright (c) 1994-99 Computer Sciences Department, University of
00032                       Wisconsin -- Madison
00033 All Rights Reserved.
00034 
00035 Permission to use, copy, modify and distribute this software and its
00036 documentation is hereby granted, provided that both the copyright
00037 notice and this permission notice appear in all copies of the
00038 software, derivative works or modified versions, and any portions
00039 thereof, and that both notices appear in supporting documentation.
00040 
00041 THE AUTHORS AND THE COMPUTER SCIENCES DEPARTMENT OF THE UNIVERSITY
00042 OF WISCONSIN - MADISON ALLOW FREE USE OF THIS SOFTWARE IN ITS
00043 "AS IS" CONDITION, AND THEY DISCLAIM ANY LIABILITY OF ANY KIND
00044 FOR ANY DAMAGES WHATSOEVER RESULTING FROM THE USE OF THIS SOFTWARE.
00045 
00046 This software was developed with support by the Advanced Research
00047 Project Agency, ARPA order number 018 (formerly 8230), monitored by
00048 the U.S. Army Research Laboratory under contract DAAB07-91-C-Q518.
00049 Further funding for this work was provided by DARPA through
00050 Rome Research Laboratory Contract No. F30602-97-2-0247.
00051 
00052 */
00053 
00054 /*  -- do not edit anything above this line --   </std-header>*/
00055 
00056 /*
00057  * The SHORE threads layer has some historical roots in the
00058  * the NewThreads implementation wrapped up as c++ objects.
00059  *
00060  * With release 6.0 of the SHORE Storage Manager, the NewThreads
00061  * functionality was substantially obviated.  Some bits and pieces
00062  * of the SHORE threads layer remains in the synchronization variables
00063  * in the sthread_t API.
00064  *
00065  * To the extent that any NewThreads code remains here, 
00066  * the following copyright applies: 
00067  *
00068  *   NewThreads is Copyright 1992, 1993, 1994, 1995, 1996, 1997 by:
00069  *
00070  *    Josef Burger    <bolo@cs.wisc.edu>
00071  *    Dylan McNamee   <dylan@cse.ogi.edu>
00072  *    Ed Felten       <felten@cs.princeton.edu>
00073  *
00074  *   All Rights Reserved.
00075  *
00076  *   NewThreads may be freely used as long as credit is given
00077  *   to the above authors and the above copyright is maintained.
00078  */
00079 
00080 /**\file sthread.h
00081  *\ingroup MACROS
00082  *
00083  * This file contains the Shore Threads API.
00084  */
00085 
00086 #ifndef STHREAD_H
00087 #define STHREAD_H
00088 
00089 #include "w_defines.h"
00090 #include "w_rc.h"
00091 #include "atomic_templates.h"
00092 #include "stime.h"
00093 #include "gethrtime.h"
00094 #include <vtable.h>
00095 #include <w_list.h>
00096 
00097 // this #include reflects the fact that sthreads is now just a pthreads wrapper
00098 #include <pthread.h>
00099 
00100 class sthread_t;
00101 class smthread_t;
00102 
00103 
00104 #ifdef __GNUC__
00105 #pragma interface
00106 #endif
00107 
00108 #ifndef SDISK_H
00109 #include <sdisk.h>
00110 #endif
00111 
00112 class vtable_row_t;
00113 class vtable_t;
00114 
00115 struct sthread_core_t;
00116 
00117 extern "C" void         dumpthreads(); // for calling from debugger
00118 
00119 
00120 /**\brief Base class for sthreads.  See \ref timeout_in_ms, \ref timeout_t
00121  */
00122 class sthread_base_t : public w_base_t {
00123 public:
00124 /**\cond skip */
00125     typedef unsigned int w_thread_id_t; // TODO REMOVE
00126     typedef w_thread_id_t id_t;
00127 /**\endcond skip */
00128 
00129     /* XXX this is really something for the SM, not the threads package;
00130        only WAIT_IMMEDIATE should ever make it to the threads package. */
00131 
00132     /**\enum timeout_t
00133      * \brief Special values for timeout_in_ms.
00134      *
00135      * \details sthreads package recognizes 2 WAIT_* values:
00136      * == WAIT_IMMEDIATE
00137      * and != WAIT_IMMEDIATE.
00138      *
00139      * If it's not WAIT_IMMEDIATE, it's assumed to be
00140      * a positive integer (milliseconds) used for the
00141      * select timeout.
00142      * WAIT_IMMEDIATE: no wait
00143      * WAIT_FOREVER:   may block indefinitely
00144      * The user of the thread (e.g., sm) had better
00145      * convert timeout that are negative values (WAIT_* below)
00146      * to something >= 0 before calling block().
00147      *
00148      * All other WAIT_* values other than WAIT_IMMEDIATE
00149      * are handled by sm layer:
00150      * WAIT_SPECIFIED_BY_THREAD: pick up a timeout_in_ms from the smthread.
00151      * WAIT_SPECIFIED_BY_XCT: pick up a timeout_in_ms from the transaction.
00152      * Anything else: not legitimate.
00153      * 
00154      * \sa timeout_in_ms
00155      */
00156     enum timeout_t {
00157     WAIT_IMMEDIATE     = 0, 
00158     WAIT_FOREVER     = -1,
00159     WAIT_SPECIFIED_BY_THREAD     = -4, // used by lock manager
00160     WAIT_SPECIFIED_BY_XCT = -5, // used by lock manager
00161     WAIT_NOT_USED = -6 // indicates last negative number used by sthreads
00162     };
00163     /* XXX int would also work, sized type not necessary */
00164     /**\typedef int4_t timeout_in_ms;
00165      * \brief Timeout in milliseconds if > 0
00166      * \details
00167      * sthread_t blocking methods take a timeout in milliseconds.
00168      * If the value is < 0, then it's expected to be a member of the
00169      * enumeration type timeout_t.
00170      *
00171      * \sa timeout_t
00172      */
00173     typedef int4_t timeout_in_ms;
00174 
00175 /**\cond skip */
00176     static const w_error_t::info_t     error_info[];
00177     static void  init_errorcodes();
00178 
00179 #include "st_error_enum_gen.h"
00180 
00181     enum {
00182     stOS = fcOS,
00183     stINTERNAL = fcINTERNAL,
00184     stNOTIMPLEMENTED = fcNOTIMPLEMENTED 
00185     };
00186 
00187     /* import sdisk base */
00188     typedef sdisk_base_t::fileoff_t    fileoff_t;
00189     typedef sdisk_base_t::filestat_t   filestat_t;
00190     typedef sdisk_base_t::iovec_t      iovec_t;
00191 
00192 
00193     /* XXX magic number */
00194     enum { iovec_max = 8 };
00195 
00196     enum {
00197     OPEN_RDWR = sdisk_base_t::OPEN_RDWR,
00198     OPEN_RDONLY = sdisk_base_t::OPEN_RDONLY,
00199     OPEN_WRONLY = sdisk_base_t::OPEN_WRONLY,
00200 
00201     OPEN_SYNC = sdisk_base_t::OPEN_SYNC,
00202     OPEN_TRUNC = sdisk_base_t::OPEN_TRUNC,
00203     OPEN_CREATE = sdisk_base_t::OPEN_CREATE,
00204     OPEN_EXCL = sdisk_base_t::OPEN_EXCL,
00205     OPEN_APPEND = sdisk_base_t::OPEN_APPEND,
00206     OPEN_RAW = sdisk_base_t::OPEN_RAW
00207     };
00208     enum {
00209     SEEK_AT_SET = sdisk_base_t::SEEK_AT_SET,
00210     SEEK_AT_CUR = sdisk_base_t::SEEK_AT_CUR,
00211     SEEK_AT_END = sdisk_base_t::SEEK_AT_END
00212     };
00213 /**\endcond skip */
00214 };
00215 
00216 /**\cond skip */
00217 class sthread_name_t {
00218 public:
00219     enum { NAME_ARRAY = 64 };
00220 
00221     char        _name[NAME_ARRAY];
00222 
00223     sthread_name_t();
00224     ~sthread_name_t();
00225 
00226     void rename(const char *n1, const char *n2=0, const char *n3=0);
00227 };
00228 
00229 class sthread_named_base_t: public sthread_base_t
00230 {
00231 public:
00232     NORET            sthread_named_base_t(
00233     const char*            n1 = 0,
00234     const char*            n2 = 0,
00235     const char*            n3 = 0);
00236     NORET            ~sthread_named_base_t();
00237     
00238     void            rename(
00239     const char*            n1,
00240     const char*            n2 = 0,
00241     const char*            n3 = 0);
00242 
00243     const char*            name() const;
00244     void                   unname();
00245 
00246 private:
00247     sthread_name_t        _name;
00248 };
00249 
00250 inline NORET
00251 sthread_named_base_t::sthread_named_base_t(
00252     const char*        n1,
00253     const char*        n2,
00254     const char*        n3)
00255 {
00256     rename(n1, n2, n3);
00257 
00258 }
00259 
00260 inline const char*
00261 sthread_named_base_t::name() const
00262 {
00263     return _name._name;
00264 }
00265 
00266 class sthread_main_t;
00267 
00268 /**\endcond skip */
00269 
00270 /**\brief A callback class for traversing the list of all sthreads.
00271  * \details
00272  * Use with for_each_thread. Somewhat costly because it's thread-safe.
00273  */
00274 class ThreadFunc
00275 {
00276     public:
00277     virtual void operator()(const sthread_t& thread) = 0;
00278     virtual NORET ~ThreadFunc() {}
00279 };
00280 
00281 
00282 class sthread_init_t;
00283 class sthread_main_t;
00284 
00285 // these macros allow us to notify the SunStudio race detector about lock acquires/releases
00286 
00287 #include "os_interface.h"
00288 
00289 /**\brief A test-and-test-and-set spinlock. 
00290  *
00291  * This lock is good for short, uncontended critical sections. 
00292  * If contention is high, use an mcs_lock. 
00293  * Long critical sections should use pthread_mutex_t.
00294  *
00295  * Tradeoffs are:
00296  *  - test-and-test-and-set locks: low-overhead but not scalable
00297  *  - queue-based locks: higher overhead but scalable
00298  *  - pthread mutexes : very high overhead and blocks, but frees up 
00299  *  cpu for other threads when number of cpus is fewer than number of threads
00300  *
00301  *  \sa REFSYNC
00302  */
00303 struct tatas_lock {
00304     /**\cond skip */
00305     enum { NOBODY=0 };
00306     typedef union  {
00307         pthread_t         handle;
00308 #undef CASFUNC 
00309 #if SIZEOF_PTHREAD_T==4
00310 #define CASFUNC atomic_cas_32
00311         unsigned int       bits;
00312 #elif SIZEOF_PTHREAD_T==8
00313 # define CASFUNC atomic_cas_64
00314         uint64_t           bits;
00315 #elif SIZEOF_PTHREAD_T==0
00316 #error  Configuration could not determine size of pthread_t. Fix configure.ac.
00317 #else 
00318 #error  Configuration determined size of pthread_t is unexpected. Fix sthread.h.
00319 #endif
00320     } holder_type_t;
00321     volatile holder_type_t _holder;
00322     /**\endcond skip */
00323 
00324     tatas_lock() { _holder.bits=NOBODY; }
00325 
00326 private:
00327     // CC mangles this as __1cKtatas_lockEspin6M_v_
00328     /// spin until lock is free
00329     void spin() { while(*&(_holder.handle)) ; }
00330 
00331 public:
00332     /// Try to acquire the lock immediately.
00333     bool try_lock() 
00334     {
00335         holder_type_t tid = { pthread_self() };
00336         bool success = false;
00337         unsigned int old_holder = 
00338                         CASFUNC(&_holder.bits, NOBODY, tid.bits);
00339         if(old_holder == NOBODY) {
00340             membar_enter();
00341             success = true;
00342         }
00343         
00344         return success;
00345     }
00346 
00347     /// Acquire the lock, spinning as long as necessary. 
00348     void acquire() {
00349         w_assert1(!is_mine());
00350         holder_type_t tid = { pthread_self() };
00351         do {
00352             spin();
00353         }
00354         while(CASFUNC(&_holder.bits, NOBODY, tid.bits));
00355         membar_enter();
00356         w_assert1(is_mine());
00357     }
00358 
00359     /// Release the lock
00360     void release() {
00361         membar_exit();
00362         w_assert1(is_mine()); // moved after the membar
00363         _holder.bits= NOBODY;
00364         {
00365             membar_enter(); // needed for the assert?
00366             w_assert1(!is_mine());
00367         }
00368     }
00369 
00370     /// True if this thread is the lock holder
00371     bool is_mine() const { return 
00372         pthread_equal(_holder.handle, pthread_self()) ? true : false; }
00373 #undef CASFUNC 
00374 };
00375 
00376 /**\brief Wrapper for pthread mutexes, with a queue-based lock API.
00377  *
00378  * This lock uses a Pthreads mutex for the lock.
00379  *
00380  * This is not a true queue-based lock, since
00381  * release doesn't inform the next node in the queue, and in fact the
00382  * nodes aren't kept in a queue.
00383  * It just gives pthread mutexes the same API as the other
00384  * queue-based locks so that we use the same idioms for
00385  * critical sections based on different kinds of locks.
00386  *
00387  * The idiom for using these locks is
00388  * that the qnode is on a threads's stack, so the qnode
00389  * implicitly identifies the owning thread.
00390  *
00391  * This allows us to add an is_mine() capability that otherwise
00392  * the pthread mutexen don't have.
00393  *
00394  * Finally, using this class ensures that the pthread_mutex_init/destroy
00395  * is done.
00396  *
00397  *  See also: \ref REFSYNC
00398  *
00399  */
00400 struct w_pthread_lock_t 
00401 {
00402     /**\cond skip */
00403     struct ext_qnode {
00404         w_pthread_lock_t* _held;
00405     };
00406 #define EXT_QNODE_INITIALIZER { NULL }
00407 
00408     typedef ext_qnode volatile* ext_qnode_ptr;
00409     /**\endcond skip */
00410 
00411 private:
00412     pthread_mutex_t     _mutex; // w_pthread_lock_t blocks on this
00413     /// Holder is this struct if acquire is successful.
00414     w_pthread_lock_t *  _holder;
00415 
00416 public:
00417     w_pthread_lock_t() :_holder(0) { pthread_mutex_init(&_mutex, 0); }
00418 
00419     ~w_pthread_lock_t() { w_assert1(!_holder); pthread_mutex_destroy(&_mutex);}
00420     
00421     /// Returns true if success.
00422     bool attempt(ext_qnode* me) {
00423         if(attempt( *me)) {
00424             me->_held = this;
00425             _holder = this;
00426             return true;
00427         }
00428         return false;
00429     }
00430 
00431 private:
00432     /// Returns true if success. Helper for attempt(ext_qnode *).
00433     bool attempt(ext_qnode & me) {
00434         w_assert1(!is_mine(&me));
00435         w_assert0( me._held == 0 );  // had better not 
00436         // be using this qnode for another lock!
00437         return pthread_mutex_trylock(&_mutex) == 0;
00438     }
00439 
00440 public:
00441     /// Acquire the lock and set the qnode to refer to this lock.
00442     void* acquire(ext_qnode* me) {
00443         w_assert1(!is_mine(me));
00444         w_assert1( me->_held == 0 );  // had better not 
00445         // be using this qnode for another lock!
00446         pthread_mutex_lock(&_mutex);
00447         me->_held = this;
00448         _holder = this;
00449         {
00450             membar_enter(); // needed for the assert
00451             w_assert1(is_mine(me)); // TODO: change to assert2
00452         }
00453         return 0;
00454     }
00455 
00456     /// Release the lock and clear the qnode.
00457     void release(ext_qnode &me) { release(&me); }
00458 
00459     /// Release the lock and clear the qnode.
00460     void release(ext_qnode_ptr me) { 
00461         // assert is_mine:
00462         w_assert1( _holder == me->_held ); 
00463         w_assert1(me->_held == this); 
00464          me->_held = 0; 
00465         _holder = 0;
00466         pthread_mutex_unlock(&_mutex); 
00467 #if W_DEBUG_LEVEL > 10
00468         // This is racy since the containing structure could
00469         // have been freed by the time we do this check.  Thus,
00470         // we'll remove it.
00471         {
00472             membar_enter(); // needed for the assertions?
00473             w_pthread_lock_t *h =  _holder;
00474             w_pthread_lock_t *m =  me->_held;
00475             w_assert1( (h==NULL && m==NULL)
00476                 || (h  != m) );
00477         }
00478 #endif
00479     }
00480 
00481     /**\brief Return true if this thread holds the lock.
00482      *
00483      * This method doesn't actually check for this pthread
00484      * holding the lock, but it checks that the qnode reference
00485      * is to this lock.  
00486      * The idiom for using these locks is
00487      * that the qnode is on a threads's stack, so the qnode
00488      * implicitly identifies the owning thread.
00489      */
00490     
00491     bool is_mine(ext_qnode* me) const { 
00492        if( me->_held == this ) {
00493            // only valid if is_mine 
00494           w_assert1( _holder == me->_held ); 
00495           return true;
00496        }
00497        return false;
00498     }
00499 };
00500 
00501 /**\def USE_PTHREAD_MUTEX
00502  * \brief Determines that we use pthread-based mutex for queue_based_lock_t
00503  *
00504  * \details
00505  * The Shore-MT release contained alternatives for scalable locks in
00506  * certain places in the storage manager; it was released with
00507  * these locks replaced by pthreads-based mutexes.
00508  *
00509  * The alternatives were described in \ref JPHAF1, but have not been
00510  * tested for the Shore 6.0 release. The code for these
00511  * alternatives is distributed for further experimentation, but it is not
00512  * not compiled in.  
00513  *
00514  * If you wish to experiment with the mcs locks, you'll have to 
00515  * undefine this macro.
00516  */
00517 #define USE_PTHREAD_MUTEX 1
00518 
00519 /**\defgroup SYNCPRIM Synchronization Primitives
00520  *\ingroup UNUSED 
00521  *
00522  * sthread/sthread.h: As distributed, a queue-based lock 
00523  * is a w_pthread_lock_t,
00524  * which is a wrapper around a pthread lock to give it a queue-based-lock API.
00525  * True queue-based locks are not used, nor are time-published
00526  * locks.
00527  * Code for these implementations is included for future 
00528  * experimentation, along with typedefs that should allow
00529  * easy substitution, as they all should have the same API.
00530  *
00531  * We don't offer the spin implementations at the moment.
00532  */
00533 /*
00534  * These typedefs are included to allow substitution at some  point.
00535  * Where there is a preference, the code should use the appropriate typedef.
00536  */
00537 
00538 typedef w_pthread_lock_t queue_based_block_lock_t; // blocking impl always ok
00539 #ifdef USE_PTHREAD_MUTEX
00540 typedef w_pthread_lock_t queue_based_spin_lock_t; // spin impl preferred
00541 typedef w_pthread_lock_t queue_based_lock_t; // might want to use spin impl
00542 #else
00543 #include <mcs_lock.h>
00544 typedef mcs_lock queue_based_spin_lock_t; // spin preferred
00545 typedef mcs_lock queue_based_lock_t;
00546 #endif
00547 
00548 #ifndef SRWLOCK_H
00549 #include <srwlock.h>
00550 #endif
00551 
00552 /**\brief A multiple-reader/single-writer lock based on pthreads (blocking)
00553  *
00554  * Use this to protect data structures that get hammered by
00555  *  reads and where updates are very rare.
00556  * It is used in the storage manager by the histograms (histo.cpp), 
00557  * and in place of some mutexen, where strict exclusion isn't required.
00558  *
00559  * This lock is used in the storage manager by the checkpoint thread
00560  * (the only acquire-writer) and other threads to be sure they don't
00561  * do certain nasty things when a checkpoint is going on.
00562  *
00563  * The idiom for using these locks is
00564  * that the qnode is on a threads's stack, so the qnode
00565  * implicitly identifies the owning thread.
00566  *
00567  *  See also: \ref REFSYNC
00568  *
00569  */
00570 struct occ_rwlock {
00571     occ_rwlock();
00572     ~occ_rwlock();
00573     /// The normal way to acquire a read lock.
00574     void acquire_read();
00575     /// The normal way to release a read lock.
00576     void release_read();
00577     /// The normal way to acquire a write lock.
00578     void acquire_write();
00579     /// The normal way to release a write lock.
00580     void release_write();
00581 
00582     /**\cond skip */
00583     /// Exposed for critical_section<>. Do not use directly.
00584     struct occ_rlock {
00585         occ_rwlock* _lock;
00586         void acquire() { _lock->acquire_read(); }
00587         void release() { _lock->release_read(); }
00588     };
00589     /// Exposed for critical_section<>. Do not use directly.
00590     struct occ_wlock {
00591         occ_rwlock* _lock;
00592         void acquire() { _lock->acquire_write(); }
00593         void release() { _lock->release_write(); }
00594     };
00595 
00596     /// Exposed for the latch manager.. Do not use directly.
00597     occ_rlock *read_lock() { return &_read_lock; }
00598     /// Exposed for the latch manager.. Do not use directly.
00599     occ_wlock *write_lock() { return &_write_lock; }
00600     /**\endcond skip */
00601 private:
00602     enum { WRITER=1, READER=2 };
00603     unsigned int volatile _active_count;
00604     occ_rlock _read_lock;
00605     occ_wlock _write_lock;
00606 
00607     pthread_mutex_t _read_write_mutex; // paired w/ _read_cond, _write_cond
00608     pthread_cond_t _read_cond; // paired w/ _read_write_mutex
00609     pthread_cond_t _write_cond; // paired w/ _read_write_mutex
00610 };
00611 
00612 typedef w_list_t<sthread_t, queue_based_lock_t>        sthread_list_t;
00613 
00614 
00615 /**\brief Thread class for all threads that use the Shore Storage Manager.
00616  *  
00617  *  All threads that perform \b any work on behalf of the storage
00618  *  manager or call any storage manager API \b must be an sthread_t or
00619  *  a class derived from sthread_t.
00620  *
00621  *  Storage manager threads use block/unblock methods provided by
00622  *  sthread, and use thread-local storage (data attributes of
00623  *  sthread_t).
00624  *
00625  *  This class also provides an os-independent API for file-system
00626  *  calls (open, read, write, close, etc.) used by the storage manager.
00627  *
00628  *  This class is a fairly thin layer over pthreads.  Client threads
00629  *  may use pthread synchronization primitives. 
00630  */
00631 class sthread_t : public sthread_named_base_t  
00632 {
00633     friend class sthread_init_t;
00634     friend class sthread_main_t;
00635     /* For access to block() and unblock() */
00636     friend class latch_t;
00637     /* For access to I/O stats */
00638 
00639 public:
00640     static void  initialize_sthreads_package();
00641 
00642     enum status_t {
00643         t_defunct,    // thread has terminated
00644         t_virgin,    // thread hasn't started yet    
00645         t_ready,    // thread is ready to run
00646         t_running,    // when me() is this thread 
00647         t_blocked,      // thread is blocked on something
00648         t_boot        // system boot
00649     };
00650     static const char *status_strings[];
00651 
00652     enum priority_t {
00653         t_time_critical = 1,
00654         t_regular    = 0,
00655         max_priority    = t_time_critical,
00656         min_priority    = t_regular
00657     };
00658     static const char *priority_strings[];
00659 
00660     /* Default stack size for a thread */
00661     enum { default_stack = 64*1024 };
00662 
00663     /*
00664      *  Class member variables
00665      */
00666     void*             user;    // user can use this 
00667     const id_t        id;
00668 
00669     // max_os_file_size is used by the sm and set in
00670     // static initialization of sthreads (sthread_init_t in sthread.cpp)
00671     static w_base_t::int8_t     max_os_file_size;
00672 
00673 private:
00674 
00675     // ASSUMES WE ALREADY LOCKED self->_wait_lock
00676     static w_rc_t::errcode_t        _block(
00677                             timeout_in_ms          timeout = WAIT_FOREVER,
00678                             const char* const      caller = 0,
00679                             const void *           id = 0);
00680 
00681     static w_rc_t::errcode_t        _block(
00682                             pthread_mutex_t        *lock, 
00683                             timeout_in_ms          timeout = WAIT_FOREVER,
00684                             sthread_list_t*        list = 0,
00685                             const char* const      caller = 0,
00686                             const void *           id = 0);
00687 
00688     w_rc_t               _unblock(w_rc_t::errcode_t e);
00689 
00690 public:
00691     static void          timeout_to_timespec(timeout_in_ms timeout, 
00692                                              struct timespec &when);
00693     w_rc_t               unblock(w_rc_t::errcode_t e);
00694     static w_rc_t        block(
00695                             pthread_mutex_t        &lock,
00696                             timeout_in_ms          timeout = WAIT_FOREVER,
00697                             sthread_list_t*        list = 0,
00698                             const char* const      caller = 0,
00699                             const void *           id = 0);
00700     static w_rc_t::errcode_t       block(int4_t  timeout = WAIT_FOREVER);
00701 
00702     virtual void        _dump(ostream &) const; // to be over-ridden
00703 
00704     // these traverse all threads
00705     static void       dumpall(const char *, ostream &);
00706     static void       dumpall(ostream &);
00707 
00708     static void       dump_io(ostream &);
00709     static void       dump_event(ostream &);
00710 
00711     static void       dump_stats(ostream &);
00712     static void       reset_stats();
00713 
00714     /// Collect a row of a virtual table. One row per thread.
00715     /// Subclasses override this.
00716     virtual void      vtable_collect(vtable_row_t &); // to be over-ridden
00717     /// Stuff the attribute names in this row.
00718     static  void      vtable_collect_names(vtable_row_t &); // to be over-ridden
00719 
00720     /// Collect an entire table, one row per thread that the sthreads package
00721     /// knows about. If attr_names_too is true, the first row will be
00722     /// attribute names.
00723     static int        collect(vtable_t&v, bool attr_names_too=true); 
00724                         // in vtable_sthread.cpp
00725 
00726     static void      find_stack(void *address);
00727     static void      for_each_thread(ThreadFunc& f);
00728 
00729     /* request stack overflow check, die on error. */
00730     static void      check_all_stacks(const char *file = "",
00731                              int line = 0);
00732     bool             isStackOK(const char *file = "", int line = 0) const;
00733 
00734     /* Recursion, etc stack depth estimator */
00735     bool             isStackFrameOK(size_t size = 0);
00736 
00737     w_rc_t           set_priority(priority_t priority);
00738     priority_t       priority() const;
00739     status_t         status() const;
00740 
00741 private:
00742 
00743 #ifdef WITHOUT_MMAP
00744     static w_rc_t     set_bufsize_memalign(size_t size, 
00745                         char *&buf_start /* in/out*/, long system_page_size);
00746 #endif
00747 #ifdef HAVE_HUGETLBFS
00748 public:
00749     // Must be called if we are configured with  hugetlbfs
00750     static w_rc_t     set_hugetlbfs_path(const char *path);
00751 private:
00752     static w_rc_t     set_bufsize_huge(size_t size, 
00753                         char *&buf_start /* in/out*/, long system_page_size);
00754 #endif
00755     static w_rc_t     set_bufsize_normal(size_t size, 
00756                         char *&buf_start /* in/out*/, long system_page_size);
00757     static void       align_bufsize(size_t size, long system_page_size,
00758                                                 long max_page_size);
00759     static long       get_max_page_size(long system_page_size);
00760     static void       align_for_sm(size_t requested_size);
00761 
00762 public:
00763     static int          do_unmap(); 
00764     /*
00765      *  Concurrent I/O ops
00766      */
00767     static char*        set_bufsize(size_t size);
00768     static w_rc_t       set_bufsize(size_t size, char *&buf_start /* in/out*/,
00769                                     bool use_normal_if_huge_fails=false);
00770 
00771     static w_rc_t        open(
00772                             const char*            path,
00773                             int                flags,
00774                             int                mode,
00775                             int&                fd);
00776     static w_rc_t        close(int fd);
00777     static w_rc_t        read(
00778                             int                 fd,
00779                             void*                 buf,
00780                             int                 n);
00781     static w_rc_t        write(
00782                             int                 fd, 
00783                             const void*             buf, 
00784                             int                 n);
00785     static w_rc_t        readv(
00786                             int                 fd, 
00787                             const iovec_t*             iov,
00788                             size_t                iovcnt);
00789     static w_rc_t        writev(
00790                             int                 fd,
00791                             const iovec_t*                iov,
00792                             size_t                 iovcnt);
00793 
00794     static w_rc_t        pread(int fd, void *buf, int n, fileoff_t pos);
00795     static w_rc_t        pwrite(int fd, const void *buf, int n,
00796                            fileoff_t pos);
00797     static w_rc_t        lseek(
00798                             int                fd,
00799                             fileoff_t            offset,
00800                             int                whence,
00801                             fileoff_t&            ret);
00802     /* returns an error if the seek doesn't match its destination */
00803     static w_rc_t        lseek(
00804                             int                fd,
00805                             fileoff_t                offset,
00806                             int                whence);
00807     static w_rc_t        fsync(int fd);
00808     static w_rc_t        ftruncate(int fd, fileoff_t sz);
00809     static w_rc_t        fstat(int fd, filestat_t &sb);
00810     static w_rc_t        fisraw(int fd, bool &raw);
00811 
00812 
00813     /*
00814      *  Misc
00815      */
00816     // NOTE: this returns a REFERENCE
00817     static sthread_t*    &me_lval() ;
00818     // NOTE: this returns a POINTER
00819     static sthread_t*    me() { return me_lval(); }
00820                          // for debugging:
00821     pthread_t            myself(); // pthread_t associated with this 
00822     static int           rand(); // returns an int in [0, 2**31)
00823     static double        drand(); // returns a double in [0.0, 1)
00824     static int           randn(int max); // returns an int in [0, max)
00825 
00826     /* XXX  sleep, fork, and wait exit overlap the unix version. */
00827 
00828     // sleep for timeout milliseconds
00829     void                 sleep(timeout_in_ms timeout = WAIT_IMMEDIATE,
00830                          const char *reason = 0);
00831     void                 wakeup();
00832 
00833     // wait for a thread to finish running
00834     w_rc_t            join(timeout_in_ms timeout = WAIT_FOREVER);
00835 
00836     // start a thread
00837     w_rc_t            fork();
00838 
00839     // give up the processor
00840     static void        yield();
00841     ostream            &print(ostream &) const;
00842 
00843     // anyone can wait and delete a thread
00844     virtual            ~sthread_t();
00845 
00846     // function to do runtime up-cast to smthread_t
00847     // return 0 if the sthread is not derrived from sm_thread_t.
00848     // should be removed when RTTI is supported
00849     virtual smthread_t*        dynamic_cast_to_smthread();
00850     virtual const smthread_t*  dynamic_cast_to_const_smthread() const;
00851 
00852 protected:
00853     sthread_t(
00854           priority_t    priority = t_regular,
00855           const char    *name = 0,
00856           unsigned        stack_size = default_stack);
00857 
00858     virtual void        before_run() { }
00859     virtual void        run() = 0;
00860     virtual void        after_run() { }
00861 
00862 private:
00863 
00864     /* start offset of sthread FDs, to differentiate from system FDs */
00865     enum { fd_base = 4000 };
00866     void *                      _start_frame;
00867     void *                      _danger;
00868     size_t                      _stack_size;
00869 
00870     pthread_mutex_t             _wait_lock; // paired with _wait_cond, also
00871                                 // protects _link
00872     pthread_cond_t              _wait_cond; // posted when thread should unblock
00873 
00874     pthread_mutex_t*            _start_terminate_lock; // _start_cond, _terminate_cond, _forked
00875     pthread_cond_t *            _start_cond; // paired w/ _start_terminate_lock
00876 
00877     volatile bool               _sleeping;
00878     volatile bool               _forked;
00879     bool                        _terminated; // protects against double calls
00880                                 // to sthread_core_exit
00881     volatile bool               _unblock_flag; // used internally by _block()
00882 
00883     fill4                       _dummy4valgrind;
00884     
00885     sthread_core_t *            _core;        // registers, stack, etc
00886     volatile status_t           _status;    // thread status
00887     priority_t                  _priority;     // thread priority
00888     w_rc_t::errcode_t           _rce;        // used in block/unblock
00889 
00890     w_link_t                    _link;        // protected by _wait_lock
00891 
00892     w_link_t                    _class_link;    // used in _class_list,
00893                                  // protected by _class_list_lock
00894     static sthread_list_t*      _class_list;
00895     static queue_based_lock_t   _class_list_lock; // for protecting _class_list
00896 
00897 
00898     /* XXX alignment probs in derived thread classes.  Sigh */
00899     // fill4                       _ex_fill;
00900 
00901     /* I/O subsystem */
00902     static    sdisk_t        **_disks;
00903     static    unsigned       open_max;
00904     static    unsigned       open_count;
00905 
00906     /* in-thread startup and shutdown */ 
00907     static void            __start(void *arg_thread);
00908     void                   _start();
00909 
00910 
00911     /* system initialization and shutdown */
00912     static w_rc_t        cold_startup();
00913     static w_rc_t        shutdown();
00914     static stime_t        boot_time;
00915     static sthread_t*    _main_thread; 
00916     static uint4_t        _next_id;    // unique id generator
00917 
00918 private:
00919     static int           _disk_buffer_disalignment;
00920     static size_t        _disk_buffer_size;
00921     static char *        _disk_buffer;
00922 };
00923 
00924 extern ostream &operator<<(ostream &o, const sthread_t &t);
00925 
00926 void print_timeout(ostream& o, const sthread_base_t::timeout_in_ms timeout);
00927 
00928 
00929 /**\cond skip */
00930 /**\brief The main thread. 
00931 *
00932 * Called from sthread_t::cold_startup(), which is
00933 * called from sthread_init_t::do_init(), which is 
00934 * called from sthread_t::initialize_sthreads_package(), which is called 
00935 * when the storage manager sets up options, among other places.
00936 */
00937 class sthread_main_t : public sthread_t  {
00938     friend class sthread_t;
00939     
00940 protected:
00941     NORET            sthread_main_t();
00942     virtual void        run();
00943 };
00944 /**\endcond skip */
00945 
00946 
00947 /**\cond skip */
00948 
00949 #define MUTEX_ACQUIRE(mutex)    W_COERCE((mutex).acquire());
00950 #define MUTEX_RELEASE(mutex)    (mutex).release();
00951 #define MUTEX_IS_MINE(mutex)    (mutex).is_mine()
00952 
00953 
00954 /**\def CRITICAL_SECTION(name, lock)
00955  *
00956  * This macro starts a critical section protected by the given lock
00957  * (2nd argument).  The critical_section structure it creates is
00958  * named by the 1st argument.
00959  * The rest of the scope (in which this macro is used) becomes the
00960  * scope of the critical section, since it is the destruction of this
00961  * critical_section structure that releases the lock.
00962  *
00963  * The programmer can release the lock early by calling <name>.pause()
00964  * or <name>.exit().
00965  * The programmer can reacquire the lock by calling <name>.resume() if
00966  * <name>.pause() was called, but not after <name>.exit().
00967  *
00968  * \sa critical_section
00969  */
00970 #define CRITICAL_SECTION(name, lock) critical_section<__typeof__(lock)&> name(lock)
00971 
00972 template<class Lock>
00973 struct critical_section;
00974 
00975 /**\brief Helper class for CRITICAL_SECTION idiom (macro).
00976  *
00977  * This templated class does nothing; its various specializations 
00978  * do the work of acquiring the given lock upon construction and
00979  * releasing it upon destruction. 
00980  * See the macros:
00981  * - SPECIALIZE_CS(Lock, Extra, ExtraInit, Acquire, Release)  
00982  * - CRITICAL_SECTION(name, lock) 
00983  */
00984 template<class Lock>
00985 struct critical_section<Lock*&> : critical_section<Lock&> {
00986     critical_section<Lock*&>(Lock* mutex) : critical_section<Lock&>(*mutex) { }
00987 };
00988 
00989 /*
00990  * NOTE: I added ExtraInit to make the initialization happen so that
00991  * assertions about holding the mutex don't fail.
00992  * At the same time, I added a holder to the w_pthread_lock_t
00993  * implementation so I could make assertions about the holder outside
00994  * the lock implementation itself.  This might seem like doubly
00995  * asserting, but in the cases where the critical section isn't
00996  * based on a pthread mutex, we really should have this clean
00997  * initialization and the check the assertions.
00998  */
00999 
01000 /**\def SPECIALIZE_CS(Lock, Extra, ExtraInit, Acquire, Release) 
01001  * \brief Macro that enables use of CRITICAL_SECTION(name,lock)
01002  *\addindex SPECIALIZE_CS
01003  * 
01004  * \details
01005  * Create a templated class that holds 
01006  *   - a reference to the given lock and
01007  *   - the Extra (2nd macro argument)
01008  *
01009  *  and it
01010  *   - applies the ExtraInit and Acquire commands upon construction,
01011  *   - applies the Release command upon destruction.
01012  *
01013  */
01014 #define SPECIALIZE_CS(Lock,Extra,ExtraInit,Acquire,Release) \
01015 template<>  struct critical_section<Lock&> { \
01016 critical_section(Lock &mutex) \
01017     : _mutex(&mutex)          \
01018     {   ExtraInit; Acquire; } \
01019     ~critical_section() {     \
01020         if(_mutex)            \
01021             Release;          \
01022             _mutex = NULL;    \
01023         }                     \
01024     void pause() { Release; } \
01025     void resume() { Acquire; }\
01026     void exit() { Release; _mutex = NULL; } \
01027     Lock &hand_off() {        \
01028         Lock* rval = _mutex;  \
01029         _mutex = NULL;        \
01030         return *rval;         \
01031     }                         \
01032 private:                      \
01033     Lock* _mutex;             \
01034     Extra;                    \
01035     void operator=(critical_section const &);   \
01036     critical_section(critical_section const &); \
01037 }
01038 
01039 
01040 // I undef-ed this and found all occurrances of CRITICAL_SECTION with this.
01041 // and hand-checked them.
01042 SPECIALIZE_CS(pthread_mutex_t, int _dummy,  (_dummy=0), 
01043     pthread_mutex_lock(_mutex), pthread_mutex_unlock(_mutex));
01044 
01045 // tatas_lock doesn't have is_mine, but I changed its release()
01046 // to Release and through compiling saw everywhere that uses release,
01047 // and fixed those places
01048 SPECIALIZE_CS(tatas_lock, int _dummy, (_dummy=0), 
01049     _mutex->acquire(), _mutex->release());
01050 
01051 // queue_based_lock_t asserts is_mine() in release()
01052 SPECIALIZE_CS(queue_based_lock_t, queue_based_lock_t::ext_qnode _me, (_me._held=0), 
01053     _mutex->acquire(&_me), _mutex->release(&_me));
01054 
01055 SPECIALIZE_CS(occ_rwlock::occ_rlock, int _dummy, (_dummy=0), 
01056     _mutex->acquire(), _mutex->release());
01057 
01058 SPECIALIZE_CS(occ_rwlock::occ_wlock, int _dummy, (_dummy=0), 
01059     _mutex->acquire(), _mutex->release());
01060 
01061 
01062 inline sthread_t::priority_t
01063 sthread_t::priority() const
01064 {
01065     return _priority;
01066 }
01067 
01068 inline sthread_t::status_t
01069 sthread_t::status() const
01070 {
01071     return _status;
01072 }
01073 
01074 #include <w_strstream.h>
01075 // Need string.h to get strerror_r 
01076 #include <string.h>
01077 
01078 #define DO_PTHREAD_BARRIER(x) \
01079 {   int res = x; \
01080     if(res && res != PTHREAD_BARRIER_SERIAL_THREAD) { \
01081        w_ostrstream S; \
01082        S << "Unexpected result from " << #x << " " << res << " "; \
01083        char buf[100]; \
01084        (void) strerror_r(res, &buf[0], sizeof(buf)); \
01085        S << buf << ends; \
01086        W_FATAL_MSG(fcINTERNAL, << S.c_str()); \
01087     }  \
01088 }
01089 #define DO_PTHREAD(x) \
01090 {   int res = x; \
01091     if(res) { \
01092        w_ostrstream S; \
01093        S << "Unexpected result from " << #x << " " << res << " "; \
01094        char buf[100]; \
01095        (void) strerror_r(res, &buf[0], sizeof(buf)); \
01096        S << buf << ends; \
01097        W_FATAL_MSG(fcINTERNAL, << S.c_str()); \
01098     }  \
01099 }
01100 #define DO_PTHREAD_TIMED(x) \
01101 {   int res = x; \
01102     if(res && res != ETIMEDOUT) { \
01103         W_FATAL_MSG(fcINTERNAL, \
01104                 <<"Unexpected result from " << #x << " " << res); \
01105     } \
01106 }
01107 
01108 /**\endcond skip */
01109 /*<std-footer incl-file-exclusion='STHREAD_H'>  -- do not edit anything below this line -- */
01110 
01111 #endif          /*</std-footer>*/

Generated on Wed Jul 7 17:22:32 2010 for Shore Storage Manager by  doxygen 1.4.7