00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 
00012 
00013 
00014 
00015 
00016 
00017 
00018 
00019 
00020 
00021 
00022 
00023 
00024 
00025 
00026 
00027 
00028 
00029 
00030 
00031 
00032 
00033 
00034 
00035 
00036 
00037 
00038 
00039 
00040 
00041 
00042 
00043 
00044 
00045 
00046 
00047 
00048 
00049 
00050 
00051 
00052 
00053 #include "w_defines.h"
00054 
00055 
00056 
00057 
00058 
00059 
00060 
00061 
00062 
00063 
00064 
00065 
00066 
00067 
00068 
00069 
00070 
00071 
00072 #define    IO_C
00073 
00074 #include <w.h>
00075 #include <w_debug.h>
00076 #include <w_stream.h>
00077 #include <cstdlib>
00078 #include <cstring>
00079 #include <sys/time.h>
00080 #include <sys/wait.h>
00081 #include <new>
00082 #include <sys/stat.h>
00083 #include <sys/mman.h>
00084 #include <w_rusage.h>
00085 #if HAVE_HUGETLBFS
00086 #include <fcntl.h>
00087 #endif
00088 #include "sthread.h"
00089 #include "sthread_stats.h"
00090 #include <sdisk.h>
00091 #include <sdisk_unix.h>
00092 
00093 #if defined(HUGEPAGESIZE) && (HUGEPAGESIZE == 0)
00094 #undef HUGEPAGESIZE
00095 #endif
00096 
00097 extern class sthread_stats SthreadStats;
00098 
00099 sdisk_t         **sthread_t::_disks = 0;
00100 unsigned        sthread_t::open_max = 0;
00101 unsigned        sthread_t::open_count = 0;
00102 
00103 static          queue_based_lock_t    protectFDs;
00104 
00105 int       sthread_t:: _disk_buffer_disalignment(0); 
00106 size_t    sthread_t:: _disk_buffer_size(0); 
00107 char *    sthread_t:: _disk_buffer (NULL);
00108 
00109 int sthread_t::do_unmap()
00110 {
00111     
00112     
00113     
00114 
00115 #ifdef WITHOUT_MMAP
00116     ::free( _disk_buffer - _disk_buffer_disalignment ); 
00117     _disk_buffer = NULL;
00118     _disk_buffer_disalignment = 0;
00119     return 0;
00120 #endif
00121 
00122 #if 0
00123     fprintf(stderr, "%d: munmap disalignment %d addr %p, size %lu\n",
00124             __LINE__, 
00125             _disk_buffer_disalignment,  
00126             ( _disk_buffer -  _disk_buffer_disalignment),  
00127             _disk_buffer_size);
00128 #endif
00129 
00130     int err =
00131         munmap( _disk_buffer -  _disk_buffer_disalignment,  _disk_buffer_size);
00132 
00133     if(err) {
00134         cerr << "munmap returns " << err 
00135             << " errno is " <<  errno  << " " << strerror(errno)
00136             << endl;
00137         w_assert1(!err);
00138     }
00139 
00140      _disk_buffer = NULL;
00141      _disk_buffer_size = 0;
00142      _disk_buffer_disalignment = 0;
00143 
00144     return err;
00145 }
00146 
00147 void sthread_t::align_for_sm(size_t W_IFDEBUG1(requested_size))
00148 {
00149     char * _disk_buffer2  = (char *)alignon( _disk_buffer, SM_PAGESIZE);
00150     if( _disk_buffer2 !=  _disk_buffer) 
00151     {
00152         
00153         _disk_buffer_disalignment = ( _disk_buffer2 -  _disk_buffer);
00154         w_assert1( _disk_buffer_disalignment < SM_PAGESIZE);
00155         w_assert1( _disk_buffer_size -  _disk_buffer_disalignment 
00156             >= requested_size);
00157 
00158          _disk_buffer =  _disk_buffer2;
00159 
00160     }
00161 }
00162 
00163 long sthread_t::get_max_page_size(long system_page_size)
00164 {
00165     long max_page_size = 0;
00166 #ifdef HAVE_GETPAGESIZES
00167     {
00168         int nelem = getpagesizes(NULL, 0);
00169         if(nelem >= 0) {
00170             size_t *pagesize = new size_t[nelem];
00171             int err = getpagesizes (pagesize, nelem);
00172             if(err >= 0) {
00173                 for(int i=0; i < nelem; i++) {
00174                    if ( pagesize[i] > max_page_size) { 
00175                        max_page_size = pagesize[i];
00176            }
00177                 }
00178             } else {
00179            cerr << "getpagesizes(pagesize, " << nelem << ") failed. "
00180             << " errno is " <<  errno  << " " << strerror(errno)
00181             << endl;
00182             }
00183             delete[] pagesize;
00184         } else {
00185            cerr << "getpagesizes(NULL,0) failed. "
00186             << " errno is " <<  errno  << " " << strerror(errno)
00187            << endl;
00188         }
00189     }
00190 #else
00191     max_page_size = system_page_size;
00192 #endif
00193     
00194 
00195 
00196 
00197 
00198 
00199     return max_page_size;
00200 }
00201 
00202 void sthread_t::align_bufsize(size_t size, long system_page_size,
00203                                                 long max_page_size)
00204 {
00205     
00206     
00207     
00208     
00209     
00210     
00211 
00212     w_assert0(alignon(max_page_size, system_page_size) == max_page_size);
00213     
00214     
00215     
00216     
00217     
00218     
00219     
00220     _disk_buffer_size  = alignon(size, max_page_size);
00221     w_assert1(_disk_buffer_size >= size); 
00222 
00223     
00224     w_assert1(size_t(alignon(_disk_buffer_size, max_page_size)) 
00225         == _disk_buffer_size);
00226     w_assert1(size_t(alignon(_disk_buffer_size, system_page_size)) 
00227         == _disk_buffer_size);
00228 }
00229 
00230 #if defined(HUGEPAGESIZE) && (HUGEPAGESIZE > 0)
00231 void clear(char *buf_start, size_t requested_size)
00232 {
00233     
00234     size_t requested_huge_pages = requested_size / (HUGEPAGESIZE*1024);
00235     size_t requested_pages = requested_size / SM_PAGESIZE;
00236     for(size_t j=0; j < requested_pages; j++) {
00237         for(size_t i=0; i < SM_PAGESIZE; i++) {
00238             size_t offset = j*SM_PAGESIZE + i;
00239             size_t hugepagenum =  offset / (HUGEPAGESIZE*1024);
00240             size_t hugepageoffset =  offset - (hugepagenum * 
00241                     (HUGEPAGESIZE*1024));
00242             char x = buf_start[offset];
00243             
00244             
00245             if(int(i) < 0) fprintf(stderr, "0x%d 0x%x, 0x%x, 0x%x", x,
00246                     int(hugepagenum), int(hugepageoffset), int(requested_huge_pages));    
00247         }
00248     }
00249     
00250 #if W_DEBUG_LEVEL > 4
00251     fprintf(stderr, "clearing %ld bytes starting at %p\n", 
00252             requested_size, buf_start); 
00253 #endif
00254     memset(buf_start, 0, requested_size);
00255 }
00256 #else
00257 void clear(char *, size_t )
00258 {
00259 }
00260 #endif
00261 
00262 
00263 w_rc_t sthread_t::set_bufsize_normal(
00264     size_t size, char *&buf_start , long system_page_size)
00265 {
00266     size_t requested_size = size; 
00267 
00268     
00269     
00270     
00271     
00272     
00273     long max_page_size = get_max_page_size(system_page_size);
00274     w_assert1(system_page_size <= max_page_size); 
00275 
00276     
00277     
00278     
00279     
00280     
00281     int fd(-1); 
00282 
00283     
00284     
00285     
00286     
00287     
00288     
00289     
00290     
00291     
00292     
00293     
00294     
00295     
00296     
00297     
00298     int flags1 = MAP_PRIVATE;
00299     int flags2 = MAP_PRIVATE;
00300 
00301 #if HAVE_DECL_MAP_ANONYMOUS==1
00302     flags1  |= MAP_ANONYMOUS;
00303     flags2  |= MAP_ANONYMOUS;
00304 #elif HAVE_DECL_MAP_ANON==1
00305     flags1  |= MAP_ANON;
00306     flags2  |= MAP_ANON;
00307 #else
00308 #endif
00309 
00310 #if HAVE_DECL_MAP_NORESERVE==1
00311     flags1  |= MAP_NORESERVE;
00312 #endif
00313 #if HAVE_DECL_MAP_FIXED==1
00314     flags2  |= MAP_FIXED;
00315 #endif
00316 
00317 #if HAVE_DECL_MAP_ALIGN==1
00318     flags1 |= MAP_ALIGN;
00319 #endif
00320     
00321     
00322     
00323     
00324     
00325     size += SM_PAGESIZE;
00326     align_bufsize(size, system_page_size, max_page_size);
00327 
00328     
00329     
00330     
00331     
00332     
00333     
00334     
00335     
00336     
00337 
00338     errno = 0;
00339     _disk_buffer = (char*) mmap(0, _disk_buffer_size,
00340                PROT_NONE,
00341                flags1,
00342                fd,   
00343                0     
00344                );
00345 
00346     if (_disk_buffer == MAP_FAILED) {
00347         cerr 
00348             << __LINE__ << " " 
00349             << "mmap (size=" << _disk_buffer_size 
00350             << " = " << int(_disk_buffer_size/1024)
00351             << " KB ) returns " << long(_disk_buffer)
00352             << " errno is " <<  errno  << " " << strerror(errno)
00353             << " flags " <<  flags1  
00354             << " fd " <<  fd  
00355             << endl;
00356         return RC(fcMMAPFAILED);
00357     }
00358 #if W_DEBUG_LEVEL > 4
00359     else
00360     {
00361         cerr 
00362             << __LINE__ << " " 
00363             << "mmap SUCCESS! (size=" << _disk_buffer_size 
00364             << " = " << int(_disk_buffer_size/1024)
00365             << " KB ) returns " << long(_disk_buffer)
00366             << " errno is " <<  errno  << " " << strerror(errno)
00367             << " flags " <<  flags1  
00368             << " fd " <<  fd  
00369             << endl;
00370     }
00371 #endif
00372 
00373 
00374     
00375     
00376     
00377     
00378     
00379     
00380     int nchunks = _disk_buffer_size / max_page_size;
00381     w_assert1(size_t(nchunks * max_page_size) == _disk_buffer_size);
00382 
00383 
00384     for(int i=0; i < nchunks; i++)
00385     {
00386         char *addr = _disk_buffer + (i * max_page_size); 
00387         char *sub_buffer = (char*) mmap(addr, 
00388                max_page_size,
00389                        PROT_READ | PROT_WRITE, 
00390                        flags2,
00391                        fd,   
00392                        0     
00393                        );
00394 
00395         if (sub_buffer == MAP_FAILED) {
00396             cerr 
00397                 << __LINE__ << " " 
00398                 << "mmap (addr=" << long(addr )
00399                 << ", size=" << max_page_size << ") returns -1;"
00400                 << " errno is " <<  errno  << " " << strerror(errno)
00401                 << " flags " <<  flags2  
00402                 << " fd " <<  fd  
00403                 << endl;
00404             do_unmap();
00405             return RC(fcMMAPFAILED);
00406         }
00407         w_assert1(sub_buffer == addr);
00408 #ifdef HAVE_MEMCNTL
00409         struct memcntl_mha info;
00410         info.mha_cmd = MHA_MAPSIZE_VA;
00411         info.mha_flags = 0;
00412         info.mha_pagesize = max_page_size;
00413         
00414         if(memcntl(sub_buffer, max_page_size, MC_HAT_ADVISE, (char *)&info, 0, 0) < 0)
00415        
00416         {
00417             cerr << "memcntl (chunk " << i << ") returns -1;"
00418                 << " errno is " <<  errno  << " " << strerror(errno)
00419                 << " requested size " <<  max_page_size  << endl;
00420             do_unmap();
00421             return RC(fcMMAPFAILED);
00422         }
00423 #endif
00424     }
00425 
00426     align_for_sm(requested_size);
00427     buf_start = _disk_buffer;
00428     clear(buf_start, requested_size);
00429     return RCOK;
00430 }
00431 
00432 #ifdef WITHOUT_MMAP
00433 w_rc_t 
00434 sthread_t::set_bufsize_memalign(size_t size, char *&buf_start ,
00435     long system_page_size)
00436 {
00437     size_t requested_size = size; 
00438 
00439     
00440     
00441     
00442     
00443     
00444 
00445     long max_page_size = system_page_size;
00446 
00447     align_bufsize(size, system_page_size, max_page_size);
00448 
00449     w_assert1(_disk_buffer == NULL);
00450 
00451 #ifdef HAVE_POSIX_MEMALIGN
00452     void *addr;
00453     int e = posix_memalign(&addr, SM_PAGESIZE, size);
00454     if (e == 0) {
00455         _disk_buffer = (char *)addr;
00456     } else {
00457         _disk_buffer = 0;
00458     }
00459 #elif  HAVE_MEMALIGN
00460     _disk_buffer =  (char *)memalign(SM_PAGESIZE, size);
00461 #elif  HAVE_VALLOC
00462     size += SM_PAGESIZE; 
00463     _disk_buffer =  valloc(size);
00464 #else
00465     size += SM_PAGESIZE; 
00466     _disk_buffer =  malloc(size);
00467 #endif
00468     if (_disk_buffer == 0) {
00469         cerr 
00470             << __LINE__ << " " 
00471             << "could not allocate memory (alignment=" << SM_PAGESIZE 
00472         << "," << size << ") returns -error;"
00473             << " errno is " << strerror(errno)
00474             << endl;
00475         return RC(fcINTERNAL);
00476     }
00477     align_for_sm(requested_size);
00478     buf_start = _disk_buffer;
00479     clear(buf_start, requested_size);
00480     return RCOK;
00481 }
00482 #endif
00483 
00484 #ifdef HAVE_HUGETLBFS
00485 
00486 #if HUGEPAGESIZE>0
00487 #else
00488 #   error You have configured to use hugetlbfs but you have no hugepagesize
00489 #   error Look for Hugepagesize in /proc/meminfo
00490 #endif
00491 
00492 static const char *hugefs_path(NULL);
00493 w_rc_t 
00494 sthread_t::set_hugetlbfs_path(const char *what) 
00495 { 
00496     if(strcmp(what, "NULL")==0) {
00497         
00498         hugefs_path = NULL;
00499         return RCOK;
00500     }
00501 
00502     
00503     
00504     struct stat statbuf;
00505     int e=stat(what, &statbuf);
00506     if(e) {
00507         fprintf(stderr, "Could not stat \"%s\"\n", what);
00508         int fd = ::open(what, O_RDWR | O_CREAT, S_IRUSR | S_IWUSR);
00509         if (fd < 0) {
00510             fprintf(stderr, "Could not create \"%s\"\n", what);
00511             return RC(stBADPATH);
00512         } else {
00513             cerr << " created " << what << endl;
00514         }
00515     }
00516     hugefs_path = what; 
00517     
00518     return RCOK;
00519 }
00520 
00521 w_rc_t 
00522 sthread_t::set_bufsize_huge(
00523     size_t size, 
00524     char *&buf_start ,
00525     long system_page_size)
00526 {
00527     size_t requested_size = size; 
00528 
00529     
00530     
00531     
00532     
00533     
00534 
00535     long max_page_size = 1024 * HUGEPAGESIZE; 
00536     
00537 
00538     w_assert1(system_page_size <= max_page_size); 
00539 
00540     
00541     
00542     
00543     
00544     
00545     
00546     
00547 
00548 
00549     if(hugefs_path == NULL)
00550     {
00551         fprintf(stderr, "path is %s\n", hugefs_path);
00552         fprintf(stderr, 
00553             "Need path to huge fs. Use ::set_hugetlbfs_path(path)\n");
00554         return RC(fcMMAPFAILED);
00555     }
00556     int fd = ::open(hugefs_path, O_RDWR | O_CREAT, S_IRUSR | S_IWUSR);
00557     if (fd < 0) {
00558         cerr << " could not open " << hugefs_path << endl;
00559         return RC(fcMMAPFAILED);
00560     }
00561 
00562     
00563     
00564     
00565     
00566     
00567     
00568     
00569     
00570     
00571     
00572     
00573     
00574     
00575     
00576     
00577     int flags = 
00578         MAP_PRIVATE;
00579 
00580     
00581 
00582 #if HAVE_DECL_MAP_ALIGN==1
00583     flags |=  MAP_ALIGN;
00584     fprintf(stderr, "%d: adding flag 0x%x %s\n", __LINE__,
00585             MAP_ALIGN, "MAP_ALIGN");
00586 #endif
00587     
00588     
00589     
00590     
00591     
00592     size += SM_PAGESIZE;
00593 
00594     align_bufsize(size, system_page_size, max_page_size);
00595 
00596     
00597     
00598     
00599     
00600     
00601 
00602     w_assert1(_disk_buffer == NULL);
00603 
00604     errno = 0;
00605     
00606     _disk_buffer = (char*) mmap(0, _disk_buffer_size,
00607                (PROT_READ | PROT_WRITE), 
00608                flags,
00609                fd,   
00610                0     
00611                );
00612 
00613     if (_disk_buffer == MAP_FAILED) {
00614         cerr 
00615             << __LINE__ << " " 
00616             << "mmap (size=" << _disk_buffer_size << ") returns "
00617             <<  long(_disk_buffer)
00618             << " errno is " <<  errno  << " " << strerror(errno)
00619             << " prot " <<  (PROT_READ | PROT_WRITE)
00620             << " flags " <<  flags
00621             << " fd " <<  fd  
00622             << endl;
00623         close(fd); 
00624         return RC(fcMMAPFAILED);
00625     }
00626 #if W_DEBUG_LEVEL > 4
00627     else
00628     {
00629         fprintf(stderr, 
00630     "%d mmap SUCCESS! (size= %lu, %lu KB) returns %p errno %d/%s prot 0x%x flags 0x%x fd %d\n",
00631         __LINE__, 
00632         _disk_buffer_size,
00633         _disk_buffer_size/1024,
00634         _disk_buffer, errno, strerror(errno), 
00635         (PROT_READ | PROT_WRITE),
00636         flags, fd);
00637         fprintf(stderr, 
00638     "%d mmap (size= %lu, %lu KB) (requested_size %d, %d KB) buf-requested is %d\n",
00639         __LINE__,
00640         _disk_buffer_size,
00641         _disk_buffer_size/1024,
00642         int(requested_size),
00643         int(requested_size/1024),
00644         int(_disk_buffer_size-requested_size) );
00645 
00646 
00647     }
00648 #endif
00649 
00650     align_for_sm(requested_size);
00651     buf_start = _disk_buffer;
00652     clear(buf_start, requested_size);
00653     return RCOK;
00654 }
00655 #endif
00656 
00657 
00658 
00659 
00660 
00661 
00662 
00663 
00664 
00665 
00666 
00667 
00668 
00669 
00670 
00671 
00672 
00673 
00674 
00675 
00676 
00677 
00678 
00679 
00680 
00681 
00682 
00683 
00684 
00685 
00686 
00687 
00688 
00689 
00690 
00691 
00692 
00693 
00694 
00695 
00696 
00697 
00698 
00699 
00700 
00701 
00702 
00703 
00704 w_rc_t 
00705 sthread_t::set_bufsize(size_t size, char *&buf_start ,
00706     bool 
00707 #ifdef HAVE_HUGETLBFS
00708     
00709     use_normal_if_huge_fails 
00710 #endif
00711     )
00712 {
00713     if (_disk_buffer && size == 0) {
00714         do_unmap();
00715         return RCOK;
00716     }
00717 
00718     if (_disk_buffer) {
00719         cerr << "Can't re-allocate disk buffer without disabling"
00720             << endl;
00721         return RC(fcINTERNAL);
00722     }
00723 
00724     buf_start = 0;
00725 
00726     long system_page_size = sysconf(_SC_PAGESIZE);
00727 
00728 #ifdef WITHOUT_MMAP
00729     
00730     
00731     return set_bufsize_memalign(size, buf_start, system_page_size);
00732 #endif
00733 
00734 #ifdef HAVE_HUGETLBFS
00735     
00736     
00737     
00738     
00739     
00740     
00741     if(hugefs_path != NULL) {
00742         w_rc_t rc =  set_bufsize_huge(size, buf_start, system_page_size);
00743         if( !rc.is_error() ) {
00744 #if W_DEBUG_LEVEL > 10
00745             cout << "Using hugetlbfs size " << size
00746                 << " system_page_size " << system_page_size
00747                 << " path " << hugefs_path << ". " << endl;
00748 #endif
00749             return rc;
00750         }
00751         if(!use_normal_if_huge_fails)
00752         {
00753             return rc;
00754         }
00755         
00756         cerr << "Skipping hugetlbfs sue to mmap failure: " << rc << endl;
00757     } else {
00758         cout << "Skipping hugetlbfs based on user option. " << endl;
00759     }
00760 #endif
00761     return set_bufsize_normal(size, buf_start, system_page_size);
00762 }
00763 
00764 
00765 char  *
00766 sthread_t::set_bufsize(size_t size)
00767 {
00768     w_rc_t    e;
00769     char    *start;
00770 
00771     if(size==0) { do_unmap(); return NULL; }
00772 
00773     e = set_bufsize(size, start);
00774 
00775     if (e.is_error()) {
00776         cerr << "Hidden Failure: set_bufsize(" << size << "):"
00777             << endl << e << endl;
00778         return 0;
00779     }
00780 
00781     
00782     if (size == 0)
00783         start = 0;
00784 
00785     return start;
00786 }
00787 
00788 
00789 w_rc_t
00790 sthread_t::open(const char* path, int flags, int mode, int &ret_fd)
00791 {
00792     w_rc_t    e;
00793     sdisk_t    *dp;
00794 
00795     
00796     ret_fd = -1;
00797 
00798     bool    open_local = true;
00799 
00800     CRITICAL_SECTION(cs, protectFDs);
00801 
00802     if (open_count >= open_max) {
00803         
00804         
00805         
00806         
00807         
00808         
00809         
00810         
00811         
00812         
00813         unsigned    new_max = open_max + 64;
00814         sdisk_t    **new_disks = new sdisk_t *[new_max];
00815         
00816         if (!new_disks) {
00817             return RC(fcOUTOFMEMORY);
00818         }
00819         unsigned    disk;
00820         for (disk = 0; disk < open_count; disk++)
00821             new_disks[disk] = _disks[disk];
00822         for (; disk < new_max; disk++)
00823             new_disks[disk] = 0;
00824         sdisk_t    **tmp = _disks;
00825         _disks = new_disks;
00826         open_max = new_max;
00827         delete [] tmp;
00828     }
00829 
00830     
00831     unsigned    disk;
00832     for (disk = 0; disk < open_max; disk++)
00833         if (!_disks[disk])
00834             break;
00835     if (disk == open_max) {
00836         return RC(stINTERNAL);    
00837     }
00838 
00839     
00840 
00841 
00842     if (open_local) {
00843         e = sdisk_unix_t::make(path, flags, mode, dp);
00844     }
00845 
00846 
00847     if (e.is_error()) {
00848         return e;
00849     }
00850 
00851     _disks[disk] = dp;
00852     open_count++;
00853 
00854     ret_fd = fd_base + disk;
00855     
00856     return RCOK;
00857 }
00858 
00859 
00860 
00861 
00862 
00863 
00864 
00865 
00866 
00867 w_rc_t sthread_t::close(int fd)
00868 {
00869     fd -= fd_base;
00870     if (fd < 0 || fd >= (int)open_max || !_disks[fd])
00871         return RC(stBADFD);
00872 
00873     w_rc_t    e;
00874 
00875     
00876     e = _disks[fd]->sync();
00877     if (e.is_error())
00878         return e;
00879 
00880     e = _disks[fd]->close();
00881     if (e.is_error())
00882         return e;
00883 
00884     sdisk_t    *togo;
00885     {
00886         CRITICAL_SECTION(cs, protectFDs);
00887         togo = _disks[fd];
00888         _disks[fd] = 0;
00889         open_count--;
00890     }
00891     delete togo;
00892     
00893     return e;
00894 }
00895 
00896 
00897 
00898 
00899 
00900 
00901 
00902 
00903 
00904 
00905 
00906 
00907 
00908 
00909 
00910 
00911 
00912 
00913 
00914 
00915 
00916 
00917 
00918 
00919 
00920 w_rc_t    sthread_t::read(int fd, void* buf, int n)
00921 {
00922     fd -= fd_base;
00923     if (fd < 0 || fd >= (int)open_max || !_disks[fd]) 
00924         return RC(stBADFD);
00925 
00926     INC_STH_STATS(read);
00927 
00928     int    done = 0;
00929     w_rc_t    e;
00930 
00931     e = _disks[fd]->read(buf, n, done);
00932     if (!e.is_error() && done != n)
00933         e = RC(stSHORTIO);
00934 
00935     return e;
00936 }
00937 
00938 
00939 w_rc_t    sthread_t::write(int fd, const void* buf, int n)
00940 {
00941     fd -= fd_base;
00942     if (fd < 0 || fd >= (int)open_max || !_disks[fd]) 
00943         return RC(stBADFD);
00944 
00945     INC_STH_STATS(write);
00946 
00947     int    done = 0;
00948     w_rc_t    e;
00949 
00950     e = _disks[fd]->write(buf, n, done);
00951     if (!e.is_error() && done != n)
00952         e = RC(stSHORTIO);
00953 
00954     return e;
00955 }
00956 
00957 
00958 w_rc_t    sthread_t::readv(int fd, const iovec_t *iov, size_t iovcnt)
00959 {
00960     fd -= fd_base;
00961     if (fd < 0 || fd >= (int)open_max || !_disks[fd]) 
00962         return RC(stBADFD);
00963 
00964     INC_STH_STATS(readv);
00965 
00966     int    done = 0;
00967     int    total = 0;
00968     w_rc_t    e;
00969 
00970     total = sdisk_t::vsize(iov, iovcnt);
00971 
00972     e = _disks[fd]->readv(iov, iovcnt, done);
00973     if (!e.is_error() && done != total)
00974         e = RC(stSHORTIO);
00975 
00976     return e;
00977 }
00978 
00979 
00980 w_rc_t    sthread_t::writev(int fd, const iovec_t *iov, size_t iovcnt)
00981 {
00982     fd -= fd_base;
00983     if (fd < 0 || fd >= (int)open_max || !_disks[fd]) 
00984         return RC(stBADFD);
00985 
00986     INC_STH_STATS(writev);
00987 
00988     int    done = 0;
00989     int    total = 0;
00990     w_rc_t    e;
00991 
00992     total = sdisk_t::vsize(iov, iovcnt);
00993 
00994     e = _disks[fd]->writev(iov, iovcnt, done);
00995     if (!e.is_error() && done != total)
00996         e = RC(stSHORTIO);
00997 
00998     return e;
00999 }
01000 
01001 
01002 w_rc_t    sthread_t::pread(int fd, void *buf, int n, fileoff_t pos)
01003 {
01004     fd -= fd_base;
01005     if (fd < 0 || fd >= (int)open_max || !_disks[fd]) 
01006         return RC(stBADFD);
01007 
01008     INC_STH_STATS(read);
01009 
01010     int    done = 0;
01011     w_rc_t    e;
01012 
01013     errno = 0;
01014     e = _disks[fd]->pread(buf, n, pos, done);
01015     if (!e.is_error() && done != n) {
01016         e = RC2(stSHORTIO, done);
01017     }
01018 
01019     return e;
01020 }
01021 
01022 
01023 w_rc_t    sthread_t::pwrite(int fd, const void *buf, int n, fileoff_t pos)
01024 {
01025     fd -= fd_base;
01026     if (fd < 0 || fd >= (int)open_max || !_disks[fd]) 
01027         return RC(stBADFD);
01028 
01029     INC_STH_STATS(write);
01030 
01031     int    done = 0;
01032     w_rc_t    e;
01033 
01034     e = _disks[fd]->pwrite(buf, n, pos, done);
01035     if (!e.is_error() && done != n)
01036         e = RC(stSHORTIO);
01037 
01038     return e;
01039 }
01040 
01041 
01042 w_rc_t    sthread_t::fsync(int fd)
01043 {
01044     fd -= fd_base;
01045     if (fd < 0 || fd >= (int)open_max || !_disks[fd]) 
01046         return RC(stBADFD);
01047 
01048     w_rc_t        e;
01049     INC_STH_STATS(sync);
01050 
01051     e = _disks[fd]->sync();
01052 
01053     return e;
01054 }
01055 
01056 w_rc_t    sthread_t::ftruncate(int fd, fileoff_t n)
01057 {
01058     fd -= fd_base;
01059     if (fd < 0 || fd >= (int)open_max || !_disks[fd]) 
01060         return RC(stBADFD);
01061 
01062     w_rc_t        e;
01063     INC_STH_STATS(truncate);
01064 
01065     e =  _disks[fd]->truncate(n);
01066 
01067     return e;
01068 }
01069 
01070 
01071 w_rc_t sthread_t::lseek(int fd, fileoff_t pos, int whence, fileoff_t& ret)
01072 {
01073     fd -= fd_base;
01074     if (fd < 0 || fd >= (int)open_max || !_disks[fd]) 
01075         return RC(stBADFD);
01076 
01077     w_rc_t    e;
01078 
01079     e = _disks[fd]->seek(pos, whence, ret);
01080 
01081     return e;
01082 }
01083 
01084 
01085 w_rc_t sthread_t::lseek(int fd, fileoff_t offset, int whence)
01086 {
01087     fileoff_t    dest;
01088     w_rc_t        e;
01089 
01090     e = sthread_t::lseek(fd, offset, whence, dest);
01091     if (!e.is_error() && whence == SEEK_AT_SET && dest != offset)
01092         e = RC(stSHORTSEEK);
01093 
01094     return e;
01095 }
01096 
01097 
01098 w_rc_t    sthread_t::fstat(int fd, filestat_t &st)
01099 {
01100     fd -= fd_base;
01101     if (fd < 0 || fd >= (int)open_max || !_disks[fd]) 
01102         return RC(stBADFD);
01103 
01104     w_rc_t    e;
01105 
01106     e = _disks[fd]->stat(st);
01107 
01108     return e;
01109 }
01110 
01111 w_rc_t    sthread_t::fisraw(int fd, bool &isRaw)
01112 {
01113     filestat_t    st;
01114 
01115     isRaw = false;        
01116 
01117     W_DO(fstat(fd, st));    
01118 
01119     isRaw = st.is_device ;
01120     return RCOK;
01121 }
01122 
01123 
01124 void    sthread_t::dump_io(ostream &s)
01125 {
01126     s << "I/O:";
01127     s << " open_max=" << int(open_max);
01128     s << " open_count=" << open_count;
01129     s << endl;
01130 }
01131 
01132 extern "C" void dump_io() 
01133 {
01134     sthread_t::dump_io(cout);
01135     cout << flush;
01136 }
01137