diff --git a/packages/mongo-livedata/oplog_observe_driver.js b/packages/mongo-livedata/oplog_observe_driver.js index 098cf10eed..50f33aa679 100644 --- a/packages/mongo-livedata/oplog_observe_driver.js +++ b/packages/mongo-livedata/oplog_observe_driver.js @@ -28,11 +28,11 @@ OplogObserveDriver = function (options) { // There are several properties ordered driver implements: // - _limit is a positive number // - _comparator is a function-comparator by which the query is ordered - // - _unpublishedBuffer is non-null collection, + // - _unpublishedBuffer is non-null Min/Max Heap, // the empty buffer in STEADY phase implies that the // everything that matches the queries selector fits // into published set. - // - _published implements maxElementId method in addition to IdMap methods + // - _published - Min Heap (also implements IdMap methods) // We don't support $near and other geo-queries so it's OK to initialize the // comparator only once in the constructor. @@ -159,7 +159,7 @@ _.extend(OplogObserveDriver.prototype, { var overflowingDocId = self._published.maxElementId(); var overflowingDoc = self._published.get(overflowingDocId); - if (_.isEqual(overflowingDocId, id)) { + if (EJSON.equals(overflowingDocId, id)) { throw new Error("The document just added is overflowing the published set"); } @@ -202,7 +202,7 @@ _.extend(OplogObserveDriver.prototype, { if (self._unpublishedBuffer.size() > self._limit) { var maxBufferedId = self._unpublishedBuffer.maxElementId(); - if (_.isEqual(maxBufferedId, id)) { + if (EJSON.equals(maxBufferedId, id)) { throw new Error("The document just added to buffer is overflowing the buffer"); } @@ -216,7 +216,7 @@ _.extend(OplogObserveDriver.prototype, { // To keep the contract "buffer is never empty in STEADY phase unless the // everything matching fits into published" true, we poll everything as soon // as we see the buffer becoming empty. - if (! self._unpublishedBuffer.size()) + if (! self._unpublishedBuffer.size() && ! self._safeAppendToBuffer) self._needToPollQuery(); }, // Called when a document has joined the "Matching" results set. @@ -468,11 +468,13 @@ _.extend(OplogObserveDriver.prototype, { // XXX needs more thought on non-zero skip // XXX "2" here is a "magic number" var initialCursor = self._cursorForQuery({ limit: self._limit * 2 }); + var fetchedDocsCount = 0; initialCursor.forEach(function (initialDoc) { self._addMatching(initialDoc); + fetchedDocsCount++; }); - self._safeAppendToBuffer = initialCursor.count() < self._limit * 2; + self._safeAppendToBuffer = fetchedDocsCount < self._limit * 2; if (self._stopped) throw new Error("oplog stopped quite early"); @@ -704,9 +706,10 @@ OplogObserveDriver.cursorSupported = function (cursorDescription, matcher) { if (options._disableOplog) return false; - // This option (which are mostly used for sorted cursors) require us to figure - // out where a given document fits in an order to know if it's included or - // not. We do it only if skip is not defined or 0. + // skip is not supported: to support it we would need to keep track of all + // "skipped" documents or at least their ids. + // limit w/o a sort specifier is not supported: current implementation needs a + // determent way to order documents. if (options.skip || (options.limit && !options.sort)) return false; // If a fields projection option is given check if it is supported by