From cae886b2f0d245d3f8888ccfb0b42eff0345ee5a Mon Sep 17 00:00:00 2001 From: David Glasser Date: Mon, 19 Aug 2013 20:05:15 -0700 Subject: [PATCH] remove processing works. --- packages/mongo-livedata/mongo_driver.js | 26 +++++++++++++++---------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/packages/mongo-livedata/mongo_driver.js b/packages/mongo-livedata/mongo_driver.js index 9cbb4082d2..d84c486e3a 100644 --- a/packages/mongo-livedata/mongo_driver.js +++ b/packages/mongo-livedata/mongo_driver.js @@ -229,12 +229,7 @@ MongoConnection.prototype._startOplogTailing = function (oplogUrl, dbName) { var callbacksByCollection = {}; - var handle = oplogConnection.tail(cursorDescription, function (doc) { - // Don't register the handle until after we've gotten one doc. - // XXX do we want to actually process this doc? - if (!self._oplogHandle) - self._oplogHandle = handle; - + self._oplogHandle = oplogConnection.tail(cursorDescription, function (doc) { if (!doc.ns && doc.ns.length > dbName.length + 1 && doc.ns.substr(0, dbName.length + 1) === (dbName + '.')) throw new Error("Unexpected ns"); @@ -247,7 +242,7 @@ MongoConnection.prototype._startOplogTailing = function (oplogUrl, dbName) { }); var nextId = 0; - handle.onOplogEntry = function (collectionName, callback) { + self._oplogHandle.onOplogEntry = function (collectionName, callback) { if (!_.has(callbacksByCollection, collectionName)) callbacksByCollection[collectionName] = {}; var callbackId = nextId++; @@ -263,14 +258,25 @@ MongoConnection.prototype._startOplogTailing = function (oplogUrl, dbName) { MongoConnection.prototype._observeChangesWithOplog = function ( cursorDescription, callbacks) { var self = this; - var oplogHandle = self._oplogHandle.onOplogEntry(cursorDescription.collectionName, function (op) { - console.log("A CHANGE TO THE DOC", op); - }); // XXX let's do this with race conditions first! var idSet = {}; + var oplogHandle = self._oplogHandle.onOplogEntry(cursorDescription.collectionName, function (op) { + if (op.op === 'd') { + // XXX check that ObjectId works here. (ie use idStringify or something) + var id = op.o._id; + if (_.has(idSet, id)) { + delete idSet[id]; + if (callbacks.removed) + callbacks.removed(id); + } + } else { + console.log("A CHANGE TO THE DOC", op); + } + }); + if (callbacks.added) { var initialCursor = new Cursor(self, cursorDescription); initialCursor.forEach(function (initialDoc) {