Don't repoll if everything fits into buffer;

Use EJSON.equals
Better comments
This commit is contained in:
Slava Kim
2014-02-21 19:47:23 -08:00
parent 23ec5007e9
commit 9ccdc0b80c

View File

@@ -28,11 +28,11 @@ OplogObserveDriver = function (options) {
// There are several properties ordered driver implements:
// - _limit is a positive number
// - _comparator is a function-comparator by which the query is ordered
// - _unpublishedBuffer is non-null collection,
// - _unpublishedBuffer is non-null Min/Max Heap,
// the empty buffer in STEADY phase implies that the
// everything that matches the queries selector fits
// into published set.
// - _published implements maxElementId method in addition to IdMap methods
// - _published - Min Heap (also implements IdMap methods)
// We don't support $near and other geo-queries so it's OK to initialize the
// comparator only once in the constructor.
@@ -159,7 +159,7 @@ _.extend(OplogObserveDriver.prototype, {
var overflowingDocId = self._published.maxElementId();
var overflowingDoc = self._published.get(overflowingDocId);
if (_.isEqual(overflowingDocId, id)) {
if (EJSON.equals(overflowingDocId, id)) {
throw new Error("The document just added is overflowing the published set");
}
@@ -202,7 +202,7 @@ _.extend(OplogObserveDriver.prototype, {
if (self._unpublishedBuffer.size() > self._limit) {
var maxBufferedId = self._unpublishedBuffer.maxElementId();
if (_.isEqual(maxBufferedId, id)) {
if (EJSON.equals(maxBufferedId, id)) {
throw new Error("The document just added to buffer is overflowing the buffer");
}
@@ -216,7 +216,7 @@ _.extend(OplogObserveDriver.prototype, {
// To keep the contract "buffer is never empty in STEADY phase unless the
// everything matching fits into published" true, we poll everything as soon
// as we see the buffer becoming empty.
if (! self._unpublishedBuffer.size())
if (! self._unpublishedBuffer.size() && ! self._safeAppendToBuffer)
self._needToPollQuery();
},
// Called when a document has joined the "Matching" results set.
@@ -468,11 +468,13 @@ _.extend(OplogObserveDriver.prototype, {
// XXX needs more thought on non-zero skip
// XXX "2" here is a "magic number"
var initialCursor = self._cursorForQuery({ limit: self._limit * 2 });
var fetchedDocsCount = 0;
initialCursor.forEach(function (initialDoc) {
self._addMatching(initialDoc);
fetchedDocsCount++;
});
self._safeAppendToBuffer = initialCursor.count() < self._limit * 2;
self._safeAppendToBuffer = fetchedDocsCount < self._limit * 2;
if (self._stopped)
throw new Error("oplog stopped quite early");
@@ -704,9 +706,10 @@ OplogObserveDriver.cursorSupported = function (cursorDescription, matcher) {
if (options._disableOplog)
return false;
// This option (which are mostly used for sorted cursors) require us to figure
// out where a given document fits in an order to know if it's included or
// not. We do it only if skip is not defined or 0.
// skip is not supported: to support it we would need to keep track of all
// "skipped" documents or at least their ids.
// limit w/o a sort specifier is not supported: current implementation needs a
// determent way to order documents.
if (options.skip || (options.limit && !options.sort)) return false;
// If a fields projection option is given check if it is supported by