Factor tailing code out of observe-changes code.

This commit is contained in:
David Glasser
2013-08-14 19:49:18 -07:00
parent 878dfe9a1f
commit a832b11211

View File

@@ -804,6 +804,58 @@ _.extend(SynchronousCursor.prototype, {
}
});
MongoConnection.prototype.tail = function (cursorDescription, docCallback) {
var self = this;
if (!cursorDescription.options.tailable)
throw new Error("Can only tail a tailable cursor");
var cursor = self._createSynchronousCursor(cursorDescription);
var stopped = false;
var lastTS = undefined;
Meteor.defer(function () {
while (true) {
if (stopped)
return;
try {
var doc = cursor._nextObject();
} catch (err) {
// There's no good way to figure out if this was actually an error
// from Mongo. Ah well. But either way, we need to retry the cursor
// (unless the failure was because the observe got stopped).
doc = null;
}
if (stopped)
return;
if (doc) {
// If a tailable cursor contains a "ts" field, use it to recreate the
// cursor on error. ("ts" is a standard that Mongo uses internally for
// the oplog, and there's a special flag that lets you do binary search
// on it instead of needing to use an index.)
lastTS = doc.ts;
docCallback(doc);
} else {
var newSelector = _.clone(cursorDescription.selector);
if (lastTS) {
newSelector.ts = {$gt: lastTS};
}
// XXX maybe set replay flag
cursor = self._createSynchronousCursor(new CursorDescription(
cursorDescription.collectionName,
newSelector,
cursorDescription.options));
}
}
});
return {
stop: function () {
stopped = true;
cursor.close();
}
};
};
var nextObserveHandleId = 1;
var ObserveHandle = function (liveResultsSet, callbacks) {
var self = this;
@@ -1209,59 +1261,18 @@ MongoConnection.prototype._observeChangesTailable = function (
+ " tailable cursor without a "
+ (ordered ? "addedBefore" : "added") + " callback");
}
var cursor = self._createSynchronousCursor(cursorDescription);
var stopped = false;
var lastTS = undefined;
Meteor.defer(function () {
while (true) {
if (stopped)
return;
try {
var doc = cursor._nextObject();
} catch (err) {
// There's no good way to figure out if this was actually an error from
// Mongo. Ah well. But either way, we need to retry the cursor (unless
// the failure was because the observe got stopped).
doc = null;
}
if (stopped)
return;
if (doc) {
var id = doc._id;
delete doc._id;
// If a tailable cursor contains a "ts" field, use it to recreate the
// cursor on error, and don't publish the field. ("ts" is a standard
// that Mongo uses internally for the oplog, and there's a special flag
// that lets you do binary search on it instead of needing to use an
// index.)
lastTS = doc.ts;
delete doc.ts;
if (ordered) {
callbacks.addedBefore(id, doc, null);
} else {
callbacks.added(id, doc);
}
} else {
var newSelector = _.clone(cursorDescription.selector);
if (lastTS) {
newSelector.ts = {$gt: lastTS};
}
// XXX maybe set replay flag
cursor = self._createSynchronousCursor(new CursorDescription(
cursorDescription.collectionName,
newSelector,
cursorDescription.options));
}
return self.tail(cursorDescription, function (doc) {
var id = doc._id;
delete doc._id;
// The ts is an implementation detail. Hide it.
delete doc.ts;
if (ordered) {
callbacks.addedBefore(id, doc, null);
} else {
callbacks.added(id, doc);
}
});
return {
stop: function () {
stopped = true;
cursor.close();
}
};
};
// Does our oplog tailing code support this cursor? For now, we are being very