diff --git a/packages/mongo-livedata/oplog_observe_driver.js b/packages/mongo-livedata/oplog_observe_driver.js index ac33779eae..dbfbcbf930 100644 --- a/packages/mongo-livedata/oplog_observe_driver.js +++ b/packages/mongo-livedata/oplog_observe_driver.js @@ -20,14 +20,22 @@ OplogObserveDriver = function (options) { self._mongoHandle = options.mongoHandle; self._multiplexer = options.multiplexer; + if (options.ordered) { + throw Error("OplogObserveDriver only supports unordered observeChanges"); + } + if (options.cursorDescription.options.limit) { // There are several properties ordered driver implements: // - _limit is a positive number // - _comparator is a function-comparator by which the query is ordered - // - _unpublishedBuffer is non-null collection + // - _unpublishedBuffer is non-null collection, + // the empty buffer in STEADY phase implies that the + // everything that matches the queries selector fits + // into published set. // - _published implements maxElementId method in addition to IdMap methods - // XXX replace with doubly-heaps and shit once we get these working + // We don't support $near and other geo-queries so it's OK to initialize the + // comparator only once in the constructor. var sorter = new Minimongo.Sorter(options.cursorDescription.options.sort); var comparator = sorter.getComparator(); self._limit = self._cursorDescription.options.limit; @@ -191,6 +199,9 @@ _.extend(OplogObserveDriver.prototype, { _removeBuffered: function (id) { var self = this; self._unpublishedBuffer.remove(id); + // To keep the contract "buffer is never empty in STEADY phase unless the + // everything matching fits into published" true, we poll everything as soon + // as we see the buffer becoming empty. if (! self._unpublishedBuffer.size()) self._needToPollQuery(); }, @@ -256,7 +267,7 @@ _.extend(OplogObserveDriver.prototype, { self._addMatching(newDoc); } else if (cachedBefore && !matchesNow) { self._removeMatching(id); - } else if (matchesNow) { + } else if (cachedBefore && matchesNow) { delete newDoc._id; var oldDoc = self._published.get(id); var comparator = self._comparator; @@ -264,14 +275,14 @@ _.extend(OplogObserveDriver.prototype, { self._unpublishedBuffer.get(self._unpublishedBuffer.minElementId()); if (publishedBefore) { - // Unordered case where the document stays in published once it matches + // Unlimited case where the document stays in published once it matches // or the case when we don't have enough matching docs to publish or the // changed but matching doc will stay in published anyways. // XXX: We rely on the emptiness of buffer. Be sure to maintain the fact // that buffer can't be empty if there are matching documents not // published. Notably, we don't want to schedule repoll and continue // relying on this property. - if (!self._limit || self._unpublishedBuffer.size() === 0 || comparator(newDoc, minBuffered) < 1) { + if (!self._limit || self._unpublishedBuffer.size() === 0 || comparator(newDoc, minBuffered) <= 0) { self._changePublished(id, oldDoc, newDoc); } else { // after the change doc doesn't stay in the published, remove it @@ -291,6 +302,8 @@ _.extend(OplogObserveDriver.prototype, { var maxPublished = self._published.get(self._published.maxElementId()); if (comparator(newDoc, maxPublished) < 0) self._addPublished(id, newDoc); + } else { + throw new Error("cachedBefore implies either of publishedBefore or bufferedBefore is true."); } } }, @@ -571,7 +584,7 @@ _.extend(OplogObserveDriver.prototype, { _publishNewResults: function (newResults, newBuffer) { var self = this; - // If the query is ordered and there is a buffer, shut down so it doesn't + // If the query is limited and there is a buffer, shut down so it doesn't // stay in a way. if (self._limit) { self._unpublishedBuffer.clear(); @@ -590,7 +603,7 @@ _.extend(OplogObserveDriver.prototype, { // Now do adds and changes. // If self has a buffer and limit, the new fetched result will be - // ordered correctly as the query has sort specifier. + // limited correctly as the query has sort specifier. newResults.forEach(function (doc, id) { // "true" here means to throw if we think this doc doesn't match the // selector. @@ -667,8 +680,8 @@ OplogObserveDriver.cursorSupported = function (cursorDescription, matcher) { // This option (which are mostly used for sorted cursors) require us to figure // out where a given document fits in an order to know if it's included or - // not, and we don't track that information when doing oplog tailing. - if (options.limit && (options.skip || !options.sort)) return false; + // not. We do it only if skip is not defined or 0. + if (options.skip || (options.limit && !options.sort)) return false; // If a fields projection option is given check if it is supported by // minimongo (some operators are not supported). @@ -688,7 +701,9 @@ OplogObserveDriver.cursorSupported = function (cursorDescription, matcher) { // as Mongo, and can yield!) // - $near (has "interesting" properties in MongoDB, like the possibility // of returning an ID multiple times, though even polling maybe - // have a bug there + // have a bug there) + // XXX: once we support it, we would need to think more on how we + // initialize the comparators when we create the driver. return !matcher.hasWhere() && !matcher.hasGeoQuery(); };