From aae9fc23db74663fc2a7411888a50b21da12db61 Mon Sep 17 00:00:00 2001 From: David Glasser Date: Wed, 9 Oct 2013 18:13:11 -0700 Subject: [PATCH] replace sequencer writes with reads. --- packages/mongo-livedata/mongo_driver.js | 131 +++++++++++++----------- packages/mongo-livedata/oplog.js | 4 +- 2 files changed, 75 insertions(+), 60 deletions(-) diff --git a/packages/mongo-livedata/mongo_driver.js b/packages/mongo-livedata/mongo_driver.js index 00c2c96e89..53f58f8a0e 100644 --- a/packages/mongo-livedata/mongo_driver.js +++ b/packages/mongo-livedata/mongo_driver.js @@ -219,13 +219,6 @@ MongoConnection.prototype._maybeBeginWrite = function () { var OPLOG_COLLECTION = 'oplog.rs'; -var SEQUENCE_COLLECTION = 'meteor_livedata_Sequencer'; -// XXX This is problematic if our RNG isn't seeded well enough. -var myServerId = Random.id(); -var nextSequenceId = 1; -// XXX doc -var pendingSequences = []; - // Like Perl's quotemeta: quotes all regexp metacharacters. See // https://github.com/substack/quotemeta/blob/master/index.js // XXX this is duplicated with accounts_server.js @@ -233,34 +226,23 @@ var quotemeta = function (str) { return String(str).replace(/(\W)/g, '\\$1'); }; -// Calls `callback` once the oplog has been processed up to a point that is -// roughly "now". Specifically, it does a dummy write which is then detected -// by the connection's oplog tailer. -// XXX This could be a read instead of a write, getting the last `ts` -// in oplog? -MongoConnection.prototype._callWhenOplogProcessed = function (callback) { - var self = this; - - var sequenceId = nextSequenceId++; - pendingSequences.push({sequenceId: sequenceId, - callback: callback}); - - // Use direct write to Node Mongo driver so we don't end up with recursive - // fence stuff. Need to disable 'safe' because we aren't providing a callback. - var writeCollection = self._getCollection(SEQUENCE_COLLECTION); - writeCollection.update({_id: myServerId}, {$set: {sequence: sequenceId}}, - {upsert: true, safe: false}); -}; - - MongoConnection.prototype._startOplogTailing = function (oplogUrl, dbName) { var self = this; var stopped = false; + var oplogConnection = null; var tailHandle = null; var readyFuture = new Future(); var nextId = 0; var callbacksByCollection = {}; + var lastProcessedTS = null; + var baseOplogSelector = { + ns: new RegExp('^' + quotemeta(dbName) + '\\.'), + // XXX also handle drop collection, etc + op: {$in: ['i', 'u', 'd']} + }; + // XXX doc + var pendingSequencers = []; self._oplogHandle = { stop: function () { @@ -290,47 +272,75 @@ MongoConnection.prototype._startOplogTailing = function (oplogUrl, dbName) { delete callbacksByCollection[collectionName][callbackId]; } }; + }, + + // Calls `callback` once the oplog has been processed up to a point that is + // roughly "now": specifically, once we've processed all ops that are + // currently visible. + // XXX become convinced that this is actually safe even if oplogConnection + // is some kind of pool + callWhenProcessedLatest: function (callback) { + if (stopped) + throw new Error("Called callWhenProcessedLatest on stopped handle!"); + + // Calling onOplogEntry requries us to wait for the oplog connection to be + // ready. + readyFuture.wait(); + + // Except for during startup, we DON'T block. + Fiber(function () { + // We need to make the selector at least as restrictive as the actual + // tailing selector (ie, we need to specify the DB name) or else we + // might find a TS that won't show up in the actual tail stream. + var lastEntry = oplogConnection.findOne( + OPLOG_COLLECTION, baseOplogSelector, {sort: {$natural: -1}}); + if (!lastEntry) { + // Really, nothing in the oplog? Well, we've processed everything. + callback(); + return; + } + var ts = lastEntry.ts; + if (!ts) + throw Error("oplog entry without ts: " + EJSON.stringify(lastEntry)); + + if (lastProcessedTS && ts.lessThanOrEqual(lastProcessedTS)) { + // We've already caught up to here. + callback(); + return; + } + + if (!_.isEmpty(pendingSequencers) + && _.last(pendingSequencers).ts.greaterThan(ts)) { + throw Error("found misordered oplog"); + } + + pendingSequencers.push({ts: ts, + callback: callback}); + }).run(); } }; // Actually setting up the connection and tail blocks, so we do it "later". Meteor.defer(function () { - var oplogConnection = new MongoConnection(oplogUrl); + oplogConnection = new MongoConnection(oplogUrl); // Find the last oplog entry. Blocks until the connection is ready. var lastOplogEntry = oplogConnection.findOne( OPLOG_COLLECTION, {}, {sort: {$natural: -1}}); - var oplogSelector = { - ns: new RegExp('^' + quotemeta(dbName) + '\\.'), - // XXX also handle drop collection, etc - op: {$in: ['i', 'u', 'd']} - }; - if (lastOplogEntry) + var oplogSelector = _.clone(baseOplogSelector); + if (lastOplogEntry) { + // Start after the last entry that currently exists. oplogSelector.ts = {$gt: lastOplogEntry.ts}; + // If there are any calls to callWhenProcessedLatest before any other + // oplog entries show up, allow callWhenProcessedLatest to call its + // callback immediately. + lastProcessedTS = lastOplogEntry.ts; + } var cursorDescription = new CursorDescription( OPLOG_COLLECTION, oplogSelector, {tailable: true}); - var processSequence = function (doc) { - if (doc.op !== 'i' && doc.op !== 'u') - return; - var serverId = (doc.op === 'i' ? doc.o._id : doc.o2._id); - if (serverId !== myServerId) - return; - var sequenceId = - (doc.op === 'i' ? doc.o.sequence : - (doc.o.$set && doc.o.$set.sequence)); - if (typeof sequenceId !== 'number') - return; - // Process all sequence points up to this point. - while (!_.isEmpty(pendingSequences) - && pendingSequences[0].sequenceId <= sequenceId) { - var sequence = pendingSequences.shift(); - sequence.callback(); - } - }; - tailHandle = oplogConnection.tail(cursorDescription, function (doc) { if (!(doc.ns && doc.ns.length > dbName.length + 1 && doc.ns.substr(0, dbName.length + 1) === (dbName + '.'))) @@ -338,14 +348,19 @@ MongoConnection.prototype._startOplogTailing = function (oplogUrl, dbName) { var collectionName = doc.ns.substr(dbName.length + 1); - if (collectionName === SEQUENCE_COLLECTION) { - processSequence(doc); - return; - } - _.each(callbacksByCollection[collectionName], function (callback) { callback(EJSON.clone(doc)); }); + + // Now that we've processed this operation, process pending sequencers. + if (!doc.ts) + throw Error("oplog entry without ts: " + EJSON.stringify(doc)); + lastProcessedTS = doc.ts; + while (!_.isEmpty(pendingSequencers) + && pendingSequencers[0].ts.lessThanOrEqual(lastProcessedTS)) { + var sequencer = pendingSequencers.shift(); + sequencer.callback(); + } }); readyFuture.return(); }); diff --git a/packages/mongo-livedata/oplog.js b/packages/mongo-livedata/oplog.js index a182a9f442..5786b0fd6c 100644 --- a/packages/mongo-livedata/oplog.js +++ b/packages/mongo-livedata/oplog.js @@ -172,7 +172,7 @@ MongoConnection.prototype._observeChangesWithOplog = function ( } var write = fence.beginWrite(); // XXX this also has to wait for steady!!! - self._callWhenOplogProcessed(function () { + self._oplogHandle.callWhenProcessedLatest(function () { write.committed(); }); complete(); @@ -185,7 +185,7 @@ MongoConnection.prototype._observeChangesWithOplog = function ( }); var catchUpFuture = new Future; - self._callWhenOplogProcessed(catchUpFuture.resolver()); + self._oplogHandle.callWhenProcessedLatest(catchUpFuture.resolver()); catchUpFuture.wait(); if (phase !== PHASE.INITIALIZING)