diff --git a/packages/mongo-livedata/mongo_driver.js b/packages/mongo-livedata/mongo_driver.js index b0e8c72dc8..193c7ed8ea 100644 --- a/packages/mongo-livedata/mongo_driver.js +++ b/packages/mongo-livedata/mongo_driver.js @@ -370,6 +370,10 @@ _.each(['forEach', 'map', 'rewind', 'fetch', 'count'], function (method) { Cursor.prototype[method] = function () { var self = this; + // You can only observe a tailable cursor. + if (self._cursorDescription.options.tailable) + throw new Error("Cannot call " + method + " on a tailable cursor"); + if (!self._synchronousCursor) self._synchronousCursor = self._mongo._createSynchronousCursor( self._cursorDescription, true); @@ -420,27 +424,42 @@ _Mongo.prototype._createSynchronousCursor = function (cursorDescription, var collection = self._getCollection(cursorDescription.collectionName); var options = cursorDescription.options; + var mongoOptions = { + sort: options.sort, + limit: options.limit, + skip: options.skip + }; + + // Do we want a tailable cursor (which only works on capped collections)? + if (options.tailable) { + // We want a tailable cursor... + mongoOptions.tailable = true; + // ... and for the server to wait a bit if any getMore has no data (rather + // than making us put the relevant sleeps in the client)... + mongoOptions.awaitdata = true; + // ... and to keep querying the server indefinitely rather than just 5 times + // if there's no more data. + mongoOptions.numberOfRetries = -1; + } + var dbCursor = collection.find( replaceTypes(cursorDescription.selector, replaceMeteorAtomWithMongo), - options.fields, { - sort: options.sort, - limit: options.limit, - skip: options.skip - }); + options.fields, mongoOptions); - return new SynchronousCursor(dbCursor, - useTransform && - cursorDescription.options && - cursorDescription.options.transform); + return new SynchronousCursor(dbCursor, cursorDescription); }; -var SynchronousCursor = function (dbCursor, transform) { +var SynchronousCursor = function (dbCursor, cursorDescription) { var self = this; - if (transform) - self._transform = Deps._makeNonreactive(transform); - else - self._transform = transform; self._dbCursor = dbCursor; + self._cursorDescription = cursorDescription; + + if (cursorDescription.options.transform) + self._transform = Deps._makeNonreactive( + cursorDescription.options.transform); + else + self._transform = null; + // Need to specify that the callback is the first argument to nextObject, // since otherwise when we try to call it with no args the driver will // interpret "undefined" first arg as an options hash and crash. @@ -458,12 +477,15 @@ _.extend(SynchronousCursor.prototype, { if (!doc || !doc._id) return null; doc = replaceTypes(doc, replaceMongoAtomWithMeteor); - // Did Mongo give us duplicate documents in the same cursor? If so, ignore - // this one. (Do this before the transform, since transform might return - // some unrelated value.) - var strId = Meteor.idStringify(doc._id); - if (self._visitedIds[strId]) continue; - self._visitedIds[strId] = true; + if (!self._cursorDescription.options.tailable) { + // Did Mongo give us duplicate documents in the same cursor? If so, + // ignore this one. (Do this before the transform, since transform might + // return some unrelated value.) We don't do this for tailable cursors, + // because we want to maintain O(1) memory usage. + var strId = Meteor.idStringify(doc._id); + if (self._visitedIds[strId]) continue; + self._visitedIds[strId] = true; + } if (self._transform) doc = self._transform(doc); @@ -508,6 +530,13 @@ _.extend(SynchronousCursor.prototype, { self._visitedIds = {}; }, + // Mostly usable for tailable cursors. + count: function () { + var self = this; + + self._dbCursor.close(); + }, + fetch: function () { var self = this; return self.map(_.identity); @@ -554,6 +583,11 @@ ObserveHandle.prototype.stop = function () { _Mongo.prototype._observeChanges = function ( cursorDescription, ordered, callbacks) { var self = this; + + if (cursorDescription.options.tailable) { + return self._observeChangesTailable(cursorDescription, ordered, callbacks); + } + var observeKey = JSON.stringify( _.extend({ordered: ordered}, cursorDescription)); @@ -882,6 +916,73 @@ _.extend(LiveResultsSet.prototype, { } }); +_Mongo.prototype._observeChangesTailable = function ( + cursorDescription, ordered, callbacks) { + var self = this; + + // Tailable cursors only ever call added/addedBefore callbacks, so it's an + // error if you didn't provide them. + if ((ordered && !callbacks.addedBefore) || + (!ordered && !callbacks.added)) { + throw new Error("Can't observe an " + (ordered ? "ordered" : "unordered") + + " tailable cursor without a " + + (ordered ? "addedBefore" : "added") + " callback"); + } + var cursor = self._createSynchronousCursor(cursorDescription); + + var stopped = false; + var lastTS = undefined; + Meteor.defer(function () { + while (true) { + if (stopped) + return; + try { + var doc = cursor._nextObject(); + } catch (err) { + // There's no good way to figure out if this was actually an error from + // Mongo. Ah well. But either way, we need to retry the cursor (unless + // the failure was because the observe got stopped). + doc = null; + } + if (stopped) + return; + if (doc) { + var id = doc._id; + delete doc._id; + // If a tailable cursor contains a "ts" field, use it to recreate the + // cursor on error, and don't publish the field. ("ts" is a standard + // that Mongo uses internally for the oplog, and there's a special flag + // that lets you do binary search on it instead of needing to use an + // index.) + lastTS = doc.ts; + delete doc.ts; + if (ordered) { + callbacks.addedBefore(id, doc, null); + } else { + callbacks.added(id, doc); + } + } else { + var newSelector = _.clone(cursorDescription.selector); + if (lastTS) { + newSelector.ts = {$gt: lastTS}; + } + // XXX maybe set replay flag + cursor = self._createSynchronousCursor(new CursorDescription( + cursorDescription.collectionName, + newSelector, + cursorDescription.options)); + } + } + }); + + return { + stop: function () { + stopped = true; + cursor.close(); + } + }; +}; + _.extend(Meteor, { _Mongo: _Mongo }); diff --git a/tools/deploy-galaxy.js b/tools/deploy-galaxy.js index fa70d9c064..9e95138450 100644 --- a/tools/deploy-galaxy.js +++ b/tools/deploy-galaxy.js @@ -199,8 +199,8 @@ exports.logs = function (options) { var ok = logReader.registerStore('logs', { update: function (msg) { - // Ignore all messages but 'added' - if (msg.msg !== 'added') + // Ignore all messages but 'changed' + if (msg.msg !== 'changed') return; var obj = msg.fields.obj; obj = Log.parse(obj); @@ -211,7 +211,8 @@ exports.logs = function (options) { if (!ok) throw new Error("Couldn't connect to logs mongodb."); - prettySub(logReader, "logsForApp", [options.app], { + prettySub(logReader, "logsForApp", [options.app, + {streaming: options.streaming}], { "no-such-app": "No such app: " + options.app }); diff --git a/tools/meteor.js b/tools/meteor.js index 1de3ebf072..f80a6e89d7 100644 --- a/tools/meteor.js +++ b/tools/meteor.js @@ -833,10 +833,9 @@ Fiber(function () { name: "logs", help: "Show logs for specified site", func: function (argv) { - if (_.isString(argv.f)) - argv._.unshift(argv.f); + argv = require('optimist').boolean('f').argv; - if (argv.help || argv._.length < 1 || argv._.length > 2) { + if (argv.help || argv._.length !== 2) { process.stdout.write( "Usage: meteor logs [-f] \n" + "\n" + @@ -853,11 +852,11 @@ Fiber(function () { var deployGalaxy = require('./deploy-galaxy.js'); deployGalaxy.logs({ context: context, - app: argv._[0], - streaming: argv.f + app: argv._[1], + streaming: !!argv.f }); } else { - deploy.logs(argv._[0]); + deploy.logs(argv._[1]); } } });