Use "selector matches specific IDs" code in oplog

This commit is contained in:
David Glasser
2013-11-26 12:01:37 -08:00
parent 58e1749104
commit 8f664a1236
3 changed files with 90 additions and 59 deletions

View File

@@ -2,13 +2,16 @@
// The "invalidation crossbar" is a specific instance used by the DDP server to
// implement write fence notifications.
DDPServer._Crossbar = function () {
DDPServer._Crossbar = function (options) {
var self = this;
options = options || {};
self.next_id = 1;
self.nextId = 1;
// map from listener id to object. each object has keys 'trigger',
// 'callback'.
self.listeners = {};
self.factPackage = options.factPackage || "livedata";
self.factName = options.factName || null;
};
_.extend(DDPServer._Crossbar.prototype, {
@@ -29,14 +32,18 @@ _.extend(DDPServer._Crossbar.prototype, {
// yields.
listen: function (trigger, callback) {
var self = this;
var id = self.next_id++;
var id = self.nextId++;
self.listeners[id] = {trigger: EJSON.clone(trigger), callback: callback};
Package.facts && Package.facts.Facts.incrementServerFact(
"livedata", "crossbar-listeners", 1);
if (self.factName && Package.facts) {
Package.facts.Facts.incrementServerFact(
self.factPackage, self.factName, 1);
}
return {
stop: function () {
Package.facts && Package.facts.Facts.incrementServerFact(
"livedata", "crossbar-listeners", -1);
if (self.factName && Package.facts) {
Package.facts.Facts.incrementServerFact(
self.factPackage, self.factName, -1);
}
delete self.listeners[id];
}
};
@@ -54,6 +61,7 @@ _.extend(DDPServer._Crossbar.prototype, {
fire: function (notification, onComplete) {
var self = this;
var callbacks = [];
// XXX consider refactoring to "index" on "collection"
_.each(self.listeners, function (l) {
if (self._matches(notification, l.trigger))
callbacks.push(l.callback);
@@ -103,4 +111,6 @@ _.extend(DDPServer._Crossbar.prototype, {
}
});
DDPServer._InvalidationCrossbar = new DDPServer._Crossbar;
DDPServer._InvalidationCrossbar = new DDPServer._Crossbar({
factName: "invalidation-crossbar-listeners"
});

View File

@@ -250,8 +250,9 @@ MongoConnection.prototype._startOplogTailing = function (oplogUrl,
var stopped = false;
var tailHandle = null;
var readyFuture = new Future();
var nextId = 0;
var callbacksByCollection = {};
var crossbar = new DDPServer._Crossbar({
factPackage: "mongo-livedata", factName: "oplog-watchers"
});
var lastProcessedTS = null;
// Lazily calculate the basic selector. Don't call baseOplogSelector() at the
// top level of this function, because we don't want this function to block.
@@ -277,27 +278,28 @@ MongoConnection.prototype._startOplogTailing = function (oplogUrl,
// XXX should close connections too
},
onOplogEntry: function (collectionName, callback) {
onOplogEntry: function (trigger, callback) {
if (stopped)
throw new Error("Called onOplogEntry on stopped handle!");
// Calling onOplogEntry requires us to wait for the tailing to be ready.
readyFuture.wait();
callback = Meteor.bindEnvironment(callback, function (err) {
var originalCallback = callback;
callback = Meteor.bindEnvironment(function (notification, onComplete) {
// XXX can we avoid this clone by making oplog.js careful?
try {
originalCallback(EJSON.clone(notification));
} finally {
onComplete();
}
}, function (err) {
Meteor._debug("Error in oplog callback", err.stack);
});
if (!_.has(callbacksByCollection, collectionName))
callbacksByCollection[collectionName] = {};
var callbackId = nextId++;
Package.facts && Package.facts.Facts.incrementServerFact(
"mongo-livedata", "oplog-watchers", 1);
callbacksByCollection[collectionName][callbackId] = callback;
var listenHandle = crossbar.listen(trigger, callback);
return {
stop: function () {
delete callbacksByCollection[collectionName][callbackId];
Package.facts && Package.facts.Facts.incrementServerFact(
"mongo-livedata", "oplog-watchers", -1);
listenHandle.stop();
}
};
},
@@ -311,8 +313,8 @@ MongoConnection.prototype._startOplogTailing = function (oplogUrl,
if (stopped)
throw new Error("Called waitUntilCaughtUp on stopped handle!");
// Calling onOplogEntry requries us to wait for the oplog connection to be
// ready.
// Calling waitUntilCaughtUp requries us to wait for the oplog connection
// to be ready.
readyFuture.wait();
// We need to make the selector at least as restrictive as the actual
@@ -403,16 +405,21 @@ MongoConnection.prototype._startOplogTailing = function (oplogUrl,
doc.ns.substr(0, dbName.length + 1) === (dbName + '.')))
throw new Error("Unexpected ns");
var collectionName = doc.ns.substr(dbName.length + 1);
var trigger = {collection: doc.ns.substr(dbName.length + 1), op: doc};
// Is it a special command and the collection name is hidden somewhere in
// operator?
if (collectionName === "$cmd")
collectionName = doc.o.drop;
if (trigger.collection === "$cmd") {
trigger.collection = doc.o.drop;
trigger.dropCollection = true;
} else {
// All other ops have an id.
trigger.id = idForOp(doc);
}
_.each(callbacksByCollection[collectionName], function (callback) {
callback(EJSON.clone(doc));
});
var f = new Future;
crossbar.fire(trigger, f.resolver());
f.wait();
// Now that we've processed this operation, process pending sequencers.
if (!doc.ts)
@@ -1181,23 +1188,17 @@ MongoConnection.prototype._observeChanges = function (
// here, so that updates to different specific IDs don't cause us to poll.
// listenCallback is the same kind of (notification, complete) callback passed
// to InvalidationCrossbar.listen.
listenAll = function (cursorDescription, listenCallback) {
var listeners = [];
var listenOnTrigger = function (trigger) {
forEachTrigger(cursorDescription, function (trigger) {
// The "drop collection" event is used by the oplog crossbar, not the
// invalidation crossbar.
if (trigger.dropCollection)
return;
listeners.push(DDPServer._InvalidationCrossbar.listen(
trigger, listenCallback));
};
var key = {collection: cursorDescription.collectionName};
var specificIds = LocalCollection._idsMatchedBySelector(
cursorDescription.selector);
if (specificIds) {
_.each(specificIds, function (id) {
listenOnTrigger(_.extend({id: id}, key));
});
} else {
listenOnTrigger(key);
}
});
return {
stop: function () {
@@ -1208,6 +1209,20 @@ listenAll = function (cursorDescription, listenCallback) {
};
};
forEachTrigger = function (cursorDescription, triggerCallback) {
var key = {collection: cursorDescription.collectionName};
var specificIds = LocalCollection._idsMatchedBySelector(
cursorDescription.selector);
if (specificIds) {
_.each(specificIds, function (id) {
triggerCallback(_.extend({id: id}, key));
});
triggerCallback(_.extend({dropCollection: true}, key));
} else {
triggerCallback(key);
}
};
var MongoPollster = function (cursorDescription, mongoHandle, ordered,
multiplexer, testOnlyPollCallback) {
var self = this;

View File

@@ -7,7 +7,7 @@ var PHASE = {
STEADY: 3
};
var idForOp = function (op) {
idForOp = function (op) {
if (op.op === 'd')
return op.o._id;
else if (op.op === 'i')
@@ -25,6 +25,7 @@ observeChangesWithOplog = function (cursorDescription,
mongoHandle,
multiplexer) {
var stopped = false;
var stopHandles = [];
Package.facts && Package.facts.Facts.incrementServerFact(
"mongo-livedata", "oplog-observers", 1);
@@ -186,22 +187,26 @@ observeChangesWithOplog = function (cursorDescription,
};
oplogEntryHandlers[PHASE.FETCHING] = oplogEntryHandlers[PHASE.STEADY];
var oplogEntryHandle = mongoHandle._oplogHandle.onOplogEntry(
cursorDescription.collectionName, function (op) {
if (op.op === 'c') {
published.forEach(function (fields, id) {
remove(id);
});
} else {
// All other operators should be handled depending on phase
oplogEntryHandlers[phase](op);
forEachTrigger(cursorDescription, function (trigger) {
stopHandles.push(mongoHandle._oplogHandle.onOplogEntry(
trigger, function (notification) {
var op = notification.op;
if (op.op === 'c') {
// XXX actually, drop collection needs to be handled by doing a
// re-query
published.forEach(function (fields, id) {
remove(id);
});
} else {
// All other operators should be handled depending on phase
oplogEntryHandlers[phase](op);
}
}
}
);
));
});
// XXX ordering w.r.t. everything else?
var listenersHandle = listenAll(
stopHandles.push(listenAll(
cursorDescription, function (notification, complete) {
// If we're not in a write fence, we don't have to do anything.
var fence = DDPServer._CurrentWriteFence.get();
@@ -225,7 +230,7 @@ observeChangesWithOplog = function (cursorDescription,
}
});
}
);
));
// observeChangesWithOplog cannot yield (because the manipulation of
// mongoHandle._observeMultiplexers needs to be yield-free); calling
@@ -268,8 +273,9 @@ observeChangesWithOplog = function (cursorDescription,
if (stopped)
return;
stopped = true;
listenersHandle.stop();
oplogEntryHandle.stop();
_.each(stopHandles, function (handle) {
handle.stop();
});
published = null;
selector = null;