Using Spark with MongoDB

I recently started investigating Apache Spark as a framework for data mining. Spark builds upon Apache Hadoop, and allows a multitude of operations more than map-reduce. It also supports streaming data with iterative algorithms.

Since Spark builds upon Hadoop and HDFS, it is compatible with any HDFS data source. Our server uses MongoDB, so we naturally turned to the mongo-hadoop connector, which allows reading and writing directly from a Mongo database.

However, it was far from obvious (at least for a beginner with Spark) how to use and configure mongo-hadoop together with Spark. After a lot of experimentation, frustration, and a few emails to the Spark user mailing list, I got it working in both Java and Scala. I wrote this tutorial to save others the exasperation.

Read below for details. The impatient can just grab the example application code.

Versions and APIs

The Hadoop ecosystem is fraught with different library versions and incompatible APIs. The major API change was in Hadoop 0.20, where the old org.apache.hadoop.mapred API was changed to org.apache.hadoop.mapreduce.  The API change reflects also to other libraries: mongo-hadoop has the packages com.mongodb.hadoop.mapred (old) and com.mongodb.hadoop (new), while SparkContext contains the methods hadoopRDD and newAPIHadoopRDD.

You need to take care to use the correct versions of each API.  This is made even more confusing since in most cases the class names of the two APIs are the same, and only the package differs.  If you get mysterious errors, double-check that you’re using the APIs consistently.

The example application uses Hadoop 2.2.0 and the new API.

Library dependencies

Apache Spark depends on a multitude of supporting libraries ranging from Apache Commons and Hadoop to slf4j and Jetty. Don’t even try to manage the library dependencies yourself — use Maven, Ivy, SBT or something similar.

The example application uses SBT with the Akka Maven repositories. The Maven repositories contain the mongo-hadoop connector for several different Hadoop versions, but not for 2.2.0. Therefore the mongo-hadoop connector is included as an unmanaged library.

Using mongo-hadoop from Spark

The mongo-hadoop configuration parameters are passed using a Configuration object (from the Hadoop packages). The most important parameters are mongo.input.uri and mongo.output.uri, which provide the Mongo host, port, authentication, database and collection names. You can also provide other configuration options, for example a Mongo query to limit the data.

Each Mongo collection is loaded as a separate RDD using the SparkContext:

JavaPairRDD<Object, BSONObject> rdd = sc.newAPIHadoopRDD(config, MongoInputFormat.class, Object.class, BSONObject.class);

This uses the new API, and MongoInputFormat must be imported from com.mongodb.hadoop. For the old API you would use the hadoopRDD method and com.mongodb.hadoop.mapred.MongoInputFormat.

The type returned is RDD<Object, BSONObject>. The first parameter is an ObjectId instance, which is the Mongo object ID of the document. The second parameter contains the BSON document.

Saving an RDD back to Mongo uses the saveAsNewAPIHadoopFile method:

rdd.saveAsNewAPIHadoopFile("file:///bogus", Object.class, Object.class, MongoOutputFormat.class, config);

Only the two last parameters seem to be relevant (though the first argument must be a valid HDFS URI). The RDD is again of type RDD<Object, BSONObject>. However, due to a bug, the first argument cannot be an ObjectId. If you want to provide the object ID explicitly, use a String. If you want the Mongo driver to generate the ID automatically, set it to null (as the example application does).

The example app

The example application contains a simple word count algorithm in both Java and Scala. They read from a MongoDB running on localhost from the database beowulf and collection input. The documents contain just a text field, on which the word count is run.

The results are stored in the same database in the collection output, with the document containing the fields word and count.

The example requires MongoDB to be running on localhost and Scala 2.10 and SBT to be installed.  Afterwards, you can import the example data, run the programs and print out the results with the following commands:

mongoimport -d beowulf -c input beowulf.json
sbt 'run-main JavaWordCount'
sbt 'run-main ScalaWordCount'
mongo beowulf --eval 'printjson(db.output.find().toArray())' | less
Advertisements
This entry was posted in Coding, MongoDB, Spark and tagged , . Bookmark the permalink.

31 Responses to Using Spark with MongoDB

  1. Jim Somerville says:

    Great post, thanks. I have just learned about Spark, and was wondering about using MongoDB as the datasource.

  2. Greg says:

    Hi, very useful post. I’m trying to run your example source code and getting the following error:
    [error] /Users/greghines/Code/mongo-spark/src/main/java/JavaWordCount.java:57: illegal start of type
    [error] return new Tuple2(s, 1);
    [error] ^
    [error] /Users/greghines/Code/mongo-spark/src/main/java/JavaWordCount.java:74: illegal start of type
    [error] return new Tuple2(null, bson);
    [error] ^
    [error] 2 errors

    Any suggestions? (I’m using Java 1.6 for what that matters.)
    thanks, Greg

    • Sampo N. says:

      The code uses generics syntax from Java 7. Adding the proper classes as the generic types for Tuple2 should make that code work on Java 6 as well – though I hit an UnsupportedClassVersionError when trying, possibly in some library dependency.

  3. Greg says:

    Follow up question – I want to play around with the mongodb connection in the Spark shell. I know I need to include additional libraries so commands such as “import org.bson.BSONObject” will work. I had assumed that such libraries were part of the mongo-hadoop package – but when I download and compile it, I can’t find any jar file containing org.bson. Also, once I have found the correct jar file, where I would put it in the Spark directory? (I’m not running a real Hadoop cluster – just using the spark-shell which comes with the basic Spark download)

    • feugy says:

      org.bson package comes with mongo-java-driver jar.
      You may check that it’s properly included in your build

  4. feugy says:

    Hello and thank you for this excellent post.

    I have a question regarding the “mongodb://xxx” url either for input or output uris.
    I can’t figure out how Hadoop is able to knows when to use Hadoop-Mongo classes, because we do not configure a FileSystem for the “mongodb” scheme in the job configuration.
    In my own cluster, I’m stuck on a

    Caused by: java.io.IOException: No FileSystem for scheme: mongodb
    at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2421)
    at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2428)
    at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:88)
    at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2467)
    at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2449)
    at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:367)
    at org.apache.hadoop.fs.Path.getFileSystem(Path.java:287)
    at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:221)
    at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:270)
    at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:140)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:205)

    exception when I try to read from mongo.
    I put the hadoop-mongo and mongo-java-drivers jars in every possible locations of my hadoop installation, with no success.

    (Here is my code 🙂
    val config = new Configuration()
    config.set(“mongo.input.uri”, mongoUrl)
    context.newAPIHadoopRDD(config, classOf[MongoInputFormat], classOf[Object], classOf[BSONObject]).flatMap( (arg) => parser.parse(arg._1, arg._2))

  5. aironman2k says:

    good job, but, why the hell you did not create the sample project using maven? where are the pom.xml file? As you say before, it could occur a jar hell dependency problem if you try to use this project as a first step.

    • Sampo N. says:

      For Scala projects SBT seems to be the de-facto standard. For example Spark’s own documentation samples use SBT. We wanted to use primarily Scala, so I chose SBT.

  6. sevvi says:

    Thanks for the post. Made my life so much easier. 🙂

  7. dselivanov says:

    Great post, thank you!
    I have one issue, I can’t add mongo.input.query to config. What I actually do:
    *config.set(“mongo.input.query”, “{\”id\”: {\”$lt\”: 1000}}”)*
    But my whole input collection goes to spark…
    Can anyone help?
    Note: my *id* field in query is not the same as *_id*

    • dselivanov says:

      The code above actually worked, but not like as I expected. At first mongo-hadoop split collection into chunks and then run your mongo.input.query on each chunk of collection. So I can’t simply filter collection (possibly very big) before doing my spark job. So if you want to run job on subset of collection it is probably the best solution to copy your subset into new collection and run job on it. Hope this will be helpfull for anybody.

      • Vishwas says:

        I have collection with records more than 300 per day. (approx 1 M documents). I want to find only last 24 hours data. how do I find latest 24 hours data??? do I need to add last 24 hours data in seperate collection??

      • Stefano says:

        I’ve been running into exactly the same trouble, and wondered for a while how could such a query+filter design possibly be designed! But then I found this illuminating suggestion, https://groups.google.com/forum/#!msg/mongodb-user/X_PYMNzcc3A/z2kRPCf2EwAJ , which according to my tests so far makes the query work exactly as you wish: first filters are applied, then the positive matches are split and dispatched to Spark. I am having encouraging results in my tests after setting the following lines in the conf query structure:
        ‘mongo.input.splits.filter_empty’: ‘true’,
        “mongo.splitter.class”: “com.mongodb.hadoop.splitter.MongoPaginatingSplitter”,
        “mongo.input.splits.min_docs” : “1000”,
        “mongo.input.split.use_range_queries”: “true”

        I hope this helps,
        Stefano

  8. Chris says:

    Great post.
    I am wondering if is it possible to use python to do the same thing ?

  9. Excellent post! It’s awesome. In the WordCount example in scala, how could I change the output of this example from a Document in MongoDB for every tuple to a single Document containing all tuples? Thanx

    • Sampo N. says:

      Three options:
      1) Use RDD transformations to create an RDD that contains only one element and store that RDD. Not sure how this would work, and it seems a bit hackish.
      2) Reduce the RDD into a single Scala result, from which you create a new RDD with only one element and store that.
      3) Reduce the RDD to the result, and directly use the Mongo driver to store it instead of using rdd.save. (That’s what we use.)

    • Adstan says:

      This is something I’m trying to accomplish as well.. anything you’ve tried worked so far?

  10. thermo says:

    With a large dataset, more than 1024 splits on the data results in Exception due to > 1024 connections to MongoDB 😦

    15/01/19 21:11:37 ERROR MongoRecordReader: Exception reading next key/val from mongo: Read operation to server /127.0.0.1:27017 failed on database test

  11. ltrejo says:

    Hi. Great post! It’s just what I was looking for, but I have a little problem when I try to send it to my cluster. It compiles and works perfectly with ‘sbt-run’. But I get a “java.lang.noClassDefFounfError” referencing “MongoInputFormat.class” when using ‘spark-submit’. The class is located where it’s supposed to. That also happens if I change to “BSONInputFormat.class”. I’d appreciate if you could help mi a little. Thanks in advance!

    • Sampo N. says:

      Unfortunately my experience using Spark is rather limited, so I’m unable to help here. I suggest you try a Spark user forum.

      • Luis Rodríguez Trejo says:

        Finnally I could solve it by using de –jars option indicanting the path of my jars. Nevertheless, now I get a ClassCastException at saveAsNewAPIHadoopFile which seems to be an internal Spark problem. I post this just in case it helps anyone. Regards.

      • Luis Rodríguez Trejo says:

        Sorry! I wanted to say IllegalStateException.

  12. Pingback: 【MongoDB】【Spark】在MongoDB上使用Spark – 剑客|关注科技互联网

  13. xulei says:

    I have met the same question “Exception in thread “main” java.lang.NoClassDefFoundError: com/mongodb/hadoop/MongoInputFormat”, so how can I fix it ? Is it a bug?

  14. sunny says:

    Hello,when I use Spark to read data from MongoDb, the amount of MongoDb record is about Two hundred million ,used about 3 hours, who can tell me how to reduce the time?

  15. Rahul Dev says:

    Interesting article, I am sure you will be excited to learn about MongoDb and Apache Spark from the experts at MongoDB. Jason Ma, Principal Product Marketing Manager and Bryan Reinero, Developer Advocate at MongoDB is speaking about how to integrate mongoDB with apache spark. Join now for this interesting session at https://www.dezyre.com/free-webinar/mongodb-and-apache-spark/66?call_back=DF.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s