Initial implementation of tailable cursors.

They really only work well (ie, restart properly after reconnect) if they have
an increasing BSON Timestamp() field named ts.  Sorry, that's just how Mongo
works.

Use them for "meteor logs" in Galaxy mode.
This commit is contained in:
David Glasser
2013-05-17 11:20:28 -07:00
parent f260ce2887
commit 61327d97ce
3 changed files with 130 additions and 29 deletions

View File

@@ -370,6 +370,10 @@ _.each(['forEach', 'map', 'rewind', 'fetch', 'count'], function (method) {
Cursor.prototype[method] = function () {
var self = this;
// You can only observe a tailable cursor.
if (self._cursorDescription.options.tailable)
throw new Error("Cannot call " + method + " on a tailable cursor");
if (!self._synchronousCursor)
self._synchronousCursor = self._mongo._createSynchronousCursor(
self._cursorDescription, true);
@@ -420,27 +424,42 @@ _Mongo.prototype._createSynchronousCursor = function (cursorDescription,
var collection = self._getCollection(cursorDescription.collectionName);
var options = cursorDescription.options;
var mongoOptions = {
sort: options.sort,
limit: options.limit,
skip: options.skip
};
// Do we want a tailable cursor (which only works on capped collections)?
if (options.tailable) {
// We want a tailable cursor...
mongoOptions.tailable = true;
// ... and for the server to wait a bit if any getMore has no data (rather
// than making us put the relevant sleeps in the client)...
mongoOptions.awaitdata = true;
// ... and to keep querying the server indefinitely rather than just 5 times
// if there's no more data.
mongoOptions.numberOfRetries = -1;
}
var dbCursor = collection.find(
replaceTypes(cursorDescription.selector, replaceMeteorAtomWithMongo),
options.fields, {
sort: options.sort,
limit: options.limit,
skip: options.skip
});
options.fields, mongoOptions);
return new SynchronousCursor(dbCursor,
useTransform &&
cursorDescription.options &&
cursorDescription.options.transform);
return new SynchronousCursor(dbCursor, cursorDescription);
};
var SynchronousCursor = function (dbCursor, transform) {
var SynchronousCursor = function (dbCursor, cursorDescription) {
var self = this;
if (transform)
self._transform = Deps._makeNonreactive(transform);
else
self._transform = transform;
self._dbCursor = dbCursor;
self._cursorDescription = cursorDescription;
if (cursorDescription.options.transform)
self._transform = Deps._makeNonreactive(
cursorDescription.options.transform);
else
self._transform = null;
// Need to specify that the callback is the first argument to nextObject,
// since otherwise when we try to call it with no args the driver will
// interpret "undefined" first arg as an options hash and crash.
@@ -458,12 +477,15 @@ _.extend(SynchronousCursor.prototype, {
if (!doc || !doc._id) return null;
doc = replaceTypes(doc, replaceMongoAtomWithMeteor);
// Did Mongo give us duplicate documents in the same cursor? If so, ignore
// this one. (Do this before the transform, since transform might return
// some unrelated value.)
var strId = Meteor.idStringify(doc._id);
if (self._visitedIds[strId]) continue;
self._visitedIds[strId] = true;
if (!self._cursorDescription.options.tailable) {
// Did Mongo give us duplicate documents in the same cursor? If so,
// ignore this one. (Do this before the transform, since transform might
// return some unrelated value.) We don't do this for tailable cursors,
// because we want to maintain O(1) memory usage.
var strId = Meteor.idStringify(doc._id);
if (self._visitedIds[strId]) continue;
self._visitedIds[strId] = true;
}
if (self._transform)
doc = self._transform(doc);
@@ -508,6 +530,13 @@ _.extend(SynchronousCursor.prototype, {
self._visitedIds = {};
},
// Mostly usable for tailable cursors.
count: function () {
var self = this;
self._dbCursor.close();
},
fetch: function () {
var self = this;
return self.map(_.identity);
@@ -554,6 +583,11 @@ ObserveHandle.prototype.stop = function () {
_Mongo.prototype._observeChanges = function (
cursorDescription, ordered, callbacks) {
var self = this;
if (cursorDescription.options.tailable) {
return self._observeChangesTailable(cursorDescription, ordered, callbacks);
}
var observeKey = JSON.stringify(
_.extend({ordered: ordered}, cursorDescription));
@@ -882,6 +916,73 @@ _.extend(LiveResultsSet.prototype, {
}
});
_Mongo.prototype._observeChangesTailable = function (
cursorDescription, ordered, callbacks) {
var self = this;
// Tailable cursors only ever call added/addedBefore callbacks, so it's an
// error if you didn't provide them.
if ((ordered && !callbacks.addedBefore) ||
(!ordered && !callbacks.added)) {
throw new Error("Can't observe an " + (ordered ? "ordered" : "unordered")
+ " tailable cursor without a "
+ (ordered ? "addedBefore" : "added") + " callback");
}
var cursor = self._createSynchronousCursor(cursorDescription);
var stopped = false;
var lastTS = undefined;
Meteor.defer(function () {
while (true) {
if (stopped)
return;
try {
var doc = cursor._nextObject();
} catch (err) {
// There's no good way to figure out if this was actually an error from
// Mongo. Ah well. But either way, we need to retry the cursor (unless
// the failure was because the observe got stopped).
doc = null;
}
if (stopped)
return;
if (doc) {
var id = doc._id;
delete doc._id;
// If a tailable cursor contains a "ts" field, use it to recreate the
// cursor on error, and don't publish the field. ("ts" is a standard
// that Mongo uses internally for the oplog, and there's a special flag
// that lets you do binary search on it instead of needing to use an
// index.)
lastTS = doc.ts;
delete doc.ts;
if (ordered) {
callbacks.addedBefore(id, doc, null);
} else {
callbacks.added(id, doc);
}
} else {
var newSelector = _.clone(cursorDescription.selector);
if (lastTS) {
newSelector.ts = {$gt: lastTS};
}
// XXX maybe set replay flag
cursor = self._createSynchronousCursor(new CursorDescription(
cursorDescription.collectionName,
newSelector,
cursorDescription.options));
}
}
});
return {
stop: function () {
stopped = true;
cursor.close();
}
};
};
_.extend(Meteor, {
_Mongo: _Mongo
});

View File

@@ -199,8 +199,8 @@ exports.logs = function (options) {
var ok = logReader.registerStore('logs', {
update: function (msg) {
// Ignore all messages but 'added'
if (msg.msg !== 'added')
// Ignore all messages but 'changed'
if (msg.msg !== 'changed')
return;
var obj = msg.fields.obj;
obj = Log.parse(obj);
@@ -211,7 +211,8 @@ exports.logs = function (options) {
if (!ok)
throw new Error("Couldn't connect to logs mongodb.");
prettySub(logReader, "logsForApp", [options.app], {
prettySub(logReader, "logsForApp", [options.app,
{streaming: options.streaming}], {
"no-such-app": "No such app: " + options.app
});

View File

@@ -833,10 +833,9 @@ Fiber(function () {
name: "logs",
help: "Show logs for specified site",
func: function (argv) {
if (_.isString(argv.f))
argv._.unshift(argv.f);
argv = require('optimist').boolean('f').argv;
if (argv.help || argv._.length < 1 || argv._.length > 2) {
if (argv.help || argv._.length !== 2) {
process.stdout.write(
"Usage: meteor logs [-f] <site>\n" +
"\n" +
@@ -853,11 +852,11 @@ Fiber(function () {
var deployGalaxy = require('./deploy-galaxy.js');
deployGalaxy.logs({
context: context,
app: argv._[0],
streaming: argv.f
app: argv._[1],
streaming: !!argv.f
});
} else {
deploy.logs(argv._[0]);
deploy.logs(argv._[1]);
}
}
});