From a93f742b3aaeb8f4af3cd1c149fa126393bea712 Mon Sep 17 00:00:00 2001 From: David Glasser Date: Thu, 12 Sep 2013 20:49:37 -0700 Subject: [PATCH] ok, rewrite is done. now "replace" updates work too. also give up on idea of knowing what fields changed (more correct, does require an in memory diff). many tests pass. --- packages/mongo-livedata/mongo_driver.js | 107 ++++++++++-------------- 1 file changed, 43 insertions(+), 64 deletions(-) diff --git a/packages/mongo-livedata/mongo_driver.js b/packages/mongo-livedata/mongo_driver.js index 933ab7a460..531ee16254 100644 --- a/packages/mongo-livedata/mongo_driver.js +++ b/packages/mongo-livedata/mongo_driver.js @@ -273,6 +273,9 @@ MongoConnection.prototype._startOplogTailing = function (oplogUrl, dbName) { var nextId = 0; self._oplogHandle.onOplogEntry = function (collectionName, callback) { + callback = Meteor.bindEnvironment(callback, function (err) { + Meteor._debug("Error in oplog callback", err.stack); + }); if (!_.has(callbacksByCollection, collectionName)) callbacksByCollection[collectionName] = {}; var callbackId = nextId++; @@ -285,21 +288,6 @@ MongoConnection.prototype._startOplogTailing = function (oplogUrl, dbName) { }; }; -// XXX you can actually get a replacement doc instead of $set/$unset! this -// completely messes with the attempt to do a non-ID-polling process of -// updates... -var modifierTopLevelFields = function (mod) { - var fields = {}; - _.each(mod, function (mapping, op) { - if (op !== '$set' && op != '$unset') - throw new Error("Unknown oplog operation " + op); - _.each(mapping, function (value, field) { - fields[field.split('.')[0]] = true; - }); - }); - return _.keys(fields); -}; - MongoConnection.prototype._observeChangesWithOplog = function ( cursorDescription, callbacks) { var self = this; @@ -334,12 +322,9 @@ MongoConnection.prototype._observeChangesWithOplog = function ( // ah well) var published = new IdMap; - // XXX KILL THESE - var idSet = new IdMap; - var changedFields = new IdMap; - var selector = LocalCollection._compileSelector(cursorDescription.selector); + // XXX add mutates its argument, which could get confusing var add = function (doc) { var id = doc._id; delete doc._id; @@ -359,6 +344,8 @@ MongoConnection.prototype._observeChangesWithOplog = function ( id = op.o._id; if (published.has(id)) remove(id); + + // XXX this needs to cancel any in-progress "ID lookup" for the document } else if (op.op === 'i') { id = op.o._id; if (published.has(id)) @@ -370,54 +357,45 @@ MongoConnection.prototype._observeChangesWithOplog = function ( add(op.o); } else if (op.op === 'u') { id = op.o2._id; - var fields = changedFields.get(id); - if (!fields) { - fields = {}; - changedFields.set(id, fields); - Fiber(function (){ - // XXX problem is, the result of this findOne is delivered at a random - // time, not necessarily synced with other stuff that may be coming - // down the oplog. how much does this matter? - var updatedDoc = self.findOne( - cursorDescription.collectionName, {_id: id}); - // XXX in what circumstances does this !== fields? - var myChangedFields = changedFields.get(id); - // Did we process a remove while we were waiting? - if (!myChangedFields) - return; + // Is this a modifier ($set/$unset, which may require us to poll the + // database to figure out if the whole document matches the selector) or a + // replacement (in which case we can just directly re-evaluate the + // selector)? + var isModifier = _.has(op.o, '$set') || _.has(op.o, '$unset'); - // Delete this record from myChangedFields atomically before anything - // that might yield (even selector might yield if it has $where!) - changedFields.remove(id); - - var matchesNow = updatedDoc && selector(updatedDoc); - var matchedBefore = idSet.has(id); - - if (matchesNow && !matchedBefore) { - add(updatedDoc); - } else if (matchedBefore && !matchesNow) { - remove(id); - } else if (matchesNow) { - if (callbacks.changed) { - // XXX this assumes that every field we saw a set/unset on - // actually changed. otherwise we may send out something - // redundant. - var changed = {}; - _.each(myChangedFields, function (unused, fieldName) { - changed[fieldName] = _.has(updatedDoc, fieldName) - ? updatedDoc[fieldName] : undefined; - }); - callbacks.changed(id, changed); - } - } - }).run(); + var newDoc; + if (isModifier) { + // XXX problem is, the result of this findOne is delivered at a random + // time, not necessarily synced with other stuff that may be coming down + // the oplog. also, we should coalesce multiple pings of the same + // document ("ID queue"). also, we shouldn't read fields that aren't + // necessary to evaluate selector or to publish. + newDoc = self.findOne(cursorDescription.collectionName, {_id: id}); + } else { + newDoc = op.o; + } + + var matchesNow = newDoc && selector(newDoc); + var matchedBefore = published.has(id); + if (matchesNow && !matchedBefore) { + add(newDoc); + } else if (matchedBefore && !matchesNow) { + remove(id); + } else if (matchesNow) { + var oldDoc = published.get(id); + if (!oldDoc) + throw Error("thought that " + id + " was there!"); + published.set(id, newDoc); + if (callbacks.changed) { + var changed = LocalCollection._makeChangedFields(newDoc, oldDoc); + if (!_.isEmpty(changed)) { + callbacks.changed(id, changed); + } + } } - _.each(modifierTopLevelFields(op.o), function (field) { - fields[field] = true; - }); } else { - console.log("A CHANGE TO THE DOC", op); + console.log("SURPRISING FOR NOW OPERATION (eg drop collection)", op); } }); @@ -1602,7 +1580,8 @@ var cursorSupportedByOplogTailing = function (cursorDescription) { // For now, we're just dealing with equality queries: no $operators, regexps, // or $and/$or/$where/etc clauses. We can expand the scope of what we're - // comfortable processing later. + // comfortable processing later. ($where will get pretty scary since it will + // allow selector processing to yield!) return _.all(cursorDescription.selector, function (value, field) { // No logical operators like $and. if (field.substr(0, 1) === '$')