One of the challenges you face when using asynchronous work queues to modify the state of a document in MongoDB is keeping the data consistent when work units are applied out of order. Here is a simple, but somewhat contrived example of the problem. Lets say you have a web application that lets students enroll and unenroll from classes. Rather than making the user wait for the synchronous action of updating the data storage system, the web application puts these operations into a queuing system (doesn’t matter what – AMQP, Oracle Streams Advanced Queues, etc) for future processing. Lets say there are 5 queue consumers that take a message from the queue, process it, and write the result to a MongoDB document. Pretty common use case.
The issue that application designers have to deal with in these type of situations is making sure that the messages are not applied to the database out of order. If a student enrolls in CS 101 and then suddenly regrets it and unrolls moments later via the web application, we want those operations to be applied to the MongoDB document in the order that they happened: insert then delete. If the operations are applied in the opposite order (delete then insert) the document will incorrectly show that the student is enrolled in CS 101. This race condition can happen since one queue consumer can receive the insert message and another consumer receive the delete message.
A simple solution is to only have one queue consumer, serializing the processing of all queue operations. For simple, low volume systems this is the best approach. But this doesn’t scale.
Here is a more scalable solution that involves 0 application layer locks, guarantees eventual consistency of the data and scales to N queue consumers to handle large volumes or spikes of messages.
-
First, modify the web application to stamp each message with an ever increasing sequence number. So a message might look like this:
{ "operation":"insert", "student_id" : 1, "class" : "CS 101", "sequence_number" : 1000 }
-
Add this new sequence number to your document schema in MongoDB, along with a “deleted class list”. We will use this delete list to keep track of delete operations from the queue. For example, if your document looked like this:
{ ... "student_id" : 1, "name" : "John Doe", "classes" : ["CS 101"] }
It would now look like:
{ ... "student_id" : 1, "name" : "John Doe", "classes" : [{"name" : "CS 101", "seq" : 1000}], "deleted_classes": [] }
-
Use MongoDB’s atomic upsert capability to selectively apply the insert and delete operation (via $push and $pull), depending upon what sequence number already exists in the document.
For queued insert operations we want to do a MongoDB upsert only where the class name doesn’t already exist in classes (to avoid duplicates) and the class either isn’t in deleted_classes or if it does exist in deleted_classes it has a lower sequence number than the insert. We then use $push to add the new class into classes and $pull to remove the class from deleted_classes.
Similarly for deletes we will apply them via an upsert that makes sure the class does not exist in deleted_classes (to avoid duplicates) and the class in classes either doesn’t exist or if it does exist in classes it has an older sequence number than our delete. We then use $push to add the new class into deleted_classes and $pull to remove it from classes.
It sounds trickier than it actually is. Stick with me on this and lets walk through an example step by step. First, lets walk through a situation where we have an insert and a delete applied in that order:
-
Initial document state:
{ ... "student_id" : 1, "name" : "John Doe", "classes" : [], "deleted_classes": [] }
-
Student enrolls in CS 101 (the insert operation) and the web application has stamped the message with sequence 1001. The MongoDB update looks like:
update( { "student_id" : 1, "classes": {"$not": { "$elemMatch": { "name" : "CS 101" }}, "$or" : [ "deleted_classes" : {"$not" : {"$elemMatch": { "name" : "CS 101" }}}, "deleted_classes" : {"$elemMatch" : { "name" : "CS 101", "seq" : {"$lt" : 1001} }}, ] }, { "$push" : {"classes" : {"name" : "CS 101", "seq" : 1001}}, "$pull" : {"deleted_classes" : {"name" : "CS 101"}}, }, true // upsert )
Resulting in a document that looks like:
{ ... "student_id" : 1, "name" : "John Doe", "classes" : [{"name" : "CS 101", "seq" 1001}], "deleted_classes": [] }
-
The student then unenrolls from CS 101 (the delete operation) and the web application has stamped the message with sequence 1002. The MongoDB update looks like:
update( { "student_id" : 1, "deleted_classes": {"$not": { "$elemMatch": { "name" : "CS 101" }}, "$or" : [ "classes" : {"$not" : {"$elemMatch": { "name" : "CS 101" }}}, "classes" : {"$elemMatch" : { "name" : "CS 101", "seq" : {"$lt" : 1002} }}, ] }, { "$push" : {"deleted_classes" : {"name" : "CS 101", "seq" : 1002}}, "$pull" : {"classes" : {"name" : "CS 101"}}, }, true // upsert )
Resulting in a document that looks like:
{ ... "student_id" : 1, "name" : "John Doe", "classes" : [], "deleted_classes": [{"name" : "CS 101", "seq" 1002}] }
Now lets examine what happens when the operations are applied in the opposite order due to 2 queue consumers racing to modify the same document: delete then insert.
-
Initial document state:
{ ... "student_id" : 1, "name" : "John Doe", "classes" : [], "deleted_classes": [] }
-
The student unenrolls from CS 101 (the delete operation) and the web application has stamped the message with sequence 1002. Again, the MongoDB update looks like:
update( { "student_id" : 1, "deleted_classes": {"$not": { "$elemMatch": { "name" : "CS 101" }}, "$or" : [ "classes" : {"$not" : {"$elemMatch": { "name" : "CS 101" }}}, "classes" : {"$elemMatch" : { "name" : "CS 101", "seq" : {"$lt" : 1002} }}, ] }, { "$push" : {"deleted_classes" : {"name" : "CS 101", "seq" : 1002}}, "$pull" : {"classes" : {"name" : "CS 101"}}, }, true // upsert )
Resulting in a document that looks like:
{ ... "student_id" : 1, "name" : "John Doe", "classes" : [], "deleted_classes": [{"name" : "CS 101", "seq" 1002}] }
-
Student enrolls in CS 101 (the insert operation) and the web application has stamped the message with sequence 1001. Again, the MongoDB update looks like:
update( { "student_id" : 1, "classes": {"$not": { "$elemMatch": { "name" : "CS 101" }}, "$or" : [ "deleted_classes" : {"$not" : {"$elemMatch": { "name" : "CS 101" }}}, "deleted_classes" : {"$elemMatch" : { "name" : "CS 101", "seq" : {"$lt" : 1001} }}, ] }, { "$push" : {"classes" : {"name" : "CS 101", "seq" : 1001}}, "$pull" : {"deleted_classes" : {"name" : "CS 101"}}, }, true // upsert )
However, this time the operation does not succeed because of the update query filter on lines 9-11. There already was a delete applied for this class with a sequence number higher than 1001. So again we end up with the same document:
{ ... "student_id" : 1, "name" : "John Doe", "classes" : [], "deleted_classes": [{"name" : "CS 101", "seq" 1002}] }
So there you have it. A lockless way to perform out of order work units to a MongoDB database from an asynchronous message queuing system.
If you need to deal with the possibility of more than one operation being applied out of order, you can follow each MongoDB upsert operation with a separate (non upserting) update to modify existing values in classes. It is possible that you would only need to change the sequence number. For example, if your application can have situations where there is an insert (sequence id of 1), a delete (seq 2), then another insert (seq 3), you would need to do this so that you correctly handle the case where the consumers apply 1 then 3 then 2. Without this second update, this scenario would result in the final insert being lost because the insert of 3 wouldn’t be applied due to the class already existing in classes from sequence 1. Then when the delete with sequence 2 is applied, we delete the sequence 1 class. Anyway, it is a fairly simple modification to handle as many out of order operations as desired.
Happy asynchronous queuing!