1 #include <iostream> 2 #include <iomanip> 3 #include <unistd.h> 4 #include <sys/time.h> 5 #include "fatals.h" 6 #include "MapReduceScheduler.h" 7 8 #ifndef sparc 9 /* The only SPARC/Solaris-specific code is the gethrtime() call */ 10 #error "Only SPARC architectures are supported." 11 #endif 12 13 using namespace std; 14 15 /* Key Comparison Function -- Keys are int* */ 16 int intcmp( const void *v1, const void * v2 ) { 17 const int * i1 = (const int *) v1; 18 const int * i2 = (const int *) v2; 19 20 if( *i1 < *i2 ) return -1; 21 else if( *i1 > *i2 ) return 1; 22 else return 0; 23 } 24 25 /* Map function */ 26 void sumarray_map( map_args_t * args ) { 27 int nChunkSize = args->length; 28 // cout << "Map chunk size is " << nChunkSize << endl; 29 int * miniArray = (int*) args->data; 30 31 // cout << "Calculating intermediate sum over " << nChunkSize << " elements: " ; 32 33 int intermediate_sum = 0; 34 for(int i=0;i<nChunkSize;i++) { 35 intermediate_sum += miniArray[i]; 36 } 37 38 // cout << intermediate_sum << endl; 39 40 int * key = new int; 41 *key = 1; 42 43 int * val = new int; 44 *val = intermediate_sum; 45 46 // cout << "Emitting intermediate <" << *key << "," << *val << ">" << endl; 47 emit_intermediate( key, val, sizeof( int* ) ); 48 } 49 50 /* Reduce Function */ 51 void sumarray_reduce( void * key_in, void ** vals_in, int vals_len ) { 52 int nElements = vals_len; 53 int ** p_array = (int**) vals_in; 54 int * p_key = (int*) key_in; 55 56 delete p_key; 57 58 int sum = 0; 59 for(int i=0;i<nElements;i++) { 60 sum += p_array[i][0]; 61 delete p_array[i]; 62 } 63 64 int * key = new int; 65 *key = 0; 66 67 int * val = new int; 68 *val = sum; 69 70 emit( key, val ); 71 } 72 73 int main( int argc, char * argv[] ) { 74 75 scheduler_args_t sched_args; 76 final_data_t result; 77 int * array; 78 int nElements; 79 hrtime_t starttime, endtime, linearsumtime, mr_sumtime; 80 81 /* Parameter parsing and checking */ 82 if( argc != 2 ) { 83 fatal("Usage:\n%s <arraySize>\n", argv[0]); 84 } 85 86 nElements = atoi( argv[1] ); 87 if( nElements <= 0 ) { 88 fatal("%s is not a valid array size.\n", argv[1]); 89 } 90 91 array = new int[nElements]; 92 93 /* Initialize the array */ 94 for(int i=0;i<nElements;i++) { 95 array[i] = i; 96 } 97 98 starttime = gethrtime(); 99 int true_sum = 0; 100 for(int i=0;i<nElements;i++) { 101 true_sum += i; 102 } 103 endtime = gethrtime(); 104 linearsumtime = endtime-starttime; 105 106 /*---------------------------------------------------------+ 107 | Set up the arguments to the scheduler | 108 +---------------------------------------------------------*/ 109 110 /* A pointer to whatever the splitter will split */ 111 sched_args.task_data = array; 112 113 /* Total number of bytes of data */ 114 sched_args.data_size = nElements*sizeof(int); 115 116 /* Pointer to the map function (REQUIRED) */ 117 sched_args.map = sumarray_map; 118 119 /* Pointer to the reduce function ( NULL => Identity function ) */ 120 sched_args.reduce = sumarray_reduce; 121 122 /* Pointer to the splitter function ( NULL => Array Splitter ) */ 123 sched_args.splitter = NULL; 124 125 /* Pointer to the Key Comparison function (REQUIRED) */ 126 sched_args.key_cmp = intcmp; 127 128 /* Pointer to the final output data (allocated by USER, NOT by runtime) */ 129 sched_args.result = &result; 130 131 /* Pointer to the partition function ( default is a hash ) */ 132 sched_args.partition = NULL; 133 134 /*-----------------------------------------------------------+ 135 | Stuff below this point is SUPPOSEDLY only for performance | 136 | tuning... in my experience, these can screw things up, so | 137 | be wary. | 138 +-----------------------------------------------------------*/ 139 140 /* Number of bytes per element (on average, if necessary) */ 141 sched_args.unit_size = sizeof(int); 142 143 /* Iff nonzero, Creates one emit queue for each reduce task, 144 * instead of per reduce thread. This improves time to emit 145 * if data is emitted in order, but can increase merge time. */ 146 sched_args.use_one_queue_per_task = 0; 147 148 /* L1D cache size, in bytes */ 149 sched_args.L1_cache_size = 8192; 150 151 /* Number of threads on which to execute map tasks (Default = 1 per processor) */ 152 sched_args.num_map_threads = -1; 153 154 /* Number of threads on which to execute reduce tasks ((Default = 1 per processor) */ 155 sched_args.num_reduce_threads = -1; 156 157 /* Number of threads on which to execute merge tasks ((Default = 1 per processor) */ 158 sched_args.num_merge_threads = -1; 159 160 /* Maximum number of processors to use */ 161 sched_args.num_procs = 32; 162 163 /* Ratio of input data size to output data size */ 164 sched_args.key_match_factor = (float) (nElements); // reducing an array to a sum 165 166 /* 167 * RUN THE ALGORITHM 168 */ 169 starttime = gethrtime(); 170 if( map_reduce_scheduler( &sched_args ) < 0 ) { 171 fatal("Scheduler had an error. Bailing out.\n"); 172 } 173 endtime = gethrtime(); 174 175 mr_sumtime = endtime-starttime; 176 177 keyval_t * p_pair = result.data; 178 int * p_mr_key = (int*) p_pair[0].key; 179 int * p_mr_sum = (int*) p_pair[0].val; 180 int mrsum = *p_mr_sum; 181 182 delete p_mr_key; 183 delete p_mr_sum; 184 185 cout << "MapReduce Sum: " << mrsum << endl; 186 cout << "True Sum: " << true_sum << endl; 187 cout << "MapReduce Time: " << mr_sumtime << endl; 188 cout << "Linear Scan Time: " << linearsumtime << endl; 189 cout << "Speedup: " << ((double) linearsumtime) / ((double) mr_sumtime) << "x" << endl; 190 cout << endl; 191 192 if( mrsum == true_sum ) { 193 cout << "Correct Result." << endl; 194 } else { 195 cout << "+--------------------------------------+" << endl; 196 cout << "| INCORRECT RESULT THE WORLD IS ENDING |" << endl; 197 cout << "+--------------------------------------+" << endl; 198 } 199 200 delete [] array; 201 array = NULL; 202 203 return 0; 204 }