00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030 #include "w_defines.h"
00031
00032
00033
00034
00035
00036
00037
00038
00039 #include "sm_vas.h"
00040 ss_m* ssm = 0;
00041
00042
00043 typedef w_rc_t rc_t;
00044
00045
00046 w_rc_t init_config_options(option_group_t& options,
00047 const char* prog_type,
00048 int& argc, char** argv);
00049
00050
00051 struct file_info_t {
00052 static const char* key;
00053 stid_t fid;
00054 rid_t first_rid;
00055 int num_rec;
00056 int rec_size;
00057 };
00058 const char* file_info_t::key = "SCANFILE";
00059
00060 ostream &
00061 operator << (ostream &o, const file_info_t &info)
00062 {
00063 o << "key " << info.key
00064 << " fid " << info.fid
00065 << " first_rid " << info.first_rid
00066 << " num_rec " << info.num_rec
00067 << " rec_size " << info.rec_size ;
00068 return o;
00069 }
00070
00071
00072 typedef smlevel_0::smksize_t smksize_t;
00073
00074
00075
00076 void
00077 usage(option_group_t& options)
00078 {
00079 cerr << "Usage: sort_stream [-h] [-i] [options]" << endl;
00080 cerr << " -i initialize device/volume and create file of records" << endl;
00081 cerr << "Valid options are: " << endl;
00082 options.print_usage(true, cerr);
00083 }
00084
00085
00086 class smthread_user_t : public smthread_t {
00087 int _argc;
00088 char **_argv;
00089
00090 const char *_device_name;
00091 smsize_t _quota;
00092 int _num_rec;
00093 smsize_t _rec_size;
00094 lvid_t _lvid;
00095 rid_t _start_rid;
00096 stid_t _fid;
00097 bool _initialize_device;
00098 option_group_t* _options;
00099 vid_t _vid;
00100 public:
00101 int retval;
00102
00103 smthread_user_t(int ac, char **av)
00104 : smthread_t(t_regular, "smthread_user_t"),
00105 _argc(ac), _argv(av),
00106 _device_name(NULL),
00107 _quota(0),
00108 _num_rec(0),
00109 _rec_size(0),
00110 _initialize_device(false),
00111 _options(NULL),
00112 _vid(1),
00113 retval(0) { }
00114
00115 ~smthread_user_t() { if(_options) delete _options; }
00116
00117 void run();
00118
00119
00120 w_rc_t handle_options();
00121 w_rc_t find_file_info();
00122 w_rc_t create_the_file();
00123 w_rc_t demo_sort_stream();
00124 w_rc_t scan_the_root_index();
00125 w_rc_t do_work();
00126 w_rc_t do_init();
00127 w_rc_t no_init();
00128
00129 };
00130
00131
00132
00133
00134 w_rc_t
00135 smthread_user_t::find_file_info()
00136 {
00137 file_info_t info;
00138 W_DO(ssm->begin_xct());
00139
00140 bool found;
00141 stid_t _root_iid;
00142 W_DO(ss_m::vol_root_index(_vid, _root_iid));
00143
00144 smsize_t info_len = sizeof(info);
00145 const vec_t key_vec_tmp(file_info_t::key, strlen(file_info_t::key));
00146 W_DO(ss_m::find_assoc(_root_iid,
00147 key_vec_tmp,
00148 &info, info_len, found));
00149 if (!found) {
00150 cerr << "No file information found" <<endl;
00151 return RC(fcASSERT);
00152 } else {
00153 cerr << " found assoc "
00154 << file_info_t::key << " --> " << info << endl;
00155 }
00156
00157 W_DO(ssm->commit_xct());
00158
00159 _start_rid = info.first_rid;
00160 _fid = info.fid;
00161 _rec_size = info.rec_size;
00162 _num_rec = info.num_rec;
00163 return RCOK;
00164 }
00165
00166
00167
00168
00169
00170
00171
00172
00173
00174
00175 rc_t
00176 smthread_user_t::create_the_file()
00177 {
00178 file_info_t info;
00179
00180
00181
00182 cout << "Creating a file with " << _num_rec
00183 << " records of size " << _rec_size << endl;
00184 W_DO(ssm->begin_xct());
00185
00186
00187 W_DO(ssm->create_file(_vid, info.fid, smlevel_3::t_regular));
00188 rid_t rid;
00189
00190 _rec_size -= align(sizeof(int));
00191
00192
00193
00194
00195 char* dummy = new char[_rec_size];
00196 memset(dummy, '\0', _rec_size);
00197 vec_t data(dummy, _rec_size);
00198
00199 for(int j=0; j < _num_rec; j++)
00200 {
00201 {
00202 w_ostrstream o(dummy, _rec_size);
00203 o << "Record number " << j << ends;
00204 w_assert1(o.c_str() == dummy);
00205 }
00206
00207
00208
00209 w_base_t::int4_t i = j;
00210 const vec_t hdr(&i, sizeof(i));
00211
00212 W_COERCE(ssm->create_rec(info.fid, hdr,
00213 _rec_size, data, rid));
00214 cout << "Creating rec " << j << endl;
00215 if (j == 0) {
00216 info.first_rid = rid;
00217 }
00218 }
00219 cout << "Created all. First rid " << info.first_rid << endl;
00220 delete [] dummy;
00221 info.num_rec = _num_rec;
00222 info.rec_size = _rec_size;
00223
00224
00225
00226 stid_t _root_iid;
00227 W_DO(ss_m::vol_root_index(_vid, _root_iid));
00228
00229 const vec_t key_vec_tmp(file_info_t::key, strlen(file_info_t::key));
00230 const vec_t info_vec_tmp(&info, sizeof(info));
00231 W_DO(ss_m::create_assoc(_root_iid,
00232 key_vec_tmp,
00233 info_vec_tmp));
00234 cerr << "Creating assoc "
00235 << file_info_t::key << " --> " << info << endl;
00236 W_DO(ssm->commit_xct());
00237 return RCOK;
00238 }
00239
00240 rc_t
00241 smthread_user_t::scan_the_root_index()
00242 {
00243 W_DO(ssm->begin_xct());
00244 stid_t _root_iid;
00245 W_DO(ss_m::vol_root_index(_vid, _root_iid));
00246 cout << "Scanning index " << _root_iid << endl;
00247 scan_index_i scan(_root_iid,
00248 scan_index_i::ge, vec_t::neg_inf,
00249 scan_index_i::le, vec_t::pos_inf, false,
00250 ss_m::t_cc_kvl);
00251 bool eof(false);
00252 int i(0);
00253 smsize_t klen(0);
00254 smsize_t elen(0);
00255 #define MAXKEYSIZE 100
00256 char * keybuf[MAXKEYSIZE];
00257 file_info_t info;
00258
00259 do {
00260 w_rc_t rc = scan.next(eof);
00261 if(rc.is_error()) {
00262 cerr << "Error getting next: " << rc << endl;
00263 retval = rc.err_num();
00264 return rc;
00265 }
00266 if(eof) break;
00267
00268
00269 W_DO(scan.curr(NULL, klen, NULL, elen));
00270
00271 vec_t key(keybuf, klen);
00272 vec_t elem(&info, elen);
00273
00274 W_DO(scan.curr(&key, klen, &elem, elen));
00275
00276 cout << "Key " << keybuf << endl;
00277 cout << "Value "
00278 << " { fid " << info.fid
00279 << " first_rid " << info.first_rid
00280 << " #rec " << info.num_rec
00281 << " rec size " << info.rec_size << " }"
00282 << endl;
00283 i++;
00284 } while (!eof);
00285 W_DO(ssm->commit_xct());
00286 return RCOK;
00287 }
00288
00289 w_rc_t
00290 smthread_user_t::demo_sort_stream()
00291 {
00292
00293
00294
00295 w_base_t::int4_t hdrcontents;
00296
00297
00298 using ssm_sort::key_info_t ;
00299
00300 key_info_t kinfo;
00301 kinfo.type = sortorder::kt_i4;
00302 kinfo.derived = false;
00303 kinfo.where = key_info_t::t_hdr;
00304 kinfo.offset = 0;
00305 kinfo.len = sizeof(hdrcontents);
00306 kinfo.est_reclen = _rec_size;
00307
00308
00309 using ssm_sort::sort_parm_t ;
00310
00311 sort_parm_t behav;
00312 behav.run_size = 10;
00313 behav.vol = _vid;
00314 behav.unique = false;
00315 behav.ascending = false;
00316 behav.destructive = false;
00317
00318 behav.property = ss_m::t_temporary;
00319
00320 sort_stream_i stream(kinfo, behav, _rec_size);
00321 w_assert1(stream.is_sorted()==false);
00322
00323
00324 W_DO(ssm->begin_xct());
00325
00326 scan_file_i scan(_fid);
00327 pin_i* cursor(NULL);
00328 bool eof(false);
00329 int i(0);
00330
00331 do {
00332 w_rc_t rc = scan.next(cursor, 0, eof);
00333 if(rc.is_error()) {
00334 cerr << "Error getting next: " << rc << endl;
00335 retval = rc.err_num();
00336 return rc;
00337 }
00338 if(eof) break;
00339
00340 vec_t header (cursor->hdr(), cursor->hdr_size());
00341 header.copy_to(&hdrcontents, sizeof(hdrcontents));
00342 cout << "Record hdr " << hdrcontents << endl;
00343
00344 rid_t rid = cursor->rid();
00345 vec_t elem (&rid, sizeof(rid));
00346
00347 stream.put(header, elem);
00348 i++;
00349 } while (!eof);
00350 w_assert1(i == _num_rec);
00351 W_DO(ssm->commit_xct());
00352
00353
00354 w_assert1(stream.is_sorted()==false);
00355
00356
00357 W_DO(ssm->begin_xct());
00358 i--;
00359 eof = false;
00360 do {
00361 vec_t header;
00362 vec_t elem;
00363 W_DO(stream.get_next(header, elem, eof));
00364 if(eof) break;
00365
00366 w_assert1(stream.is_sorted()==true);
00367 w_assert1(stream.is_empty()==false);
00368 header.copy_to(&hdrcontents, sizeof(hdrcontents));
00369
00370 rid_t rid;
00371 elem.copy_to(&rid, sizeof(rid));
00372
00373 w_assert1(i == hdrcontents);
00374
00375 cout << "i " << hdrcontents << " rid " << rid << endl;
00376 i--;
00377 } while (!eof);
00378 W_DO(ssm->commit_xct());
00379 w_assert1(stream.is_empty()==false);
00380 w_assert1(stream.is_sorted()==true);
00381 return RCOK;
00382 }
00383
00384 rc_t
00385 smthread_user_t::do_init()
00386 {
00387 cout << "-i: Initialize " << endl;
00388
00389 {
00390 devid_t devid;
00391 cout << "Formatting device: " << _device_name
00392 << " with a " << _quota << "KB quota ..." << endl;
00393 W_DO(ssm->format_dev(_device_name, _quota, true));
00394
00395 cout << "Mounting device: " << _device_name << endl;
00396
00397 u_int vol_cnt;
00398 W_DO(ssm->mount_dev(_device_name, vol_cnt, devid));
00399
00400 cout << "Mounted device: " << _device_name
00401 << " volume count " << vol_cnt
00402 << " device " << devid
00403 << endl;
00404
00405
00406
00407 cout << "Generating new lvid: " << endl;
00408 W_DO(ssm->generate_new_lvid(_lvid));
00409 cout << "Generated lvid " << _lvid << endl;
00410
00411
00412 cout << "Creating a new volume on the device" << endl;
00413 cout << " with a " << _quota << "KB quota ..." << endl;
00414
00415 W_DO(ssm->create_vol(_device_name, _lvid, _quota, false, _vid));
00416 cout << " with local handle(phys volid) " << _vid << endl;
00417
00418 }
00419
00420 W_DO(create_the_file());
00421 W_DO(find_file_info());
00422 W_DO(demo_sort_stream());
00423 return RCOK;
00424 }
00425
00426 rc_t
00427 smthread_user_t::no_init()
00428 {
00429 cout << "Using already-existing device: " << _device_name << endl;
00430
00431 devid_t devid;
00432 u_int vol_cnt;
00433 w_rc_t rc = ssm->mount_dev(_device_name, vol_cnt, devid);
00434 if (rc.is_error()) {
00435 cerr << "Error: could not mount device: "
00436 << _device_name << endl;
00437 cerr << " Did you forget to run the server with -i?"
00438 << endl;
00439 return rc;
00440 }
00441
00442
00443 lvid_t* lvid_list;
00444 u_int lvid_cnt;
00445 W_DO(ssm->list_volumes(_device_name, lvid_list, lvid_cnt));
00446 if (lvid_cnt == 0) {
00447 cerr << "Error, device has no volumes" << endl;
00448 exit(1);
00449 }
00450 _lvid = lvid_list[0];
00451 delete [] lvid_list;
00452
00453 W_COERCE(find_file_info());
00454 W_COERCE(scan_the_root_index());
00455 W_DO(demo_sort_stream());
00456 return RCOK;
00457 }
00458
00459 rc_t
00460 smthread_user_t::do_work()
00461 {
00462 if (_initialize_device) W_DO(do_init());
00463 else W_DO(no_init());
00464 return RCOK;
00465 }
00466
00467
00468
00469
00470
00471 w_rc_t smthread_user_t::handle_options()
00472 {
00473 option_t* opt_device_name = 0;
00474 option_t* opt_device_quota = 0;
00475 option_t* opt_num_rec = 0;
00476
00477 cout << "Processing configuration options ..." << endl;
00478
00479
00480
00481
00482
00483
00484
00485
00486
00487 const int option_level_cnt = 3;
00488
00489 _options = new option_group_t (option_level_cnt);
00490 if(!_options) {
00491 cerr << "Out of memory: could not allocate from heap." <<
00492 endl;
00493 retval = 1;
00494 return RC(fcINTERNAL);
00495 }
00496 option_group_t &options(*_options);
00497
00498 W_COERCE(options.add_option("device_name", "device/file name",
00499 NULL, "device containg volume holding file to scan",
00500 true, option_t::set_value_charstr,
00501 opt_device_name));
00502
00503 W_COERCE(options.add_option("device_quota", "# > 1000",
00504 "2000", "quota for device",
00505 false, option_t::set_value_long,
00506 opt_device_quota));
00507
00508
00509 W_COERCE(options.add_option("num_rec", "# > 0",
00510 "1", "number of records in file",
00511 true, option_t::set_value_long,
00512 opt_num_rec));
00513
00514
00515 W_COERCE(ss_m::setup_options(&options));
00516
00517 cout << "Finding configuration option settings." << endl;
00518
00519 w_rc_t rc = init_config_options(options, "server", _argc, _argv);
00520 if (rc.is_error()) {
00521 usage(options);
00522 retval = 1;
00523 return rc;
00524 }
00525 cout << "Processing command line." << endl;
00526
00527
00528 int option;
00529 while ((option = getopt(_argc, _argv, "hi")) != -1) {
00530 switch (option) {
00531 case 'i' :
00532 _initialize_device = true;
00533 break;
00534
00535 case 'h' :
00536 usage(options);
00537 break;
00538
00539 default:
00540 usage(options);
00541 retval = 1;
00542 return RC(fcNOTIMPLEMENTED);
00543 break;
00544 }
00545 }
00546 {
00547 cout << "Checking for required options...";
00548
00549 w_ostrstream err_stream;
00550 w_rc_t rc = options.check_required(&err_stream);
00551 if (rc.is_error()) {
00552 cerr << "These required options are not set:" << endl;
00553 cerr << err_stream.c_str() << endl;
00554 return rc;
00555 }
00556 cout << "OK " << endl;
00557 }
00558
00559
00560 _device_name = opt_device_name->value();
00561 _quota = strtol(opt_device_quota->value(), 0, 0);
00562 _num_rec = strtol(opt_num_rec->value(), 0, 0);
00563
00564 return RCOK;
00565 }
00566
00567 void smthread_user_t::run()
00568 {
00569 w_rc_t rc = handle_options();
00570 if(rc.is_error()) {
00571 retval = 1;
00572 return;
00573 }
00574
00575
00576 cout << "Starting SSM and performing recovery ..." << endl;
00577 ssm = new ss_m();
00578 if (!ssm) {
00579 cerr << "Error: Out of memory for ss_m" << endl;
00580 retval = 1;
00581 return;
00582 }
00583
00584 cout << "Getting SSM config info for record size ..." << endl;
00585
00586 sm_config_info_t config_info;
00587 W_COERCE(ss_m::config_info(config_info));
00588 _rec_size = config_info.max_small_rec;
00589
00590
00591
00592 rc = do_work();
00593
00594 if (rc.is_error()) {
00595 cerr << "Could not set up device/volume due to: " << endl;
00596 cerr << rc << endl;
00597 delete ssm;
00598 rc = RCOK;
00599
00600 retval = 1;
00601 if(rc.is_error())
00602 W_COERCE(rc);
00603 return;
00604 }
00605
00606
00607
00608 cout << "\nShutting down SSM ..." << endl;
00609 delete ssm;
00610
00611 cout << "Finished!" << endl;
00612
00613 return;
00614 }
00615
00616
00617 int
00618 main(int argc, char* argv[])
00619 {
00620 smthread_user_t *smtu = new smthread_user_t(argc, argv);
00621 if (!smtu)
00622 W_FATAL(fcOUTOFMEMORY);
00623
00624 w_rc_t e = smtu->fork();
00625 if(e.is_error()) {
00626 cerr << "error forking thread: " << e <<endl;
00627 return 1;
00628 }
00629 e = smtu->join();
00630 if(e.is_error()) {
00631 cerr << "error forking thread: " << e <<endl;
00632 return 1;
00633 }
00634
00635 int rv = smtu->retval;
00636 delete smtu;
00637
00638 return rv;
00639 }
00640