Accessing Data Stored in MongoDB with Stratosphere

28 Jan 2014

We recently merged a pull request that allows you to use any existing Hadoop InputFormat with Stratosphere. So you can now (in the 0.5-SNAPSHOT and upwards versions) define a Hadoop-based data source:

HadoopDataSource source = new HadoopDataSource(new TextInputFormat(), new JobConf(), "Input Lines");
TextInputFormat.addInputPath(source.getJobConf(), new Path(dataInput));

We describe in the following article how to access data stored in MongoDB with Stratosphere. This allows users to join data from multiple sources (e.g. MonogDB and HDFS) or perform machine learning with the documents stored in MongoDB.

The approach here is to use the MongoInputFormat that was developed for Apache Hadoop but now also runs with Stratosphere.

JobConf conf = new JobConf();
HadoopDataSource src = new HadoopDataSource(new MongoInputFormat(), conf, "Read from Mongodb", new WritableWrapperConverter());

Example Program

The example program reads data from the enron dataset that contains about 500k internal e-mails. The data is stored in MongoDB and the Stratosphere program counts the number of e-mails per day.

The complete code of this sample program is available on GitHub.

Prepare MongoDB and the Data

  • Install MongoDB
  • Download the enron dataset from their website.
  • Unpack and load it

bash bunzip2 enron_mongo.tar.bz2 tar xvf enron_mongo.tar mongorestore dump/enron_mail/messages.bson

We used Robomongo to visually examine the dataset stored in MongoDB.

Build MongoInputFormat

MongoDB offers an InputFormat for Hadoop on their GitHub page. The code is not available in any Maven repository, so we have to build the jar file on our own.

  • Check out the repository
git clone
cd mongo-hadoop
  • Set the appropriate Hadoop version in the build.sbt, we used 1.1.
hadoopRelease in ThisBuild := "1.1"
  • Build the input format
./sbt package

The jar-file is now located in core/target.

The Stratosphere Program

Now we have everything prepared to run the Stratosphere program. I only ran it on my local computer, out of Eclipse. To do that, check out the code …

git clone

… and import it as a Maven project into your Eclipse. You have to manually add the previously built mongo-hadoop jar-file as a dependency. You can now press the “Run” button and see how Stratosphere executes the little program. It was running for about 8 seconds on the 1.5 GB dataset.

The result (located in /tmp/enronCountByDay) now looks like this.

11,Fri Sep 26 10:00:00 CEST 1997
154,Tue Jun 29 10:56:00 CEST 1999
292,Tue Aug 10 12:11:00 CEST 1999
185,Thu Aug 12 18:35:00 CEST 1999
26,Fri Mar 19 12:33:00 CET 1999

There is one thing left I want to point out here. MongoDB represents objects stored in the database as JSON-documents. Since Stratosphere’s standard types do not support JSON documents, I was using the WritableWrapper here. This wrapper allows to use any Hadoop datatype with Stratosphere.

The following code example shows how the JSON-documents are accessed in Stratosphere.

public void map(Record record, Collector<Record> out) throws Exception {
	Writable valWr = record.getField(1, WritableWrapper.class).value();
	BSONWritable value = (BSONWritable) valWr;
	Object headers = value.getDoc().get("headers");
	BasicDBObject headerOb = (BasicDBObject) headers;
	String date = (String) headerOb.get("Date");
	// further date processing

Please use the comments if you have questions or if you want to showcase your own MongoDB-Stratosphere integration.

Written by Robert Metzger (@rmetzger_).

comments powered by Disqus