_justUpdatedBuffer => _safeAppendToBuffer

This commit is contained in:
Slava Kim
2014-02-19 12:44:19 -08:00
parent 6144213649
commit 23ec5007e9

View File

@@ -49,7 +49,11 @@ OplogObserveDriver = function (options) {
self._unpublishedBuffer = null;
self._published = new LocalCollection._IdMap;
}
self._justUpdatedBuffer = false;
// Indicates if it is safe to insert a new document at the end of the buffer
// for this query. i.e. it is known that there are no documents matching the
// selector those are not in published or buffer.
self._safeAppendToBuffer = false;
self._stopped = false;
self._stopHandles = [];
@@ -203,7 +207,7 @@ _.extend(OplogObserveDriver.prototype, {
}
self._unpublishedBuffer.remove(maxBufferedId);
self._justUpdatedBuffer = false;
self._safeAppendToBuffer = false;
}
},
_removeBuffered: function (id) {
@@ -241,7 +245,7 @@ _.extend(OplogObserveDriver.prototype, {
// outside of the buffer easily.
if (!limit || self._published.size() < limit || comparator(maxPublished, fields) > 0) {
self._addPublished(id, fields);
} else if ((self._justUpdatedBuffer && self._unpublishedBuffer.size() < limit) || (maxBuffered && comparator(maxBuffered, fields) > 0)) {
} else if ((self._safeAppendToBuffer && self._unpublishedBuffer.size() < limit) || (maxBuffered && comparator(maxBuffered, fields) > 0)) {
self._addBuffered(id, fields);
}
},
@@ -299,10 +303,10 @@ _.extend(OplogObserveDriver.prototype, {
self._removePublished(id);
// but it can move into buffered now, check it
var maxBuffered = self._unpublishedBuffer.get(self._unpublishedBuffer.maxElementId());
if (self._justUpdatedBuffer || (maxBuffered && comparator(newDoc, maxBuffered) < 0))
if (self._safeAppendToBuffer || (maxBuffered && comparator(newDoc, maxBuffered) < 0))
self._addBuffered(id, newDoc);
else
self._justUpdatedBuffer = false;
self._safeAppendToBuffer = false;
}
} else if (bufferedBefore) {
oldDoc = self._unpublishedBuffer.get(id);
@@ -315,11 +319,11 @@ _.extend(OplogObserveDriver.prototype, {
// the buffered doc was updated, it could move to published
if (comparator(newDoc, maxPublished) < 0) {
self._addPublished(id, newDoc);
} else if (self._justUpdatedBuffer || (maxBuffered && comparator(newDoc, maxBuffered) < 0)) {
} else if (self._safeAppendToBuffer || (maxBuffered && comparator(newDoc, maxBuffered) < 0)) {
// stays in buffer
self._unpublishedBuffer.set(id, newDoc);
} else {
self._justUpdatedBuffer = false;
self._safeAppendToBuffer = false;
}
} else {
throw new Error("cachedBefore implies either of publishedBefore or bufferedBefore is true.");
@@ -467,7 +471,9 @@ _.extend(OplogObserveDriver.prototype, {
initialCursor.forEach(function (initialDoc) {
self._addMatching(initialDoc);
});
self._justUpdatedBuffer = true;
self._safeAppendToBuffer = initialCursor.count() < self._limit * 2;
if (self._stopped)
throw new Error("oplog stopped quite early");
// Allow observeChanges calls to return. (After this, it's possible for
@@ -634,7 +640,8 @@ _.extend(OplogObserveDriver.prototype, {
delete doc._id;
self._addBuffered(id, doc);
});
self._justUpdatedBuffer = true;
self._safeAppendToBuffer = newBuffer.size() < self._limit;
},
// This stop function is invoked from the onStop of the ObserveMultiplexer, so