sort_stream.cpp

00001 /*<std-header orig-src='shore'>
00002 
00003  $Id: sort_stream.cpp,v 1.2 2010/06/24 17:08:14 nhall Exp $
00004 
00005 SHORE -- Scalable Heterogeneous Object REpository
00006 
00007 Copyright (c) 1994-99 Computer Sciences Department, University of
00008                       Wisconsin -- Madison
00009 All Rights Reserved.
00010 
00011 Permission to use, copy, modify and distribute this software and its
00012 documentation is hereby granted, provided that both the copyright
00013 notice and this permission notice appear in all copies of the
00014 software, derivative works or modified versions, and any portions
00015 thereof, and that both notices appear in supporting documentation.
00016 
00017 THE AUTHORS AND THE COMPUTER SCIENCES DEPARTMENT OF THE UNIVERSITY
00018 OF WISCONSIN - MADISON ALLOW FREE USE OF THIS SOFTWARE IN ITS
00019 "AS IS" CONDITION, AND THEY DISCLAIM ANY LIABILITY OF ANY KIND
00020 FOR ANY DAMAGES WHATSOEVER RESULTING FROM THE USE OF THIS SOFTWARE.
00021 
00022 This software was developed with support by the Advanced Research
00023 Project Agency, ARPA order number 018 (formerly 8230), monitored by
00024 the U.S. Army Research Laboratory under contract DAAB07-91-C-Q518.
00025 Further funding for this work was provided by DARPA through
00026 Rome Research Laboratory Contract No. F30602-97-2-0247.
00027 
00028 */
00029 
00030 #include "w_defines.h"
00031 
00032 /*  -- do not edit anything above this line --   </std-header>*/
00033 
00034 /**\anchor sort_stream_i_example */
00035 /*
00036  * This program is a test of sorting with sort_stream
00037  */
00038 
00039 #include "sm_vas.h"
00040 ss_m* ssm = 0;
00041 
00042 // shorten error code type name
00043 typedef w_rc_t rc_t;
00044 
00045 // this is implemented in options.cpp
00046 w_rc_t init_config_options(option_group_t& options,
00047                         const char* prog_type,
00048                         int& argc, char** argv);
00049 
00050 
00051 struct file_info_t {
00052     static const char* key;
00053     stid_t         fid;
00054     rid_t       first_rid;
00055     int         num_rec;
00056     int         rec_size;
00057 };
00058 const char* file_info_t::key = "SCANFILE";
00059 
00060 ostream &
00061 operator << (ostream &o, const file_info_t &info)
00062 {
00063     o << "key " << info.key
00064     << " fid " << info.fid
00065     << " first_rid " << info.first_rid
00066     << " num_rec " << info.num_rec
00067     << " rec_size " << info.rec_size ;
00068     return o;
00069 }
00070 
00071 
00072 typedef        smlevel_0::smksize_t        smksize_t;
00073 
00074 
00075 
00076 void
00077 usage(option_group_t& options)
00078 {
00079     cerr << "Usage: sort_stream [-h] [-i] [options]" << endl;
00080     cerr << "       -i initialize device/volume and create file of records" << endl;
00081     cerr << "Valid options are: " << endl;
00082     options.print_usage(true, cerr);
00083 }
00084 
00085 /* create an smthread based class for all sm-related work */
00086 class smthread_user_t : public smthread_t {
00087         int        _argc;
00088         char        **_argv;
00089 
00090         const char *_device_name;
00091         smsize_t    _quota;
00092         int         _num_rec;
00093         smsize_t    _rec_size;
00094         lvid_t      _lvid;  
00095         rid_t       _start_rid;
00096         stid_t      _fid;
00097         bool        _initialize_device;
00098         option_group_t* _options;
00099         vid_t       _vid;
00100 public:
00101         int         retval;
00102 
00103         smthread_user_t(int ac, char **av) 
00104                 : smthread_t(t_regular, "smthread_user_t"),
00105                 _argc(ac), _argv(av), 
00106                 _device_name(NULL),
00107                 _quota(0),
00108                 _num_rec(0),
00109                 _rec_size(0),
00110                 _initialize_device(false),
00111                 _options(NULL),
00112                 _vid(1),
00113                 retval(0) { }
00114 
00115         ~smthread_user_t()  { if(_options) delete _options; }
00116 
00117         void run();
00118 
00119         // helpers for run()
00120         w_rc_t handle_options();
00121         w_rc_t find_file_info();
00122         w_rc_t create_the_file();
00123         w_rc_t demo_sort_stream();
00124         w_rc_t scan_the_root_index();
00125         w_rc_t do_work();
00126         w_rc_t do_init();
00127         w_rc_t no_init();
00128 
00129 };
00130 
00131 /*
00132  * looks up file info in the root index
00133 */
00134 w_rc_t
00135 smthread_user_t::find_file_info()
00136 {
00137     file_info_t  info;
00138     W_DO(ssm->begin_xct());
00139 
00140     bool        found;
00141     stid_t      _root_iid;
00142     W_DO(ss_m::vol_root_index(_vid, _root_iid));
00143 
00144     smsize_t    info_len = sizeof(info);
00145     const vec_t key_vec_tmp(file_info_t::key, strlen(file_info_t::key));
00146     W_DO(ss_m::find_assoc(_root_iid,
00147                           key_vec_tmp,
00148                           &info, info_len, found));
00149     if (!found) {
00150         cerr << "No file information found" <<endl;
00151         return RC(fcASSERT);
00152     } else {
00153        cerr << " found assoc "
00154                 << file_info_t::key << " --> " << info << endl;
00155     }
00156 
00157     W_DO(ssm->commit_xct());
00158 
00159     _start_rid = info.first_rid;
00160     _fid = info.fid;
00161     _rec_size = info.rec_size;
00162     _num_rec = info.num_rec;
00163     return RCOK;
00164 }
00165 
00166 /*
00167  * This function either formats a new device and creates a
00168  * volume on it, or mounts an already existing device and
00169  * returns the ID of the volume on it.
00170  *
00171  * It's borrowed from elsewhere; it can handle mounting
00172  * an already existing device, even though in this main program
00173  * we don't ever do that.
00174  */
00175 rc_t
00176 smthread_user_t::create_the_file() 
00177 {
00178     file_info_t info;  // will be made persistent in the
00179     // volume root index.
00180 
00181     // create and fill file to scan
00182     cout << "Creating a file with " << _num_rec 
00183         << " records of size " << _rec_size << endl;
00184     W_DO(ssm->begin_xct());
00185 
00186     // Create the file. Stuff its fid in the persistent file_info
00187     W_DO(ssm->create_file(_vid, info.fid, smlevel_3::t_regular));
00188     rid_t rid;
00189 
00190     _rec_size -= align(sizeof(int));
00191 
00192 /// each record will have its ordinal number in the header
00193 /// and zeros for data 
00194 
00195     char* dummy = new char[_rec_size];
00196     memset(dummy, '\0', _rec_size);
00197     vec_t data(dummy, _rec_size);
00198 
00199     for(int j=0; j < _num_rec; j++)
00200     {
00201         {
00202             w_ostrstream o(dummy, _rec_size);
00203             o << "Record number " << j << ends;
00204             w_assert1(o.c_str() == dummy);
00205         }
00206 
00207         // header contains record #
00208         // body contains zeroes of size _rec_size
00209         w_base_t::int4_t i = j;
00210         const vec_t hdr(&i, sizeof(i));
00211 
00212         W_COERCE(ssm->create_rec(info.fid, hdr,
00213                                 _rec_size, data, rid));
00214         cout << "Creating rec " << j << endl;
00215         if (j == 0) {
00216             info.first_rid = rid;
00217         }        
00218     }
00219     cout << "Created all. First rid " << info.first_rid << endl;
00220     delete [] dummy;
00221     info.num_rec = _num_rec;
00222     info.rec_size = _rec_size;
00223 
00224     // record file info in the root index : this stores some
00225     // attributes of the file in general
00226     stid_t      _root_iid;
00227     W_DO(ss_m::vol_root_index(_vid, _root_iid));
00228 
00229     const vec_t key_vec_tmp(file_info_t::key, strlen(file_info_t::key));
00230     const vec_t info_vec_tmp(&info, sizeof(info));
00231     W_DO(ss_m::create_assoc(_root_iid,
00232                             key_vec_tmp,
00233                             info_vec_tmp));
00234     cerr << "Creating assoc "
00235             << file_info_t::key << " --> " << info << endl;
00236     W_DO(ssm->commit_xct());
00237     return RCOK;
00238 }
00239 
00240 rc_t
00241 smthread_user_t::scan_the_root_index() 
00242 {
00243     W_DO(ssm->begin_xct());
00244     stid_t _root_iid;
00245     W_DO(ss_m::vol_root_index(_vid, _root_iid));
00246     cout << "Scanning index " << _root_iid << endl;
00247     scan_index_i scan(_root_iid, 
00248             scan_index_i::ge, vec_t::neg_inf,
00249             scan_index_i::le, vec_t::pos_inf, false,
00250             ss_m::t_cc_kvl);
00251     bool        eof(false);
00252     int         i(0);
00253     smsize_t    klen(0);
00254     smsize_t    elen(0);
00255 #define MAXKEYSIZE 100
00256     char *      keybuf[MAXKEYSIZE];
00257     file_info_t info;
00258 
00259     do {
00260         w_rc_t rc = scan.next(eof);
00261         if(rc.is_error()) {
00262             cerr << "Error getting next: " << rc << endl;
00263             retval = rc.err_num();
00264             return rc;
00265         }
00266         if(eof) break;
00267 
00268         // get the key len and element len
00269         W_DO(scan.curr(NULL, klen, NULL, elen));
00270         // Create vectors for the given lengths.
00271         vec_t key(keybuf, klen);
00272         vec_t elem(&info, elen);
00273         // Get the key and element value
00274         W_DO(scan.curr(&key, klen, &elem, elen));
00275 
00276         cout << "Key " << keybuf << endl;
00277         cout << "Value " 
00278         << " { fid " << info.fid 
00279         << " first_rid " << info.first_rid
00280         << " #rec " << info.num_rec
00281         << " rec size " << info.rec_size << " }"
00282         << endl;
00283         i++;
00284     } while (!eof);
00285     W_DO(ssm->commit_xct());
00286     return RCOK;
00287 }
00288 
00289 w_rc_t 
00290 smthread_user_t::demo_sort_stream()
00291 {
00292     // Recall that the records contain a header that indicates the
00293     // order in which the records were created.  Let's sort in
00294     // reverse order.
00295     w_base_t::int4_t hdrcontents;
00296 
00297     // Key descriptor
00298     using ssm_sort::key_info_t ;
00299 
00300     key_info_t     kinfo;
00301     kinfo.type = sortorder::kt_i4;
00302     kinfo.derived = false;
00303     kinfo.where = key_info_t::t_hdr;
00304     kinfo.offset = 0;
00305     kinfo.len = sizeof(hdrcontents);
00306     kinfo.est_reclen = _rec_size;
00307 
00308     // Behavioral options
00309     using ssm_sort::sort_parm_t ;
00310 
00311     sort_parm_t   behav;
00312     behav.run_size = 10; // pages
00313     behav.vol = _vid;
00314     behav.unique = false; // there should be no duplicates
00315     behav.ascending = false; // reverse the order
00316     behav.destructive = false; // don't blow away the original file --
00317     // immaterial for sort_stream_i
00318     behav.property = ss_m::t_temporary; // don't log the scratch files used
00319 
00320     sort_stream_i  stream(kinfo, behav, _rec_size);
00321     w_assert1(stream.is_sorted()==false);
00322 
00323     // Scan the file, inserting key,rid pairs into a sort stream
00324     W_DO(ssm->begin_xct());
00325 
00326     scan_file_i scan(_fid);
00327     pin_i*      cursor(NULL);
00328     bool        eof(false);
00329     int         i(0);
00330 
00331     do {
00332         w_rc_t rc = scan.next(cursor, 0, eof);
00333         if(rc.is_error()) {
00334             cerr << "Error getting next: " << rc << endl;
00335             retval = rc.err_num();
00336             return rc;
00337         }
00338         if(eof) break;
00339 
00340         vec_t       header (cursor->hdr(), cursor->hdr_size());
00341         header.copy_to(&hdrcontents, sizeof(hdrcontents));
00342         cout << "Record hdr "  << hdrcontents << endl;
00343 
00344         rid_t       rid = cursor->rid();
00345         vec_t       elem (&rid, sizeof(rid));
00346 
00347         stream.put(header, elem);
00348         i++;
00349     } while (!eof);
00350     w_assert1(i == _num_rec);
00351     W_DO(ssm->commit_xct());
00352 
00353     // is not sorted until first get_next call
00354     w_assert1(stream.is_sorted()==false);
00355 
00356     // Iterate over the sort_stream_i 
00357     W_DO(ssm->begin_xct());
00358     i--;
00359     eof = false;
00360     do {
00361         vec_t       header; 
00362         vec_t       elem; 
00363         W_DO(stream.get_next(header, elem, eof));
00364         if(eof) break;
00365 
00366         w_assert1(stream.is_sorted()==true);
00367         w_assert1(stream.is_empty()==false);
00368         header.copy_to(&hdrcontents, sizeof(hdrcontents));
00369 
00370         rid_t       rid; 
00371         elem.copy_to(&rid, sizeof(rid));
00372 
00373         w_assert1(i == hdrcontents);
00374 
00375         cout << "i " << hdrcontents << " rid " << rid << endl;
00376         i--;
00377     } while (!eof);
00378     W_DO(ssm->commit_xct());
00379     w_assert1(stream.is_empty()==false);
00380     w_assert1(stream.is_sorted()==true);
00381     return RCOK;
00382 }
00383 
00384 rc_t
00385 smthread_user_t::do_init()
00386 {
00387     cout << "-i: Initialize " << endl;
00388 
00389     {
00390         devid_t        devid;
00391         cout << "Formatting device: " << _device_name 
00392              << " with a " << _quota << "KB quota ..." << endl;
00393         W_DO(ssm->format_dev(_device_name, _quota, true));
00394 
00395         cout << "Mounting device: " << _device_name  << endl;
00396         // mount the new device
00397         u_int        vol_cnt;
00398         W_DO(ssm->mount_dev(_device_name, vol_cnt, devid));
00399 
00400         cout << "Mounted device: " << _device_name  
00401              << " volume count " << vol_cnt
00402              << " device " << devid
00403              << endl;
00404 
00405         // generate a volume ID for the new volume we are about to
00406         // create on the device
00407         cout << "Generating new lvid: " << endl;
00408         W_DO(ssm->generate_new_lvid(_lvid));
00409         cout << "Generated lvid " << _lvid <<  endl;
00410 
00411         // create the new volume 
00412         cout << "Creating a new volume on the device" << endl;
00413         cout << "    with a " << _quota << "KB quota ..." << endl;
00414 
00415         W_DO(ssm->create_vol(_device_name, _lvid, _quota, false, _vid));
00416         cout << "    with local handle(phys volid) " << _vid << endl;
00417 
00418     } 
00419 
00420     W_DO(create_the_file());
00421     W_DO(find_file_info());
00422     W_DO(demo_sort_stream());
00423     return RCOK;
00424 }
00425 
00426 rc_t
00427 smthread_user_t::no_init()
00428 {
00429     cout << "Using already-existing device: " << _device_name << endl;
00430     // mount already existing device
00431     devid_t      devid;
00432     u_int        vol_cnt;
00433     w_rc_t rc = ssm->mount_dev(_device_name, vol_cnt, devid);
00434     if (rc.is_error()) {
00435         cerr << "Error: could not mount device: " 
00436             << _device_name << endl;
00437         cerr << "   Did you forget to run the server with -i?" 
00438             << endl;
00439         return rc;
00440     }
00441     
00442     // find ID of the volume on the device
00443     lvid_t* lvid_list;
00444     u_int   lvid_cnt;
00445     W_DO(ssm->list_volumes(_device_name, lvid_list, lvid_cnt));
00446     if (lvid_cnt == 0) {
00447         cerr << "Error, device has no volumes" << endl;
00448         exit(1);
00449     }
00450     _lvid = lvid_list[0];
00451     delete [] lvid_list;
00452 
00453     W_COERCE(find_file_info());
00454     W_COERCE(scan_the_root_index());
00455     W_DO(demo_sort_stream());
00456     return RCOK;
00457 }
00458 
00459 rc_t
00460 smthread_user_t::do_work()
00461 {
00462     if (_initialize_device) W_DO(do_init());
00463     else  W_DO(no_init());
00464     return RCOK;
00465 }
00466 
00467 /**\defgroup EGOPTIONS Example of setting up options.
00468  * This method creates configuration options, starts up
00469  * the storage manager,
00470  */
00471 w_rc_t smthread_user_t::handle_options()
00472 {
00473     option_t* opt_device_name = 0;
00474     option_t* opt_device_quota = 0;
00475     option_t* opt_num_rec = 0;
00476 
00477     cout << "Processing configuration options ..." << endl;
00478 
00479     // Create an option group for my options.
00480     // I use a 3-level naming scheme:
00481     // executable-name.server.option-name
00482     // Thus, the file will contain lines like this:
00483     // sort_stream.server.device_name : /tmp/example/device
00484     // *.server.device_name : /tmp/example/device
00485     // sort_stream.*.device_name : /tmp/example/device
00486     //
00487     const int option_level_cnt = 3; 
00488 
00489     _options = new option_group_t (option_level_cnt);
00490     if(!_options) {
00491         cerr << "Out of memory: could not allocate from heap." <<
00492             endl;
00493         retval = 1;
00494         return RC(fcINTERNAL);
00495     }
00496     option_group_t &options(*_options);
00497 
00498     W_COERCE(options.add_option("device_name", "device/file name",
00499                          NULL, "device containg volume holding file to scan",
00500                          true, option_t::set_value_charstr,
00501                          opt_device_name));
00502 
00503     W_COERCE(options.add_option("device_quota", "# > 1000",
00504                          "2000", "quota for device",
00505                          false, option_t::set_value_long,
00506                          opt_device_quota));
00507 
00508     // Default number of records to create is 1.
00509     W_COERCE(options.add_option("num_rec", "# > 0",
00510                          "1", "number of records in file",
00511                          true, option_t::set_value_long,
00512                          opt_num_rec));
00513 
00514     // Have the SSM add its options to my group.
00515     W_COERCE(ss_m::setup_options(&options));
00516 
00517     cout << "Finding configuration option settings." << endl;
00518 
00519     w_rc_t rc = init_config_options(options, "server", _argc, _argv);
00520     if (rc.is_error()) {
00521         usage(options);
00522         retval = 1;
00523         return rc;
00524     }
00525     cout << "Processing command line." << endl;
00526 
00527     // Process the command line: looking for the "-h" flag
00528     int option;
00529     while ((option = getopt(_argc, _argv, "hi")) != -1) {
00530         switch (option) {
00531         case 'i' :
00532             _initialize_device = true;
00533             break;
00534 
00535         case 'h' :
00536             usage(options);
00537             break;
00538 
00539         default:
00540             usage(options);
00541             retval = 1;
00542             return RC(fcNOTIMPLEMENTED);
00543             break;
00544         }
00545     }
00546     {
00547         cout << "Checking for required options...";
00548         /* check that all required options have been set */
00549         w_ostrstream      err_stream;
00550         w_rc_t rc = options.check_required(&err_stream);
00551         if (rc.is_error()) {
00552             cerr << "These required options are not set:" << endl;
00553             cerr << err_stream.c_str() << endl;
00554             return rc;
00555         }
00556         cout << "OK " << endl;
00557     }
00558 
00559     // Grab the options values for later use by run()
00560     _device_name = opt_device_name->value();
00561     _quota = strtol(opt_device_quota->value(), 0, 0);
00562     _num_rec = strtol(opt_num_rec->value(), 0, 0);
00563 
00564     return RCOK;
00565 }
00566 
00567 void smthread_user_t::run()
00568 {
00569     w_rc_t rc = handle_options();
00570     if(rc.is_error()) {
00571         retval = 1;
00572         return;
00573     }
00574 
00575     // Now start a storage manager.
00576     cout << "Starting SSM and performing recovery ..." << endl;
00577     ssm = new ss_m();
00578     if (!ssm) {
00579         cerr << "Error: Out of memory for ss_m" << endl;
00580         retval = 1;
00581         return;
00582     }
00583 
00584     cout << "Getting SSM config info for record size ..." << endl;
00585 
00586     sm_config_info_t config_info;
00587     W_COERCE(ss_m::config_info(config_info));
00588     _rec_size = config_info.max_small_rec; // minus a header
00589 
00590     // Subroutine to set up the device and volume and
00591     // create the num_rec records of rec_size.
00592     rc = do_work();
00593 
00594     if (rc.is_error()) {
00595         cerr << "Could not set up device/volume due to: " << endl;
00596         cerr << rc << endl;
00597         delete ssm;
00598         rc = RCOK;   // force deletion of w_error_t info hanging off rc
00599                      // otherwise a leak for w_error_t will be reported
00600         retval = 1;
00601         if(rc.is_error()) 
00602             W_COERCE(rc); // avoid error not checked.
00603         return;
00604     }
00605 
00606 
00607     // Clean up and shut down
00608     cout << "\nShutting down SSM ..." << endl;
00609     delete ssm;
00610 
00611     cout << "Finished!" << endl;
00612 
00613     return;
00614 }
00615 
00616 // This was copied from file_scan so it has lots of extra junk
00617 int
00618 main(int argc, char* argv[])
00619 {
00620         smthread_user_t *smtu = new smthread_user_t(argc, argv);
00621         if (!smtu)
00622                 W_FATAL(fcOUTOFMEMORY);
00623 
00624         w_rc_t e = smtu->fork();
00625         if(e.is_error()) {
00626             cerr << "error forking thread: " << e <<endl;
00627             return 1;
00628         }
00629         e = smtu->join();
00630         if(e.is_error()) {
00631             cerr << "error forking thread: " << e <<endl;
00632             return 1;
00633         }
00634 
00635         int        rv = smtu->retval;
00636         delete smtu;
00637 
00638         return rv;
00639 }
00640 

Generated on Wed Jul 7 17:22:31 2010 for Shore Storage Manager by  doxygen 1.4.7