From ba63548d4d6be25ac203dc4cd02f7c579e51cfa2 Mon Sep 17 00:00:00 2001 From: David Glasser Date: Mon, 19 Aug 2013 16:24:05 -0700 Subject: [PATCH] checkpoint monday morning --- packages/mongo-livedata/mongo_driver.js | 70 ++++++++++++++++++++++++- 1 file changed, 69 insertions(+), 1 deletion(-) diff --git a/packages/mongo-livedata/mongo_driver.js b/packages/mongo-livedata/mongo_driver.js index d43b3e82c9..9cbb4082d2 100644 --- a/packages/mongo-livedata/mongo_driver.js +++ b/packages/mongo-livedata/mongo_driver.js @@ -139,6 +139,13 @@ MongoConnection = function (url, connectionOptions) { MongoConnection.prototype.close = function() { var self = this; + + // XXX probably untested + var oplogHandle = self._oplogHandle; + self._oplogHandle = null; + if (oplogHandle) + oplogHandle.stop(); + // Use Future.wrap so that errors get thrown. This happens to // work even outside a fiber since the 'close' method is not // actually asynchronous. @@ -219,15 +226,70 @@ MongoConnection.prototype._startOplogTailing = function (oplogUrl, dbName) { var cursorDescription = new CursorDescription( OPLOG_COLLECTION, oplogSelector, {tailable: true}); + + var callbacksByCollection = {}; + var handle = oplogConnection.tail(cursorDescription, function (doc) { // Don't register the handle until after we've gotten one doc. + // XXX do we want to actually process this doc? if (!self._oplogHandle) self._oplogHandle = handle; - console.log("OPLOG TAILING SEZ:", doc); + if (!doc.ns && doc.ns.length > dbName.length + 1 && + doc.ns.substr(0, dbName.length + 1) === (dbName + '.')) + throw new Error("Unexpected ns"); + + var collectionName = doc.ns.substr(dbName.length + 1); + + _.each(callbacksByCollection[collectionName], function (callback) { + callback(doc); + }); }); + + var nextId = 0; + handle.onOplogEntry = function (collectionName, callback) { + if (!_.has(callbacksByCollection, collectionName)) + callbacksByCollection[collectionName] = {}; + var callbackId = nextId++; + callbacksByCollection[collectionName][callbackId] = callback; + return { + stop: function () { + delete callbacksByCollection[collectionName][callbackId]; + } + }; + }; }; +MongoConnection.prototype._observeChangesWithOplog = function ( + cursorDescription, callbacks) { + var self = this; + var oplogHandle = self._oplogHandle.onOplogEntry(cursorDescription.collectionName, function (op) { + console.log("A CHANGE TO THE DOC", op); + }); + + // XXX let's do this with race conditions first! + + var idSet = {}; + + if (callbacks.added) { + var initialCursor = new Cursor(self, cursorDescription); + initialCursor.forEach(function (initialDoc) { + var id = initialDoc._id; + delete initialDoc._id; + idSet[id] = true; + callbacks.added(id, initialDoc); + }); + } + + var observeHandle = { + stop: function () { + oplogHandle.stop(); + } + }; + return observeHandle; +}; + + //////////// Public API ////////// // The write methods block until the database has confirmed the write (it may @@ -937,6 +999,12 @@ MongoConnection.prototype._observeChanges = function ( return self._observeChangesTailable(cursorDescription, ordered, callbacks); } + // XXX maybe this should actually use deduping too? + if (self._oplogHandle && !ordered + && cursorSupportedByOplogTailing(cursorDescription)) { + return self._observeChangesWithOplog(cursorDescription, callbacks); + } + var observeKey = JSON.stringify( _.extend({ordered: ordered}, cursorDescription));