diff --git a/packages/mongo-livedata/mongo_driver.js b/packages/mongo-livedata/mongo_driver.js index 3e9113d8a9..65d1aa9335 100644 --- a/packages/mongo-livedata/mongo_driver.js +++ b/packages/mongo-livedata/mongo_driver.js @@ -213,6 +213,26 @@ var quotemeta = function (str) { return String(str).replace(/(\W)/g, '\\$1'); }; +// Calls `callback` once the oplog has been processed up to a point that is +// roughly "now". Specifically, it does a dummy write which is then detected +// by the connection's oplog tailer. +// XXX This could be a read instead of a write, getting the last `ts` +// in oplog? +MongoConnection.prototype._callWhenOplogProcessed = function (callback) { + var self = this; + + var sequenceId = nextSequenceId++; + pendingSequences.push({sequenceId: sequenceId, + callback: callback}); + + // Use direct write to Node Mongo driver so we don't end up with recursive + // fence stuff. Need to disable 'safe' because we aren't providing a callback. + var writeCollection = self._getCollection(SEQUENCE_COLLECTION); + writeCollection.update({_id: myServerId}, {$set: {sequence: sequenceId}}, + {upsert: true, safe: false}); +}; + + MongoConnection.prototype._startOplogTailing = function (oplogUrl, dbName) { var self = this; @@ -289,161 +309,6 @@ MongoConnection.prototype._startOplogTailing = function (oplogUrl, dbName) { }; }; -MongoConnection.prototype._observeChangesWithOplog = function ( - cursorDescription, callbacks) { - var self = this; - - // XXX let's do this with race conditions first! - // - // the real way will involve special oplog handling during the initial cursor - // read. specifically: - // - // 1) start reading the oplog. for every document that could conceivably be - // relevant, cache a bit of information about what we saw. (eg, cache - // document for inserts, removal fact for removes, "needs poll" for updates. - // most recent overrides.) - // - // 2) read the initial set and send added messages. - // - // 3) write a sentinel to some field. - // - // 4) wait until that sentinel comes up through the oplog. - // - // 5) use the cached information (compared to what we already know) to send - // messages about things that changed right about then - // - // 6) now that we're in the "steady state", process ops more directly - - // XXX NOW: replace idSet/changedFields with simply currently published - // results, ok??? that should simplify things, and allow the implementation of - // "replace" (noodles) - - // XXX DOC: map id -> currently published fields - // (which of course is also the same as what is tracked in merge box, - // ah well) - var published = new IdMap; - - var selector = LocalCollection._compileSelector(cursorDescription.selector); - - // XXX add mutates its argument, which could get confusing - var add = function (doc) { - var id = doc._id; - delete doc._id; - published.set(id, doc); - callbacks.added && callbacks.added(id, doc); - }; - - var remove = function (id) { - published.remove(id); - callbacks.removed && callbacks.removed(id); - }; - - // XXX the ordering here is wrong - var initialCursor = new Cursor(self, cursorDescription); - initialCursor.forEach(function (initialDoc) { - add(initialDoc); - }); - - var oplogHandle = self._oplogHandle.onOplogEntry(cursorDescription.collectionName, function (op) { - var id; - if (op.op === 'd') { - // XXX check that ObjectId works here - id = op.o._id; - if (published.has(id)) - remove(id); - - // XXX this needs to cancel any in-progress "ID lookup" for the document - } else if (op.op === 'i') { - id = op.o._id; - if (published.has(id)) - throw new Error("insert found for already-existing ID"); - - // XXX what if selector yields? for now it can't but later it could have - // $where - if (selector(op.o)) { - add(op.o); - } - } else if (op.op === 'u') { - id = op.o2._id; - - // Is this a modifier ($set/$unset, which may require us to poll the - // database to figure out if the whole document matches the selector) or a - // replacement (in which case we can just directly re-evaluate the - // selector)? - var isModifier = _.has(op.o, '$set') || _.has(op.o, '$unset'); - - var newDoc; - if (isModifier) { - // XXX problem is, the result of this findOne is delivered at a random - // time, not necessarily synced with other stuff that may be coming down - // the oplog. also, we shouldn't read fields that aren't - // necessary to evaluate selector or to publish. - newDoc = self._docFetcher.fetch(cursorDescription.collectionName, id, - op.ts.toString()); - } else { - newDoc = _.extend({_id: id}, op.o); - } - - var matchesNow = newDoc && selector(newDoc); - var matchedBefore = published.has(id); - if (matchesNow && !matchedBefore) { - add(newDoc); - } else if (matchedBefore && !matchesNow) { - remove(id); - } else if (matchesNow) { - var oldDoc = published.get(id); - if (!oldDoc) - throw Error("thought that " + id + " was there!"); - delete newDoc._id; - published.set(id, newDoc); - if (callbacks.changed) { - var changed = LocalCollection._makeChangedFields(newDoc, oldDoc); - if (!_.isEmpty(changed)) { - callbacks.changed(id, changed); - } - } - } - } else { - console.log("SURPRISING FOR NOW OPERATION (eg drop collection)", op); - } - }); - - // XXX ordering w.r.t. everything else? - var listenersHandle = listenAll( - cursorDescription, function (notification, complete) { - // If we're not in a write fence, we don't have to do anything. That's - // because - var fence = DDPServer._CurrentWriteFence.get(); - if (!fence) { - complete(); - return; - } - var sequenceId = nextSequenceId++; - var write = fence.beginWrite(); - pendingSequences.push({sequenceId: sequenceId, - callback: function () { - write.committed(); - }}); - - // Use direct write to Node Mongo driver so we don't end up with recursive - // fence stuff. Need to disable 'safe' because we aren't providing a - // callback. - var writeCollection = self._getCollection(SEQUENCE_COLLECTION); - writeCollection.update({_id: myServerId}, {$set: {sequence: sequenceId}}, - {upsert: true, safe: false}); - complete(); - } - ); - - var observeHandle = { - stop: function () { - listenersHandle.stop(); - oplogHandle.stop(); - } - }; - return observeHandle; -}; - //////////// Public API ////////// @@ -858,7 +723,7 @@ var CursorDescription = function (collectionName, selector, options) { self.options = options || {}; }; -var Cursor = function (mongo, cursorDescription) { +Cursor = function (mongo, cursorDescription) { var self = this; self._mongo = mongo; @@ -1212,7 +1077,7 @@ MongoConnection.prototype._observeChanges = function ( // here, so that updates to different specific IDs don't cause us to poll. // listenCallback is the same kind of (notification, complete) callback passed // to InvalidationCrossbar.listen. -var listenAll = function (cursorDescription, listenCallback) { +listenAll = function (cursorDescription, listenCallback) { var listeners = []; var listenOnTrigger = function (trigger) { listeners.push(DDPServer._InvalidationCrossbar.listen( diff --git a/packages/mongo-livedata/oplog.js b/packages/mongo-livedata/oplog.js new file mode 100644 index 0000000000..11ef7018db --- /dev/null +++ b/packages/mongo-livedata/oplog.js @@ -0,0 +1,145 @@ +MongoConnection.prototype._observeChangesWithOplog = function ( + cursorDescription, callbacks) { + var self = this; + + // XXX let's do this with race conditions first! + // + // the real way will involve special oplog handling during the initial cursor + // read. specifically: + // + // 1) start reading the oplog. for every document that could conceivably be + // relevant, cache a bit of information about what we saw. (eg, cache + // document for inserts, removal fact for removes, "needs poll" for updates. + // most recent overrides.) + // + // 2) read the initial set and send added messages. + // + // 3) write a sentinel to some field. + // + // 4) wait until that sentinel comes up through the oplog. + // + // 5) use the cached information (compared to what we already know) to send + // messages about things that changed right about then + // + // 6) now that we're in the "steady state", process ops more directly + + // XXX NOW: replace idSet/changedFields with simply currently published + // results, ok??? that should simplify things, and allow the implementation of + // "replace" (noodles) + + // XXX DOC: map id -> currently published fields + // (which of course is also the same as what is tracked in merge box, + // ah well) + var published = new IdMap; + + var selector = LocalCollection._compileSelector(cursorDescription.selector); + + // XXX add mutates its argument, which could get confusing + var add = function (doc) { + var id = doc._id; + delete doc._id; + published.set(id, doc); + callbacks.added && callbacks.added(id, doc); + }; + + var remove = function (id) { + published.remove(id); + callbacks.removed && callbacks.removed(id); + }; + + // XXX the ordering here is wrong + var initialCursor = new Cursor(self, cursorDescription); + initialCursor.forEach(function (initialDoc) { + add(initialDoc); + }); + + var oplogHandle = self._oplogHandle.onOplogEntry(cursorDescription.collectionName, function (op) { + var id; + if (op.op === 'd') { + // XXX check that ObjectId works here + id = op.o._id; + if (published.has(id)) + remove(id); + + // XXX this needs to cancel any in-progress "ID lookup" for the document + } else if (op.op === 'i') { + id = op.o._id; + if (published.has(id)) + throw new Error("insert found for already-existing ID"); + + // XXX what if selector yields? for now it can't but later it could have + // $where + if (selector(op.o)) { + add(op.o); + } + } else if (op.op === 'u') { + id = op.o2._id; + + // Is this a modifier ($set/$unset, which may require us to poll the + // database to figure out if the whole document matches the selector) or a + // replacement (in which case we can just directly re-evaluate the + // selector)? + var isModifier = _.has(op.o, '$set') || _.has(op.o, '$unset'); + + var newDoc; + if (isModifier) { + // XXX problem is, the result of this findOne is delivered at a random + // time, not necessarily synced with other stuff that may be coming down + // the oplog. also, we shouldn't read fields that aren't + // necessary to evaluate selector or to publish. + newDoc = self._docFetcher.fetch(cursorDescription.collectionName, id, + op.ts.toString()); + } else { + newDoc = _.extend({_id: id}, op.o); + } + + var matchesNow = newDoc && selector(newDoc); + var matchedBefore = published.has(id); + if (matchesNow && !matchedBefore) { + add(newDoc); + } else if (matchedBefore && !matchesNow) { + remove(id); + } else if (matchesNow) { + var oldDoc = published.get(id); + if (!oldDoc) + throw Error("thought that " + id + " was there!"); + delete newDoc._id; + published.set(id, newDoc); + if (callbacks.changed) { + var changed = LocalCollection._makeChangedFields(newDoc, oldDoc); + if (!_.isEmpty(changed)) { + callbacks.changed(id, changed); + } + } + } + } else { + console.log("SURPRISING FOR NOW OPERATION (eg drop collection)", op); + } + }); + + // XXX ordering w.r.t. everything else? + var listenersHandle = listenAll( + cursorDescription, function (notification, complete) { + // If we're not in a write fence, we don't have to do anything. That's + // because + var fence = DDPServer._CurrentWriteFence.get(); + if (!fence) { + complete(); + return; + } + var write = fence.beginWrite(); + self._callWhenOplogProcessed(function () { + write.committed(); + }); + complete(); + } + ); + + var observeHandle = { + stop: function () { + listenersHandle.stop(); + oplogHandle.stop(); + } + }; + return observeHandle; +}; diff --git a/packages/mongo-livedata/package.js b/packages/mongo-livedata/package.js index b9179eb05e..4fbf18d2a4 100644 --- a/packages/mongo-livedata/package.js +++ b/packages/mongo-livedata/package.js @@ -38,8 +38,8 @@ Package.on_use(function (api) { // For tests only. api.export('MongoTest', 'server', {testOnly: true}); - api.add_files(['id_map.js', 'doc_fetcher.js'], 'server'); - api.add_files('mongo_driver.js', 'server'); + api.add_files(['id_map.js', 'doc_fetcher.js', 'mongo_driver.js', + 'oplog.js'], 'server'); api.add_files('local_collection_driver.js', ['client', 'server']); api.add_files('remote_collection_driver.js', 'server'); api.add_files('collection.js', ['client', 'server']);