1 /* Copyright (c) 2007, Stanford University
  2 * All rights reserved.
  3 *
  4 * Redistribution and use in source and binary forms, with or without
  5 * modification, are permitted provided that the following conditions are met:
  6 *     * Redistributions of source code must retain the above copyright
  7 *       notice, this list of conditions and the following disclaimer.
  8 *     * Redistributions in binary form must reproduce the above copyright
  9 *       notice, this list of conditions and the following disclaimer in the
 10 *       documentation and/or other materials provided with the distribution.
 11 *     * Neither the name of Stanford University nor the
 12 *       names of its contributors may be used to endorse or promote products
 13 *       derived from this software without specific prior written permission.
 14 *
 15 * THIS SOFTWARE IS PROVIDED BY STANFORD UNIVERSITY ``AS IS'' AND ANY
 16 * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
 17 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
 18 * DISCLAIMED. IN NO EVENT SHALL STANFORD UNIVERSITY BE LIABLE FOR ANY
 19 * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
 20 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
 21 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
 22 * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
 23 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
 24 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 25 */
 26 
 27 #ifndef _MAP_REDUCE_SCHEDULER_H_
 28 #define _MAP_REDUCE_SCHEDULER_H_
 29 
 30 /* Standard data types for the function arguments and results */
 31 
 32 
 33 /* Argument to map function. This is specified by the splitter function.
 34  * length - number of elements of data. The default splitter function gives 
 35             length in terms of the # of elements of unit_size bytes.
 36  * data - data to process of a user defined type
 37  */
 38 typedef struct
 39 {
 40    int length;
 41    void *data;
 42 } map_args_t;
 43 
 44 /* Single element of result
 45  * key - pointer to the key
 46  * val - pointer to the value
 47  */
 48 typedef struct
 49 {
 50    void * key;
 51    void * val;
 52 } keyval_t;
 53 
 54 /* List of results
 55  * length - number of key value pairs
 56  * data - array of key value pairs
 57  */
 58 typedef struct
 59 {
 60    int length;
 61    keyval_t *data;
 62 } final_data_t;
 63 
 64 /* Scheduler function pointer type definitions */
 65 
 66 /* Map function takes in map_args_t, as supplied by the splitter
 67  * and emit_intermediate() should be called on any key value pairs 
 68  * in the intermediate result set.
 69  */
 70 typedef void (*map_t)(map_args_t *);
 71 
 72 
 73 /* Reduce function takes in a key pointer, a list of value pointers, and a 
 74  * length of the list. emit() should be called on any key value pairs 
 75  * in the result set.
 76  */
 77 typedef void (*reduce_t)(void *, void **, int);
 78 
 79 /* Splitter function takes in a pointer to the input data, an interger of
 80  * the number of bytes requested, and an uninitialized pointer to a 
 81  * map_args_t pointer. The result is stored in map_args_t. The splitter
 82  * should return 1 if the result is valid or 0 if there is no more data.
 83  */
 84 typedef int (*splitter_t)(void *, int, map_args_t *);
 85 
 86 /* Partition function takes in the number of reduce tasks, a pointer to
 87  * a key, and the lendth of the key in bytes. It assigns a key to a reduce task.
 88  * The value returned is the # of the reduce task where the key will be processed. 
 89  * This value should be the same for all keys that are equal.
 90  */
 91 typedef int (*partition_t)(int, void *, int);
 92 
 93 /* key_cmp(key1, key2) returns:
 94  *   0 if key1 == key2
 95  *   + if key1 > key2
 96  *   - if key1 < key2 
 97  */
 98 typedef int (*key_cmp_t)(const void *, const void*);
 99 
100 /* The arguments to operate the scheduler */
101 typedef struct
102 {
103    void * task_data;    /* The data to run MapReduce on.
104                          * If splitter is NULL, this should be an array. */
105    int data_size;       /* Total # of bytes of data */
106    int unit_size;       /* # of bytes for one element (if necessary, on average) */
107 
108    map_t map;           /* Map function pointer, must be user defined */
109    reduce_t reduce;     /* If NULL, identity reduce function is used, 
110                          * which emits a keyval pair for each val. */
111    splitter_t splitter; /* If NULL, the array splitter is used.*/
112    key_cmp_t key_cmp;   /* Key comparison function, must be user defined.*/
113 
114    final_data_t *result;/* Pointer to output data, must be allocated by user */
115 
116    /*** Optional arguments must be zero if not used ***/
117    partition_t partition;      /* Default partition function is a hash function */
118    int use_one_queue_per_task; /* Creates one emit queue for each reduce task,
119                                 * instead of per reduce thread. This improves
120                                 * time to emit if data is emitted in order,
121                                 * but can increase merge time. */
122    int L1_cache_size;          /* Size of L1 cache in bytes */
123    int num_map_threads;        /* # of threads to run map tasks on.
124                                 * Default is one per processor */
125    int num_reduce_threads;     /* # of threads to run reduce tasks on.
126                                 * Default is one per processor */
127    int num_merge_threads;      /* # of threads to run merge tasks on.
128                                 * Default is one per processor */
129    int num_procs;              /* Maximum number of processors to use.
130                                 * TODO: Specify a processor set. */
131    float key_match_factor;     /* Magic number that describes the ratio of 
132                                 * the input data size to the output data size.
133                                 * This is used as a hint. */
134 } scheduler_args_t;
135 
136 /* Scheduler defined functions */
137 
138 /* The main MapReduce engine. This is the function called by the application.
139  * It is responsible for creating and scheduling all map and reduce tasks, and
140  * also organizes and maintains the data which is passed from application to 
141  * map tasks, map tasks to reduce tasks, and reduce tasks back to the
142  * application. Results are stored in args->result. A return value less than zero
143  * represents an error. This function is not thread safe. 
144  */
145 int map_reduce_scheduler(scheduler_args_t * args);
146 
147 /* This should be called from the map function. It stores a key with key_size
148  * bytes and a value in the intermediate queues for processing by the reduce 
149  * task. The scheduler will call partiton function to assign the key to a 
150  * reduce task.
151  */
152 void emit_intermediate(void *key, void *val, int key_size);
153 
154 /* Same as above except inline.
155  */
156 inline void emit_intermediate_inline(void *key, void *val, int key_size);
157 
158 /* This should be called from the reduce function. It stores a key and a value 
159  * in the reduce queue. This will be in the final result array.
160  */
161 void emit(void *key, void *val);
162 
163 /* Same as above except inline.
164  */
165 inline void emit_inline(void *key, void *val);
166 
167 /* This is the built in partition function which is a hash.  It is global so 
168  * the user defined partition function can call it.
169  */
170 int default_partition(int reduce_tasks, void* key, int key_size);
171 
172 #endif // _MAP_REDUCE_SCHEDULER_H_