-
Notifications
You must be signed in to change notification settings - Fork 602
Streaming Usage
You can use Hadoop streaming support with MongoDB in Python, Ruby, or NodeJS. There are source code examples for each language.
Streaming support + MongoDB requires your Hadoop distribution include the patches for the following issues:
- HADOOP-1722 - Make streaming to handle non-utf8 byte array
- HADOOP-5450 - Add support for application-specific typecodes to typed bytes
- MAPREDUCE-764 - TypedBytesInput's readRaw() does not preserve custom type codes
For the mainline Apache Hadoop distribution, these patches were merged for the 0.21.0 release. We have verified as well that the Cloudera distribution (while based on 0.20.x still) includes these patches in CDH3 Update 1; anecdotal evidence (which needs confirmation) indicates they may have been there since CDH2, and likely exist in CDH3 as well.
The streaming will be automatically built as part of the general build cycle except when building against Hadoop 1.0. No jar will be generated in this case as 1.0 lacks support for streaming.
This jar file is runnable with hadoop jar, and contains all of the dependencies necessary to run the job.
Each individual scripting language will have different requirements for working with MongoDB + Hadoop Streaming. Once you have the jar file built for mongo-hadoop-streaming, you will need to build and deploy the support libraries for your chosen language.
It will also be necessary to ensure these libraries are available on each Hadoop node in your cluster, along with the mongo-hadoop-core driver as outlined in the main setup instructions. However, you do not need to distribute the mongo-hadoop-streaming jar anywhere.
For distributions of Hadoop which support streaming, you can also use MongoDB collections as the input or output for these jobs as well. Here is a description of arguments needed to run a Hadoop streaming job including MongoDB support.
- Launch the job with
$HADOOP/bin/hadoop jar <location of streaming jar> …
- Depending on which hadoop release you use, the jar needed for streaming may be located in a different directory, commonly it is located
somewhere like
$HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming*.jar
- Provide dependencies for the job by either placing each jar file in one your hadoop's classpath directories, or explicitly list it on the
command line using
-libjars <jar file>
. You will need to do this for the Java MongoDB driver as well as the Mongo-Hadoop core library. - When using a mongoDB collection as input, add the arguments
-jobconf mongo.input.uri=<input mongo URI>
and-inputformat com.mongodb.hadoop.mapred.MongoInputFormat
- When using a mongoDB collection as output, add the arguments
-jobconf mongo.output.uri=<output mongo URI>
,-jobconf mapred.output.committer.class=com.mongodb.hadoop.mapred.output.MongoOutputCommitter
and-outputformat com.mongodb.hadoop.mapred.MongoOutputFormat
- When using BSON as input, use
-inputformat com.mongodb.hadoop.mapred.BSONFileInputFormat
. - Specify locations for the
-input
and-output
arguments. Even when using mongoDB for input or output, these are required; you can use temporary directories for these in such a case. - Always add the arguments
-jobconf stream.io.identifier.resolver.class=com.mongodb.hadoop.streaming.io.MongoIdentifierResolver
and-io mongodb
so that decoders/encoders needed by streaming can be resolved at run time. - Specify a
-mapper <mapper script>
and-reducer <reducer script>
. - Any other arguments needed, by using
-jobconf
. for examplemongo.input.query
or other options for controlling splitting behavior or filtering.
Here is a full example of a streaming command broken into separate lines for readability:
$HADOOP_HOME/bin/hadoop jar
$HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming*
-libjars /Users/mike/projects/mongo-hadoop/streaming/target/mongo-hadoop-streaming.jar
-input /tmp/in
-output /tmp/out
-inputformat com.mongodb.hadoop.mapred.MongoInputFormat
-outputformat com.mongodb.hadoop.mapred.MongoOutputFormat
-io mongodb
-jobconf mongo.input.uri=mongodb://127.0.0.1:4000/mongo_hadoop.yield_historical.in?readPreference=primary
-jobconf mongo.output.uri=mongodb://127.0.0.1:4000/mongo_hadoop.yield_historical.out
-jobconf stream.io.identifier.resolver.class=com.mongodb.hadoop.streaming.io.MongoIdentifierResolver
-mapper /Users/mike/projects/mongo-hadoop/streaming/examples/treasury/mapper.py
-reducer /Users/mike/projects/mongo-hadoop/streaming/examples/treasury/reducer.py
-jobconf mongo.input.query={_id:{\$gt:{\$date:883440000000}}}
Also, refer to the TestStreaming
class in the test suite for more concrete examples.
Important note: if you need to use print
or any other kind of text output when debugging streaming Map/Reduce scripts, be sure that
you are writing the debug statements to stderr
or some kind of log file. Using stdin
or stdout
for any purpose other than
communicating with the Hadoop Streaming layer will interfere with the encoding and decoding of data.
#####Setup
To use Python for streaming first install the python package pymongo_hadoop
using pip or easy_install (For best performance, ensure that
you are using the C extensions for BSON).
#####Mapper
To implement a mapper, write a function which accepts an iterable sequence of documents and calls yield
to produce each output document,
then call BSONMapper() against that function. For example:
from pymongo_hadoop import BSONMapper
def mapper(documents):
for doc in documents:
yield {'_id': doc['user_id'], 'city': doc['location']['city']}
BSONMapper(mapper)
#####Reducer
To implement a reducer, write a function which accepts two arguments: a key and an iterable sequence of documents matching that key.
Compute your reduce output and pass it back to Hadoop with return
. Then call BSONReducer()
against this function. For example,
from pymongo_hadoop import BSONReducer
def reducer(key, values):
_count = _sum = 0
for v in values:
_count += 1
_sum += v['bc10Year']
return {'_id': key, 'avg': _sum / _count,
'count': _count, 'sum': _sum }
BSONReducer(reducer)
Ruby support for MongoDB and Hadoop has two alternative ways of writing mapper and reducer functions. One version operates on documents, and the other works on key-value pairs. These are functions are called map
and kvmap
, reduce
and kvreduce
respectively.
You can install MongoDB Hadoop support for Ruby with gem install mongo-hadoop
.
The current Ruby implementation of streaming requires that the user define and pass a block to serve as a map function to the MongoHadoop.map
or MongoHadoop.kvmap
methods.
The block passed to map
takes a single argument which is the document to be processed by the mapper function. It needs to then emit (return) one document.
The key-value mapper, kvmap
, takes two arguments, a key and a value, and must return an array in which the first element is the key to be emitted and the second is a value docuement.
#!/usr/bin/env ruby
require 'mongo-hadoop'
MongoHadoop.map do |document|
{ :_id => doc['_id'], :value => doc['somefield'] }
end
Creating a reducer is similar. The user defines a block to serve as the reduce function.
The block passed to reduce
should take two arguments, a key and an array of values. It should iterate over the values and reduce the data by producing a single document that has an _id
field with the key and other fields with some reduction of the data (sum, count, etc...)
The key-value reducer should function similarly in that it takes a key and array of values but returns an array with the key as the first element and a value as the second.
#!/usr/bin/env ruby
require 'mongo-hadoop'
MongoHadoop.reduce do |key, values|
count = sum = 0
values.each do |value|
count += 1
sum += value['num']
end
{ :_id => key, :average => sum / count }
end
#####Setup
Install the nodejs mongo-hadoop lib with npm install node_mongo_hadoop
.
#####Mapper
Write a function that accepts two arguments: the input document, and a callback function. Call the callback function with the output of your map function. Then pass that function as an argument to node_mongo_hadoop.MapBSONStream. For example:
function mapFunc(doc, callback){
if(doc.headers && doc.headers.From && doc.headers.To){
var from_field = doc['headers']['From']
var to_field = doc['headers']['To']
var recips = []
to_field.split(',').forEach(function(to){
callback( {'_id': {'f':from_field, 't':trimString(to)}, 'count': 1} )
});
}
}
node_mongo_hadoop.MapBSONStream(mapFunc);
#####Reducer
Write a function that accepts three arguments: the key from the map phase, an array of documents for that key, and a callback. Call the callback function with the output result of reduce.
var node_mongo_hadoop = require('node_mongo_hadoop')
function reduceFunc(key, values, callback){
var count = 0;
values.forEach(function(v){
count += v.count
});
callback( {'_id':key, 'count':count } );
}
node_mongo_hadoop.ReduceBSONStream(reduceFunc);