Fix oplog error "Buffer inexplicably empty"

We were violating the contract "buffer is never empty in STEADY phase
unless everything matching fits into published", which is maintained by
_removeBuffered, by moving something from _unpublishedBuffer without
going through _removeBuffered.

Specifically, if we had already set _safeAppendToBuffer to
false (because we knew of some matching document below buffer) and did a
modification to a document in buffer, we could leave buffer empty
without triggering a repoll.

Fixes #2274.
This commit is contained in:
David Glasser
2014-07-16 20:13:25 -07:00
parent c8d7821deb
commit 8e005cadd3
2 changed files with 58 additions and 2 deletions

View File

@@ -2976,3 +2976,52 @@ testAsyncMulti("mongo-livedata - undefined find options", [
test.equal(result, self.doc);
}
]);
// Regression test for #2274.
Meteor.isServer && testAsyncMulti("mongo-livedata - observe limit bug", [
function (test, expect) {
var self = this;
self.coll = new Meteor.Collection(Random.id());
var state = {};
var callbacks = {
changed: function (newDoc) {
state[newDoc._id] = newDoc;
},
added: function (newDoc) {
state[newDoc._id] = newDoc;
},
removed: function (oldDoc) {
delete state[oldDoc._id];
}
};
self.observe = self.coll.find(
{}, {limit: 1, sort: {sortField: -1}}).observe(callbacks);
// Insert some documents.
runInFence(function () {
self.id0 = self.coll.insert({sortField: 0, toDelete: true});
self.id1 = self.coll.insert({sortField: 1, toDelete: true});
self.id2 = self.coll.insert({sortField: 2, toDelete: true});
});
test.equal(_.keys(state), [self.id2]);
// Mutate the one in the unpublished buffer and the one below the
// buffer. Before the fix for #2274, this left the observe state machine in
// a broken state where the buffer was empty but it wasn't try to re-fill
// it.
runInFence(function () {
self.coll.update({_id: {$ne: self.id2}},
{$set: {toDelete: false}},
{multi: 1});
});
test.equal(_.keys(state), [self.id2]);
// Now remove the one published document. This should slide up id1 from the
// buffer, but this didn't work before the #2274 fix.
runInFence(function () {
self.coll.remove({toDelete: true});
});
test.equal(_.keys(state), [self.id1]);
}
]);

View File

@@ -389,8 +389,10 @@ _.extend(OplogObserveDriver.prototype, {
}
} else if (bufferedBefore) {
oldDoc = self._unpublishedBuffer.get(id);
// remove the old version manually so we don't trigger the querying
// immediately
// remove the old version manually instead of using _removeBuffered so
// we don't trigger the querying immediately. if we end this block with
// the buffer empty, we will need to trigger the query poll manually
// too.
self._unpublishedBuffer.remove(id);
var maxPublished = self._published.get(self._published.maxElementId());
@@ -411,6 +413,11 @@ _.extend(OplogObserveDriver.prototype, {
} else {
// Throw away from both published set and buffer
self._safeAppendToBuffer = false;
// Normally this check would have been done in _removeBuffered but we
// didn't use it, so we need to do it ourself now.
if (! self._unpublishedBuffer.size()) {
self._needToPollQuery();
}
}
} else {
throw new Error("cachedBefore implies either of publishedBefore or bufferedBefore is true.");