diff --git a/packages/mongo-livedata/oplog_observe_driver.js b/packages/mongo-livedata/oplog_observe_driver.js index 8400373390..5fb94ec4b1 100644 --- a/packages/mongo-livedata/oplog_observe_driver.js +++ b/packages/mongo-livedata/oplog_observe_driver.js @@ -287,15 +287,11 @@ _.extend(OplogObserveDriver.prototype, { self._removeBuffered(id); } }, - _handleDoc: function (id, newDoc, mustMatchNow) { + _handleDoc: function (id, newDoc) { var self = this; newDoc = _.clone(newDoc); var matchesNow = newDoc && self._matcher.documentMatches(newDoc).result; - if (mustMatchNow && !matchesNow) { - throw Error("expected " + EJSON.stringify(newDoc) + " to match " - + EJSON.stringify(self._cursorDescription)); - } var publishedBefore = self._published.has(id); var bufferedBefore = self._limit && self._unpublishedBuffer.has(id); @@ -678,9 +674,18 @@ _.extend(OplogObserveDriver.prototype, { // If self has a buffer and limit, the new fetched result will be // limited correctly as the query has sort specifier. newResults.forEach(function (doc, id) { - // "true" here means to throw if we think this doc doesn't match the - // selector. - self._handleDoc(id, doc, true); + self._handleDoc(id, doc); + }); + + // Sanity-check that everything we tried to put into _published ended up + // there. + // XXX if this is slow, remove it later + if (self._published.size() !== newResults.size()) { + throw Error("failed to copy newResults into _published!"); + } + self._published.forEach(function (doc, id) { + if (!newResults.has(id)) + throw Error("_published has a doc that newResults doesn't; " + id); }); // Finally, replace the buffer