BadgerDB
|
00001 00008 #include <iostream> 00009 #include <cstdlib> 00010 #include <sstream> 00011 #include <cstring> 00012 #include <vector> 00013 00014 #include "exceptions/end_of_file_exception.h" 00015 #include "exceptions/bad_param_exception.h" 00016 #include "exceptions/file_exists_exception.h" 00017 00018 #include "sort.h" 00019 #include "buffer.h" 00020 00021 #define MIN(a,b) ((a) < (b) ? (a) : (b)) 00022 00023 00024 // These comparison functions are visible only within this 00025 // source file. reccmp is the comparison routine (much like 00026 // strcmp or memcmp) that accepts integers, floats, and strings. 00027 // It returns -1 if p1 is less than p2, +1 if p1 is greater 00028 // than p2, or zero otherwise. 00029 00030 extern badgerdb::BufMgr *bufMgr; // pointer to the attribute catalogs 00031 00032 namespace badgerdb 00033 { 00034 00035 static int reccmp(char* p1, char* p2, int p1Len, int p2Len, Datatype type) 00036 { 00037 float diff = 0.0; 00038 00039 switch(type) { 00040 case INTEGER: 00041 int iattr, ifltr; // word-alignment problem possible 00042 memcpy(&iattr, p1, sizeof(int)); 00043 memcpy(&ifltr, p2, sizeof(int)); 00044 diff = iattr - ifltr; 00045 break; 00046 case DOUBLE: 00047 double fattr, ffltr; // word-alignment problem possible 00048 memcpy(&fattr, p1, sizeof(double)); 00049 memcpy(&ffltr, p2, sizeof(double)); 00050 diff = fattr - ffltr; 00051 break; 00052 case STRING: 00053 diff = memcmp(p1, p2, MIN(p1Len, p2Len)); 00054 break; 00055 default: 00056 break; 00057 } 00058 00059 if(diff < 0) 00060 diff = -1; 00061 else if (diff > 0) 00062 diff = 1; 00063 00064 return (int)diff; 00065 } 00066 00067 00068 // These three comparison routines are jacketed versions of 00069 // reccmp. This is because qsort(3) takes only a function pointer 00070 // but no additional parameters. The objects pointed to by p1 00071 // and p2 are of type SORTREC which has a pointer to the field 00072 // to be compared as well as its length (used for strings). 00073 00074 #define SR(p) ((SORTREC*)p) 00075 00076 static int intcmp(const void* p1, const void* p2) 00077 { 00078 p1 = (char*) p1; 00079 p2 = (char*) p2; 00080 return reccmp(SR(p1)->field, SR(p2)->field, SR(p1)->length, SR(p2)->length, INTEGER); 00081 } 00082 00083 static int floatcmp(const void* p1, const void* p2) 00084 { 00085 p1 = (char*) p1; 00086 p2 = (char*) p2; 00087 return reccmp(SR(p1)->field, SR(p2)->field, SR(p1)->length, SR(p2)->length, DOUBLE); 00088 } 00089 00090 00091 static int stringcmp(const void* p1, const void* p2) 00092 { 00093 p1 = (char*) p1; 00094 p2 = (char*) p2; 00095 return reccmp(SR(p1)->field, SR(p2)->field, SR(p1)->length, SR(p2)->length, STRING); 00096 } 00097 00098 00099 // Create a sorted temporary file of the source file (fileName). 00100 // Sorting is based on attribute that is defined by offset, len, 00101 // and type. maxItems is the maximum number of items that a sorted 00102 // sub-run can hold (usually derived from amount of memory available). 00103 // Status code is returned in variable status. 00104 00105 SortedFile::SortedFile(const std::string &fileName, int offset, int len, Datatype type, int maxItems) 00106 : fileName(fileName), type(type), offset(offset), length(len), maxItems(maxItems) 00107 { 00108 if(offset < 0 || len < 1) 00109 throw BadParamException(); 00110 00111 if(type != STRING && type != INTEGER && type != DOUBLE) 00112 throw BadParamException(); 00113 00114 if((type == INTEGER && len != sizeof(int)) || (type == DOUBLE && len != sizeof(double))) 00115 throw BadParamException(); 00116 00117 // Must have space for at least 2 items (records) because otherwise 00118 // items cannot be swapped and sorted! 00119 00120 if(maxItems < 2) 00121 throw BadParamException(); 00122 00123 buffer = new SORTREC [maxItems]; 00124 00125 sortFile(); 00126 } 00127 00128 00129 // Sort file into sub-runs. The source file is split into runs 00130 // which have at most maxItems records each. That many records 00131 // are read into memory, sorted using qsort(3), and then written 00132 // to a temporary file. 00133 00134 void SortedFile::sortFile() 00135 { 00136 std::string rec; 00137 00138 // Open source file. 00139 00140 // Start an unfiltered sequential scan. 00141 00142 file = new FileScan(fileName, bufMgr); 00143 file->startScan(0, 0, STRING, std::string(""), EQ); 00144 00145 // As long as the source file has more records, collect up to 00146 // maxItems records into buffer and then dump records into 00147 // temporary file. 00148 00149 do 00150 { 00151 for(numItems = 0; numItems < maxItems; numItems++) 00152 { 00153 // Fetch next record from source file, check if end of file. 00154 00155 try 00156 { 00157 file->scanNext(buffer[numItems].rid); 00158 } 00159 catch(EndOfFileException &e) 00160 { 00161 break; 00162 } 00163 00164 rec = file->getRecord(); 00165 00166 // Create space for holding a copy of the sorting attribute 00167 // only (rest of record is read when temporary file is 00168 // written). Copy sorting attribute from source record and 00169 // store the length of the attribute (reccmp is general- 00170 // purpose and can be shared by multiple instances of 00171 // SortedFile!). 00172 00173 buffer[numItems].field = new char [length]; 00174 00175 memcpy(buffer[numItems].field, (char *)rec.c_str() + offset, length); 00176 buffer[numItems].length = length; 00177 } 00178 00179 // If at least 1 record in sub-run, sort records and write out 00180 // to temporary file. 00181 00182 if (numItems > 0) 00183 { 00184 generateRun(numItems); 00185 00186 for(int i = 0; i < numItems; i++) 00187 delete [] buffer[i].field; 00188 } 00189 }while(numItems > 0); 00190 00191 // Terminate sequential scan on source file and close file. 00192 00193 delete file; 00194 00195 // Prepare a sequential scan on each sub-run so that next() 00196 // can fetch next record from each run. 00197 00198 startScans(); 00199 } 00200 00201 00202 // Sort the records in buffer[] (actually, the sorting attribute 00203 // plus the associated RecordId) and then dump records into temporary 00204 // file. 00205 00206 void SortedFile::generateRun(int items) 00207 { 00208 // Sort buffer using library function qsort (quick sort). Use 00209 // the appropriate comparison function for integers, floats, 00210 // or strings (qsort can't take type as a parameter). 00211 00212 if (type == INTEGER) 00213 qsort(buffer, items, sizeof(SORTREC), intcmp); 00214 else if (type == DOUBLE) 00215 qsort(buffer, items, sizeof(SORTREC), floatcmp); 00216 else 00217 qsort(buffer, items, sizeof(SORTREC), stringcmp); 00218 00219 // If this is the first sub-run, malloc space for a RUN object, 00220 // otherwise realloc more space. Note that on most systems 00221 // realloc(NULL) could be used even when runs == NULL, but 00222 // this doesn't work on all systems. 00223 00224 RUN newRun; 00225 runs.push_back(newRun); 00226 00227 RUN &run = runs.back(); 00228 00229 //Generate file name for temporary file. 00230 std::ostringstream outputString; 00231 outputString << fileName << ".sort." << runs.size(); 00232 run.name = outputString.str(); 00233 00234 #ifdef DEBUGSORT 00235 std::cout << "%% Writing " << items << " tuples to file " << run->name 00236 << std::endl; 00237 #endif 00238 00239 // Make sure temporary file does not exist already. We don't 00240 // want to corrupt somebody else's sorted files (on another 00241 // attribute, for example). 00242 00243 if(File::exists(run.name)) 00244 throw FileExistsException(run.name); 00245 00246 // Open a heap file. This will also create the temporary file. 00247 00248 run.file = new FileScan(run.name, bufMgr); 00249 //run.file->startScan(0, 0, STRING, std::string(""), EQ); 00250 00251 // Open an unfiltered sequential scan on the source file. The scan 00252 // is not actually needed for anything else than just getting 00253 // a scanId which getRandomRecord requires. 00254 00255 FileScan scan(fileName, bufMgr); 00256 //scan.startScan(0, 0, STRING, std::string(""), EQ); 00257 00258 // For each sort record (attribute plus RecordId) in the buffer, fetch 00259 // the whole record from the source file and then insert it into 00260 // the temporary file. 00261 00262 for(int i = 0; i < items; i++) 00263 { 00264 SORTREC* rec = &buffer[i]; 00265 RecordId rid; 00266 std::string record; 00267 00268 record = scan.getRandomRecord(rec->rid); 00269 00270 run.file->insertRecord(record, rid); 00271 } 00272 00273 delete run.file; 00274 } 00275 00276 00277 // Prepare a sequential scan on each sub-run so that next() 00278 // can fetch the next record from each run. The valid bit of 00279 // each run is marked false to indicate that the (first) 00280 // record has not been fetched yet. next() must therefore 00281 // fetch it. 00282 00283 void SortedFile::startScans() 00284 { 00285 std::vector<RUN>::iterator run; 00286 00287 #ifdef DEBUGSORT 00288 std::cout << "Start scan on : "; 00289 #endif 00290 for(run = runs.begin(); run != runs.end(); run++) 00291 { 00292 run->file = new FileScan(run->name, bufMgr); 00293 //run->file->startScan(0, 0, STRING, std::string(""), EQ); 00294 #ifdef DEBUGSORT 00295 std::cout << run->name << ", " << std::endl; 00296 #endif 00297 00298 run->valid = false; 00299 run->rid.page_number = 0; 00300 run->rid.slot_number = 0; 00301 run->mark.page_number = 0; 00302 run->mark.slot_number = 0; 00303 } 00304 #ifdef DEBUGSORT 00305 std::cout << std::endl; 00306 #endif 00307 } 00308 00309 00310 // Retrieve the next smallest record from the set of sorted sub-runs. 00311 // The next record of each sub-run is peeked to find out the 00312 // smallest of all. The pointer in the chosen sub-run is then 00313 // advanced. 00314 00315 std::string SortedFile::next() 00316 { 00317 // Empty source file has zero sub-runs and causes 00318 // end of file to be returned. 00319 00320 if(runs.size() <= 0) 00321 throw EndOfFileException(); 00322 00323 // Find the run which has the smallest next record. If a run 00324 // has false valid bit, it doesn't have the next record in memory 00325 // yet. 00326 00327 int smallest = -1; 00328 std::vector<RUN>::iterator run; 00329 00330 for(run = runs.begin(); run != runs.end(); run++) 00331 { 00332 if (run->valid == false) 00333 { // no record fetched yet for this run? 00334 try 00335 { 00336 run->file->scanNext(run->rid); 00337 run->rec = run->file->getRecord(); 00338 } 00339 catch(EndOfFileException &e) 00340 { 00341 run->rid.page_number = 0; // mark end of file 00342 } 00343 run->valid = true; // a record is now in memory 00344 } 00345 00346 if(run->rid.page_number == 0) // end of run already? 00347 continue; 00348 00349 if (smallest == -1) // select first one as smallest 00350 smallest = run - runs.begin(); 00351 else if (reccmp((char *)runs[smallest].rec.c_str() + offset, (char *)run->rec.c_str() + offset, length, length, type) > 0) 00352 smallest = run- runs.begin(); // current run has smaller next 00353 } 00354 00355 if (smallest == -1) // no next record found? 00356 throw EndOfFileException(); 00357 00358 #ifdef DEBUGSORT 00359 std::cout << "%% Retrieved smallest from " << runs[smallest].name << std::endl; 00360 #endif 00361 00362 runs[smallest].valid = false; // must fetch new record next time 00363 00364 return runs[smallest].rec; // give record pointers to caller 00365 } 00366 00367 00368 // Remember a position in the sorted output so that the caller 00369 // can later return to this spot. The current RecordId of each sub-run 00370 // is recorded in a separate field. 00371 00372 void SortedFile::setMark() 00373 { 00374 #ifdef DEBUGSORT 00375 std::cout << "%% Setting mark in file" << std::endl; 00376 #endif 00377 00378 std::vector<RUN>::iterator run; 00379 00380 for(run = runs.begin(); run != runs.end(); run++) 00381 { 00382 run->mark.page_number = run->rid.page_number; 00383 run->mark.slot_number = run->rid.slot_number; 00384 #ifdef DEBUGSORT 00385 std::cout << "%% Run " << i << " is at page " << run->mark.page_number << ", slot " << run->mark.slot_number << std::endl; 00386 #endif 00387 } 00388 } 00389 00390 00391 // Restore sort position by fetching the last marked record 00392 // of each sub-run using getRandomRecord. This allows the caller 00393 // to back up in the sorted sequence (used in sort-merge join 00394 // in case of duplicates). 00395 00396 void SortedFile::gotoMark() 00397 { 00398 #ifdef DEBUGSORT 00399 std::cout << "%% Going to a mark in file" << std::endl; 00400 #endif 00401 00402 std::vector<RUN>::iterator run; 00403 00404 for(run = runs.begin(); run != runs.end(); run++) 00405 { 00406 run->rid.page_number = run->mark.page_number; 00407 run->rid.slot_number = run->mark.slot_number; 00408 00409 #ifdef DEBUGSORT 00410 if(run->rid.page_number > 0) 00411 std::cout << "%% Run " << i << " going to page " << run->rid.page_number << ", slot " << run->rid.slot_number << std::endl; 00412 else 00413 std::cout << "%% Run " << i << " is already at eof" << std::endl; 00414 #endif 00415 00416 // Restore file position only if last marked position is 00417 // something else than end of file. 00418 00419 if(run->rid.page_number > 0) 00420 { 00421 run->rec = run->file->getRandomRecord(run->rid); 00422 run->file->gotoMark(run->rid); 00423 } 00424 00425 // Current record is already in memory so next() must not 00426 // advance in the temporary file. 00427 00428 run->valid = true; 00429 } 00430 } 00431 00432 00433 // Deallocate all space allocated for this sorted file and 00434 // delete temporary files. 00435 00436 SortedFile::~SortedFile() 00437 { 00438 for(unsigned int i = 0; i < runs.size(); i++) 00439 { 00440 delete runs[i].file; 00441 File::remove(runs[i].name); 00442 } 00443 00444 delete [] buffer; 00445 } 00446 00447 }