From ff000110a01aa72c183f14f739e6e2806f96f2e1 Mon Sep 17 00:00:00 2001 From: David Glasser Date: Thu, 19 Sep 2013 10:23:43 -0700 Subject: [PATCH] only one mongo-livedata test fails --- packages/mongo-livedata/id_map.js | 11 +++++ packages/mongo-livedata/oplog.js | 79 ++++++++++++++++++++++--------- 2 files changed, 68 insertions(+), 22 deletions(-) diff --git a/packages/mongo-livedata/id_map.js b/packages/mongo-livedata/id_map.js index 9510797731..cf513fa49c 100644 --- a/packages/mongo-livedata/id_map.js +++ b/packages/mongo-livedata/id_map.js @@ -28,6 +28,17 @@ _.extend(IdMap.prototype, { var self = this; return _.isEmpty(self._map); }, + clear: function () { + var self = this; + self._map = {}; + }, + each: function (iterator) { + var self = this; + _.each(self._map, function (value, key, obj) { + var context = this; + iterator.call(context, value, LocalCollection._idParse(key), obj); + }); + }, // XXX used? setDefault: function (id, def) { var self = this; diff --git a/packages/mongo-livedata/oplog.js b/packages/mongo-livedata/oplog.js index f22814fcb5..0ffe9739bd 100644 --- a/packages/mongo-livedata/oplog.js +++ b/packages/mongo-livedata/oplog.js @@ -1,3 +1,4 @@ +var Fiber = Npm.require('fibers'); var Future = Npm.require('fibers/future'); var PHASE = { @@ -41,8 +42,57 @@ MongoConnection.prototype._observeChangesWithOplog = function ( callbacks.removed && callbacks.removed(id); }; + // XXX mutates newDoc, that's weird + var handleDoc = function (id, newDoc) { + var matchesNow = newDoc && selector(newDoc); + var matchedBefore = published.has(id); + if (matchesNow && !matchedBefore) { + add(newDoc); + } else if (matchedBefore && !matchesNow) { + remove(id); + } else if (matchesNow) { + var oldDoc = published.get(id); + if (!oldDoc) + throw Error("thought that " + id + " was there!"); + delete newDoc._id; + published.set(id, newDoc); + if (callbacks.changed) { + var changed = LocalCollection._makeChangedFields( + EJSON.clone(newDoc), oldDoc); + if (!_.isEmpty(changed)) + callbacks.changed(id, changed); + } + } + }; + var beCurious = function () { - throw Error("I AM CURIOUS") + phase = PHASE.FETCHING; + while (!curiousity.isEmpty()) { + if (phase !== PHASE.FETCHING) + throw new Error("Surprising phase in beCurious: " + phase); + + var futures = []; + curiousity.each(function (cacheKey, id) { + // Run each until they yield. This implies that curiousity should not be + // updated during this loop. + Fiber(function () { + var f = new Future; + futures.push(f); + var doc = self._docFetcher.fetch(cursorDescription.collectionName, id, + cacheKey); + handleDoc(id, doc); + f.return(); + }).run(); + }); + curiousity.clear(); + Future.wait(futures); + // Throw if any throw. + // XXX this means the observe will now be stalled + _.each(futures, function (f) { + f.get(); + }); + } + phase = PHASE.STEADY; }; var oplogEntryHandlers = {}; @@ -50,7 +100,9 @@ MongoConnection.prototype._observeChangesWithOplog = function ( curiousity.set(idForOp(op), op.ts.toString()); }; oplogEntryHandlers[PHASE.FETCHING] = function (op) { - // XXX now + // XXX we can probably actually handle some operations directly (eg, + // insert/remove/replace if they don't conflict with "outstanding" fetches) + curiousity.set(idForOp(op), op.ts.toString()); }; oplogEntryHandlers[PHASE.STEADY] = function (op) { var id = idForOp(op); @@ -74,30 +126,12 @@ MongoConnection.prototype._observeChangesWithOplog = function ( if (isModifier) { curiousity.set(id, op.ts.toString()); - phase = PHASE.FETCHING; beCurious(); return; } - var newDoc = _.extend({_id: id}, op.o); - var matchesNow = selector(newDoc); - var matchedBefore = published.has(id); - if (matchesNow && !matchedBefore) { - add(newDoc); - } else if (matchedBefore && !matchesNow) { - remove(id); - } else if (matchesNow) { - var oldDoc = published.get(id); - if (!oldDoc) - throw Error("thought that " + id + " was there!"); - delete newDoc._id; - published.set(id, newDoc); - if (callbacks.changed) { - var changed = LocalCollection._makeChangedFields(newDoc, oldDoc); - if (!_.isEmpty(changed)) - callbacks.changed(id, changed); - } - } + + handleDoc(id, newDoc); } else { throw Error("XXX SURPRISING OPERATION: " + op); } @@ -121,6 +155,7 @@ MongoConnection.prototype._observeChangesWithOplog = function ( return; } var write = fence.beginWrite(); + // XXX this also has to wait for steady!!! self._callWhenOplogProcessed(function () { write.committed(); });