diff --git a/packages/mongo-livedata/mongo_livedata_tests.js b/packages/mongo-livedata/mongo_livedata_tests.js index d41f711670..2a9675aef8 100644 --- a/packages/mongo-livedata/mongo_livedata_tests.js +++ b/packages/mongo-livedata/mongo_livedata_tests.js @@ -2976,3 +2976,52 @@ testAsyncMulti("mongo-livedata - undefined find options", [ test.equal(result, self.doc); } ]); + +// Regression test for #2274. +Meteor.isServer && testAsyncMulti("mongo-livedata - observe limit bug", [ + function (test, expect) { + var self = this; + self.coll = new Meteor.Collection(Random.id()); + var state = {}; + var callbacks = { + changed: function (newDoc) { + state[newDoc._id] = newDoc; + }, + added: function (newDoc) { + state[newDoc._id] = newDoc; + }, + removed: function (oldDoc) { + delete state[oldDoc._id]; + } + }; + self.observe = self.coll.find( + {}, {limit: 1, sort: {sortField: -1}}).observe(callbacks); + + // Insert some documents. + runInFence(function () { + self.id0 = self.coll.insert({sortField: 0, toDelete: true}); + self.id1 = self.coll.insert({sortField: 1, toDelete: true}); + self.id2 = self.coll.insert({sortField: 2, toDelete: true}); + }); + test.equal(_.keys(state), [self.id2]); + + // Mutate the one in the unpublished buffer and the one below the + // buffer. Before the fix for #2274, this left the observe state machine in + // a broken state where the buffer was empty but it wasn't try to re-fill + // it. + runInFence(function () { + self.coll.update({_id: {$ne: self.id2}}, + {$set: {toDelete: false}}, + {multi: 1}); + }); + test.equal(_.keys(state), [self.id2]); + + // Now remove the one published document. This should slide up id1 from the + // buffer, but this didn't work before the #2274 fix. + runInFence(function () { + self.coll.remove({toDelete: true}); + }); + test.equal(_.keys(state), [self.id1]); + } +]); + diff --git a/packages/mongo-livedata/oplog_observe_driver.js b/packages/mongo-livedata/oplog_observe_driver.js index dd1bb94c1b..ac03d24cb0 100644 --- a/packages/mongo-livedata/oplog_observe_driver.js +++ b/packages/mongo-livedata/oplog_observe_driver.js @@ -389,8 +389,10 @@ _.extend(OplogObserveDriver.prototype, { } } else if (bufferedBefore) { oldDoc = self._unpublishedBuffer.get(id); - // remove the old version manually so we don't trigger the querying - // immediately + // remove the old version manually instead of using _removeBuffered so + // we don't trigger the querying immediately. if we end this block with + // the buffer empty, we will need to trigger the query poll manually + // too. self._unpublishedBuffer.remove(id); var maxPublished = self._published.get(self._published.maxElementId()); @@ -411,6 +413,11 @@ _.extend(OplogObserveDriver.prototype, { } else { // Throw away from both published set and buffer self._safeAppendToBuffer = false; + // Normally this check would have been done in _removeBuffered but we + // didn't use it, so we need to do it ourself now. + if (! self._unpublishedBuffer.size()) { + self._needToPollQuery(); + } } } else { throw new Error("cachedBefore implies either of publishedBefore or bufferedBefore is true.");