Have you ever wanted to join 2 collections of data in MongoDB?  I’ve worked on a project that had half a dozen systems each dumping data into their own collection and then needed to do some analytics over all the data joined together.  Using the new incremental MapReduce features in MongoDB 1.8 I can pre-join all these data sets together into rich, nested documents that can then be queried by an analytics tool in literally hundreds of different ways and provide the user with instant results.  Its amazing.  I thought I’d take the opportunity to give a simple demonstration of how this is possible.

There are 2 main tricks to do this.  The first is to realize that the fields you perform a join on will be keys what you emit from your mapper function. The second trick is to have the mapping functions for each collection output data in the same format. Then you can use the same reduction function for all of the collections and run mapReduce using the “reduce” output method.

Lets walk through a simple but interesting example. We will find out the amount of money the US goverment gave to the countries with the lowest life expectancy in 2009. The data sets I will be using are:

  1. A list of US Overseas Loans and Grants from data.gov Download (already formatted for monogoimport)
  2. A list of all the countries in the world and their average life expectancy Download (already formatted for mongoimport)

To import the above files into mongo:

mongoimport --file us_economic_assistance.csv --headerline --type csv -d mr_demo -c us_economic_assistance --drop --ignoreBlanks
mongoimport --file life_expectancy.tsv --headerline --type tsv -d mr_demo -c life_expectancy --drop --ignoreBlanks

The map function for the life_expectancy collection:

life_expect_map = function() {
  // Simply emit the age and 0 for the dollar amount.
  // The dollar amount will come from the other collection.
  emit(this.country, {life_expectancy: this.age, dollars: 0});
}

The map function for the US Economic Aid in 2009:

us_econ_map = function() {
  // The data set contains grant amounts going back to 1946.  I
  // am only interested in 2009 grants.
  if (this.FY2009 !== undefined && this.FY2009 !== null) {
    emit(this.country_name, {
      dollars: this.FY2009,
      life_expectancy: 0
    });
  }
}

You’ll note that the key that is being emitted by both mappers is the country’s name.  This is the field I am joining on.  Second, notice that the shape of the emitted documents is the same in both map functions. Here is the reduction function that we will use for both collections:

r = function(key, values) {
    var result = {dollars: 0, life_expectancy: 0};

    values.forEach(function(value) {
        // Sum up all the money from all the 2009 grants for this
        // country (key)
        result.dollars += (value.dollars !== null) ? value.dollars : 0;
        // Only set life expectancy once
        if (result.life_expectancy === 0 &&
            value.life_expectancy !== null
        ) {
            result.life_expectancy = value.life_expectancy;
        }
    });

    return result;
}

Now run mapReduce over both collections, using the reduce output option so that if a document already exists in the output collection, the reduce function will be called with the 2 values with the result being written back to the output collection.

  > res = db.life_expectancy.mapReduce(life_expect_map, r, {out: {reduce: 'joined'}})
  {
    "result" : "joined",
    "timeMillis" : 22,
    "counts" : {
      "input" : 222,
      "emit" : 222,
      "output" : 222
    },
    "ok" : 1,
  }
  > res = db.us_economic_assistance.mapReduce(us_econ_map, r, {out: {reduce: 'joined'}})
  {
    "result" : "joined",
    "timeMillis" : 184,
    "counts" : {
      "input" : 2446,
      "emit" : 1353,
      "output" : 252
    },
    "ok" : 1,
  }
  > db.joined.find({'value.dollars': {$gt:0}, 'value.life_expectancy': {$gt:0}}).sort({'value.life_expectancy':1}).limit(10)
  { "_id" : "Angola", "value" : { "dollars" : 54227598, "life_expectancy" : 38.76 } }
  { "_id" : "Afghanistan", "value" : { "dollars" : 3046294847, "life_expectancy" : 45.02 } }
  { "_id" : "Nigeria", "value" : { "dollars" : 498281606, "life_expectancy" : 47.56 } }
  { "_id" : "Chad", "value" : { "dollars" : 221842294, "life_expectancy" : 48.33 } }
  { "_id" : "Swaziland", "value" : { "dollars" : 22843193, "life_expectancy" : 48.66 } }
  { "_id" : "Guinea-Bissau", "value" : { "dollars" : 1857000, "life_expectancy" : 48.7 } }
  { "_id" : "South Africa", "value" : { "dollars" : 569625000, "life_expectancy" : 49.33 } }
  { "_id" : "Zimbabwe", "value" : { "dollars" : 285470926, "life_expectancy" : 49.64 } }
  { "_id" : "Central African Republic", "value" : { "dollars" : 36109353, "life_expectancy" : 50.07 } }
  { "_id" : "Somalia", "value" : { "dollars" : 179253637, "life_expectancy" : 50.4 } }

Download the example data sets and try it yourself.  There ya have it.  A simple way to use map reduce to pre-join data sets for analysis. Enjoy!