00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 
00012 
00013 
00014 
00015 
00016 
00017 
00018 
00019 
00020 
00021 
00022 
00023 
00024 
00025 
00026 
00027 
00028 
00029 
00030 #include "w_defines.h"
00031 
00032 
00033 
00034 
00035 
00036 
00037 
00038 
00039 
00040 #define SM_LEVEL 1
00041 #define SM_SOURCE
00042 #define XCT_C
00043 #include "sm_int_1.h"
00044 #include "sm_vas.h"
00045 
00046 ss_m* ssm = 0;
00047 
00048 extern w_rc_t out_of_log_space (xct_i*, xct_t*&, ss_m::fileoff_t, 
00049                                  ss_m::fileoff_t, 
00050                                  const char *logfile
00051                                  );
00052 
00053 extern w_rc_t get_archived_log_file (const char *logfile, ss_m::partition_number_t);
00054 
00055 
00056 typedef w_rc_t rc_t;
00057 
00058 
00059 w_rc_t init_config_options(option_group_t& options,
00060                         const char* prog_type,
00061                         int& argc, char** argv);
00062 
00063 
00064 struct file_info_t {
00065     static const char* key;
00066     stid_t         fid;
00067     rid_t       first_rid;
00068     int         num_rec;
00069     int         rec_size;
00070 };
00071 const char* file_info_t::key = "SCANFILE";
00072 
00073 ostream &
00074 operator << (ostream &o, const file_info_t &info)
00075 {
00076     o << "key " << info.key
00077     << " fid " << info.fid
00078     << " first_rid " << info.first_rid
00079     << " num_rec " << info.num_rec
00080     << " rec_size " << info.rec_size ;
00081     return o;
00082 }
00083 
00084 
00085 typedef        smlevel_0::smksize_t        smksize_t;
00086 
00087 
00088 
00089 void
00090 usage(option_group_t& options)
00091 {
00092     cerr << "Usage: log_exceed [-h] [options]" << endl;
00093     cerr << "Valid options are: " << endl;
00094     options.print_usage(true, cerr);
00095 }
00096 
00097 
00098 class smthread_user_t : public smthread_t 
00099 {
00100     int        _argc;
00101     char        **_argv;
00102 
00103     const char *_device_name;
00104     smsize_t    _quota;
00105     int         _num_rec;
00106     smsize_t    _rec_size;
00107     lvid_t      _lvid;  
00108     rid_t       _start_rid;
00109     stid_t      _fid;
00110     bool        _initialize_device;
00111     option_group_t* _options;
00112     vid_t       _vid;
00113 public:
00114     int         retval;
00115 
00116     smthread_user_t(int ac, char **av) 
00117             : smthread_t(t_regular, "smthread_user_t"),
00118             _argc(ac), _argv(av), 
00119             _device_name(NULL),
00120             _quota(0),
00121             _num_rec(0),
00122             _rec_size(0),
00123             _initialize_device(true), 
00124             _options(NULL),
00125             _vid(1),
00126             retval(0) { }
00127 
00128     ~smthread_user_t()  { if(_options) delete _options; }
00129 
00130     void run();
00131 
00132     
00133     w_rc_t handle_options();
00134     w_rc_t find_file_info();
00135     w_rc_t create_the_file();
00136     w_rc_t scan_the_file();
00137     w_rc_t scan_the_root_index();
00138     w_rc_t do_work();
00139     w_rc_t do_init();
00140     w_rc_t no_init();
00141 
00142 };
00143 
00144 
00145 
00146 
00147 w_rc_t
00148 smthread_user_t::find_file_info()
00149 {
00150     file_info_t  info;
00151     W_DO(ssm->begin_xct());
00152 
00153     bool        found;
00154     stid_t      _root_iid;
00155     W_DO(ss_m::vol_root_index(_vid, _root_iid));
00156 
00157     smsize_t    info_len = sizeof(info);
00158     const vec_t key_vec_tmp(file_info_t::key, strlen(file_info_t::key));
00159     W_DO(ss_m::find_assoc(_root_iid,
00160                           key_vec_tmp,
00161                           &info, info_len, found));
00162     if (!found) {
00163         cerr << "No file information found" <<endl;
00164         return RC(fcASSERT);
00165     } else {
00166        cerr << " found assoc "
00167                 << file_info_t::key << " --> " << info << endl;
00168     }
00169 
00170     W_DO(ssm->commit_xct());
00171 
00172     _start_rid = info.first_rid;
00173     _fid = info.fid;
00174     _rec_size = info.rec_size;
00175     _num_rec = info.num_rec;
00176     return RCOK;
00177 }
00178 
00179 
00180 
00181 
00182 
00183 
00184 
00185 
00186 
00187 
00188 rc_t
00189 smthread_user_t::create_the_file() 
00190 {
00191     file_info_t info;  
00192     
00193 
00194     
00195     cout << "Creating a file with " << _num_rec 
00196         << " records of size " << _rec_size << endl;
00197     W_DO(ssm->begin_xct());
00198 
00199     
00200     W_DO(ssm->create_file(_vid, info.fid, smlevel_3::t_regular));
00201     rid_t rid;
00202 
00203     _rec_size -= align(sizeof(int));
00204 
00205 
00206 
00207 
00208     char* dummy = new char[_rec_size];
00209     char* dummy2 = new char[_rec_size];
00210     char* dummy3 = new char[_rec_size];
00211     char* dummy4 = new char[_rec_size];
00212     memset(dummy, '\1', _rec_size); 
00213     memset(dummy2, '\2', _rec_size); 
00214     memset(dummy3, '\3', _rec_size); 
00215     memset(dummy4, '\4', _rec_size); 
00216     for(smsize_t i=0; i < _rec_size; i++) {
00217         dummy2[i] = (char)(i%8);
00218         dummy3[i] = (dummy2[i] + 1)%8; 
00219         dummy4[i] = (dummy2[i] - 1)%8; 
00220     }
00221 
00222     vec_t data(dummy, _rec_size);
00223     vec_t data2(dummy2, _rec_size);
00224     vec_t data3(dummy3, _rec_size);
00225     vec_t data4(dummy4, _rec_size);
00226 
00227     for(int j=0; j < _num_rec; j++)
00228     {
00229         {
00230             w_ostrstream o(dummy, _rec_size);
00231             o << "Record number " << j << ends;
00232             w_assert1(o.c_str() == dummy);
00233         }
00234         
00235         int i = j;
00236         const vec_t hdr(&i, sizeof(i));
00237         W_COERCE(ssm->create_rec(info.fid, hdr,
00238                                 _rec_size, data, rid));
00239         cout << "Creating rec " << j << endl;
00240 
00241         
00242         
00243         for(int i=0; i < 200; i++) {
00244             W_DO(ssm->update_rec(rid, 0 , data2));
00245             W_DO(ssm->update_rec(rid, 0 , data3));
00246             W_DO(ssm->update_rec(rid, 0 , data4));
00247         }
00248         if (j == 0) {
00249             info.first_rid = rid;
00250         }        
00251     }
00252     cout << "Created all. First rid " << info.first_rid << endl;
00253     delete [] dummy;
00254     delete [] dummy2;
00255     delete [] dummy3;
00256     delete [] dummy4;
00257     info.num_rec = _num_rec;
00258     info.rec_size = _rec_size;
00259 
00260     
00261     
00262     stid_t      _root_iid;
00263     W_DO(ss_m::vol_root_index(_vid, _root_iid));
00264 
00265     const vec_t key_vec_tmp(file_info_t::key, strlen(file_info_t::key));
00266     const vec_t info_vec_tmp(&info, sizeof(info));
00267     W_DO(ss_m::create_assoc(_root_iid,
00268                             key_vec_tmp,
00269                             info_vec_tmp));
00270     W_DO(ssm->commit_xct());
00271     return RCOK;
00272 }
00273 
00274 rc_t
00275 smthread_user_t::scan_the_root_index() 
00276 {
00277     W_DO(ssm->begin_xct());
00278     stid_t _root_iid;
00279     W_DO(ss_m::vol_root_index(_vid, _root_iid));
00280     cout << "Scanning index " << _root_iid << endl;
00281     scan_index_i scan(_root_iid, 
00282             scan_index_i::ge, vec_t::neg_inf,
00283             scan_index_i::le, vec_t::pos_inf, false,
00284             ss_m::t_cc_kvl);
00285     bool        eof(false);
00286     int         i(0);
00287     smsize_t    klen(0);
00288     smsize_t    elen(0);
00289 #define MAXKEYSIZE 100
00290     char *      keybuf[MAXKEYSIZE];
00291     file_info_t info;
00292 
00293     do {
00294         w_rc_t rc = scan.next(eof);
00295         if(rc.is_error()) {
00296             cerr << "Error getting next: " << rc << endl;
00297             retval = rc.err_num();
00298             return rc;
00299         }
00300         if(eof) break;
00301 
00302         
00303         W_DO(scan.curr(NULL, klen, NULL, elen));
00304         
00305         vec_t key(keybuf, klen);
00306         vec_t elem(&info, elen);
00307         
00308         W_DO(scan.curr(&key, klen, &elem, elen));
00309 
00310         cout << "Key " << keybuf << endl;
00311         cout << "Value " 
00312         << " { fid " << info.fid 
00313         << " first_rid " << info.first_rid
00314         << " #rec " << info.num_rec
00315         << " rec size " << info.rec_size << " }"
00316         << endl;
00317         i++;
00318     } while (!eof);
00319     W_DO(ssm->commit_xct());
00320     return RCOK;
00321 }
00322 
00323 rc_t
00324 smthread_user_t::scan_the_file() 
00325 {
00326     cout << "Scanning file " << _fid << endl;
00327     W_DO(ssm->begin_xct());
00328 
00329     scan_file_i scan(_fid);
00330     pin_i*      cursor(NULL);
00331     bool        eof(false);
00332     int         i(0);
00333 
00334     do {
00335         w_rc_t rc = scan.next(cursor, 0, eof);
00336         if(rc.is_error()) {
00337             cerr << "Error getting next: " << rc << endl;
00338             retval = rc.err_num();
00339             return rc;
00340         }
00341         if(eof) break;
00342 
00343         cout << "Record " << i << "/" << _num_rec
00344             << " Rid "  << cursor->rid() << endl;
00345         vec_t       header (cursor->hdr(), cursor->hdr_size());
00346         int         hdrcontents;
00347         header.copy_to(&hdrcontents, sizeof(hdrcontents));
00348         cout << "Record hdr "  << hdrcontents << endl;
00349 
00350         const char *body = cursor->body();
00351         w_assert1(cursor->body_size() == _rec_size);
00352         cout << "Record body "  << body << endl;
00353         i++;
00354     } while (!eof);
00355     w_assert1(i == _num_rec);
00356 
00357     W_DO(ssm->commit_xct());
00358     return RCOK;
00359 }
00360 
00361 rc_t
00362 smthread_user_t::do_init()
00363 {
00364     {
00365         devid_t        devid;
00366         cout << "Formatting device: " << _device_name 
00367              << " with a " << _quota << "KB quota ..." << endl;
00368         W_DO(ssm->format_dev(_device_name, _quota, true));
00369 
00370         cout << "Mounting device: " << _device_name  << endl;
00371         
00372         u_int        vol_cnt;
00373         W_DO(ssm->mount_dev(_device_name, vol_cnt, devid));
00374 
00375         cout << "Mounted device: " << _device_name  
00376              << " volume count " << vol_cnt
00377              << " device " << devid
00378              << endl;
00379 
00380         
00381         
00382         cout << "Generating new lvid: " << endl;
00383         W_DO(ssm->generate_new_lvid(_lvid));
00384         cout << "Generated lvid " << _lvid <<  endl;
00385 
00386         
00387         cout << "Creating a new volume on the device" << endl;
00388         cout << "    with a " << _quota << "KB quota ..." << endl;
00389 
00390         W_DO(ssm->create_vol(_device_name, _lvid, _quota, false, _vid));
00391         cout << "    with local handle(phys volid) " << _vid << endl;
00392 
00393     } 
00394 
00395     W_DO(create_the_file());
00396     return RCOK;
00397 }
00398 
00399 rc_t
00400 smthread_user_t::no_init()
00401 {
00402     cout << "Using already-existing device: " << _device_name << endl;
00403     
00404     devid_t      devid;
00405     u_int        vol_cnt;
00406     w_rc_t rc = ssm->mount_dev(_device_name, vol_cnt, devid);
00407     if (rc.is_error()) {
00408         cerr << "Error: could not mount device: " 
00409             << _device_name << endl;
00410         return rc;
00411     }
00412     
00413     
00414     lvid_t* lvid_list;
00415     u_int   lvid_cnt;
00416     W_DO(ssm->list_volumes(_device_name, lvid_list, lvid_cnt));
00417     if (lvid_cnt == 0) {
00418         cerr << "Error, device has no volumes" << endl;
00419         exit(1);
00420     }
00421     _lvid = lvid_list[0];
00422     delete [] lvid_list;
00423 
00424     W_DO(find_file_info());
00425     W_DO(scan_the_root_index());
00426     W_DO(scan_the_file());
00427     return RCOK;
00428 }
00429 
00430 rc_t
00431 smthread_user_t::do_work()
00432 {
00433     if (_initialize_device) W_DO(do_init());
00434     else  W_DO(no_init());
00435     return RCOK;
00436 }
00437 
00438 
00439 
00440 
00441 
00442 static option_t *logdir(NULL);
00443 w_rc_t smthread_user_t::handle_options()
00444 {
00445     option_t* opt_device_name = 0;
00446     option_t* opt_device_quota = 0;
00447     option_t* opt_num_rec = 0;
00448 
00449     cout << "Processing configuration options ..." << endl;
00450 
00451     
00452     
00453     
00454     
00455     
00456     
00457     
00458     
00459     const int option_level_cnt = 3; 
00460 
00461     _options = new option_group_t (option_level_cnt);
00462     if(!_options) {
00463         cerr << "Out of memory: could not allocate from heap." <<
00464             endl;
00465         retval = 1;
00466         return RC(fcINTERNAL);
00467     }
00468     option_group_t &options(*_options);
00469 
00470     W_COERCE(options.add_option("device_name", "device/file name",
00471                          NULL, "device containg volume holding file to scan",
00472                          true, option_t::set_value_charstr,
00473                          opt_device_name));
00474 
00475     W_COERCE(options.add_option("device_quota", "# > 1000",
00476                          "2000", "quota for device",
00477                          false, option_t::set_value_long,
00478                          opt_device_quota));
00479 
00480     
00481     W_COERCE(options.add_option("num_rec", "# > 0",
00482                          "1", "number of records in file",
00483                          true, option_t::set_value_long,
00484                          opt_num_rec));
00485 
00486     
00487     W_COERCE(ss_m::setup_options(&options));
00488 
00489     cout << "Finding configuration option settings." << endl;
00490 
00491     w_rc_t rc = init_config_options(options, "server", _argc, _argv);
00492     if (rc.is_error()) {
00493         usage(options);
00494         retval = 1;
00495         return rc;
00496     }
00497     cout << "Processing command line." << endl;
00498 
00499     
00500     int option;
00501     while ((option = getopt(_argc, _argv, "hi")) != -1) {
00502         switch (option) {
00503         case 'h' :
00504             usage(options);
00505             break;
00506 
00507         default:
00508             usage(options);
00509             retval = 1;
00510             return RC(fcNOTIMPLEMENTED);
00511             break;
00512         }
00513     }
00514     {
00515         cout << "Checking for required options...";
00516         
00517         w_ostrstream      err_stream;
00518         w_rc_t rc = options.check_required(&err_stream);
00519         if (rc.is_error()) {
00520             cerr << "These required options are not set:" << endl;
00521             cerr << err_stream.c_str() << endl;
00522             return rc;
00523         }
00524         cout << "OK " << endl;
00525     }
00526 
00527     
00528     _device_name = opt_device_name->value();
00529     _quota = strtol(opt_device_quota->value(), 0, 0);
00530     _num_rec = strtol(opt_num_rec->value(), 0, 0);
00531 
00532     if(logdir==NULL) {
00533         W_DO(options.lookup("sm_logdir", false, logdir));
00534         fprintf(stdout, "Found logdir %s\n", logdir->value());
00535     }
00536 
00537     return RCOK;
00538 }
00539 
00540 void smthread_user_t::run()
00541 {
00542     w_rc_t rc = handle_options();
00543     if(rc.is_error()) {
00544         retval = 1;
00545         return;
00546     }
00547 
00548     
00549     cout << "Starting SSM and performing recovery ..." << endl;
00550     ssm = new ss_m(out_of_log_space, get_archived_log_file);
00551     if (!ssm) {
00552         cerr << "Error: Out of memory for ss_m" << endl;
00553         retval = 1;
00554         return;
00555     }
00556 
00557     cout << "Getting SSM config info for record size ..." << endl;
00558 
00559     sm_config_info_t config_info;
00560     W_COERCE(ss_m::config_info(config_info));
00561     _rec_size = config_info.max_small_rec; 
00562 
00563     
00564     
00565     rc = do_work();
00566 
00567     if (rc.is_error()) {
00568         cerr << "Could not set up device/volume due to: " << endl;
00569         cerr << rc << endl;
00570         {
00571             w_ostrstream o;
00572             static sm_stats_info_t curr;
00573 
00574             rc = ssm->gather_stats(curr);
00575 
00576             o << curr.sm.abort_xct_cnt<< ends;
00577             fprintf(stdout, "log_exceed stats: abort_xct_cnt %s\n" , o.c_str()); 
00578         }
00579         delete ssm;
00580         rc = RCOK;   
00581                      
00582         retval = 0;
00583         if(rc.is_error()) 
00584             W_COERCE(rc); 
00585         return;
00586     }
00587 
00588 
00589     
00590     cout << "\nShutting down SSM ..." << endl;
00591     delete ssm;
00592 
00593     cout << "Finished!" << endl;
00594 
00595     return;
00596 }
00597 
00598 
00599 int
00600 main(int argc, char* argv[])
00601 {
00602     smthread_user_t *smtu = new smthread_user_t(argc, argv);
00603     if (!smtu)
00604             W_FATAL(fcOUTOFMEMORY);
00605 
00606     w_rc_t e = smtu->fork();
00607     if(e.is_error()) {
00608         cerr << "error forking thread: " << e <<endl;
00609         return 1;
00610     }
00611     e = smtu->join();
00612     if(e.is_error()) {
00613         cerr << "error forking thread: " << e <<endl;
00614         return 1;
00615     }
00616 
00617     int        rv = smtu->retval;
00618     delete smtu;
00619 
00620     return rv;
00621 }
00622 
00623 #include <os_interface.h>
00624 
00625 void dump()
00626 {
00627 
00628     os_dirent_t *dd = NULL;
00629     os_dir_t ldir = os_opendir(logdir->value());
00630     if(!ldir) {
00631         fprintf(stdout, "Could not open directory %s\n", logdir->value());
00632         return;
00633     }
00634     fprintf(stdout, "---------------------- %s {\n", logdir->value());
00635     while ((dd = os_readdir(ldir)))
00636     {
00637         fprintf(stdout, "%s\n", dd->d_name);
00638     }
00639     os_closedir(ldir);
00640     fprintf(stdout, "---------------------- %s }\n", logdir->value());
00641 }
00642 
00643 
00644 w_rc_t get_archived_log_file (
00645         const char *filename, 
00646         ss_m::partition_number_t num)
00647 {
00648     fprintf(stdout, 
00649             "\n**************************************** RECOVER %s : %d\n",
00650             filename, num
00651             );
00652     dump();
00653 
00654     w_rc_t rc;
00655 
00656     static char O[smlevel_0::max_devname<<1];
00657     strcat(O, filename);
00658     strcat(O, ".bak");
00659 
00660     static char N[smlevel_0::max_devname<<1];
00661     strcat(N, filename);
00662 
00663     int e = ::rename(O, N);
00664     if(e != 0) 
00665     {
00666         fprintf(stdout, "Could not move %s to %s: error : %d %s\n",
00667                 O, N, e, strerror(errno));
00668         rc = RC2(smlevel_0::eOUTOFLOGSPACE, errno); 
00669     }
00670     dump();
00671     fprintf(stdout, "recovered ... OK!\n\n");
00672     fprintf(stdout, 
00673         "This recovery of the log file will enable us to finish the abort.\n");
00674     fprintf(stdout, 
00675         "It will not continue the device/volume set up.\n");
00676     fprintf(stdout, 
00677         "Expect an error message and stack trace about that:\n\n");
00678     return rc;
00679 }
00680 w_rc_t out_of_log_space (
00681     xct_i* iter, 
00682     xct_t *& xd,
00683     smlevel_0::fileoff_t curr,
00684     smlevel_0::fileoff_t thresh,
00685     const char *filename
00686 )
00687 {
00688     static int calls=0;
00689 
00690     calls++;
00691 
00692     w_rc_t rc;
00693     fprintf(stdout, "\n**************************************** %d\n", calls);
00694     dump();
00695 
00696     fprintf(stdout, 
00697             "Called out_of_log_space with curr %lld thresh %lld, file %s\n",
00698             (long long) curr, (long long) thresh, filename);
00699     {
00700         w_ostrstream o;
00701         o << xd->tid() << endl;
00702         fprintf(stdout, "called with xct %s\n" , o.c_str()); 
00703     }
00704 
00705     xd->log_warn_disable();
00706     iter->never_mind(); 
00707 
00708     {
00709         w_ostrstream o;
00710         static sm_stats_info_t curr;
00711 
00712         W_DO( ssm->gather_stats(curr));
00713 
00714         o << curr.sm.log_bytes_generated << ends;
00715         fprintf(stdout, "stats: log_bytes_generated %s\n" , o.c_str()); 
00716     }
00717     xct_t *oldxct(NULL);
00718     lsn_t target;
00719     {
00720         w_ostrstream o;
00721         o << "Active xcts: " << xct_t::num_active_xcts() << " ";
00722 
00723         tid_t old = xct_t::oldest_tid();
00724         o << "Oldest transaction: " << old;
00725 
00726         xct_t *x = xct_t::look_up(old);
00727         if(x==NULL) {
00728             fprintf(stdout, "Could not find %s\n", o.c_str());
00729             W_FATAL(fcINTERNAL);
00730         }
00731 
00732         target = x->first_lsn();
00733         o << "   First lsn: " << x->first_lsn();
00734         o << "   Last lsn: " << x->last_lsn();
00735 
00736         fprintf(stdout, "%s\n" , o.c_str()); 
00737 
00738         oldxct = x;
00739     }
00740 
00741     if(calls > 3) {
00742         
00743         static tid_t aborted_tid;
00744         if(aborted_tid == xd->tid()) {
00745             w_ostrstream o;
00746             o << aborted_tid << endl;
00747             fprintf(stdout, 
00748                     "Multiple calls with same victim! : %s total calls %d\n",
00749                     o.c_str(), calls);
00750             W_FATAL(smlevel_0::eINTERNAL);
00751         }
00752         aborted_tid = xd->tid();
00753         fprintf(stdout, "\n\n******************************** ABORTING\n\n");
00754         return RC(smlevel_0::eUSERABORT); 
00755     }
00756 
00757     fprintf(stdout, "\n\n******************************** ARCHIVING \n\n");
00758     fprintf(stdout, "Move aside log file log.%d to log.%d.bak\n", 
00759             target.file(), target.file());
00760     static char O[smlevel_0::max_devname<<1];
00761     strcat(O, filename);
00762     static char N[smlevel_0::max_devname<<1];
00763     strcat(N, filename);
00764     strcat(N, ".bak");
00765 
00766     int e = ::rename(O, N);
00767     if(e != 0) {
00768         fprintf(stdout, "Could not move %s to %s: error : %d %s\n",
00769                 O, N, e, strerror(errno));
00770         if(errno == ENOENT) {
00771             fprintf(stdout, "Ignored error.\n\n");
00772             return RCOK; 
00773         }
00774         fprintf(stdout, "Returning eOUTOFLOGSPACE.\n\n");
00775         rc = RC2(smlevel_0::eOUTOFLOGSPACE, errno); 
00776     } else {
00777         dump();
00778         fprintf(stdout, "archived ... OK\n\n");
00779         W_COERCE(ss_m::log_file_was_archived(filename));
00780     }
00781     return rc;
00782 }