From fbfda21dfbe5778f1e4813e54eac69afb597f67e Mon Sep 17 00:00:00 2001 From: David Glasser Date: Wed, 18 Sep 2013 18:51:35 -0700 Subject: [PATCH] whoa, we're halfway there --- packages/mongo-livedata/id_map.js | 4 + packages/mongo-livedata/mongo_driver.js | 6 +- packages/mongo-livedata/oplog.js | 139 +++++++++++++----------- 3 files changed, 81 insertions(+), 68 deletions(-) diff --git a/packages/mongo-livedata/id_map.js b/packages/mongo-livedata/id_map.js index 57ee9cd8d9..9510797731 100644 --- a/packages/mongo-livedata/id_map.js +++ b/packages/mongo-livedata/id_map.js @@ -24,6 +24,10 @@ _.extend(IdMap.prototype, { var key = LocalCollection._idStringify(id); return _.has(self._map, key); }, + isEmpty: function () { + var self = this; + return _.isEmpty(self._map); + }, // XXX used? setDefault: function (id, def) { var self = this; diff --git a/packages/mongo-livedata/mongo_driver.js b/packages/mongo-livedata/mongo_driver.js index 65d1aa9335..2610ee4fc4 100644 --- a/packages/mongo-livedata/mongo_driver.js +++ b/packages/mongo-livedata/mongo_driver.js @@ -244,10 +244,8 @@ MongoConnection.prototype._startOplogTailing = function (oplogUrl, dbName) { var oplogSelector = { ns: new RegExp('^' + quotemeta(dbName) + '\\.'), - $or: [ - {op: {$in: ['i', 'u', 'd']}}, - {op: 'c', 'o.drop': {$exists: true}} - ] + // XXX also handle drop collection, etc + op: {$in: ['i', 'u', 'd']} }; if (lastOplogEntry) oplogSelector.ts = {$gt: lastOplogEntry.ts}; diff --git a/packages/mongo-livedata/oplog.js b/packages/mongo-livedata/oplog.js index 11ef7018db..f22814fcb5 100644 --- a/packages/mongo-livedata/oplog.js +++ b/packages/mongo-livedata/oplog.js @@ -1,45 +1,39 @@ +var Future = Npm.require('fibers/future'); + +var PHASE = { + INITIALIZING: 1, + FETCHING: 2, + STEADY: 3 +}; + +var idForOp = function (op) { + if (op.op === 'd') + return op.o._id; + else if (op.op === 'i') + return op.o._id; + else if (op.op === 'u') + return op.o2._id; + else + throw Error("Unknown op: " + EJSON.stringify(op)); +}; + MongoConnection.prototype._observeChangesWithOplog = function ( cursorDescription, callbacks) { var self = this; - // XXX let's do this with race conditions first! - // - // the real way will involve special oplog handling during the initial cursor - // read. specifically: - // - // 1) start reading the oplog. for every document that could conceivably be - // relevant, cache a bit of information about what we saw. (eg, cache - // document for inserts, removal fact for removes, "needs poll" for updates. - // most recent overrides.) - // - // 2) read the initial set and send added messages. - // - // 3) write a sentinel to some field. - // - // 4) wait until that sentinel comes up through the oplog. - // - // 5) use the cached information (compared to what we already know) to send - // messages about things that changed right about then - // - // 6) now that we're in the "steady state", process ops more directly + var phase = PHASE.INITIALIZING; - // XXX NOW: replace idSet/changedFields with simply currently published - // results, ok??? that should simplify things, and allow the implementation of - // "replace" (noodles) - - // XXX DOC: map id -> currently published fields - // (which of course is also the same as what is tracked in merge box, - // ah well) var published = new IdMap; - var selector = LocalCollection._compileSelector(cursorDescription.selector); - // XXX add mutates its argument, which could get confusing + var curiousity = new IdMap; + var add = function (doc) { var id = doc._id; - delete doc._id; - published.set(id, doc); - callbacks.added && callbacks.added(id, doc); + var fields = EJSON.clone(doc); + delete fields._id; + published.set(id, fields); + callbacks.added && callbacks.added(id, EJSON.clone(fields)); }; var remove = function (id) { @@ -47,53 +41,46 @@ MongoConnection.prototype._observeChangesWithOplog = function ( callbacks.removed && callbacks.removed(id); }; - // XXX the ordering here is wrong - var initialCursor = new Cursor(self, cursorDescription); - initialCursor.forEach(function (initialDoc) { - add(initialDoc); - }); + var beCurious = function () { + throw Error("I AM CURIOUS") + }; - var oplogHandle = self._oplogHandle.onOplogEntry(cursorDescription.collectionName, function (op) { - var id; + var oplogEntryHandlers = {}; + oplogEntryHandlers[PHASE.INITIALIZING] = function (op) { + curiousity.set(idForOp(op), op.ts.toString()); + }; + oplogEntryHandlers[PHASE.FETCHING] = function (op) { + // XXX now + }; + oplogEntryHandlers[PHASE.STEADY] = function (op) { + var id = idForOp(op); if (op.op === 'd') { - // XXX check that ObjectId works here - id = op.o._id; if (published.has(id)) remove(id); - - // XXX this needs to cancel any in-progress "ID lookup" for the document } else if (op.op === 'i') { - id = op.o._id; if (published.has(id)) throw new Error("insert found for already-existing ID"); // XXX what if selector yields? for now it can't but later it could have // $where - if (selector(op.o)) { + if (selector(op.o)) add(op.o); - } } else if (op.op === 'u') { - id = op.o2._id; - // Is this a modifier ($set/$unset, which may require us to poll the // database to figure out if the whole document matches the selector) or a // replacement (in which case we can just directly re-evaluate the // selector)? var isModifier = _.has(op.o, '$set') || _.has(op.o, '$unset'); - var newDoc; if (isModifier) { - // XXX problem is, the result of this findOne is delivered at a random - // time, not necessarily synced with other stuff that may be coming down - // the oplog. also, we shouldn't read fields that aren't - // necessary to evaluate selector or to publish. - newDoc = self._docFetcher.fetch(cursorDescription.collectionName, id, - op.ts.toString()); - } else { - newDoc = _.extend({_id: id}, op.o); + curiousity.set(id, op.ts.toString()); + phase = PHASE.FETCHING; + beCurious(); + return; } - var matchesNow = newDoc && selector(newDoc); + var newDoc = _.extend({_id: id}, op.o); + var matchesNow = selector(newDoc); var matchedBefore = published.has(id); if (matchesNow && !matchedBefore) { add(newDoc); @@ -107,15 +94,21 @@ MongoConnection.prototype._observeChangesWithOplog = function ( published.set(id, newDoc); if (callbacks.changed) { var changed = LocalCollection._makeChangedFields(newDoc, oldDoc); - if (!_.isEmpty(changed)) { + if (!_.isEmpty(changed)) callbacks.changed(id, changed); - } } } } else { - console.log("SURPRISING FOR NOW OPERATION (eg drop collection)", op); + throw Error("XXX SURPRISING OPERATION: " + op); } - }); + }; + + + var oplogHandle = self._oplogHandle.onOplogEntry( + cursorDescription.collectionName, function (op) { + oplogEntryHandlers[phase](op); + } + ); // XXX ordering w.r.t. everything else? var listenersHandle = listenAll( @@ -135,11 +128,29 @@ MongoConnection.prototype._observeChangesWithOplog = function ( } ); - var observeHandle = { + var initialCursor = new Cursor(self, cursorDescription); + initialCursor.forEach(function (initialDoc) { + add(initialDoc); + }); + + var catchUpFuture = new Future; + self._callWhenOplogProcessed(catchUpFuture.resolver()); + catchUpFuture.wait(); + + if (phase !== PHASE.INITIALIZING) + throw Error("Phase unexpectedly " + phase); + + if (curiousity.isEmpty()) { + phase = PHASE.STEADY; + } else { + phase = PHASE.FETCHING; + Meteor.defer(beCurious); + } + + return { stop: function () { listenersHandle.stop(); oplogHandle.stop(); } }; - return observeHandle; };