diff --git a/packages/mongo-livedata/mongo_driver.js b/packages/mongo-livedata/mongo_driver.js index aca94e98dc..815c4a1636 100644 --- a/packages/mongo-livedata/mongo_driver.js +++ b/packages/mongo-livedata/mongo_driver.js @@ -154,10 +154,7 @@ MongoConnection = function (url, connectionOptions) { // settle a little before thinking too hard about this if (process.env.XXX_OPLOG_URL && !connectionOptions.isOplog) { var dbName = Npm.require('url').parse(url).pathname.substr(1); - // Defer this, because it blocks. If we start observing cursors before the - // oplog handle is ready, they just don't get to use the oplog. - Meteor.defer(_.bind(self._startOplogTailing, - self, process.env.XXX_OPLOG_URL, dbName)); + self._startOplogTailing(process.env.XXX_OPLOG_URL, dbName); } }; @@ -259,75 +256,99 @@ MongoConnection.prototype._callWhenOplogProcessed = function (callback) { MongoConnection.prototype._startOplogTailing = function (oplogUrl, dbName) { var self = this; - var oplogConnection = new MongoConnection(oplogUrl, {isOplog: true}); - // Find the last oplog entry. Blocks until the connection is ready. - - var lastOplogEntry = oplogConnection.findOne( - OPLOG_COLLECTION, {}, {sort: {$natural: -1}}); - - var oplogSelector = { - ns: new RegExp('^' + quotemeta(dbName) + '\\.'), - // XXX also handle drop collection, etc - op: {$in: ['i', 'u', 'd']} - }; - if (lastOplogEntry) - oplogSelector.ts = {$gt: lastOplogEntry.ts}; - - var cursorDescription = new CursorDescription( - OPLOG_COLLECTION, oplogSelector, {tailable: true}); - + var stopped = false; + var tailHandle = null; + var readyFuture = new Future(); + var nextId = 0; var callbacksByCollection = {}; - var processSequence = function (doc) { - if (doc.op !== 'i' && doc.op !== 'u') - return; - var serverId = (doc.op === 'i' ? doc.o._id : doc.o2._id); - if (serverId !== myServerId) - return; - var sequenceId = - (doc.op === 'i' ? doc.o.sequence : (doc.o.$set && doc.o.$set.sequence)); - if (typeof sequenceId !== 'number') - return; - // Process all sequence points up to this point. - while (!_.isEmpty(pendingSequences) - && pendingSequences[0].sequenceId <= sequenceId) { - var sequence = pendingSequences.shift(); - sequence.callback(); + self._oplogHandle = { + stop: function () { + if (stopped) + return; + stopped = true; + if (tailHandle) + tailHandle.stop(); + }, + + onOplogEntry: function (collectionName, callback) { + if (stopped) + throw new Error("Called onOplogEntry on stopped handle!"); + + // Calling onOplogEntry requires us to wait for the tailing to be ready. + readyFuture.wait(); + + callback = Meteor.bindEnvironment(callback, function (err) { + Meteor._debug("Error in oplog callback", err.stack); + }); + if (!_.has(callbacksByCollection, collectionName)) + callbacksByCollection[collectionName] = {}; + var callbackId = nextId++; + callbacksByCollection[collectionName][callbackId] = callback; + return { + stop: function () { + delete callbacksByCollection[collectionName][callbackId]; + } + }; } }; - self._oplogHandle = oplogConnection.tail(cursorDescription, function (doc) { - if (!doc.ns && doc.ns.length > dbName.length + 1 && - doc.ns.substr(0, dbName.length + 1) === (dbName + '.')) - throw new Error("Unexpected ns"); + // Actually setting up the connection and tail blocks, so we do it "later". + Meteor.defer(function () { + var oplogConnection = new MongoConnection(oplogUrl, {isOplog: true}); - var collectionName = doc.ns.substr(dbName.length + 1); + // Find the last oplog entry. Blocks until the connection is ready. + var lastOplogEntry = oplogConnection.findOne( + OPLOG_COLLECTION, {}, {sort: {$natural: -1}}); - if (collectionName === SEQUENCE_COLLECTION) { - processSequence(doc); - return; - } + var oplogSelector = { + ns: new RegExp('^' + quotemeta(dbName) + '\\.'), + // XXX also handle drop collection, etc + op: {$in: ['i', 'u', 'd']} + }; + if (lastOplogEntry) + oplogSelector.ts = {$gt: lastOplogEntry.ts}; - _.each(callbacksByCollection[collectionName], function (callback) { - callback(EJSON.clone(doc)); - }); - }); + var cursorDescription = new CursorDescription( + OPLOG_COLLECTION, oplogSelector, {tailable: true}); - var nextId = 0; - self._oplogHandle.onOplogEntry = function (collectionName, callback) { - callback = Meteor.bindEnvironment(callback, function (err) { - Meteor._debug("Error in oplog callback", err.stack); - }); - if (!_.has(callbacksByCollection, collectionName)) - callbacksByCollection[collectionName] = {}; - var callbackId = nextId++; - callbacksByCollection[collectionName][callbackId] = callback; - return { - stop: function () { - delete callbacksByCollection[collectionName][callbackId]; + var processSequence = function (doc) { + if (doc.op !== 'i' && doc.op !== 'u') + return; + var serverId = (doc.op === 'i' ? doc.o._id : doc.o2._id); + if (serverId !== myServerId) + return; + var sequenceId = + (doc.op === 'i' ? doc.o.sequence : + (doc.o.$set && doc.o.$set.sequence)); + if (typeof sequenceId !== 'number') + return; + // Process all sequence points up to this point. + while (!_.isEmpty(pendingSequences) + && pendingSequences[0].sequenceId <= sequenceId) { + var sequence = pendingSequences.shift(); + sequence.callback(); } }; - }; + + tailHandle = oplogConnection.tail(cursorDescription, function (doc) { + if (!doc.ns && doc.ns.length > dbName.length + 1 && + doc.ns.substr(0, dbName.length + 1) === (dbName + '.')) + throw new Error("Unexpected ns"); + + var collectionName = doc.ns.substr(dbName.length + 1); + + if (collectionName === SEQUENCE_COLLECTION) { + processSequence(doc); + return; + } + + _.each(callbacksByCollection[collectionName], function (callback) { + callback(EJSON.clone(doc)); + }); + }); + readyFuture.return(); + }); };