diff --git a/packages/mongo-livedata/oplog_observe_driver.js b/packages/mongo-livedata/oplog_observe_driver.js index 50fcd10564..098cf10eed 100644 --- a/packages/mongo-livedata/oplog_observe_driver.js +++ b/packages/mongo-livedata/oplog_observe_driver.js @@ -49,7 +49,11 @@ OplogObserveDriver = function (options) { self._unpublishedBuffer = null; self._published = new LocalCollection._IdMap; } - self._justUpdatedBuffer = false; + + // Indicates if it is safe to insert a new document at the end of the buffer + // for this query. i.e. it is known that there are no documents matching the + // selector those are not in published or buffer. + self._safeAppendToBuffer = false; self._stopped = false; self._stopHandles = []; @@ -203,7 +207,7 @@ _.extend(OplogObserveDriver.prototype, { } self._unpublishedBuffer.remove(maxBufferedId); - self._justUpdatedBuffer = false; + self._safeAppendToBuffer = false; } }, _removeBuffered: function (id) { @@ -241,7 +245,7 @@ _.extend(OplogObserveDriver.prototype, { // outside of the buffer easily. if (!limit || self._published.size() < limit || comparator(maxPublished, fields) > 0) { self._addPublished(id, fields); - } else if ((self._justUpdatedBuffer && self._unpublishedBuffer.size() < limit) || (maxBuffered && comparator(maxBuffered, fields) > 0)) { + } else if ((self._safeAppendToBuffer && self._unpublishedBuffer.size() < limit) || (maxBuffered && comparator(maxBuffered, fields) > 0)) { self._addBuffered(id, fields); } }, @@ -299,10 +303,10 @@ _.extend(OplogObserveDriver.prototype, { self._removePublished(id); // but it can move into buffered now, check it var maxBuffered = self._unpublishedBuffer.get(self._unpublishedBuffer.maxElementId()); - if (self._justUpdatedBuffer || (maxBuffered && comparator(newDoc, maxBuffered) < 0)) + if (self._safeAppendToBuffer || (maxBuffered && comparator(newDoc, maxBuffered) < 0)) self._addBuffered(id, newDoc); else - self._justUpdatedBuffer = false; + self._safeAppendToBuffer = false; } } else if (bufferedBefore) { oldDoc = self._unpublishedBuffer.get(id); @@ -315,11 +319,11 @@ _.extend(OplogObserveDriver.prototype, { // the buffered doc was updated, it could move to published if (comparator(newDoc, maxPublished) < 0) { self._addPublished(id, newDoc); - } else if (self._justUpdatedBuffer || (maxBuffered && comparator(newDoc, maxBuffered) < 0)) { + } else if (self._safeAppendToBuffer || (maxBuffered && comparator(newDoc, maxBuffered) < 0)) { // stays in buffer self._unpublishedBuffer.set(id, newDoc); } else { - self._justUpdatedBuffer = false; + self._safeAppendToBuffer = false; } } else { throw new Error("cachedBefore implies either of publishedBefore or bufferedBefore is true."); @@ -467,7 +471,9 @@ _.extend(OplogObserveDriver.prototype, { initialCursor.forEach(function (initialDoc) { self._addMatching(initialDoc); }); - self._justUpdatedBuffer = true; + + self._safeAppendToBuffer = initialCursor.count() < self._limit * 2; + if (self._stopped) throw new Error("oplog stopped quite early"); // Allow observeChanges calls to return. (After this, it's possible for @@ -634,7 +640,8 @@ _.extend(OplogObserveDriver.prototype, { delete doc._id; self._addBuffered(id, doc); }); - self._justUpdatedBuffer = true; + + self._safeAppendToBuffer = newBuffer.size() < self._limit; }, // This stop function is invoked from the onStop of the ObserveMultiplexer, so