1 /* Copyright (c) 2007, Stanford University
2 * All rights reserved.
3 *
4 * Redistribution and use in source and binary forms, with or without
5 * modification, are permitted provided that the following conditions are met:
6 * * Redistributions of source code must retain the above copyright
7 * notice, this list of conditions and the following disclaimer.
8 * * Redistributions in binary form must reproduce the above copyright
9 * notice, this list of conditions and the following disclaimer in the
10 * documentation and/or other materials provided with the distribution.
11 * * Neither the name of Stanford University nor the
12 * names of its contributors may be used to endorse or promote products
13 * derived from this software without specific prior written permission.
14 *
15 * THIS SOFTWARE IS PROVIDED BY STANFORD UNIVERSITY ``AS IS'' AND ANY
16 * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
17 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
18 * DISCLAIMED. IN NO EVENT SHALL STANFORD UNIVERSITY BE LIABLE FOR ANY
19 * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
20 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
21 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
22 * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
23 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
24 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
25 */
26
27 #ifndef _MAP_REDUCE_SCHEDULER_H_
28 #define _MAP_REDUCE_SCHEDULER_H_
29
30 /* Standard data types for the function arguments and results */
31
32
33 /* Argument to map function. This is specified by the splitter function.
34 * length - number of elements of data. The default splitter function gives
35 length in terms of the # of elements of unit_size bytes.
36 * data - data to process of a user defined type
37 */
38 typedef struct
39 {
40 int length;
41 void *data;
42 } map_args_t;
43
44 /* Single element of result
45 * key - pointer to the key
46 * val - pointer to the value
47 */
48 typedef struct
49 {
50 void * key;
51 void * val;
52 } keyval_t;
53
54 /* List of results
55 * length - number of key value pairs
56 * data - array of key value pairs
57 */
58 typedef struct
59 {
60 int length;
61 keyval_t *data;
62 } final_data_t;
63
64 /* Scheduler function pointer type definitions */
65
66 /* Map function takes in map_args_t, as supplied by the splitter
67 * and emit_intermediate() should be called on any key value pairs
68 * in the intermediate result set.
69 */
70 typedef void (*map_t)(map_args_t *);
71
72
73 /* Reduce function takes in a key pointer, a list of value pointers, and a
74 * length of the list. emit() should be called on any key value pairs
75 * in the result set.
76 */
77 typedef void (*reduce_t)(void *, void **, int);
78
79 /* Splitter function takes in a pointer to the input data, an interger of
80 * the number of bytes requested, and an uninitialized pointer to a
81 * map_args_t pointer. The result is stored in map_args_t. The splitter
82 * should return 1 if the result is valid or 0 if there is no more data.
83 */
84 typedef int (*splitter_t)(void *, int, map_args_t *);
85
86 /* Partition function takes in the number of reduce tasks, a pointer to
87 * a key, and the lendth of the key in bytes. It assigns a key to a reduce task.
88 * The value returned is the # of the reduce task where the key will be processed.
89 * This value should be the same for all keys that are equal.
90 */
91 typedef int (*partition_t)(int, void *, int);
92
93 /* key_cmp(key1, key2) returns:
94 * 0 if key1 == key2
95 * + if key1 > key2
96 * - if key1 < key2
97 */
98 typedef int (*key_cmp_t)(const void *, const void*);
99
100 /* The arguments to operate the scheduler */
101 typedef struct
102 {
103 void * task_data; /* The data to run MapReduce on.
104 * If splitter is NULL, this should be an array. */
105 int data_size; /* Total # of bytes of data */
106 int unit_size; /* # of bytes for one element (if necessary, on average) */
107
108 map_t map; /* Map function pointer, must be user defined */
109 reduce_t reduce; /* If NULL, identity reduce function is used,
110 * which emits a keyval pair for each val. */
111 splitter_t splitter; /* If NULL, the array splitter is used.*/
112 key_cmp_t key_cmp; /* Key comparison function, must be user defined.*/
113
114 final_data_t *result;/* Pointer to output data, must be allocated by user */
115
116 /*** Optional arguments must be zero if not used ***/
117 partition_t partition; /* Default partition function is a hash function */
118 int use_one_queue_per_task; /* Creates one emit queue for each reduce task,
119 * instead of per reduce thread. This improves
120 * time to emit if data is emitted in order,
121 * but can increase merge time. */
122 int L1_cache_size; /* Size of L1 cache in bytes */
123 int num_map_threads; /* # of threads to run map tasks on.
124 * Default is one per processor */
125 int num_reduce_threads; /* # of threads to run reduce tasks on.
126 * Default is one per processor */
127 int num_merge_threads; /* # of threads to run merge tasks on.
128 * Default is one per processor */
129 int num_procs; /* Maximum number of processors to use.
130 * TODO: Specify a processor set. */
131 float key_match_factor; /* Magic number that describes the ratio of
132 * the input data size to the output data size.
133 * This is used as a hint. */
134 } scheduler_args_t;
135
136 /* Scheduler defined functions */
137
138 /* The main MapReduce engine. This is the function called by the application.
139 * It is responsible for creating and scheduling all map and reduce tasks, and
140 * also organizes and maintains the data which is passed from application to
141 * map tasks, map tasks to reduce tasks, and reduce tasks back to the
142 * application. Results are stored in args->result. A return value less than zero
143 * represents an error. This function is not thread safe.
144 */
145 int map_reduce_scheduler(scheduler_args_t * args);
146
147 /* This should be called from the map function. It stores a key with key_size
148 * bytes and a value in the intermediate queues for processing by the reduce
149 * task. The scheduler will call partiton function to assign the key to a
150 * reduce task.
151 */
152 void emit_intermediate(void *key, void *val, int key_size);
153
154 /* Same as above except inline.
155 */
156 inline void emit_intermediate_inline(void *key, void *val, int key_size);
157
158 /* This should be called from the reduce function. It stores a key and a value
159 * in the reduce queue. This will be in the final result array.
160 */
161 void emit(void *key, void *val);
162
163 /* Same as above except inline.
164 */
165 inline void emit_inline(void *key, void *val);
166
167 /* This is the built in partition function which is a hash. It is global so
168 * the user defined partition function can call it.
169 */
170 int default_partition(int reduce_tasks, void* key, int key_size);
171
172 #endif // _MAP_REDUCE_SCHEDULER_H_