diff --git a/packages/mongo-livedata/mongo_livedata_tests.js b/packages/mongo-livedata/mongo_livedata_tests.js index c6100a41d5..12618963fa 100644 --- a/packages/mongo-livedata/mongo_livedata_tests.js +++ b/packages/mongo-livedata/mongo_livedata_tests.js @@ -770,13 +770,21 @@ if (Meteor.isServer) { }; var ins = function (doc) { - var id; - runInFence(function () { - id = coll.insert(doc); }); + var id; runInFence(function () { id = coll.insert(doc); }); return id; }; - var rem = function (sel) { runInFence(function () { coll.remove(sel); }); }; + var upd = function (sel, mod, opt) { + runInFence(function () { + coll.update(sel, mod, opt); + }); + }; + // compares arrays a and b w/o looking at order + var setsEqual = function (a, b) { + a = _.map(a, EJSON.stringify); + b = _.map(b, EJSON.stringify); + return _.isEmpty(_.difference(a, b)); + }; // Insert a doc and start observing. var docId1 = ins({foo: 22, bar: 5}); @@ -819,6 +827,45 @@ if (Meteor.isServer) { test.equal(o.output.shift(), {removed: docId5}); test.equal(o.output.shift(), {added: docId2}); + // Current state is [3 5 6] 7] + // Add some negative numbers overflowing the buffer. + // New documents will take the published place, [3 5 6] will take the buffer + // and 7 will be outside of the buffer in MongoDB. + var docId6 = ins({ foo: 22, bar: -1 }); + var docId7 = ins({ foo: 22, bar: -2 }); + var docId8 = ins({ foo: 22, bar: -3 }); + test.length(o.output, 6); + var expected = [{added: docId6}, {removed: docId2}, + {added: docId7}, {removed: docId1}, + {added: docId8}, {removed: docId3}]; + + test.equal(o.output, expected); + o.output.splice(0, 6); + + // Now the state is [-3 -2 -1] 3 5 6] 7 + // If we update first 3 docs (increment them by 20), it would be + // interesting. + upd({ bar: { $lt: 0 }}, { $inc: { bar: 20 } }, { multi: true }); + + // The updated documents can't find their place in published and they can't + // be buffered as we are not aware of the situation outside of the buffer. + // But since our buffer becomes empty, it will be refilled partially with + // updated documents. + test.length(o.output, 6); + expected = [{removed: docId6}, {added: docId4}, + {removed: docId7}, {added: docId1}, + {removed: docId8}, {added: docId2}]; + + // Note: since we are updating multiple things, the order of updates may + // differ from launch to launch. That's why we compare even positions + // (removes) w/o looking at ordering. + test.isTrue(setsEqual([o.output[0], o.output[2], o.output[4]], + [expected[0], expected[2], expected[4]])); + test.equal([o.output[1], o.output[3], o.output[5]], + [expected[1], expected[3], expected[5]]); + // The new arrangment is [3 5 6] 7 17 18] 19 + + o.handle.stop(); onComplete(); }); diff --git a/packages/mongo-livedata/oplog_observe_driver.js b/packages/mongo-livedata/oplog_observe_driver.js index 2ee1ea5f1b..0febd194e3 100644 --- a/packages/mongo-livedata/oplog_observe_driver.js +++ b/packages/mongo-livedata/oplog_observe_driver.js @@ -261,6 +261,10 @@ _.extend(OplogObserveDriver.prototype, { // Unordered case where the document stays in published once it matches // or the case when we don't have enough matching docs to publish or the // changed but matching doc will stay in published anyways. + // XXX: We rely on the emptiness of buffer. Be sure to maintain the fact + // that buffer can't be empty if there are matching documents not + // published. Notably, we don't want to schedule repoll and continue + // relying on this property. if (!self._limit || self._unpublishedBuffer.size() === 0 || comparator(newDoc, minBuffered) < 1) { self._changePublished(id, oldDoc, newDoc); } else {