00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030 #include "w_defines.h"
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040 #define SM_LEVEL 1
00041 #define SM_SOURCE
00042 #define XCT_C
00043 #include "sm_int_1.h"
00044 #include "sm_vas.h"
00045
00046 ss_m* ssm = 0;
00047
00048 extern w_rc_t out_of_log_space (xct_i*, xct_t*&, ss_m::fileoff_t,
00049 ss_m::fileoff_t,
00050 const char *logfile
00051 );
00052
00053 extern w_rc_t get_archived_log_file (const char *logfile, ss_m::partition_number_t);
00054
00055
00056 typedef w_rc_t rc_t;
00057
00058
00059 w_rc_t init_config_options(option_group_t& options,
00060 const char* prog_type,
00061 int& argc, char** argv);
00062
00063
00064 struct file_info_t {
00065 static const char* key;
00066 stid_t fid;
00067 rid_t first_rid;
00068 int num_rec;
00069 int rec_size;
00070 };
00071 const char* file_info_t::key = "SCANFILE";
00072
00073 ostream &
00074 operator << (ostream &o, const file_info_t &info)
00075 {
00076 o << "key " << info.key
00077 << " fid " << info.fid
00078 << " first_rid " << info.first_rid
00079 << " num_rec " << info.num_rec
00080 << " rec_size " << info.rec_size ;
00081 return o;
00082 }
00083
00084
00085 typedef smlevel_0::smksize_t smksize_t;
00086
00087
00088
00089 void
00090 usage(option_group_t& options)
00091 {
00092 cerr << "Usage: log_exceed [-h] [options]" << endl;
00093 cerr << "Valid options are: " << endl;
00094 options.print_usage(true, cerr);
00095 }
00096
00097
00098 class smthread_user_t : public smthread_t
00099 {
00100 int _argc;
00101 char **_argv;
00102
00103 const char *_device_name;
00104 smsize_t _quota;
00105 int _num_rec;
00106 smsize_t _rec_size;
00107 lvid_t _lvid;
00108 rid_t _start_rid;
00109 stid_t _fid;
00110 bool _initialize_device;
00111 option_group_t* _options;
00112 vid_t _vid;
00113 public:
00114 int retval;
00115
00116 smthread_user_t(int ac, char **av)
00117 : smthread_t(t_regular, "smthread_user_t"),
00118 _argc(ac), _argv(av),
00119 _device_name(NULL),
00120 _quota(0),
00121 _num_rec(0),
00122 _rec_size(0),
00123 _initialize_device(true),
00124 _options(NULL),
00125 _vid(1),
00126 retval(0) { }
00127
00128 ~smthread_user_t() { if(_options) delete _options; }
00129
00130 void run();
00131
00132
00133 w_rc_t handle_options();
00134 w_rc_t find_file_info();
00135 w_rc_t create_the_file();
00136 w_rc_t scan_the_file();
00137 w_rc_t scan_the_root_index();
00138 w_rc_t do_work();
00139 w_rc_t do_init();
00140 w_rc_t no_init();
00141
00142 };
00143
00144
00145
00146
00147 w_rc_t
00148 smthread_user_t::find_file_info()
00149 {
00150 file_info_t info;
00151 W_DO(ssm->begin_xct());
00152
00153 bool found;
00154 stid_t _root_iid;
00155 W_DO(ss_m::vol_root_index(_vid, _root_iid));
00156
00157 smsize_t info_len = sizeof(info);
00158 const vec_t key_vec_tmp(file_info_t::key, strlen(file_info_t::key));
00159 W_DO(ss_m::find_assoc(_root_iid,
00160 key_vec_tmp,
00161 &info, info_len, found));
00162 if (!found) {
00163 cerr << "No file information found" <<endl;
00164 return RC(fcASSERT);
00165 } else {
00166 cerr << " found assoc "
00167 << file_info_t::key << " --> " << info << endl;
00168 }
00169
00170 W_DO(ssm->commit_xct());
00171
00172 _start_rid = info.first_rid;
00173 _fid = info.fid;
00174 _rec_size = info.rec_size;
00175 _num_rec = info.num_rec;
00176 return RCOK;
00177 }
00178
00179
00180
00181
00182
00183
00184
00185
00186
00187
00188 rc_t
00189 smthread_user_t::create_the_file()
00190 {
00191 file_info_t info;
00192
00193
00194
00195 cout << "Creating a file with " << _num_rec
00196 << " records of size " << _rec_size << endl;
00197 W_DO(ssm->begin_xct());
00198
00199
00200 W_DO(ssm->create_file(_vid, info.fid, smlevel_3::t_regular));
00201 rid_t rid;
00202
00203 _rec_size -= align(sizeof(int));
00204
00205
00206
00207
00208 char* dummy = new char[_rec_size];
00209 char* dummy2 = new char[_rec_size];
00210 char* dummy3 = new char[_rec_size];
00211 char* dummy4 = new char[_rec_size];
00212 memset(dummy, '\1', _rec_size);
00213 memset(dummy2, '\2', _rec_size);
00214 memset(dummy3, '\3', _rec_size);
00215 memset(dummy4, '\4', _rec_size);
00216 for(smsize_t i=0; i < _rec_size; i++) {
00217 dummy2[i] = (char)(i%8);
00218 dummy3[i] = (dummy2[i] + 1)%8;
00219 dummy4[i] = (dummy2[i] - 1)%8;
00220 }
00221
00222 vec_t data(dummy, _rec_size);
00223 vec_t data2(dummy2, _rec_size);
00224 vec_t data3(dummy3, _rec_size);
00225 vec_t data4(dummy4, _rec_size);
00226
00227 for(int j=0; j < _num_rec; j++)
00228 {
00229 {
00230 w_ostrstream o(dummy, _rec_size);
00231 o << "Record number " << j << ends;
00232 w_assert1(o.c_str() == dummy);
00233 }
00234
00235 int i = j;
00236 const vec_t hdr(&i, sizeof(i));
00237 W_COERCE(ssm->create_rec(info.fid, hdr,
00238 _rec_size, data, rid));
00239 cout << "Creating rec " << j << endl;
00240
00241
00242
00243 for(int i=0; i < 200; i++) {
00244 W_DO(ssm->update_rec(rid, 0 , data2));
00245 W_DO(ssm->update_rec(rid, 0 , data3));
00246 W_DO(ssm->update_rec(rid, 0 , data4));
00247 }
00248 if (j == 0) {
00249 info.first_rid = rid;
00250 }
00251 }
00252 cout << "Created all. First rid " << info.first_rid << endl;
00253 delete [] dummy;
00254 delete [] dummy2;
00255 delete [] dummy3;
00256 delete [] dummy4;
00257 info.num_rec = _num_rec;
00258 info.rec_size = _rec_size;
00259
00260
00261
00262 stid_t _root_iid;
00263 W_DO(ss_m::vol_root_index(_vid, _root_iid));
00264
00265 const vec_t key_vec_tmp(file_info_t::key, strlen(file_info_t::key));
00266 const vec_t info_vec_tmp(&info, sizeof(info));
00267 W_DO(ss_m::create_assoc(_root_iid,
00268 key_vec_tmp,
00269 info_vec_tmp));
00270 W_DO(ssm->commit_xct());
00271 return RCOK;
00272 }
00273
00274 rc_t
00275 smthread_user_t::scan_the_root_index()
00276 {
00277 W_DO(ssm->begin_xct());
00278 stid_t _root_iid;
00279 W_DO(ss_m::vol_root_index(_vid, _root_iid));
00280 cout << "Scanning index " << _root_iid << endl;
00281 scan_index_i scan(_root_iid,
00282 scan_index_i::ge, vec_t::neg_inf,
00283 scan_index_i::le, vec_t::pos_inf, false,
00284 ss_m::t_cc_kvl);
00285 bool eof(false);
00286 int i(0);
00287 smsize_t klen(0);
00288 smsize_t elen(0);
00289 #define MAXKEYSIZE 100
00290 char * keybuf[MAXKEYSIZE];
00291 file_info_t info;
00292
00293 do {
00294 w_rc_t rc = scan.next(eof);
00295 if(rc.is_error()) {
00296 cerr << "Error getting next: " << rc << endl;
00297 retval = rc.err_num();
00298 return rc;
00299 }
00300 if(eof) break;
00301
00302
00303 W_DO(scan.curr(NULL, klen, NULL, elen));
00304
00305 vec_t key(keybuf, klen);
00306 vec_t elem(&info, elen);
00307
00308 W_DO(scan.curr(&key, klen, &elem, elen));
00309
00310 cout << "Key " << keybuf << endl;
00311 cout << "Value "
00312 << " { fid " << info.fid
00313 << " first_rid " << info.first_rid
00314 << " #rec " << info.num_rec
00315 << " rec size " << info.rec_size << " }"
00316 << endl;
00317 i++;
00318 } while (!eof);
00319 W_DO(ssm->commit_xct());
00320 return RCOK;
00321 }
00322
00323 rc_t
00324 smthread_user_t::scan_the_file()
00325 {
00326 cout << "Scanning file " << _fid << endl;
00327 W_DO(ssm->begin_xct());
00328
00329 scan_file_i scan(_fid);
00330 pin_i* cursor(NULL);
00331 bool eof(false);
00332 int i(0);
00333
00334 do {
00335 w_rc_t rc = scan.next(cursor, 0, eof);
00336 if(rc.is_error()) {
00337 cerr << "Error getting next: " << rc << endl;
00338 retval = rc.err_num();
00339 return rc;
00340 }
00341 if(eof) break;
00342
00343 cout << "Record " << i << "/" << _num_rec
00344 << " Rid " << cursor->rid() << endl;
00345 vec_t header (cursor->hdr(), cursor->hdr_size());
00346 int hdrcontents;
00347 header.copy_to(&hdrcontents, sizeof(hdrcontents));
00348 cout << "Record hdr " << hdrcontents << endl;
00349
00350 const char *body = cursor->body();
00351 w_assert1(cursor->body_size() == _rec_size);
00352 cout << "Record body " << body << endl;
00353 i++;
00354 } while (!eof);
00355 w_assert1(i == _num_rec);
00356
00357 W_DO(ssm->commit_xct());
00358 return RCOK;
00359 }
00360
00361 rc_t
00362 smthread_user_t::do_init()
00363 {
00364 {
00365 devid_t devid;
00366 cout << "Formatting device: " << _device_name
00367 << " with a " << _quota << "KB quota ..." << endl;
00368 W_DO(ssm->format_dev(_device_name, _quota, true));
00369
00370 cout << "Mounting device: " << _device_name << endl;
00371
00372 u_int vol_cnt;
00373 W_DO(ssm->mount_dev(_device_name, vol_cnt, devid));
00374
00375 cout << "Mounted device: " << _device_name
00376 << " volume count " << vol_cnt
00377 << " device " << devid
00378 << endl;
00379
00380
00381
00382 cout << "Generating new lvid: " << endl;
00383 W_DO(ssm->generate_new_lvid(_lvid));
00384 cout << "Generated lvid " << _lvid << endl;
00385
00386
00387 cout << "Creating a new volume on the device" << endl;
00388 cout << " with a " << _quota << "KB quota ..." << endl;
00389
00390 W_DO(ssm->create_vol(_device_name, _lvid, _quota, false, _vid));
00391 cout << " with local handle(phys volid) " << _vid << endl;
00392
00393 }
00394
00395 W_DO(create_the_file());
00396 return RCOK;
00397 }
00398
00399 rc_t
00400 smthread_user_t::no_init()
00401 {
00402 cout << "Using already-existing device: " << _device_name << endl;
00403
00404 devid_t devid;
00405 u_int vol_cnt;
00406 w_rc_t rc = ssm->mount_dev(_device_name, vol_cnt, devid);
00407 if (rc.is_error()) {
00408 cerr << "Error: could not mount device: "
00409 << _device_name << endl;
00410 return rc;
00411 }
00412
00413
00414 lvid_t* lvid_list;
00415 u_int lvid_cnt;
00416 W_DO(ssm->list_volumes(_device_name, lvid_list, lvid_cnt));
00417 if (lvid_cnt == 0) {
00418 cerr << "Error, device has no volumes" << endl;
00419 exit(1);
00420 }
00421 _lvid = lvid_list[0];
00422 delete [] lvid_list;
00423
00424 W_DO(find_file_info());
00425 W_DO(scan_the_root_index());
00426 W_DO(scan_the_file());
00427 return RCOK;
00428 }
00429
00430 rc_t
00431 smthread_user_t::do_work()
00432 {
00433 if (_initialize_device) W_DO(do_init());
00434 else W_DO(no_init());
00435 return RCOK;
00436 }
00437
00438
00439
00440
00441
00442 static option_t *logdir(NULL);
00443 w_rc_t smthread_user_t::handle_options()
00444 {
00445 option_t* opt_device_name = 0;
00446 option_t* opt_device_quota = 0;
00447 option_t* opt_num_rec = 0;
00448
00449 cout << "Processing configuration options ..." << endl;
00450
00451
00452
00453
00454
00455
00456
00457
00458
00459 const int option_level_cnt = 3;
00460
00461 _options = new option_group_t (option_level_cnt);
00462 if(!_options) {
00463 cerr << "Out of memory: could not allocate from heap." <<
00464 endl;
00465 retval = 1;
00466 return RC(fcINTERNAL);
00467 }
00468 option_group_t &options(*_options);
00469
00470 W_COERCE(options.add_option("device_name", "device/file name",
00471 NULL, "device containg volume holding file to scan",
00472 true, option_t::set_value_charstr,
00473 opt_device_name));
00474
00475 W_COERCE(options.add_option("device_quota", "# > 1000",
00476 "2000", "quota for device",
00477 false, option_t::set_value_long,
00478 opt_device_quota));
00479
00480
00481 W_COERCE(options.add_option("num_rec", "# > 0",
00482 "1", "number of records in file",
00483 true, option_t::set_value_long,
00484 opt_num_rec));
00485
00486
00487 W_COERCE(ss_m::setup_options(&options));
00488
00489 cout << "Finding configuration option settings." << endl;
00490
00491 w_rc_t rc = init_config_options(options, "server", _argc, _argv);
00492 if (rc.is_error()) {
00493 usage(options);
00494 retval = 1;
00495 return rc;
00496 }
00497 cout << "Processing command line." << endl;
00498
00499
00500 int option;
00501 while ((option = getopt(_argc, _argv, "hi")) != -1) {
00502 switch (option) {
00503 case 'h' :
00504 usage(options);
00505 break;
00506
00507 default:
00508 usage(options);
00509 retval = 1;
00510 return RC(fcNOTIMPLEMENTED);
00511 break;
00512 }
00513 }
00514 {
00515 cout << "Checking for required options...";
00516
00517 w_ostrstream err_stream;
00518 w_rc_t rc = options.check_required(&err_stream);
00519 if (rc.is_error()) {
00520 cerr << "These required options are not set:" << endl;
00521 cerr << err_stream.c_str() << endl;
00522 return rc;
00523 }
00524 cout << "OK " << endl;
00525 }
00526
00527
00528 _device_name = opt_device_name->value();
00529 _quota = strtol(opt_device_quota->value(), 0, 0);
00530 _num_rec = strtol(opt_num_rec->value(), 0, 0);
00531
00532 if(logdir==NULL) {
00533 W_DO(options.lookup("sm_logdir", false, logdir));
00534 fprintf(stdout, "Found logdir %s\n", logdir->value());
00535 }
00536
00537 return RCOK;
00538 }
00539
00540 void smthread_user_t::run()
00541 {
00542 w_rc_t rc = handle_options();
00543 if(rc.is_error()) {
00544 retval = 1;
00545 return;
00546 }
00547
00548
00549 cout << "Starting SSM and performing recovery ..." << endl;
00550 ssm = new ss_m(out_of_log_space, get_archived_log_file);
00551 if (!ssm) {
00552 cerr << "Error: Out of memory for ss_m" << endl;
00553 retval = 1;
00554 return;
00555 }
00556
00557 cout << "Getting SSM config info for record size ..." << endl;
00558
00559 sm_config_info_t config_info;
00560 W_COERCE(ss_m::config_info(config_info));
00561 _rec_size = config_info.max_small_rec;
00562
00563
00564
00565 rc = do_work();
00566
00567 if (rc.is_error()) {
00568 cerr << "Could not set up device/volume due to: " << endl;
00569 cerr << rc << endl;
00570 {
00571 w_ostrstream o;
00572 static sm_stats_info_t curr;
00573
00574 rc = ssm->gather_stats(curr);
00575
00576 o << curr.sm.abort_xct_cnt<< ends;
00577 fprintf(stdout, "log_exceed stats: abort_xct_cnt %s\n" , o.c_str());
00578 }
00579 delete ssm;
00580 rc = RCOK;
00581
00582 retval = 0;
00583 if(rc.is_error())
00584 W_COERCE(rc);
00585 return;
00586 }
00587
00588
00589
00590 cout << "\nShutting down SSM ..." << endl;
00591 delete ssm;
00592
00593 cout << "Finished!" << endl;
00594
00595 return;
00596 }
00597
00598
00599 int
00600 main(int argc, char* argv[])
00601 {
00602 smthread_user_t *smtu = new smthread_user_t(argc, argv);
00603 if (!smtu)
00604 W_FATAL(fcOUTOFMEMORY);
00605
00606 w_rc_t e = smtu->fork();
00607 if(e.is_error()) {
00608 cerr << "error forking thread: " << e <<endl;
00609 return 1;
00610 }
00611 e = smtu->join();
00612 if(e.is_error()) {
00613 cerr << "error forking thread: " << e <<endl;
00614 return 1;
00615 }
00616
00617 int rv = smtu->retval;
00618 delete smtu;
00619
00620 return rv;
00621 }
00622
00623 #include <os_interface.h>
00624
00625 void dump()
00626 {
00627
00628 os_dirent_t *dd = NULL;
00629 os_dir_t ldir = os_opendir(logdir->value());
00630 if(!ldir) {
00631 fprintf(stdout, "Could not open directory %s\n", logdir->value());
00632 return;
00633 }
00634 fprintf(stdout, "---------------------- %s {\n", logdir->value());
00635 while ((dd = os_readdir(ldir)))
00636 {
00637 fprintf(stdout, "%s\n", dd->d_name);
00638 }
00639 os_closedir(ldir);
00640 fprintf(stdout, "---------------------- %s }\n", logdir->value());
00641 }
00642
00643
00644 w_rc_t get_archived_log_file (
00645 const char *filename,
00646 ss_m::partition_number_t num)
00647 {
00648 fprintf(stdout,
00649 "\n**************************************** RECOVER %s : %d\n",
00650 filename, num
00651 );
00652 dump();
00653
00654 w_rc_t rc;
00655
00656 static char O[smlevel_0::max_devname<<1];
00657 strcat(O, filename);
00658 strcat(O, ".bak");
00659
00660 static char N[smlevel_0::max_devname<<1];
00661 strcat(N, filename);
00662
00663 int e = ::rename(O, N);
00664 if(e != 0)
00665 {
00666 fprintf(stdout, "Could not move %s to %s: error : %d %s\n",
00667 O, N, e, strerror(errno));
00668 rc = RC2(smlevel_0::eOUTOFLOGSPACE, errno);
00669 }
00670 dump();
00671 fprintf(stdout, "recovered ... OK!\n\n");
00672 fprintf(stdout,
00673 "This recovery of the log file will enable us to finish the abort.\n");
00674 fprintf(stdout,
00675 "It will not continue the device/volume set up.\n");
00676 fprintf(stdout,
00677 "Expect an error message and stack trace about that:\n\n");
00678 return rc;
00679 }
00680 w_rc_t out_of_log_space (
00681 xct_i* iter,
00682 xct_t *& xd,
00683 smlevel_0::fileoff_t curr,
00684 smlevel_0::fileoff_t thresh,
00685 const char *filename
00686 )
00687 {
00688 static int calls=0;
00689
00690 calls++;
00691
00692 w_rc_t rc;
00693 fprintf(stdout, "\n**************************************** %d\n", calls);
00694 dump();
00695
00696 fprintf(stdout,
00697 "Called out_of_log_space with curr %lld thresh %lld, file %s\n",
00698 (long long) curr, (long long) thresh, filename);
00699 {
00700 w_ostrstream o;
00701 o << xd->tid() << endl;
00702 fprintf(stdout, "called with xct %s\n" , o.c_str());
00703 }
00704
00705 xd->log_warn_disable();
00706 iter->never_mind();
00707
00708 {
00709 w_ostrstream o;
00710 static sm_stats_info_t curr;
00711
00712 W_DO( ssm->gather_stats(curr));
00713
00714 o << curr.sm.log_bytes_generated << ends;
00715 fprintf(stdout, "stats: log_bytes_generated %s\n" , o.c_str());
00716 }
00717 xct_t *oldxct(NULL);
00718 lsn_t target;
00719 {
00720 w_ostrstream o;
00721 o << "Active xcts: " << xct_t::num_active_xcts() << " ";
00722
00723 tid_t old = xct_t::oldest_tid();
00724 o << "Oldest transaction: " << old;
00725
00726 xct_t *x = xct_t::look_up(old);
00727 if(x==NULL) {
00728 fprintf(stdout, "Could not find %s\n", o.c_str());
00729 W_FATAL(fcINTERNAL);
00730 }
00731
00732 target = x->first_lsn();
00733 o << " First lsn: " << x->first_lsn();
00734 o << " Last lsn: " << x->last_lsn();
00735
00736 fprintf(stdout, "%s\n" , o.c_str());
00737
00738 oldxct = x;
00739 }
00740
00741 if(calls > 3) {
00742
00743 static tid_t aborted_tid;
00744 if(aborted_tid == xd->tid()) {
00745 w_ostrstream o;
00746 o << aborted_tid << endl;
00747 fprintf(stdout,
00748 "Multiple calls with same victim! : %s total calls %d\n",
00749 o.c_str(), calls);
00750 W_FATAL(smlevel_0::eINTERNAL);
00751 }
00752 aborted_tid = xd->tid();
00753 fprintf(stdout, "\n\n******************************** ABORTING\n\n");
00754 return RC(smlevel_0::eUSERABORT);
00755 }
00756
00757 fprintf(stdout, "\n\n******************************** ARCHIVING \n\n");
00758 fprintf(stdout, "Move aside log file log.%d to log.%d.bak\n",
00759 target.file(), target.file());
00760 static char O[smlevel_0::max_devname<<1];
00761 strcat(O, filename);
00762 static char N[smlevel_0::max_devname<<1];
00763 strcat(N, filename);
00764 strcat(N, ".bak");
00765
00766 int e = ::rename(O, N);
00767 if(e != 0) {
00768 fprintf(stdout, "Could not move %s to %s: error : %d %s\n",
00769 O, N, e, strerror(errno));
00770 if(errno == ENOENT) {
00771 fprintf(stdout, "Ignored error.\n\n");
00772 return RCOK;
00773 }
00774 fprintf(stdout, "Returning eOUTOFLOGSPACE.\n\n");
00775 rc = RC2(smlevel_0::eOUTOFLOGSPACE, errno);
00776 } else {
00777 dump();
00778 fprintf(stdout, "archived ... OK\n\n");
00779 W_COERCE(ss_m::log_file_was_archived(filename));
00780 }
00781 return rc;
00782 }