MongoDB is a fantastic tool to use for real time analytics because you can pre-join multiple data sources in to rich, nested documents and then query those documents blazingly fast.  Sometimes the systems used to feed into MongoDB live inside an Oracle database.  So how do you keep the data in MongoDB consistent with Oracle and synced in near real time?  Oracle Streams Advanced Queuing!

There are tons of open source and proprietary queuing systems that could be used, so why choose Advanced Queuing (AQ from here on)?  Transaction support.  With AQ, your enqueue and dequeue operations happen in the same transaction as any other DML that your Oracle application is performing making it a very simple way to only enqueue messages for committed work units.

The rest of this article walks through an example using a very simple table of customers. We want documents in MongoDB updated as soon as data in Oracle has been committed.  The flow of data will look something like this:

Oracle AQ to MongoDB

Before we get into any coding, lets first create the queue that will be used for communicating DML operations to the outside world (MongoDB).  This consists of 4 steps:

  1. Create an object type for the message payload
  2. Create a queue table
  3. Create the queue
  4. Start the queue

And here is what it looks like:

CREATE TYPE message_t AS OBJECT (json VARCHAR2(4000));

-- Create the queue table.  This will persist the messages
-- to disk when the transaction commits.
BEGIN
  DBMS_AQADM.CREATE_QUEUE_TABLE(
    queue_table => 'myqueue_tab',
    queue_payload_type => 'message_t'
  );

  -- Create a queue using the new queue table.
  DBMS_AQADM.CREATE_QUEUE(
    queue_name => 'myqueue',
    queue_table => 'myqueue_tab'
  );

  DBMS_AQADM.START_QUEUE(
    queue_name => 'myqueue'
  );
END;

Now that the queue setup is done, lets start writing some code to enqueue messages.  We will do this with triggers on the Oracle table serializing the DML event.  For simplicity sake, we’ll serialize the DML operation to JSON.  We could just as easily serialize to XML or some other format if desired.

CREATE TABLE customer (
  customer_id INTEGER NOT NULL PRIMARY KEY,
  name VARCHAR2(40),
  address VARCHAR2(40),
  city VARCHAR2(40)
);

CREATE TRIGGER customer_queue_trig
  AFTER INSERT OR UPDATE OR DELETE ON customer
  FOR EACH ROW
DECLARE
  json VARCHAR2(4000);
BEGIN
  -- For simplicity sake I am just doing simple concatenations.  Production
  -- JSON serialization code should do character escaping (double quotes,
  -- newlines, etc).
  IF INSERTING OR UPDATING THEN
    json := '{"id":' || :new.customer_id || ',"name":"' || :new.name || '"';
    json := json||',"address":"' || :new.address || '","city":"';
    json := json|| :new.city || '","dml_type":"';
    json := json|| CASE WHEN INSERTING THEN 'I' ELSE 'U' END || '"}';
  ELSE
    json := '{"id":' || :old.customer_id || ',"dml_type":"D"}';
  END IF;

  enqueue_message(json);
END;

Now that we have a trigger serializing the data, lets look at the enqueue_message convenience procedure that is called at the bottom.  It is a simple wrapper function around DBMS_AQ.ENQUEUE:

CREATE PROCEDURE enqueue_message(payload VARCHAR2) AS
  msg message_t := message_t(NULL);
  msg_id RAW(16);
  priority NUMBER;
  enqueue_options DBMS_AQ.ENQUEUE_OPTIONS_T;
  message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
BEGIN
  msg.json := payload;
  message_properties.priority := 1;  -- give all messages same priority
  DBMS_AQ.ENQUEUE(
    queue_name => 'myqueue',
    enqueue_options => enqueue_options,
    message_properties => message_properties,
    payload => msg,
    msgid => msg_id);
END;

The msgid is assigned by Oracle AQ and is returned as an OUT variable by calling ENQUEUE.  We don’t really need it for anything in this simple example.  We will successfully enqueue a message once the insert into the customer table is committed.  To dequeue it, lets write another simple wrapper procedure that can be called from Ruby (my language of choice for this).

CREATE PROCEDURE dequeue_message(payload OUT VARCHAR2) AS
  msg message_t := message_t(NULL);
  msg_id RAW(16);
  dequeue_options DBMS_AQ.DEQUEUE_OPTIONS_T;
  message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
BEGIN
  DBMS_AQ.DEQUEUE(
    queue_name => 'myqueue',
    dequeue_options => dequeue_options,
    message_properties => message_properties,
    payload => msg,
    msgid => msg_id
  );
  payload := msg.json;
END;

At this point we just need to call this procedure from our queue consumer code and process the message, upserting the appropriate document in MongoDB.  By default, calling DEQUEUE will block, waiting forever until there is a message in the queue.  Since we don’t want our queue consumer to have to poll for messages, this behavior works fine.  You can change this to never wait or to wait with a timeout by setting dequeue_options.wait.  See the documentation for details.  Here is a simple consumer that dequeues from Oracle, writes to Mongo, then issues a commit on the Oracle connection which will remove the message from the queue.

#!/usr/bin/env ruby

require 'oci8'
require 'mongo'
require 'yajl'

oc = OCI8.new('queue_user', 'mypass', 'orcl')
col = Mongo::Connection.new.db('customer_db').collection('customer')

cur = oc.parse('BEGIN dequeue_message(:p); END;')
cur.bind_param(':p', nil, String, 4000)

while true
  # retrieve message
  cur.exec()
  json = Yajl::Parser.parse(cur[':p'])

  # print
  puts json.inspect

  if json['dml_type'] == 'D'
    col.remove({'_id' => json['id']})
  else
    # rename customer_id key to _id
    json['_id'] = json.delete('id')
    json.delete('dml_type')
    col.update(
      {'_id' => json['_id']},
      json,
      {:upsert => true}
    )
  end

  # remove from AQ.  dequeue isn't complete until this happens
  oc.commit
end

To scale horizontally, simply run as many of these consumers as desired. For now, lets run one consumer and then watch the collection in Mongo change after each Oracle COMMIT.

Oracle:

INSERT INTO customer VALUES(1, 'Nathan', '123 S. Main', 'Mytown');
COMMIT;

Mongo:

db.customer.find()
{ "_id" : 1, "name" : "Nathan", "address" : "123 S. Main", "city" : "Mytown" }

Oracle:

UPDATE customer SET name = 'Luke' WHERE customer_id = 1;
COMMIT;

Mongo:

db.customer.find()
{ "_id" : 1, "name" : "Luke", "address" : "123 S. Main", "city" : "Mytown" }

Now lets rollback an insert to verify that only committed data in Oracle gets enqueued and pulled into Mongo:

INSERT INTO customer VALUES(2, 'John Doe', '123 N. Main', 'Mytown');
ROLLBACK;

We should only have one document in Mongo:

db.customer.find()
{ "_id" : 1, "name" : "Luke", "address" : "123 S. Main", "city" : "Mytown" }

Hooray!! Now to wrap up, lets delete things:

DELETE FROM customer;
COMMIT;

Mongo:

db.customer.find()