Information about MapReduce is available on the [wiki:RepyMapReduce main MapReduce page].
In this first assignment, a simple one worker node and one primary node MapReduce framework will be built. The primary node will acquire data, hand it off to the worker, and wait for the worker to finish. The worker will perform a map() pass and a reduce() pass on the data and send the result back to the primary node. Since only one worker is involved in the computation, the partition step is left for future assignments. This code will serve as a base for the future assignments that will expand the scalability and robustness of the framework.
For the worker, begin with implementing a pipeline. Where is the worker going to call its mapping function and reduce function from? Put these calls in the script portion of the mapred.repy file, after ''if callfunc == 'initialize':''. First, the worker needs to call a do_map function, giving the initial data as a parameter. This data has a strict outline, where each line is treated as a key-value pair (separated by the ASCII character ** n**), and the key is separated from the value with a tab (first tab in the line, ASCII character ** t**).
In order to ensure that the implementation of the worker stays general, we're going to enforce that any algorithmic-specific functionality be included in a separate file. For this reason, mapper.repy and reducer.repy will need to be included at the top of the file. These files will supply the map_func(key, value) and reduce_func(key, value_list) functions to the worker's do_map() and do_reduce() helper functions, respectively. Two sample files (mapper.repy, reducer.repy) are included in the skeleton code. After generating key-value pairs, the worker will call the map_func(key, value) function in mapper.repy for each key-value pair, collect the results, and return the list of key-value pairs generated by the do_map() function to the worker.
With the results from the do_map() function, do_reduce can now be called after some simplification. The do_map function will have returned a list of (key, value) tuples, but the reduce_func expects a key with a list of values! This means the worker will have to combine (key, value) tuples into a dictionary where the dictionary keys are keys and the dictionary values will be a list of values. After this conversion, do_reduce() can be called while passing in this {key1: [v1, v2,..., vn], ..} dictionary.
Similar to do_map(), do_reduce() will call the reduce_func(key, value_list) for each key-values pair. After collecting the results of reduce_func(), the key-value pairs can be expanded back out into the flat format as before ("key t value n").
In this step, the mappri.repy program will embody the preliminary design of how the primary node communicates with the single MapReduce peer. In order to hand off the initial data to the MapReduce peer, the primary needs to be able to read a data file, copy its contents, and send it to the peer. To do this, mappri.repy must take some arguments on the command line:
mappri.repy <init data file> <peer ip>:<peer port>
Your MapReduce primary will take these parameters on the command line, do some basic error checking, attempt to connect to the replica, and pass it a message with the initial data. The primary node should then wait until the peer is completed with its computation and receive the output of the peer. The primary can either write this to a file of your choosing or output this data to screen and exit.
In order to allow communication between the primary and peer, it is necessary to ''serialize'' all communication between the two. A complicated data structure such as a dictionary cannot be represented as a string without some distinct structure and possibly some limitations. Since repy does not have any built-in modules to support ''pickling'' (the python term for serializing objects), this responsibility must fall to you.
It is important to note that sockets always receive a distinct number of bytes. How does the program using the socket know if the message is been entirely received if the socket has no idea how long the message is? There are many ways to solve this, but one of the easiest is to preface the message with the size of the message. This way, the receiving side can look at the first couple of bytes (iterating if it doesn't find the * separator), find and parse how long the message should be, and then receive the entire message from the socket. An example of what a message could look like is below:
<size>*<message message message>
For easier readability for yourself and your grader, encapsulate this sending and receiving functionality into methods, so only a recv_msg() needs to be called to receive a message or a send_msg() to compose the outgoing message and send it instead of repeating the functionality wherever communication is required.
It may be needed now or in the next MapReduce assignments to serialize more complicated python structure such as dictionaries. The appeal of dictionaries in a MapReduce setting makes sense; it's a structure that can hold a mapping of a key to a list of values, e.g. {key1: [v1,v2,v3], key2: [v3,v5], ...}
.
- Work sequentially! Start with step 1 and ensure that if you directly feed in data that it will be mapped and reduced correctly. Move onto developing the skeleton for step 2, and think about how you can pass strings using TCP messages for step 3.
- To save the trouble of destroying and recreating sockets, remember that keeping several sockets alive is very cheap. It is comparatively more expensive to keep creating them however. Take advantage of the global mycontext dictionary and utilize the same socket to respond to the primary or vice versa.
- Break your assignment into multiple methods. Do most of your computation in methods, not in the script part of the repy file. This allows for greater headway going forward, and is clearer to other people reading your program.
- Make sure you do not call your own functions map(), reduce(), or hash(). These are built-in python methods, consider using other related names!
Turn in a tar of your repy code including the following files:
- mappri.repy - Your primary node code
- mapred.repy - Your worker node code
- README - A readme file detailing any bugs, limitations, or special operating instructions of your program