I recently started investigating Apache Spark as a framework for data mining. Spark builds upon Apache Hadoop, and allows a multitude of operations more than map-reduce. It also supports streaming data with iterative algorithms.
Since Spark builds upon Hadoop and HDFS, it is compatible with any HDFS data source. Our server uses MongoDB, so we naturally turned to the mongo-hadoop connector, which allows reading and writing directly from a Mongo database.
However, it was far from obvious (at least for a beginner with Spark) how to use and configure mongo-hadoop together with Spark. After a lot of experimentation, frustration, and a few emails to the Spark user mailing list, I got it working in both Java and Scala. I wrote this tutorial to save others the exasperation.
Read below for details. The impatient can just grab the example application code.
Versions and APIs
The Hadoop ecosystem is fraught with different library versions and incompatible APIs. The major API change was in Hadoop 0.20, where the old
org.apache.hadoop.mapred API was changed to
org.apache.hadoop.mapreduce. The API change reflects also to other libraries: mongo-hadoop has the packages
com.mongodb.hadoop.mapred (old) and
com.mongodb.hadoop (new), while
SparkContext contains the methods
You need to take care to use the correct versions of each API. This is made even more confusing since in most cases the class names of the two APIs are the same, and only the package differs. If you get mysterious errors, double-check that you’re using the APIs consistently.
The example application uses Hadoop 2.2.0 and the new API.
Apache Spark depends on a multitude of supporting libraries ranging from Apache Commons and Hadoop to slf4j and Jetty. Don’t even try to manage the library dependencies yourself — use Maven, Ivy, SBT or something similar.
The example application uses SBT with the Akka Maven repositories. The Maven repositories contain the mongo-hadoop connector for several different Hadoop versions, but not for 2.2.0. Therefore the mongo-hadoop connector is included as an unmanaged library.
Using mongo-hadoop from Spark
The mongo-hadoop configuration parameters are passed using a
Configuration object (from the Hadoop packages). The most important parameters are
mongo.output.uri, which provide the Mongo host, port, authentication, database and collection names. You can also provide other configuration options, for example a Mongo query to limit the data.
Each Mongo collection is loaded as a separate RDD using the
JavaPairRDD<Object, BSONObject> rdd = sc.newAPIHadoopRDD(config, MongoInputFormat.class, Object.class, BSONObject.class);
This uses the new API, and
MongoInputFormat must be imported from
com.mongodb.hadoop. For the old API you would use the
hadoopRDD method and
The type returned is
RDD<Object, BSONObject>. The first parameter is an
ObjectId instance, which is the Mongo object ID of the document. The second parameter contains the BSON document.
Saving an RDD back to Mongo uses the
rdd.saveAsNewAPIHadoopFile("file:///bogus", Object.class, Object.class, MongoOutputFormat.class, config);
Only the two last parameters seem to be relevant (though the first argument must be a valid HDFS URI). The RDD is again of type
RDD<Object, BSONObject>. However, due to a bug, the first argument cannot be an
ObjectId. If you want to provide the object ID explicitly, use a
String. If you want the Mongo driver to generate the ID automatically, set it to
null (as the example application does).
The example app
The example application contains a simple word count algorithm in both Java and Scala. They read from a MongoDB running on localhost from the database
beowulf and collection
input. The documents contain just a
text field, on which the word count is run.
The results are stored in the same database in the collection
output, with the document containing the fields
The example requires MongoDB to be running on localhost and Scala 2.10 and SBT to be installed. Afterwards, you can import the example data, run the programs and print out the results with the following commands:
mongoimport -d beowulf -c input beowulf.json sbt 'run-main JavaWordCount' sbt 'run-main ScalaWordCount' mongo beowulf --eval 'printjson(db.output.find().toArray())' | less