This document gives a quick example of how to use the MapReduce implementation described in [1], by means of a simple example. The example is available as a tarball here, Updated 31 July 2010. Please note that the MapReduceScheduler.c file differs slightly from the version released by the original authors. The version in the tarball has been modified to compile cleanly with GCC 4.0.2. The files are also available as syntax-highlighted HTML here (the MapReduce implementation is not shown, and fatals.* are elided). This document assumes the reader is following along using the syntax-highlighted Makefile and main.C.
Updated 31 July 2010: Since this tutorial has gained some popularity for non-Wisconsin users, I've modified the files in the tarball to build by default on x86/Linux instead of SPARC/Solaris. I've also removed the remote build commands from the Makefile, as those were fairly Wisconsin-specific. Some of the line number mentioned in the prose below may be incorrect, and the syntax-highlighted HTML differs from the source in the tarball.
Caveat: The MapReduce implementation seems to break if when a very large number of keys are emitted (eg. 2 billion+). FYI.
farfo(1)% cd ~/
farfo(2)% mkdir mapreduce_example
farfo(3)% cd mapreduce_example
farfo(4)% cp $DOWNLOADS/mapreduce.tar.gz .
farfo(5)% gzcat mapreduce.tar.gz | tar -xf -
farfo(6)% ls
fatals.cpp main.C MapReduceScheduler.c mapreduce.tar.gz
fatals.h Makefile MapReduceScheduler.h
Quoting [1] directly:
The authors released their implementation, which comes with a couple of examples. You can find the authors' site here. Read on for a trivial home-grown example.
The age-old problem: given an array of numbers, find their sum. Admittedly, the problem is not horribly interesting, but can still benefit from parallelism, provided the array is reasonably large. Follow along with main.C.
Strategy: Split the array into segments, let threads sum each segment. Then, sum the intermediate sums to produce the final sum.
The Map function in the example, void sumarray_map( map_args_t * args ), (line 26) determines the intermediate sum of a subset of the large array, called miniArray. It then emits this sum, tagged with a constant key (1 in the example). Note that sumarray_map() determines the length and location of its input directly from the map_args_t struct passed to it. The MapReduce runtime is responsible for splitting the array, determining the length of each slice, and calling sumarray_map() the appropriate number of times and with the correct arguments.
The various data structures used in MapReduce can be found in MapReduceScheduler.h.
If we had been inclinded to do so, we could have used our own data parition strategy by defining a splitter function. The default splitter, which simply splits an array into segments, suffices for this example.
The Reduce function (void sumarray_reduce( void * key_in, void ** vals_in, int vals_len )) is executed by the runtime after all invocations of Map have completed. sumarray_reduce() sums all the intermediate values (with key = 1) and emits the final result. In general, reduce will be called once for each unique key value emitted by the map stage, in parallel. Since our map only emitted one unique key, only one reduce is invoked.
If we had been inclined to do so, we could have specified how to assign the reduce tasks among available worker threads by defining a partition function. By default, our MapReduce runtime will hash keys to perform reduce assignment. Any assignment is acceptible in our case, since we only have one reduce invocation anyway.
The MapReduce function forces the user to define a key comaparison function, somewhat analagous to C++'s ability to override comparison operators. Since our keys are just poitners to integers, our key comparison function ((int intcmp( const void *v1, const void * v2 )) is simple. Key Comparison is used to sort the final emitted outputs of reduce before returning the list of result keys.
To actually run MapReduce, we first parse our arguments (lines 74-105 in main()), then we set up the scheduler_args_t at lines 106-164. Notably, define the data set, provide pointers to the map, reduce, and key comparison functions, specify the default splitter and partition functions (see [1]), and provide a pointer to where we will store the result sum.
We also provide some hints to the runtime to possibly enable faster execution: the size of a single element, the L1-D cache size, etc. The runtime will take these hints into account when paritioning the data for mapping, and for other data and control allocation.
To actually invoke our MapReduce algorithm, we call map_reduce_scheduler(). Since we have written map and reduce functions, we know that there will only be one emitted <key,value> pair, and that the value will contain the array sum.
[1]'s Tables 1 and 2 are useful references on how to configure the runtime.
To make development easier, we're going to write our code in AFS and on a machine that supports a lot of nice features, like X-windowed applications. A clever Makefile will allow us to actually build on our Solaris-based Sun T2000, chianti. To enable remote build, copy remoteMake.tcsh and remoteMake.tcsh from the tarball to your home directory on chianti, set up RSA Authenticaion (a guide is available here), and modify the local Makefile to reflect your username and whatever directory structure you prefer. The rsync utility will take care of keeping the local and remote directories synchronized.
You will want to make directories on chianti commensurate with your Makefile settings before running make.
farfo(7)% make
Syncing Remote Directory...
rsync -av . joeuser@chianti:afs_sync/mapreduce_trivial
building file list ... done
sent 235 bytes received 20 bytes 170.00 bytes/sec
total size is 63002 speedup is 247.07
Initiating Remote Build...
ssh chianti ./remoteMake.tcsh afs_sync/mapreduce_trivial
Making .generated/ path for object files...
mkdir .generated
Making bin/ path for binaries...
mkdir bin
*Compiling fatals.cpp...
g++ -c -O3 fatals.cpp -o .generated/fatals.o
*Compiling MapReduceScheduler.c...
g++ -c -O3 -D_SOLARIS_ MapReduceScheduler.c -o .generated/MapReduceScheduler.o
MapReduceScheduler.c:144: warning: non-local variable '
*Compiling main.C...
g++ -c -O3 main.C -o .generated/main.o
***Making binary sumArray...
g++ -lm -lpthread -lrt -lcpc .generated/fatals.o .generated/MapReduceScheduler.o .generated/main.o -o bin/sumArray
My work here is done.
Just as we did with building, we can run the algorithm remotely as well, via the run.tcsh script. Edit it to match your Makefile's settings and run the script, passing one argument (the size of the array).
farfo(8)% ./run.tcsh 100
MapReduce Sum: 4950
True Sum: 4950
MapReduce Time: 53669080
Linear Scan Time: 612
Speedup: 1.14032e-05x
Correct Result.
Just for fun, experiment with the array size and see how much speedup our toy program can get. We don't do a lot of error checking, so very large arrays will crash. Uncomment the printing and notice that the runtime opts for single-thread execution when the array size is small.
[1] Ranger, Raghuraman, Penmetsa, Bradski, and Kozyrakis. Evaluating MapReduce for Multi-core and Multiprocessor Systems. In HPCA 2007. Link