Block on first call to observeChanges if there is an oplog handle that is still

initializing.
This commit is contained in:
David Glasser
2013-10-07 18:09:53 -07:00
parent 11bf813f90
commit a8201cc17e

View File

@@ -154,10 +154,7 @@ MongoConnection = function (url, connectionOptions) {
// settle a little before thinking too hard about this
if (process.env.XXX_OPLOG_URL && !connectionOptions.isOplog) {
var dbName = Npm.require('url').parse(url).pathname.substr(1);
// Defer this, because it blocks. If we start observing cursors before the
// oplog handle is ready, they just don't get to use the oplog.
Meteor.defer(_.bind(self._startOplogTailing,
self, process.env.XXX_OPLOG_URL, dbName));
self._startOplogTailing(process.env.XXX_OPLOG_URL, dbName);
}
};
@@ -259,75 +256,99 @@ MongoConnection.prototype._callWhenOplogProcessed = function (callback) {
MongoConnection.prototype._startOplogTailing = function (oplogUrl, dbName) {
var self = this;
var oplogConnection = new MongoConnection(oplogUrl, {isOplog: true});
// Find the last oplog entry. Blocks until the connection is ready.
var lastOplogEntry = oplogConnection.findOne(
OPLOG_COLLECTION, {}, {sort: {$natural: -1}});
var oplogSelector = {
ns: new RegExp('^' + quotemeta(dbName) + '\\.'),
// XXX also handle drop collection, etc
op: {$in: ['i', 'u', 'd']}
};
if (lastOplogEntry)
oplogSelector.ts = {$gt: lastOplogEntry.ts};
var cursorDescription = new CursorDescription(
OPLOG_COLLECTION, oplogSelector, {tailable: true});
var stopped = false;
var tailHandle = null;
var readyFuture = new Future();
var nextId = 0;
var callbacksByCollection = {};
var processSequence = function (doc) {
if (doc.op !== 'i' && doc.op !== 'u')
return;
var serverId = (doc.op === 'i' ? doc.o._id : doc.o2._id);
if (serverId !== myServerId)
return;
var sequenceId =
(doc.op === 'i' ? doc.o.sequence : (doc.o.$set && doc.o.$set.sequence));
if (typeof sequenceId !== 'number')
return;
// Process all sequence points up to this point.
while (!_.isEmpty(pendingSequences)
&& pendingSequences[0].sequenceId <= sequenceId) {
var sequence = pendingSequences.shift();
sequence.callback();
self._oplogHandle = {
stop: function () {
if (stopped)
return;
stopped = true;
if (tailHandle)
tailHandle.stop();
},
onOplogEntry: function (collectionName, callback) {
if (stopped)
throw new Error("Called onOplogEntry on stopped handle!");
// Calling onOplogEntry requires us to wait for the tailing to be ready.
readyFuture.wait();
callback = Meteor.bindEnvironment(callback, function (err) {
Meteor._debug("Error in oplog callback", err.stack);
});
if (!_.has(callbacksByCollection, collectionName))
callbacksByCollection[collectionName] = {};
var callbackId = nextId++;
callbacksByCollection[collectionName][callbackId] = callback;
return {
stop: function () {
delete callbacksByCollection[collectionName][callbackId];
}
};
}
};
self._oplogHandle = oplogConnection.tail(cursorDescription, function (doc) {
if (!doc.ns && doc.ns.length > dbName.length + 1 &&
doc.ns.substr(0, dbName.length + 1) === (dbName + '.'))
throw new Error("Unexpected ns");
// Actually setting up the connection and tail blocks, so we do it "later".
Meteor.defer(function () {
var oplogConnection = new MongoConnection(oplogUrl, {isOplog: true});
var collectionName = doc.ns.substr(dbName.length + 1);
// Find the last oplog entry. Blocks until the connection is ready.
var lastOplogEntry = oplogConnection.findOne(
OPLOG_COLLECTION, {}, {sort: {$natural: -1}});
if (collectionName === SEQUENCE_COLLECTION) {
processSequence(doc);
return;
}
var oplogSelector = {
ns: new RegExp('^' + quotemeta(dbName) + '\\.'),
// XXX also handle drop collection, etc
op: {$in: ['i', 'u', 'd']}
};
if (lastOplogEntry)
oplogSelector.ts = {$gt: lastOplogEntry.ts};
_.each(callbacksByCollection[collectionName], function (callback) {
callback(EJSON.clone(doc));
});
});
var cursorDescription = new CursorDescription(
OPLOG_COLLECTION, oplogSelector, {tailable: true});
var nextId = 0;
self._oplogHandle.onOplogEntry = function (collectionName, callback) {
callback = Meteor.bindEnvironment(callback, function (err) {
Meteor._debug("Error in oplog callback", err.stack);
});
if (!_.has(callbacksByCollection, collectionName))
callbacksByCollection[collectionName] = {};
var callbackId = nextId++;
callbacksByCollection[collectionName][callbackId] = callback;
return {
stop: function () {
delete callbacksByCollection[collectionName][callbackId];
var processSequence = function (doc) {
if (doc.op !== 'i' && doc.op !== 'u')
return;
var serverId = (doc.op === 'i' ? doc.o._id : doc.o2._id);
if (serverId !== myServerId)
return;
var sequenceId =
(doc.op === 'i' ? doc.o.sequence :
(doc.o.$set && doc.o.$set.sequence));
if (typeof sequenceId !== 'number')
return;
// Process all sequence points up to this point.
while (!_.isEmpty(pendingSequences)
&& pendingSequences[0].sequenceId <= sequenceId) {
var sequence = pendingSequences.shift();
sequence.callback();
}
};
};
tailHandle = oplogConnection.tail(cursorDescription, function (doc) {
if (!doc.ns && doc.ns.length > dbName.length + 1 &&
doc.ns.substr(0, dbName.length + 1) === (dbName + '.'))
throw new Error("Unexpected ns");
var collectionName = doc.ns.substr(dbName.length + 1);
if (collectionName === SEQUENCE_COLLECTION) {
processSequence(doc);
return;
}
_.each(callbacksByCollection[collectionName], function (callback) {
callback(EJSON.clone(doc));
});
});
readyFuture.return();
});
};