From a832b11211cf7bc6c2148ce29e14b0c8248d2be9 Mon Sep 17 00:00:00 2001 From: David Glasser Date: Wed, 14 Aug 2013 19:49:18 -0700 Subject: [PATCH] Factor tailing code out of observe-changes code. --- packages/mongo-livedata/mongo_driver.js | 111 +++++++++++++----------- 1 file changed, 61 insertions(+), 50 deletions(-) diff --git a/packages/mongo-livedata/mongo_driver.js b/packages/mongo-livedata/mongo_driver.js index 788afe70d7..841057ef88 100644 --- a/packages/mongo-livedata/mongo_driver.js +++ b/packages/mongo-livedata/mongo_driver.js @@ -804,6 +804,58 @@ _.extend(SynchronousCursor.prototype, { } }); +MongoConnection.prototype.tail = function (cursorDescription, docCallback) { + var self = this; + if (!cursorDescription.options.tailable) + throw new Error("Can only tail a tailable cursor"); + + var cursor = self._createSynchronousCursor(cursorDescription); + + var stopped = false; + var lastTS = undefined; + Meteor.defer(function () { + while (true) { + if (stopped) + return; + try { + var doc = cursor._nextObject(); + } catch (err) { + // There's no good way to figure out if this was actually an error + // from Mongo. Ah well. But either way, we need to retry the cursor + // (unless the failure was because the observe got stopped). + doc = null; + } + if (stopped) + return; + if (doc) { + // If a tailable cursor contains a "ts" field, use it to recreate the + // cursor on error. ("ts" is a standard that Mongo uses internally for + // the oplog, and there's a special flag that lets you do binary search + // on it instead of needing to use an index.) + lastTS = doc.ts; + docCallback(doc); + } else { + var newSelector = _.clone(cursorDescription.selector); + if (lastTS) { + newSelector.ts = {$gt: lastTS}; + } + // XXX maybe set replay flag + cursor = self._createSynchronousCursor(new CursorDescription( + cursorDescription.collectionName, + newSelector, + cursorDescription.options)); + } + } + }); + + return { + stop: function () { + stopped = true; + cursor.close(); + } + }; +}; + var nextObserveHandleId = 1; var ObserveHandle = function (liveResultsSet, callbacks) { var self = this; @@ -1209,59 +1261,18 @@ MongoConnection.prototype._observeChangesTailable = function ( + " tailable cursor without a " + (ordered ? "addedBefore" : "added") + " callback"); } - var cursor = self._createSynchronousCursor(cursorDescription); - var stopped = false; - var lastTS = undefined; - Meteor.defer(function () { - while (true) { - if (stopped) - return; - try { - var doc = cursor._nextObject(); - } catch (err) { - // There's no good way to figure out if this was actually an error from - // Mongo. Ah well. But either way, we need to retry the cursor (unless - // the failure was because the observe got stopped). - doc = null; - } - if (stopped) - return; - if (doc) { - var id = doc._id; - delete doc._id; - // If a tailable cursor contains a "ts" field, use it to recreate the - // cursor on error, and don't publish the field. ("ts" is a standard - // that Mongo uses internally for the oplog, and there's a special flag - // that lets you do binary search on it instead of needing to use an - // index.) - lastTS = doc.ts; - delete doc.ts; - if (ordered) { - callbacks.addedBefore(id, doc, null); - } else { - callbacks.added(id, doc); - } - } else { - var newSelector = _.clone(cursorDescription.selector); - if (lastTS) { - newSelector.ts = {$gt: lastTS}; - } - // XXX maybe set replay flag - cursor = self._createSynchronousCursor(new CursorDescription( - cursorDescription.collectionName, - newSelector, - cursorDescription.options)); - } + return self.tail(cursorDescription, function (doc) { + var id = doc._id; + delete doc._id; + // The ts is an implementation detail. Hide it. + delete doc.ts; + if (ordered) { + callbacks.addedBefore(id, doc, null); + } else { + callbacks.added(id, doc); } }); - - return { - stop: function () { - stopped = true; - cursor.close(); - } - }; }; // Does our oplog tailing code support this cursor? For now, we are being very