BadgerDB
/afs/cs.wisc.edu/u/n/w/nwilliam/private/workspace/Quut/src/sort.cpp
00001 
00008 #include <iostream>
00009 #include <cstdlib>
00010 #include <sstream>
00011 #include <cstring>
00012 #include <vector>
00013 
00014 #include "exceptions/end_of_file_exception.h"
00015 #include "exceptions/bad_param_exception.h"
00016 #include "exceptions/file_exists_exception.h"
00017 
00018 #include "sort.h"
00019 #include "buffer.h"
00020 
00021 #define MIN(a,b)   ((a) < (b) ? (a) : (b))
00022 
00023 
00024 // These comparison functions are visible only within this
00025 // source file. reccmp is the comparison routine (much like
00026 // strcmp or memcmp) that accepts integers, floats, and strings.
00027 // It returns -1 if p1 is less than p2, +1 if p1 is greater
00028 // than p2, or zero otherwise.
00029 
00030 extern badgerdb::BufMgr *bufMgr;  // pointer to the attribute catalogs
00031 
00032 namespace badgerdb
00033 {
00034 
00035 static int reccmp(char* p1, char* p2, int p1Len, int p2Len, Datatype type)
00036 {
00037   float diff = 0.0;
00038 
00039   switch(type) {
00040   case INTEGER:
00041     int iattr, ifltr;                   // word-alignment problem possible
00042     memcpy(&iattr, p1, sizeof(int));
00043     memcpy(&ifltr, p2, sizeof(int));
00044     diff = iattr - ifltr;
00045     break;
00046   case DOUBLE:
00047     double fattr, ffltr;                 // word-alignment problem possible
00048     memcpy(&fattr, p1, sizeof(double));
00049     memcpy(&ffltr, p2, sizeof(double));
00050     diff = fattr - ffltr;
00051     break;
00052   case STRING:
00053     diff = memcmp(p1, p2, MIN(p1Len, p2Len));
00054     break;
00055   default:
00056     break;
00057   }
00058 
00059   if(diff < 0)
00060     diff = -1;
00061   else if (diff > 0)
00062     diff = 1;
00063 
00064   return (int)diff;
00065 }
00066 
00067 
00068 // These three comparison routines are jacketed versions of
00069 // reccmp. This is because qsort(3) takes only a function pointer
00070 // but no additional parameters. The objects pointed to by p1
00071 // and p2 are of type SORTREC which has a pointer to the field
00072 // to be compared as well as its length (used for strings).
00073 
00074 #define SR(p)  ((SORTREC*)p)
00075 
00076 static int intcmp(const void* p1, const void* p2)
00077 {
00078  p1 = (char*) p1;
00079  p2 = (char*) p2;
00080   return reccmp(SR(p1)->field, SR(p2)->field, SR(p1)->length, SR(p2)->length, INTEGER);
00081 }
00082 
00083 static int floatcmp(const void* p1, const void* p2)
00084 {
00085  p1 = (char*) p1;
00086  p2 = (char*) p2;
00087   return reccmp(SR(p1)->field, SR(p2)->field, SR(p1)->length, SR(p2)->length, DOUBLE);
00088 }
00089 
00090 
00091 static int stringcmp(const void* p1, const void* p2)
00092 {
00093  p1 = (char*) p1;
00094  p2 = (char*) p2;
00095   return reccmp(SR(p1)->field, SR(p2)->field, SR(p1)->length, SR(p2)->length, STRING);
00096 }
00097 
00098 
00099 // Create a sorted temporary file of the source file (fileName).
00100 // Sorting is based on attribute that is defined by offset, len,
00101 // and type. maxItems is the maximum number of items that a sorted
00102 // sub-run can hold (usually derived from amount of memory available).
00103 // Status code is returned in variable status.
00104 
00105 SortedFile::SortedFile(const std::string &fileName, int offset, int len, Datatype type, int maxItems)
00106  : fileName(fileName), type(type), offset(offset), length(len), maxItems(maxItems)
00107 {
00108   if(offset < 0 || len < 1)
00109     throw BadParamException();
00110 
00111   if(type != STRING && type != INTEGER && type != DOUBLE)
00112     throw BadParamException();
00113 
00114   if((type == INTEGER && len != sizeof(int)) || (type == DOUBLE && len != sizeof(double)))
00115     throw BadParamException();
00116 
00117   // Must have space for at least 2 items (records) because otherwise
00118   // items cannot be swapped and sorted!
00119 
00120   if(maxItems < 2)
00121     throw BadParamException();
00122 
00123   buffer = new SORTREC [maxItems];
00124     
00125   sortFile();
00126 }
00127 
00128 
00129 // Sort file into sub-runs. The source file is split into runs
00130 // which have at most maxItems records each. That many records
00131 // are read into memory, sorted using qsort(3), and then written
00132 // to a temporary file.
00133 
00134 void SortedFile::sortFile()
00135 {
00136   std::string rec;
00137 
00138   // Open source file.
00139 
00140   // Start an unfiltered sequential scan.
00141 
00142   file = new FileScan(fileName, bufMgr);
00143   file->startScan(0, 0, STRING, std::string(""), EQ);
00144 
00145   // As long as the source file has more records, collect up to
00146   // maxItems records into buffer and then dump records into
00147   // temporary file.
00148 
00149   do
00150   {
00151     for(numItems = 0; numItems < maxItems; numItems++)
00152     {
00153       // Fetch next record from source file, check if end of file.
00154 
00155       try
00156       {
00157         file->scanNext(buffer[numItems].rid);
00158       }
00159       catch(EndOfFileException &e)
00160       {
00161         break;
00162       }
00163 
00164       rec = file->getRecord();
00165 
00166       // Create space for holding a copy of the sorting attribute
00167       // only (rest of record is read when temporary file is
00168       // written). Copy sorting attribute from source record and
00169       // store the length of the attribute (reccmp is general-
00170       // purpose and can be shared by multiple instances of
00171       // SortedFile!).
00172 
00173       buffer[numItems].field = new char [length];
00174 
00175       memcpy(buffer[numItems].field, (char *)rec.c_str() + offset, length);
00176       buffer[numItems].length = length;
00177     }
00178 
00179     // If at least 1 record in sub-run, sort records and write out
00180     // to temporary file.
00181 
00182     if (numItems > 0)
00183     {
00184       generateRun(numItems);
00185 
00186       for(int i = 0; i < numItems; i++)
00187         delete [] buffer[i].field;
00188     }
00189   }while(numItems > 0);
00190 
00191   // Terminate sequential scan on source file and close file.
00192 
00193   delete file;
00194 
00195   // Prepare a sequential scan on each sub-run so that next()
00196   // can fetch next record from each run.
00197 
00198   startScans();
00199 }
00200 
00201 
00202 // Sort the records in buffer[] (actually, the sorting attribute
00203 // plus the associated RecordId) and then dump records into temporary
00204 // file.
00205 
00206 void SortedFile::generateRun(int items)
00207 {
00208   // Sort buffer using library function qsort (quick sort). Use
00209   // the appropriate comparison function for integers, floats,
00210   // or strings (qsort can't take type as a parameter).
00211 
00212   if (type == INTEGER)
00213     qsort(buffer, items, sizeof(SORTREC), intcmp);
00214   else if (type == DOUBLE)
00215     qsort(buffer, items, sizeof(SORTREC), floatcmp);
00216   else
00217     qsort(buffer, items, sizeof(SORTREC), stringcmp);
00218 
00219   // If this is the first sub-run, malloc space for a RUN object,
00220   // otherwise realloc more space. Note that on most systems
00221   // realloc(NULL) could be used even when runs == NULL, but
00222   // this doesn't work on all systems.
00223 
00224   RUN newRun;
00225   runs.push_back(newRun);
00226 
00227   RUN &run = runs.back();
00228 
00229   //Generate file name for temporary file.
00230   std::ostringstream outputString;
00231   outputString << fileName << ".sort." << runs.size();
00232   run.name = outputString.str();
00233 
00234 #ifdef DEBUGSORT
00235   std::cout << "%%  Writing " << items << " tuples to file " << run->name
00236        << std::endl;
00237 #endif
00238 
00239   // Make sure temporary file does not exist already. We don't
00240   // want to corrupt somebody else's sorted files (on another
00241   // attribute, for example).
00242 
00243   if(File::exists(run.name))
00244     throw FileExistsException(run.name);
00245 
00246   // Open a heap file. This will also create the temporary file.
00247 
00248   run.file = new FileScan(run.name, bufMgr);
00249   //run.file->startScan(0, 0, STRING, std::string(""), EQ);
00250 
00251   // Open an unfiltered sequential scan on the source file. The scan
00252   // is not actually needed for anything else than just getting
00253   // a scanId which getRandomRecord requires.
00254 
00255   FileScan scan(fileName, bufMgr);
00256   //scan.startScan(0, 0, STRING, std::string(""), EQ);
00257 
00258   // For each sort record (attribute plus RecordId) in the buffer, fetch
00259   // the whole record from the source file and then insert it into
00260   // the temporary file.
00261 
00262   for(int i = 0; i < items; i++)
00263   {
00264     SORTREC* rec = &buffer[i];
00265     RecordId rid;
00266     std::string record;
00267 
00268     record = scan.getRandomRecord(rec->rid);
00269 
00270     run.file->insertRecord(record, rid);
00271   }
00272 
00273   delete run.file;
00274 }
00275 
00276 
00277 // Prepare a sequential scan on each sub-run so that next()
00278 // can fetch the next record from each run. The valid bit of
00279 // each run is marked false to indicate that the (first)
00280 // record has not been fetched yet. next() must therefore
00281 // fetch it.
00282 
00283 void SortedFile::startScans()
00284 {
00285   std::vector<RUN>::iterator run;
00286 
00287 #ifdef DEBUGSORT
00288   std::cout << "Start scan on : ";
00289 #endif
00290   for(run = runs.begin(); run != runs.end(); run++)
00291   {
00292     run->file = new FileScan(run->name, bufMgr);
00293     //run->file->startScan(0, 0, STRING, std::string(""), EQ);
00294 #ifdef DEBUGSORT
00295     std::cout << run->name << ", " << std::endl;
00296 #endif
00297 
00298     run->valid = false;
00299     run->rid.page_number = 0;
00300     run->rid.slot_number = 0;
00301     run->mark.page_number = 0;
00302     run->mark.slot_number = 0;
00303   }
00304 #ifdef DEBUGSORT
00305   std::cout << std::endl;
00306 #endif
00307 }
00308 
00309 
00310 // Retrieve the next smallest record from the set of sorted sub-runs.
00311 // The next record of each sub-run is peeked to find out the
00312 // smallest of all. The pointer in the chosen sub-run is then
00313 // advanced.
00314 
00315 std::string SortedFile::next()
00316 {
00317   // Empty source file has zero sub-runs and causes
00318   // end of file to be returned.
00319 
00320   if(runs.size() <= 0)
00321     throw EndOfFileException();
00322 
00323   // Find the run which has the smallest next record. If a run
00324   // has false valid bit, it doesn't have the next record in memory
00325   // yet.
00326 
00327   int smallest = -1;
00328   std::vector<RUN>::iterator run;
00329 
00330   for(run = runs.begin(); run != runs.end(); run++)
00331   {
00332     if (run->valid == false)
00333     {          // no record fetched yet for this run?
00334       try
00335       {
00336         run->file->scanNext(run->rid);
00337         run->rec = run->file->getRecord();
00338       }
00339       catch(EndOfFileException &e)
00340       {
00341         run->rid.page_number = 0;           // mark end of file
00342       }
00343       run->valid = true;                // a record is now in memory
00344     }
00345 
00346     if(run->rid.page_number == 0)            // end of run already?
00347       continue;
00348 
00349     if (smallest == -1)                     // select first one as smallest
00350       smallest = run - runs.begin();
00351     else if (reccmp((char *)runs[smallest].rec.c_str() + offset, (char *)run->rec.c_str() + offset, length, length, type) > 0)
00352       smallest = run- runs.begin();                   // current run has smaller next
00353   }
00354 
00355   if (smallest == -1)                        // no next record found?
00356     throw EndOfFileException();
00357 
00358 #ifdef DEBUGSORT
00359   std::cout << "%%  Retrieved smallest from " << runs[smallest].name << std::endl;
00360 #endif
00361 
00362   runs[smallest].valid = false;              // must fetch new record next time
00363 
00364   return runs[smallest].rec;               // give record pointers to caller
00365 }
00366 
00367 
00368 // Remember a position in the sorted output so that the caller
00369 // can later return to this spot. The current RecordId of each sub-run
00370 // is recorded in a separate field.
00371 
00372 void SortedFile::setMark()
00373 {
00374 #ifdef DEBUGSORT
00375   std::cout << "%%  Setting mark in file" << std::endl;
00376 #endif
00377 
00378   std::vector<RUN>::iterator run;
00379 
00380   for(run = runs.begin(); run != runs.end(); run++)
00381   {
00382     run->mark.page_number = run->rid.page_number;
00383     run->mark.slot_number = run->rid.slot_number;
00384 #ifdef DEBUGSORT
00385     std::cout << "%%  Run " << i << " is at page " << run->mark.page_number << ", slot " << run->mark.slot_number << std::endl;
00386 #endif
00387   }
00388 }
00389 
00390 
00391 // Restore sort position by fetching the last marked record
00392 // of each sub-run using getRandomRecord. This allows the caller
00393 // to back up in the sorted sequence (used in sort-merge join
00394 // in case of duplicates).
00395 
00396 void SortedFile::gotoMark()
00397 {
00398 #ifdef DEBUGSORT
00399   std::cout << "%%  Going to a mark in file" << std::endl;
00400 #endif
00401 
00402   std::vector<RUN>::iterator run;
00403 
00404   for(run = runs.begin(); run != runs.end(); run++)
00405   {
00406     run->rid.page_number = run->mark.page_number;
00407     run->rid.slot_number = run->mark.slot_number;
00408 
00409 #ifdef DEBUGSORT
00410     if(run->rid.page_number > 0)
00411       std::cout << "%%  Run " << i << " going to page " << run->rid.page_number << ", slot " << run->rid.slot_number << std::endl;
00412     else
00413       std::cout << "%%  Run " << i << " is already at eof" << std::endl;
00414 #endif
00415 
00416     // Restore file position only if last marked position is
00417     // something else than end of file.
00418 
00419     if(run->rid.page_number > 0)
00420     {
00421       run->rec = run->file->getRandomRecord(run->rid);
00422       run->file->gotoMark(run->rid);
00423     }
00424 
00425     // Current record is already in memory so next() must not
00426     // advance in the temporary file.
00427 
00428     run->valid = true;
00429   }
00430 }
00431 
00432 
00433 // Deallocate all space allocated for this sorted file and
00434 // delete temporary files.
00435 
00436 SortedFile::~SortedFile()
00437 {
00438   for(unsigned int i = 0; i < runs.size(); i++)
00439   {
00440     delete runs[i].file;
00441     File::remove(runs[i].name);
00442   }
00443 
00444   delete [] buffer;
00445 }
00446 
00447 }
 All Classes Namespaces Functions Variables Typedefs Enumerations Friends