00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 
00012 
00013 
00014 
00015 
00016 
00017 
00018 
00019 
00020 
00021 
00022 
00023 
00024 
00025 
00026 
00027 
00028 
00029 
00030 #include "w_defines.h"
00031 
00032 
00033 
00034 
00035 
00036 
00037 
00038 
00039 #include "sm_vas.h"
00040 ss_m* ssm = 0;
00041 
00042 
00043 typedef w_rc_t rc_t;
00044 
00045 
00046 w_rc_t init_config_options(option_group_t& options,
00047                         const char* prog_type,
00048                         int& argc, char** argv);
00049 
00050 
00051 struct file_info_t {
00052     static const char* key;
00053     stid_t         fid;
00054     rid_t       first_rid;
00055     int         num_rec;
00056     int         rec_size;
00057 };
00058 const char* file_info_t::key = "SCANFILE";
00059 
00060 ostream &
00061 operator << (ostream &o, const file_info_t &info)
00062 {
00063     o << "key " << info.key
00064     << " fid " << info.fid
00065     << " first_rid " << info.first_rid
00066     << " num_rec " << info.num_rec
00067     << " rec_size " << info.rec_size ;
00068     return o;
00069 }
00070 
00071 
00072 typedef        smlevel_0::smksize_t        smksize_t;
00073 
00074 
00075 
00076 void
00077 usage(option_group_t& options)
00078 {
00079     cerr << "Usage: sort_stream [-h] [-i] [options]" << endl;
00080     cerr << "       -i initialize device/volume and create file of records" << endl;
00081     cerr << "Valid options are: " << endl;
00082     options.print_usage(true, cerr);
00083 }
00084 
00085 
00086 class smthread_user_t : public smthread_t {
00087         int        _argc;
00088         char        **_argv;
00089 
00090         const char *_device_name;
00091         smsize_t    _quota;
00092         int         _num_rec;
00093         smsize_t    _rec_size;
00094         lvid_t      _lvid;  
00095         rid_t       _start_rid;
00096         stid_t      _fid;
00097         bool        _initialize_device;
00098         option_group_t* _options;
00099         vid_t       _vid;
00100 public:
00101         int         retval;
00102 
00103         smthread_user_t(int ac, char **av) 
00104                 : smthread_t(t_regular, "smthread_user_t"),
00105                 _argc(ac), _argv(av), 
00106                 _device_name(NULL),
00107                 _quota(0),
00108                 _num_rec(0),
00109                 _rec_size(0),
00110                 _initialize_device(false),
00111                 _options(NULL),
00112                 _vid(1),
00113                 retval(0) { }
00114 
00115         ~smthread_user_t()  { if(_options) delete _options; }
00116 
00117         void run();
00118 
00119         
00120         w_rc_t handle_options();
00121         w_rc_t find_file_info();
00122         w_rc_t create_the_file();
00123         w_rc_t demo_sort_stream();
00124         w_rc_t scan_the_root_index();
00125         w_rc_t do_work();
00126         w_rc_t do_init();
00127         w_rc_t no_init();
00128 
00129 };
00130 
00131 
00132 
00133 
00134 w_rc_t
00135 smthread_user_t::find_file_info()
00136 {
00137     file_info_t  info;
00138     W_DO(ssm->begin_xct());
00139 
00140     bool        found;
00141     stid_t      _root_iid;
00142     W_DO(ss_m::vol_root_index(_vid, _root_iid));
00143 
00144     smsize_t    info_len = sizeof(info);
00145     const vec_t key_vec_tmp(file_info_t::key, strlen(file_info_t::key));
00146     W_DO(ss_m::find_assoc(_root_iid,
00147                           key_vec_tmp,
00148                           &info, info_len, found));
00149     if (!found) {
00150         cerr << "No file information found" <<endl;
00151         return RC(fcASSERT);
00152     } else {
00153        cerr << " found assoc "
00154                 << file_info_t::key << " --> " << info << endl;
00155     }
00156 
00157     W_DO(ssm->commit_xct());
00158 
00159     _start_rid = info.first_rid;
00160     _fid = info.fid;
00161     _rec_size = info.rec_size;
00162     _num_rec = info.num_rec;
00163     return RCOK;
00164 }
00165 
00166 
00167 
00168 
00169 
00170 
00171 
00172 
00173 
00174 
00175 rc_t
00176 smthread_user_t::create_the_file() 
00177 {
00178     file_info_t info;  
00179     
00180 
00181     
00182     cout << "Creating a file with " << _num_rec 
00183         << " records of size " << _rec_size << endl;
00184     W_DO(ssm->begin_xct());
00185 
00186     
00187     W_DO(ssm->create_file(_vid, info.fid, smlevel_3::t_regular));
00188     rid_t rid;
00189 
00190     _rec_size -= align(sizeof(int));
00191 
00192 
00193 
00194 
00195     char* dummy = new char[_rec_size];
00196     memset(dummy, '\0', _rec_size);
00197     vec_t data(dummy, _rec_size);
00198 
00199     for(int j=0; j < _num_rec; j++)
00200     {
00201         {
00202             w_ostrstream o(dummy, _rec_size);
00203             o << "Record number " << j << ends;
00204             w_assert1(o.c_str() == dummy);
00205         }
00206 
00207         
00208         
00209         w_base_t::int4_t i = j;
00210         const vec_t hdr(&i, sizeof(i));
00211 
00212         W_COERCE(ssm->create_rec(info.fid, hdr,
00213                                 _rec_size, data, rid));
00214         cout << "Creating rec " << j << endl;
00215         if (j == 0) {
00216             info.first_rid = rid;
00217         }        
00218     }
00219     cout << "Created all. First rid " << info.first_rid << endl;
00220     delete [] dummy;
00221     info.num_rec = _num_rec;
00222     info.rec_size = _rec_size;
00223 
00224     
00225     
00226     stid_t      _root_iid;
00227     W_DO(ss_m::vol_root_index(_vid, _root_iid));
00228 
00229     const vec_t key_vec_tmp(file_info_t::key, strlen(file_info_t::key));
00230     const vec_t info_vec_tmp(&info, sizeof(info));
00231     W_DO(ss_m::create_assoc(_root_iid,
00232                             key_vec_tmp,
00233                             info_vec_tmp));
00234     cerr << "Creating assoc "
00235             << file_info_t::key << " --> " << info << endl;
00236     W_DO(ssm->commit_xct());
00237     return RCOK;
00238 }
00239 
00240 rc_t
00241 smthread_user_t::scan_the_root_index() 
00242 {
00243     W_DO(ssm->begin_xct());
00244     stid_t _root_iid;
00245     W_DO(ss_m::vol_root_index(_vid, _root_iid));
00246     cout << "Scanning index " << _root_iid << endl;
00247     scan_index_i scan(_root_iid, 
00248             scan_index_i::ge, vec_t::neg_inf,
00249             scan_index_i::le, vec_t::pos_inf, false,
00250             ss_m::t_cc_kvl);
00251     bool        eof(false);
00252     int         i(0);
00253     smsize_t    klen(0);
00254     smsize_t    elen(0);
00255 #define MAXKEYSIZE 100
00256     char *      keybuf[MAXKEYSIZE];
00257     file_info_t info;
00258 
00259     do {
00260         w_rc_t rc = scan.next(eof);
00261         if(rc.is_error()) {
00262             cerr << "Error getting next: " << rc << endl;
00263             retval = rc.err_num();
00264             return rc;
00265         }
00266         if(eof) break;
00267 
00268         
00269         W_DO(scan.curr(NULL, klen, NULL, elen));
00270         
00271         vec_t key(keybuf, klen);
00272         vec_t elem(&info, elen);
00273         
00274         W_DO(scan.curr(&key, klen, &elem, elen));
00275 
00276         cout << "Key " << keybuf << endl;
00277         cout << "Value " 
00278         << " { fid " << info.fid 
00279         << " first_rid " << info.first_rid
00280         << " #rec " << info.num_rec
00281         << " rec size " << info.rec_size << " }"
00282         << endl;
00283         i++;
00284     } while (!eof);
00285     W_DO(ssm->commit_xct());
00286     return RCOK;
00287 }
00288 
00289 w_rc_t 
00290 smthread_user_t::demo_sort_stream()
00291 {
00292     
00293     
00294     
00295     w_base_t::int4_t hdrcontents;
00296 
00297     
00298     using ssm_sort::key_info_t ;
00299 
00300     key_info_t     kinfo;
00301     kinfo.type = sortorder::kt_i4;
00302     kinfo.derived = false;
00303     kinfo.where = key_info_t::t_hdr;
00304     kinfo.offset = 0;
00305     kinfo.len = sizeof(hdrcontents);
00306     kinfo.est_reclen = _rec_size;
00307 
00308     
00309     using ssm_sort::sort_parm_t ;
00310 
00311     sort_parm_t   behav;
00312     behav.run_size = 10; 
00313     behav.vol = _vid;
00314     behav.unique = false; 
00315     behav.ascending = false; 
00316     behav.destructive = false; 
00317     
00318     behav.property = ss_m::t_temporary; 
00319 
00320     sort_stream_i  stream(kinfo, behav, _rec_size);
00321     w_assert1(stream.is_sorted()==false);
00322 
00323     
00324     W_DO(ssm->begin_xct());
00325 
00326     scan_file_i scan(_fid);
00327     pin_i*      cursor(NULL);
00328     bool        eof(false);
00329     int         i(0);
00330 
00331     do {
00332         w_rc_t rc = scan.next(cursor, 0, eof);
00333         if(rc.is_error()) {
00334             cerr << "Error getting next: " << rc << endl;
00335             retval = rc.err_num();
00336             return rc;
00337         }
00338         if(eof) break;
00339 
00340         vec_t       header (cursor->hdr(), cursor->hdr_size());
00341         header.copy_to(&hdrcontents, sizeof(hdrcontents));
00342         cout << "Record hdr "  << hdrcontents << endl;
00343 
00344         rid_t       rid = cursor->rid();
00345         vec_t       elem (&rid, sizeof(rid));
00346 
00347         stream.put(header, elem);
00348         i++;
00349     } while (!eof);
00350     w_assert1(i == _num_rec);
00351     W_DO(ssm->commit_xct());
00352 
00353     
00354     w_assert1(stream.is_sorted()==false);
00355 
00356     
00357     W_DO(ssm->begin_xct());
00358     i--;
00359     eof = false;
00360     do {
00361         vec_t       header; 
00362         vec_t       elem; 
00363         W_DO(stream.get_next(header, elem, eof));
00364         if(eof) break;
00365 
00366         w_assert1(stream.is_sorted()==true);
00367         w_assert1(stream.is_empty()==false);
00368         header.copy_to(&hdrcontents, sizeof(hdrcontents));
00369 
00370         rid_t       rid; 
00371         elem.copy_to(&rid, sizeof(rid));
00372 
00373         w_assert1(i == hdrcontents);
00374 
00375         cout << "i " << hdrcontents << " rid " << rid << endl;
00376         i--;
00377     } while (!eof);
00378     W_DO(ssm->commit_xct());
00379     w_assert1(stream.is_empty()==false);
00380     w_assert1(stream.is_sorted()==true);
00381     return RCOK;
00382 }
00383 
00384 rc_t
00385 smthread_user_t::do_init()
00386 {
00387     cout << "-i: Initialize " << endl;
00388 
00389     {
00390         devid_t        devid;
00391         cout << "Formatting device: " << _device_name 
00392              << " with a " << _quota << "KB quota ..." << endl;
00393         W_DO(ssm->format_dev(_device_name, _quota, true));
00394 
00395         cout << "Mounting device: " << _device_name  << endl;
00396         
00397         u_int        vol_cnt;
00398         W_DO(ssm->mount_dev(_device_name, vol_cnt, devid));
00399 
00400         cout << "Mounted device: " << _device_name  
00401              << " volume count " << vol_cnt
00402              << " device " << devid
00403              << endl;
00404 
00405         
00406         
00407         cout << "Generating new lvid: " << endl;
00408         W_DO(ssm->generate_new_lvid(_lvid));
00409         cout << "Generated lvid " << _lvid <<  endl;
00410 
00411         
00412         cout << "Creating a new volume on the device" << endl;
00413         cout << "    with a " << _quota << "KB quota ..." << endl;
00414 
00415         W_DO(ssm->create_vol(_device_name, _lvid, _quota, false, _vid));
00416         cout << "    with local handle(phys volid) " << _vid << endl;
00417 
00418     } 
00419 
00420     W_DO(create_the_file());
00421     W_DO(find_file_info());
00422     W_DO(demo_sort_stream());
00423     return RCOK;
00424 }
00425 
00426 rc_t
00427 smthread_user_t::no_init()
00428 {
00429     cout << "Using already-existing device: " << _device_name << endl;
00430     
00431     devid_t      devid;
00432     u_int        vol_cnt;
00433     w_rc_t rc = ssm->mount_dev(_device_name, vol_cnt, devid);
00434     if (rc.is_error()) {
00435         cerr << "Error: could not mount device: " 
00436             << _device_name << endl;
00437         cerr << "   Did you forget to run the server with -i?" 
00438             << endl;
00439         return rc;
00440     }
00441     
00442     
00443     lvid_t* lvid_list;
00444     u_int   lvid_cnt;
00445     W_DO(ssm->list_volumes(_device_name, lvid_list, lvid_cnt));
00446     if (lvid_cnt == 0) {
00447         cerr << "Error, device has no volumes" << endl;
00448         exit(1);
00449     }
00450     _lvid = lvid_list[0];
00451     delete [] lvid_list;
00452 
00453     W_COERCE(find_file_info());
00454     W_COERCE(scan_the_root_index());
00455     W_DO(demo_sort_stream());
00456     return RCOK;
00457 }
00458 
00459 rc_t
00460 smthread_user_t::do_work()
00461 {
00462     if (_initialize_device) W_DO(do_init());
00463     else  W_DO(no_init());
00464     return RCOK;
00465 }
00466 
00467 
00468 
00469 
00470 
00471 w_rc_t smthread_user_t::handle_options()
00472 {
00473     option_t* opt_device_name = 0;
00474     option_t* opt_device_quota = 0;
00475     option_t* opt_num_rec = 0;
00476 
00477     cout << "Processing configuration options ..." << endl;
00478 
00479     
00480     
00481     
00482     
00483     
00484     
00485     
00486     
00487     const int option_level_cnt = 3; 
00488 
00489     _options = new option_group_t (option_level_cnt);
00490     if(!_options) {
00491         cerr << "Out of memory: could not allocate from heap." <<
00492             endl;
00493         retval = 1;
00494         return RC(fcINTERNAL);
00495     }
00496     option_group_t &options(*_options);
00497 
00498     W_COERCE(options.add_option("device_name", "device/file name",
00499                          NULL, "device containg volume holding file to scan",
00500                          true, option_t::set_value_charstr,
00501                          opt_device_name));
00502 
00503     W_COERCE(options.add_option("device_quota", "# > 1000",
00504                          "2000", "quota for device",
00505                          false, option_t::set_value_long,
00506                          opt_device_quota));
00507 
00508     
00509     W_COERCE(options.add_option("num_rec", "# > 0",
00510                          "1", "number of records in file",
00511                          true, option_t::set_value_long,
00512                          opt_num_rec));
00513 
00514     
00515     W_COERCE(ss_m::setup_options(&options));
00516 
00517     cout << "Finding configuration option settings." << endl;
00518 
00519     w_rc_t rc = init_config_options(options, "server", _argc, _argv);
00520     if (rc.is_error()) {
00521         usage(options);
00522         retval = 1;
00523         return rc;
00524     }
00525     cout << "Processing command line." << endl;
00526 
00527     
00528     int option;
00529     while ((option = getopt(_argc, _argv, "hi")) != -1) {
00530         switch (option) {
00531         case 'i' :
00532             _initialize_device = true;
00533             break;
00534 
00535         case 'h' :
00536             usage(options);
00537             break;
00538 
00539         default:
00540             usage(options);
00541             retval = 1;
00542             return RC(fcNOTIMPLEMENTED);
00543             break;
00544         }
00545     }
00546     {
00547         cout << "Checking for required options...";
00548         
00549         w_ostrstream      err_stream;
00550         w_rc_t rc = options.check_required(&err_stream);
00551         if (rc.is_error()) {
00552             cerr << "These required options are not set:" << endl;
00553             cerr << err_stream.c_str() << endl;
00554             return rc;
00555         }
00556         cout << "OK " << endl;
00557     }
00558 
00559     
00560     _device_name = opt_device_name->value();
00561     _quota = strtol(opt_device_quota->value(), 0, 0);
00562     _num_rec = strtol(opt_num_rec->value(), 0, 0);
00563 
00564     return RCOK;
00565 }
00566 
00567 void smthread_user_t::run()
00568 {
00569     w_rc_t rc = handle_options();
00570     if(rc.is_error()) {
00571         retval = 1;
00572         return;
00573     }
00574 
00575     
00576     cout << "Starting SSM and performing recovery ..." << endl;
00577     ssm = new ss_m();
00578     if (!ssm) {
00579         cerr << "Error: Out of memory for ss_m" << endl;
00580         retval = 1;
00581         return;
00582     }
00583 
00584     cout << "Getting SSM config info for record size ..." << endl;
00585 
00586     sm_config_info_t config_info;
00587     W_COERCE(ss_m::config_info(config_info));
00588     _rec_size = config_info.max_small_rec; 
00589 
00590     
00591     
00592     rc = do_work();
00593 
00594     if (rc.is_error()) {
00595         cerr << "Could not set up device/volume due to: " << endl;
00596         cerr << rc << endl;
00597         delete ssm;
00598         rc = RCOK;   
00599                      
00600         retval = 1;
00601         if(rc.is_error()) 
00602             W_COERCE(rc); 
00603         return;
00604     }
00605 
00606 
00607     
00608     cout << "\nShutting down SSM ..." << endl;
00609     delete ssm;
00610 
00611     cout << "Finished!" << endl;
00612 
00613     return;
00614 }
00615 
00616 
00617 int
00618 main(int argc, char* argv[])
00619 {
00620         smthread_user_t *smtu = new smthread_user_t(argc, argv);
00621         if (!smtu)
00622                 W_FATAL(fcOUTOFMEMORY);
00623 
00624         w_rc_t e = smtu->fork();
00625         if(e.is_error()) {
00626             cerr << "error forking thread: " << e <<endl;
00627             return 1;
00628         }
00629         e = smtu->join();
00630         if(e.is_error()) {
00631             cerr << "error forking thread: " << e <<endl;
00632             return 1;
00633         }
00634 
00635         int        rv = smtu->retval;
00636         delete smtu;
00637 
00638         return rv;
00639 }
00640