00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 
00012 
00013 
00014 
00015 
00016 
00017 
00018 
00019 
00020 
00021 
00022 
00023 
00024 
00025 
00026 
00027 
00028 
00029 
00030 #include "w_defines.h"
00031 
00032 
00033 
00034 
00035 
00036 
00037 
00038 
00039 #include "sm_vas.h"
00040 ss_m* ssm = 0;
00041 
00042 
00043 typedef w_rc_t rc_t;
00044 
00045 
00046 w_rc_t init_config_options(option_group_t& options,
00047                         const char* prog_type,
00048                         int& argc, char** argv);
00049 
00050 
00051 struct file_info_t {
00052     static const char* key;
00053     stid_t         fid;
00054     rid_t       first_rid;
00055     int         num_rec;
00056     int         rec_size;
00057 };
00058 const char* file_info_t::key = "SCANFILE";
00059 
00060 ostream &
00061 operator << (ostream &o, const file_info_t &info)
00062 {
00063     o << "key " << info.key
00064     << " fid " << info.fid
00065     << " first_rid " << info.first_rid
00066     << " num_rec " << info.num_rec
00067     << " rec_size " << info.rec_size ;
00068     return o;
00069 }
00070 
00071 
00072 typedef        smlevel_0::smksize_t        smksize_t;
00073 
00074 
00075 
00076 void
00077 usage(option_group_t& options)
00078 {
00079     cerr << "Usage: create_rec [-h] [-i] [options]" << endl;
00080     cerr << "       -i initialize device/volume and create file of records" << endl;
00081     cerr << "Valid options are: " << endl;
00082     options.print_usage(true, cerr);
00083 }
00084 
00085 
00086 class smthread_user_t : public smthread_t {
00087         int        _argc;
00088         char        **_argv;
00089 
00090         const char *_device_name;
00091         smsize_t    _quota;
00092         int         _num_rec;
00093         smsize_t    _rec_size;
00094         lvid_t      _lvid;  
00095         rid_t       _start_rid;
00096         stid_t      _fid;
00097         bool        _initialize_device;
00098         option_group_t* _options;
00099         vid_t       _vid;
00100 public:
00101         int         retval;
00102 
00103         smthread_user_t(int ac, char **av) 
00104                 : smthread_t(t_regular, "smthread_user_t"),
00105                 _argc(ac), _argv(av), 
00106                 _device_name(NULL),
00107                 _quota(0),
00108                 _num_rec(0),
00109                 _rec_size(0),
00110                 _initialize_device(false),
00111                 _options(NULL),
00112                 _vid(1),
00113                 retval(0) { }
00114 
00115         ~smthread_user_t()  { if(_options) delete _options; }
00116 
00117         void run();
00118 
00119         
00120         w_rc_t handle_options();
00121         w_rc_t find_file_info();
00122         w_rc_t create_the_file();
00123         w_rc_t scan_the_file();
00124         w_rc_t scan_the_root_index();
00125         w_rc_t do_work();
00126         w_rc_t do_init();
00127         w_rc_t no_init();
00128         w_rc_t vtable_locks();
00129         w_rc_t vtable_threads();
00130         w_rc_t vtable_xcts();
00131 
00132 };
00133 
00134 
00135 
00136 
00137 w_rc_t
00138 smthread_user_t::find_file_info()
00139 {
00140     file_info_t  info;
00141     W_DO(ssm->begin_xct());
00142 
00143     bool        found;
00144     stid_t      _root_iid;
00145     W_DO(ss_m::vol_root_index(_vid, _root_iid));
00146 
00147     smsize_t    info_len = sizeof(info);
00148     const vec_t key_vec_tmp(file_info_t::key, strlen(file_info_t::key));
00149     W_DO(ss_m::find_assoc(_root_iid,
00150                           key_vec_tmp,
00151                           &info, info_len, found));
00152     if (!found) {
00153         cerr << "No file information found" <<endl;
00154         return RC(fcASSERT);
00155     } else {
00156        cerr << " found assoc "
00157                 << file_info_t::key << " --> " << info << endl;
00158     }
00159 
00160     W_DO(ssm->commit_xct());
00161 
00162     _start_rid = info.first_rid;
00163     _fid = info.fid;
00164     _rec_size = info.rec_size;
00165     _num_rec = info.num_rec;
00166     return RCOK;
00167 }
00168 
00169 
00170 
00171 
00172 
00173 
00174 
00175 
00176 
00177 
00178 rc_t
00179 smthread_user_t::create_the_file() 
00180 {
00181     file_info_t info;  
00182     
00183 
00184     
00185     cout << "Creating a file with " << _num_rec 
00186         << " records of size " << _rec_size << endl;
00187     W_DO(ssm->begin_xct());
00188 
00189     
00190     W_DO(ssm->create_file(_vid, info.fid, smlevel_3::t_regular));
00191     rid_t rid;
00192 
00193     _rec_size -= align(sizeof(int));
00194 
00195 
00196 
00197 
00198     char* dummy = new char[_rec_size];
00199     memset(dummy, '\0', _rec_size);
00200     vec_t data(dummy, _rec_size);
00201 
00202     for(int j=0; j < _num_rec; j++)
00203     {
00204         {
00205             w_ostrstream o(dummy, _rec_size);
00206             o << "Record number " << j << ends;
00207             w_assert1(o.c_str() == dummy);
00208         }
00209         
00210         int i = j;
00211         const vec_t hdr(&i, sizeof(i));
00212         W_COERCE(ssm->create_rec(info.fid, hdr,
00213                                 _rec_size, data, rid));
00214         if (j == 0) {
00215             info.first_rid = rid;
00216         }        
00217     }
00218     cout << "Created all. First rid " << info.first_rid << endl;
00219     delete [] dummy;
00220     info.num_rec = _num_rec;
00221     info.rec_size = _rec_size;
00222 
00223     
00224     
00225     stid_t      _root_iid;
00226     W_DO(ss_m::vol_root_index(_vid, _root_iid));
00227 
00228     const vec_t key_vec_tmp(file_info_t::key, strlen(file_info_t::key));
00229     const vec_t info_vec_tmp(&info, sizeof(info));
00230     W_DO(ss_m::create_assoc(_root_iid,
00231                             key_vec_tmp,
00232                             info_vec_tmp));
00233     W_DO(vtable_locks());
00234     W_DO(vtable_xcts());
00235     W_DO(vtable_threads());
00236 
00237     W_DO(ssm->commit_xct());
00238     return RCOK;
00239 }
00240 
00241 rc_t
00242 smthread_user_t::scan_the_root_index() 
00243 {
00244     W_DO(ssm->begin_xct());
00245     stid_t _root_iid;
00246     W_DO(ss_m::vol_root_index(_vid, _root_iid));
00247     cout << "Scanning index " << _root_iid << endl;
00248     scan_index_i scan(_root_iid, 
00249             scan_index_i::ge, vec_t::neg_inf,
00250             scan_index_i::le, vec_t::pos_inf, false,
00251             ss_m::t_cc_kvl);
00252     bool        eof(false);
00253     int         i(0);
00254     smsize_t    klen(0);
00255     smsize_t    elen(0);
00256 #define MAXKEYSIZE 100
00257     char *      keybuf[MAXKEYSIZE];
00258     file_info_t info;
00259 
00260     do {
00261         w_rc_t rc = scan.next(eof);
00262         if(rc.is_error()) {
00263             cerr << "Error getting next: " << rc << endl;
00264             retval = rc.err_num();
00265             return rc;
00266         }
00267         if(eof) break;
00268 
00269         
00270         W_DO(scan.curr(NULL, klen, NULL, elen));
00271         
00272         vec_t key(keybuf, klen);
00273         vec_t elem(&info, elen);
00274         
00275         W_DO(scan.curr(&key, klen, &elem, elen));
00276 
00277         cout << "Key " << keybuf << endl;
00278         cout << "Value " 
00279         << " { fid " << info.fid 
00280         << " first_rid " << info.first_rid
00281         << " #rec " << info.num_rec
00282         << " rec size " << info.rec_size << " }"
00283         << endl;
00284         i++;
00285     } while (!eof);
00286     W_DO(ssm->commit_xct());
00287     return RCOK;
00288 }
00289 
00290 rc_t
00291 smthread_user_t::scan_the_file() 
00292 {
00293     cout << "Scanning file " << _fid << endl;
00294     W_DO(ssm->begin_xct());
00295 
00296     scan_file_i scan(_fid);
00297     pin_i*      cursor(NULL);
00298     bool        eof(false);
00299     int         i(0);
00300 
00301     do {
00302         w_rc_t rc = scan.next(cursor, 0, eof);
00303         if(rc.is_error()) {
00304             cerr << "Error getting next: " << rc << endl;
00305             retval = rc.err_num();
00306             return rc;
00307         }
00308         if(eof) break;
00309 
00310         vec_t       header (cursor->hdr(), cursor->hdr_size());
00311         int         hdrcontents;
00312         header.copy_to(&hdrcontents, sizeof(hdrcontents));
00313 
00314         w_assert1(cursor->body_size() == _rec_size);
00315         i++;
00316     } while (!eof);
00317     w_assert1(i == _num_rec);
00318 
00319     W_DO(vtable_locks());
00320     W_DO(vtable_xcts());
00321     W_DO(vtable_threads());
00322 
00323     W_DO(ssm->commit_xct());
00324     return RCOK;
00325 }
00326 
00327 rc_t
00328 smthread_user_t::do_init()
00329 {
00330     cout << "-i: Initialize " << endl;
00331 
00332     {
00333         devid_t        devid;
00334         cout << "Formatting device: " << _device_name 
00335              << " with a " << _quota << "KB quota ..." << endl;
00336         W_DO(ssm->format_dev(_device_name, _quota, true));
00337 
00338         cout << "Mounting device: " << _device_name  << endl;
00339         
00340         u_int        vol_cnt;
00341         W_DO(ssm->mount_dev(_device_name, vol_cnt, devid));
00342 
00343         cout << "Mounted device: " << _device_name  
00344              << " volume count " << vol_cnt
00345              << " device " << devid
00346              << endl;
00347 
00348         
00349         
00350         cout << "Generating new lvid: " << endl;
00351         W_DO(ssm->generate_new_lvid(_lvid));
00352         cout << "Generated lvid " << _lvid <<  endl;
00353 
00354         
00355         cout << "Creating a new volume on the device" << endl;
00356         cout << "    with a " << _quota << "KB quota ..." << endl;
00357 
00358         W_DO(ssm->create_vol(_device_name, _lvid, _quota, false, _vid));
00359         cout << "    with local handle(phys volid) " << _vid << endl;
00360 
00361     } 
00362 
00363     W_DO(create_the_file());
00364     return RCOK;
00365 }
00366 
00367 rc_t
00368 smthread_user_t::no_init()
00369 {
00370     cout << "Using already-existing device: " << _device_name << endl;
00371     
00372     devid_t      devid;
00373     u_int        vol_cnt;
00374     w_rc_t rc = ssm->mount_dev(_device_name, vol_cnt, devid);
00375     if (rc.is_error()) {
00376         cerr << "Error: could not mount device: " 
00377             << _device_name << endl;
00378         cerr << "   Did you forget to run the server with -i?" 
00379             << endl;
00380         return rc;
00381     }
00382     
00383     
00384     lvid_t* lvid_list;
00385     u_int   lvid_cnt;
00386     W_DO(ssm->list_volumes(_device_name, lvid_list, lvid_cnt));
00387     if (lvid_cnt == 0) {
00388         cerr << "Error, device has no volumes" << endl;
00389         exit(1);
00390     }
00391     _lvid = lvid_list[0];
00392     delete [] lvid_list;
00393 
00394     W_COERCE(find_file_info());
00395     W_COERCE(scan_the_root_index());
00396     W_DO(scan_the_file());
00397     return RCOK;
00398 }
00399 
00400 rc_t
00401 smthread_user_t::do_work()
00402 {
00403     if (_initialize_device) W_DO(do_init());
00404     else  W_DO(no_init());
00405 
00406     return RCOK;
00407 }
00408 
00409 
00410 
00411 
00412 
00413 w_rc_t smthread_user_t::handle_options()
00414 {
00415     option_t* opt_device_name = 0;
00416     option_t* opt_device_quota = 0;
00417     option_t* opt_num_rec = 0;
00418 
00419     cout << "Processing configuration options ..." << endl;
00420 
00421     
00422     
00423     
00424     
00425     
00426     
00427     
00428     
00429     const int option_level_cnt = 3; 
00430 
00431     _options = new option_group_t (option_level_cnt);
00432     if(!_options) {
00433         cerr << "Out of memory: could not allocate from heap." <<
00434             endl;
00435         retval = 1;
00436         return RC(fcINTERNAL);
00437     }
00438     option_group_t &options(*_options);
00439 
00440     W_COERCE(options.add_option("device_name", "device/file name",
00441                          NULL, "device containg volume holding file to scan",
00442                          true, option_t::set_value_charstr,
00443                          opt_device_name));
00444 
00445     W_COERCE(options.add_option("device_quota", "# > 1000",
00446                          "2000", "quota for device",
00447                          false, option_t::set_value_long,
00448                          opt_device_quota));
00449 
00450     
00451     W_COERCE(options.add_option("num_rec", "# > 0",
00452                          "1", "number of records in file",
00453                          true, option_t::set_value_long,
00454                          opt_num_rec));
00455 
00456     
00457     W_COERCE(ss_m::setup_options(&options));
00458 
00459     cout << "Finding configuration option settings." << endl;
00460 
00461     w_rc_t rc = init_config_options(options, "server", _argc, _argv);
00462     if (rc.is_error()) {
00463         usage(options);
00464         retval = 1;
00465         return rc;
00466     }
00467     cout << "Processing command line." << endl;
00468 
00469     
00470     int option;
00471     while ((option = getopt(_argc, _argv, "hi")) != -1) {
00472         switch (option) {
00473         case 'i' :
00474             _initialize_device = true;
00475             break;
00476 
00477         case 'h' :
00478             usage(options);
00479             break;
00480 
00481         default:
00482             usage(options);
00483             retval = 1;
00484             return RC(fcNOTIMPLEMENTED);
00485             break;
00486         }
00487     }
00488     {
00489         cout << "Checking for required options...";
00490         
00491         w_ostrstream      err_stream;
00492         w_rc_t rc = options.check_required(&err_stream);
00493         if (rc.is_error()) {
00494             cerr << "These required options are not set:" << endl;
00495             cerr << err_stream.c_str() << endl;
00496             return rc;
00497         }
00498         cout << "OK " << endl;
00499     }
00500 
00501     
00502     _device_name = opt_device_name->value();
00503     _quota = strtol(opt_device_quota->value(), 0, 0);
00504     _num_rec = strtol(opt_num_rec->value(), 0, 0);
00505 
00506     return RCOK;
00507 }
00508 
00509 void smthread_user_t::run()
00510 {
00511     w_rc_t rc = handle_options();
00512     if(rc.is_error()) {
00513         retval = 1;
00514         return;
00515     }
00516 
00517     
00518     cout << "Starting SSM and performing recovery ..." << endl;
00519     ssm = new ss_m();
00520     if (!ssm) {
00521         cerr << "Error: Out of memory for ss_m" << endl;
00522         retval = 1;
00523         return;
00524     }
00525 
00526     cout << "Getting SSM config info for record size ..." << endl;
00527 
00528     sm_config_info_t config_info;
00529     W_COERCE(ss_m::config_info(config_info));
00530     _rec_size = config_info.max_small_rec; 
00531 
00532     
00533     
00534     rc = do_work();
00535 
00536     if (rc.is_error()) {
00537         cerr << "Could not set up device/volume due to: " << endl;
00538         cerr << rc << endl;
00539         delete ssm;
00540         rc = RCOK;   
00541                      
00542         retval = 1;
00543         if(rc.is_error()) 
00544             W_COERCE(rc); 
00545         return;
00546     }
00547 
00548 
00549     
00550     cout << "\nShutting down SSM ..." << endl;
00551     delete ssm;
00552 
00553     cout << "Finished!" << endl;
00554 
00555     return;
00556 }
00557 
00558 w_rc_t
00559 smthread_user_t::vtable_locks() 
00560 {
00561     vtable_t vt;
00562     W_DO(ss_m::lock_collect(vt));
00563     w_ostrstream o;
00564     vt.operator<<(o);
00565     fprintf(stderr, "Locks %s\n", o.c_str());
00566     return RCOK;
00567 }
00568 
00569 w_rc_t
00570 smthread_user_t::vtable_xcts() 
00571 {
00572     vtable_t vt;
00573     W_DO(ss_m::xct_collect(vt));
00574     w_ostrstream o;
00575     vt.operator<<(o);
00576     fprintf(stderr, "Transactions %s\n", o.c_str());
00577     return RCOK;
00578 }
00579 
00580 w_rc_t
00581 smthread_user_t::vtable_threads() 
00582 {
00583     vtable_t vt;
00584     W_DO(ss_m::thread_collect(vt));
00585     w_ostrstream o;
00586     vt.operator<<(o);
00587     fprintf(stderr, "Transactions %s\n", o.c_str());
00588     return RCOK;
00589 }
00590 
00591 
00592 
00593 int
00594 main(int argc, char* argv[])
00595 {
00596         smthread_user_t *smtu = new smthread_user_t(argc, argv);
00597         if (!smtu)
00598                 W_FATAL(fcOUTOFMEMORY);
00599 
00600         w_rc_t e = smtu->fork();
00601         if(e.is_error()) {
00602             cerr << "error forking thread: " << e <<endl;
00603             return 1;
00604         }
00605         e = smtu->join();
00606         if(e.is_error()) {
00607             cerr << "error forking thread: " << e <<endl;
00608             return 1;
00609         }
00610 
00611         int        rv = smtu->retval;
00612         delete smtu;
00613 
00614         return rv;
00615 }
00616