1 #include <iostream>
  2 #include <iomanip>
  3 #include <unistd.h>
  4 #include <sys/time.h>
  5 #include "fatals.h"
  6 #include "MapReduceScheduler.h"
  7 
  8 #ifndef sparc
  9 /* The only SPARC/Solaris-specific code is the gethrtime() call */
 10 #error "Only SPARC architectures are supported."
 11 #endif
 12 
 13 using namespace std;
 14 
 15 /* Key Comparison Function -- Keys are int* */
 16 int intcmp( const void *v1, const void * v2 ) {
 17   const int * i1 = (const int *) v1;
 18   const int * i2 = (const int *) v2;
 19 
 20   if( *i1 < *i2 )      return -1;
 21   else if( *i1 > *i2 ) return  1;
 22   else                 return  0;
 23 }
 24 
 25 /* Map function */
 26 void sumarray_map( map_args_t * args ) {
 27   int nChunkSize  = args->length;
 28   // cout << "Map chunk size is " << nChunkSize << endl;
 29   int * miniArray = (int*) args->data;
 30 
 31   // cout << "Calculating intermediate sum over " << nChunkSize << " elements: " ;
 32 
 33   int intermediate_sum = 0;
 34   for(int i=0;i<nChunkSize;i++) {
 35     intermediate_sum += miniArray[i];
 36   }
 37 
 38   // cout << intermediate_sum << endl;
 39 
 40   int * key = new int;
 41   *key = 1;
 42 
 43   int * val = new int;
 44   *val = intermediate_sum;
 45 
 46   // cout << "Emitting intermediate <" << *key << "," << *val << ">" << endl;
 47   emit_intermediate( key, val, sizeof( int* ) );
 48 }
 49 
 50 /* Reduce Function */
 51 void sumarray_reduce( void * key_in, void ** vals_in, int vals_len ) {
 52   int nElements = vals_len;
 53   int ** p_array = (int**) vals_in;
 54   int * p_key = (int*) key_in;
 55 
 56   delete p_key;
 57 
 58   int sum = 0;
 59   for(int i=0;i<nElements;i++) {
 60     sum += p_array[i][0];
 61     delete p_array[i];
 62   }
 63 
 64   int * key = new int;
 65   *key = 0;
 66 
 67   int * val = new int;
 68   *val = sum;
 69 
 70   emit( key, val );
 71 }
 72 
 73 int main( int argc, char * argv[] ) {
 74 
 75   scheduler_args_t sched_args;
 76   final_data_t result;
 77   int * array;
 78   int nElements;
 79   hrtime_t starttime, endtime, linearsumtime, mr_sumtime;
 80 
 81   /* Parameter parsing and checking */
 82   if( argc != 2 ) {
 83     fatal("Usage:\n%s <arraySize>\n", argv[0]);
 84   }
 85 
 86   nElements = atoi( argv[1] );
 87   if( nElements <= 0 ) {
 88     fatal("%s is not a valid array size.\n", argv[1]);
 89   }
 90 
 91   array = new int[nElements];
 92 
 93   /* Initialize the array */
 94   for(int i=0;i<nElements;i++) {
 95     array[i] = i;
 96   }
 97 
 98   starttime = gethrtime();
 99   int true_sum = 0;
100   for(int i=0;i<nElements;i++) {
101     true_sum += i;
102   }
103   endtime = gethrtime();
104   linearsumtime = endtime-starttime;
105 
106   /*---------------------------------------------------------+
107    | Set up the arguments to the scheduler                   |
108    +---------------------------------------------------------*/
109 
110   /* A pointer to whatever the splitter will split */
111   sched_args.task_data = array;
112 
113   /* Total number of bytes of data */
114   sched_args.data_size = nElements*sizeof(int);
115 
116   /* Pointer to the map function (REQUIRED) */
117   sched_args.map = sumarray_map;
118 
119   /* Pointer to the reduce function ( NULL => Identity function ) */
120   sched_args.reduce = sumarray_reduce;
121 
122   /* Pointer to the splitter function ( NULL => Array Splitter ) */
123   sched_args.splitter = NULL;
124 
125   /* Pointer to the Key Comparison function (REQUIRED) */
126   sched_args.key_cmp = intcmp;
127 
128   /* Pointer to the final output data (allocated by USER, NOT by runtime) */
129   sched_args.result = &result;
130 
131   /* Pointer to the partition function ( default is a hash ) */
132   sched_args.partition = NULL;
133 
134   /*-----------------------------------------------------------+
135    | Stuff below this point is SUPPOSEDLY only for performance |
136    | tuning... in my experience, these can screw things up, so |
137    | be wary.                                                  |
138    +-----------------------------------------------------------*/
139 
140   /* Number of bytes per element (on average, if necessary) */
141   sched_args.unit_size = sizeof(int);
142 
143   /* Iff nonzero, Creates one emit queue for each reduce task,
144    * instead of per reduce thread. This improves time to emit 
145    * if data is emitted in order, but can increase merge time. */
146   sched_args.use_one_queue_per_task = 0;
147 
148   /* L1D cache size, in bytes */
149   sched_args.L1_cache_size = 8192;
150 
151   /* Number of threads on which to execute map tasks (Default = 1 per processor) */
152   sched_args.num_map_threads = -1;
153 
154   /* Number of threads on which to execute reduce tasks ((Default = 1 per processor) */
155   sched_args.num_reduce_threads = -1;
156 
157   /* Number of threads on which to execute merge tasks ((Default = 1 per processor) */
158   sched_args.num_merge_threads = -1;
159 
160   /* Maximum number of processors to use */
161   sched_args.num_procs = 32;
162 
163   /* Ratio of input data size to output data size */
164   sched_args.key_match_factor = (float) (nElements); // reducing an array to a sum
165 
166   /*
167    * RUN THE ALGORITHM
168    */
169   starttime = gethrtime();
170   if( map_reduce_scheduler( &sched_args ) < 0 ) {
171     fatal("Scheduler had an error. Bailing out.\n");
172   }
173   endtime = gethrtime();
174 
175   mr_sumtime = endtime-starttime;
176 
177   keyval_t * p_pair = result.data;
178   int * p_mr_key = (int*) p_pair[0].key;
179   int * p_mr_sum = (int*) p_pair[0].val;
180   int mrsum = *p_mr_sum;
181 
182   delete p_mr_key;
183   delete p_mr_sum;
184 
185   cout << "MapReduce Sum: " << mrsum << endl;
186   cout << "True Sum:      " << true_sum << endl;
187   cout << "MapReduce Time:   " << mr_sumtime << endl;
188   cout << "Linear Scan Time: " << linearsumtime << endl;
189   cout << "Speedup: " << ((double) linearsumtime) / ((double) mr_sumtime) << "x" << endl;
190   cout << endl;
191 
192   if( mrsum == true_sum ) {
193     cout << "Correct Result." << endl;
194   } else {
195     cout << "+--------------------------------------+" << endl;
196     cout << "| INCORRECT RESULT THE WORLD IS ENDING |" << endl;
197     cout << "+--------------------------------------+" << endl;
198   }
199 
200   delete [] array;
201   array = NULL;
202 
203   return 0;
204 }