More comments, explicity, fixed ambiguities

This commit is contained in:
Slava Kim
2014-02-17 17:25:01 -08:00
parent c03701ef1c
commit 07a18984b1

View File

@@ -20,14 +20,22 @@ OplogObserveDriver = function (options) {
self._mongoHandle = options.mongoHandle;
self._multiplexer = options.multiplexer;
if (options.ordered) {
throw Error("OplogObserveDriver only supports unordered observeChanges");
}
if (options.cursorDescription.options.limit) {
// There are several properties ordered driver implements:
// - _limit is a positive number
// - _comparator is a function-comparator by which the query is ordered
// - _unpublishedBuffer is non-null collection
// - _unpublishedBuffer is non-null collection,
// the empty buffer in STEADY phase implies that the
// everything that matches the queries selector fits
// into published set.
// - _published implements maxElementId method in addition to IdMap methods
// XXX replace with doubly-heaps and shit once we get these working
// We don't support $near and other geo-queries so it's OK to initialize the
// comparator only once in the constructor.
var sorter = new Minimongo.Sorter(options.cursorDescription.options.sort);
var comparator = sorter.getComparator();
self._limit = self._cursorDescription.options.limit;
@@ -191,6 +199,9 @@ _.extend(OplogObserveDriver.prototype, {
_removeBuffered: function (id) {
var self = this;
self._unpublishedBuffer.remove(id);
// To keep the contract "buffer is never empty in STEADY phase unless the
// everything matching fits into published" true, we poll everything as soon
// as we see the buffer becoming empty.
if (! self._unpublishedBuffer.size())
self._needToPollQuery();
},
@@ -256,7 +267,7 @@ _.extend(OplogObserveDriver.prototype, {
self._addMatching(newDoc);
} else if (cachedBefore && !matchesNow) {
self._removeMatching(id);
} else if (matchesNow) {
} else if (cachedBefore && matchesNow) {
delete newDoc._id;
var oldDoc = self._published.get(id);
var comparator = self._comparator;
@@ -264,14 +275,14 @@ _.extend(OplogObserveDriver.prototype, {
self._unpublishedBuffer.get(self._unpublishedBuffer.minElementId());
if (publishedBefore) {
// Unordered case where the document stays in published once it matches
// Unlimited case where the document stays in published once it matches
// or the case when we don't have enough matching docs to publish or the
// changed but matching doc will stay in published anyways.
// XXX: We rely on the emptiness of buffer. Be sure to maintain the fact
// that buffer can't be empty if there are matching documents not
// published. Notably, we don't want to schedule repoll and continue
// relying on this property.
if (!self._limit || self._unpublishedBuffer.size() === 0 || comparator(newDoc, minBuffered) < 1) {
if (!self._limit || self._unpublishedBuffer.size() === 0 || comparator(newDoc, minBuffered) <= 0) {
self._changePublished(id, oldDoc, newDoc);
} else {
// after the change doc doesn't stay in the published, remove it
@@ -291,6 +302,8 @@ _.extend(OplogObserveDriver.prototype, {
var maxPublished = self._published.get(self._published.maxElementId());
if (comparator(newDoc, maxPublished) < 0)
self._addPublished(id, newDoc);
} else {
throw new Error("cachedBefore implies either of publishedBefore or bufferedBefore is true.");
}
}
},
@@ -571,7 +584,7 @@ _.extend(OplogObserveDriver.prototype, {
_publishNewResults: function (newResults, newBuffer) {
var self = this;
// If the query is ordered and there is a buffer, shut down so it doesn't
// If the query is limited and there is a buffer, shut down so it doesn't
// stay in a way.
if (self._limit) {
self._unpublishedBuffer.clear();
@@ -590,7 +603,7 @@ _.extend(OplogObserveDriver.prototype, {
// Now do adds and changes.
// If self has a buffer and limit, the new fetched result will be
// ordered correctly as the query has sort specifier.
// limited correctly as the query has sort specifier.
newResults.forEach(function (doc, id) {
// "true" here means to throw if we think this doc doesn't match the
// selector.
@@ -667,8 +680,8 @@ OplogObserveDriver.cursorSupported = function (cursorDescription, matcher) {
// This option (which are mostly used for sorted cursors) require us to figure
// out where a given document fits in an order to know if it's included or
// not, and we don't track that information when doing oplog tailing.
if (options.limit && (options.skip || !options.sort)) return false;
// not. We do it only if skip is not defined or 0.
if (options.skip || (options.limit && !options.sort)) return false;
// If a fields projection option is given check if it is supported by
// minimongo (some operators are not supported).
@@ -688,7 +701,9 @@ OplogObserveDriver.cursorSupported = function (cursorDescription, matcher) {
// as Mongo, and can yield!)
// - $near (has "interesting" properties in MongoDB, like the possibility
// of returning an ID multiple times, though even polling maybe
// have a bug there
// have a bug there)
// XXX: once we support it, we would need to think more on how we
// initialize the comparators when we create the driver.
return !matcher.hasWhere() && !matcher.hasGeoQuery();
};