1 #include <iostream>
2 #include <iomanip>
3 #include <unistd.h>
4 #include <sys/time.h>
5 #include "fatals.h"
6 #include "MapReduceScheduler.h"
7
8 #ifndef sparc
9 /* The only SPARC/Solaris-specific code is the gethrtime() call */
10 #error "Only SPARC architectures are supported."
11 #endif
12
13 using namespace std;
14
15 /* Key Comparison Function -- Keys are int* */
16 int intcmp( const void *v1, const void * v2 ) {
17 const int * i1 = (const int *) v1;
18 const int * i2 = (const int *) v2;
19
20 if( *i1 < *i2 ) return -1;
21 else if( *i1 > *i2 ) return 1;
22 else return 0;
23 }
24
25 /* Map function */
26 void sumarray_map( map_args_t * args ) {
27 int nChunkSize = args->length;
28 // cout << "Map chunk size is " << nChunkSize << endl;
29 int * miniArray = (int*) args->data;
30
31 // cout << "Calculating intermediate sum over " << nChunkSize << " elements: " ;
32
33 int intermediate_sum = 0;
34 for(int i=0;i<nChunkSize;i++) {
35 intermediate_sum += miniArray[i];
36 }
37
38 // cout << intermediate_sum << endl;
39
40 int * key = new int;
41 *key = 1;
42
43 int * val = new int;
44 *val = intermediate_sum;
45
46 // cout << "Emitting intermediate <" << *key << "," << *val << ">" << endl;
47 emit_intermediate( key, val, sizeof( int* ) );
48 }
49
50 /* Reduce Function */
51 void sumarray_reduce( void * key_in, void ** vals_in, int vals_len ) {
52 int nElements = vals_len;
53 int ** p_array = (int**) vals_in;
54 int * p_key = (int*) key_in;
55
56 delete p_key;
57
58 int sum = 0;
59 for(int i=0;i<nElements;i++) {
60 sum += p_array[i][0];
61 delete p_array[i];
62 }
63
64 int * key = new int;
65 *key = 0;
66
67 int * val = new int;
68 *val = sum;
69
70 emit( key, val );
71 }
72
73 int main( int argc, char * argv[] ) {
74
75 scheduler_args_t sched_args;
76 final_data_t result;
77 int * array;
78 int nElements;
79 hrtime_t starttime, endtime, linearsumtime, mr_sumtime;
80
81 /* Parameter parsing and checking */
82 if( argc != 2 ) {
83 fatal("Usage:\n%s <arraySize>\n", argv[0]);
84 }
85
86 nElements = atoi( argv[1] );
87 if( nElements <= 0 ) {
88 fatal("%s is not a valid array size.\n", argv[1]);
89 }
90
91 array = new int[nElements];
92
93 /* Initialize the array */
94 for(int i=0;i<nElements;i++) {
95 array[i] = i;
96 }
97
98 starttime = gethrtime();
99 int true_sum = 0;
100 for(int i=0;i<nElements;i++) {
101 true_sum += i;
102 }
103 endtime = gethrtime();
104 linearsumtime = endtime-starttime;
105
106 /*---------------------------------------------------------+
107 | Set up the arguments to the scheduler |
108 +---------------------------------------------------------*/
109
110 /* A pointer to whatever the splitter will split */
111 sched_args.task_data = array;
112
113 /* Total number of bytes of data */
114 sched_args.data_size = nElements*sizeof(int);
115
116 /* Pointer to the map function (REQUIRED) */
117 sched_args.map = sumarray_map;
118
119 /* Pointer to the reduce function ( NULL => Identity function ) */
120 sched_args.reduce = sumarray_reduce;
121
122 /* Pointer to the splitter function ( NULL => Array Splitter ) */
123 sched_args.splitter = NULL;
124
125 /* Pointer to the Key Comparison function (REQUIRED) */
126 sched_args.key_cmp = intcmp;
127
128 /* Pointer to the final output data (allocated by USER, NOT by runtime) */
129 sched_args.result = &result;
130
131 /* Pointer to the partition function ( default is a hash ) */
132 sched_args.partition = NULL;
133
134 /*-----------------------------------------------------------+
135 | Stuff below this point is SUPPOSEDLY only for performance |
136 | tuning... in my experience, these can screw things up, so |
137 | be wary. |
138 +-----------------------------------------------------------*/
139
140 /* Number of bytes per element (on average, if necessary) */
141 sched_args.unit_size = sizeof(int);
142
143 /* Iff nonzero, Creates one emit queue for each reduce task,
144 * instead of per reduce thread. This improves time to emit
145 * if data is emitted in order, but can increase merge time. */
146 sched_args.use_one_queue_per_task = 0;
147
148 /* L1D cache size, in bytes */
149 sched_args.L1_cache_size = 8192;
150
151 /* Number of threads on which to execute map tasks (Default = 1 per processor) */
152 sched_args.num_map_threads = -1;
153
154 /* Number of threads on which to execute reduce tasks ((Default = 1 per processor) */
155 sched_args.num_reduce_threads = -1;
156
157 /* Number of threads on which to execute merge tasks ((Default = 1 per processor) */
158 sched_args.num_merge_threads = -1;
159
160 /* Maximum number of processors to use */
161 sched_args.num_procs = 32;
162
163 /* Ratio of input data size to output data size */
164 sched_args.key_match_factor = (float) (nElements); // reducing an array to a sum
165
166 /*
167 * RUN THE ALGORITHM
168 */
169 starttime = gethrtime();
170 if( map_reduce_scheduler( &sched_args ) < 0 ) {
171 fatal("Scheduler had an error. Bailing out.\n");
172 }
173 endtime = gethrtime();
174
175 mr_sumtime = endtime-starttime;
176
177 keyval_t * p_pair = result.data;
178 int * p_mr_key = (int*) p_pair[0].key;
179 int * p_mr_sum = (int*) p_pair[0].val;
180 int mrsum = *p_mr_sum;
181
182 delete p_mr_key;
183 delete p_mr_sum;
184
185 cout << "MapReduce Sum: " << mrsum << endl;
186 cout << "True Sum: " << true_sum << endl;
187 cout << "MapReduce Time: " << mr_sumtime << endl;
188 cout << "Linear Scan Time: " << linearsumtime << endl;
189 cout << "Speedup: " << ((double) linearsumtime) / ((double) mr_sumtime) << "x" << endl;
190 cout << endl;
191
192 if( mrsum == true_sum ) {
193 cout << "Correct Result." << endl;
194 } else {
195 cout << "+--------------------------------------+" << endl;
196 cout << "| INCORRECT RESULT THE WORLD IS ENDING |" << endl;
197 cout << "+--------------------------------------+" << endl;
198 }
199
200 delete [] array;
201 array = NULL;
202
203 return 0;
204 }