checkpoint monday morning

This commit is contained in:
David Glasser
2013-08-19 16:24:05 -07:00
parent 123c06ac3f
commit ba63548d4d

View File

@@ -139,6 +139,13 @@ MongoConnection = function (url, connectionOptions) {
MongoConnection.prototype.close = function() {
var self = this;
// XXX probably untested
var oplogHandle = self._oplogHandle;
self._oplogHandle = null;
if (oplogHandle)
oplogHandle.stop();
// Use Future.wrap so that errors get thrown. This happens to
// work even outside a fiber since the 'close' method is not
// actually asynchronous.
@@ -219,15 +226,70 @@ MongoConnection.prototype._startOplogTailing = function (oplogUrl, dbName) {
var cursorDescription = new CursorDescription(
OPLOG_COLLECTION, oplogSelector, {tailable: true});
var callbacksByCollection = {};
var handle = oplogConnection.tail(cursorDescription, function (doc) {
// Don't register the handle until after we've gotten one doc.
// XXX do we want to actually process this doc?
if (!self._oplogHandle)
self._oplogHandle = handle;
console.log("OPLOG TAILING SEZ:", doc);
if (!doc.ns && doc.ns.length > dbName.length + 1 &&
doc.ns.substr(0, dbName.length + 1) === (dbName + '.'))
throw new Error("Unexpected ns");
var collectionName = doc.ns.substr(dbName.length + 1);
_.each(callbacksByCollection[collectionName], function (callback) {
callback(doc);
});
});
var nextId = 0;
handle.onOplogEntry = function (collectionName, callback) {
if (!_.has(callbacksByCollection, collectionName))
callbacksByCollection[collectionName] = {};
var callbackId = nextId++;
callbacksByCollection[collectionName][callbackId] = callback;
return {
stop: function () {
delete callbacksByCollection[collectionName][callbackId];
}
};
};
};
MongoConnection.prototype._observeChangesWithOplog = function (
cursorDescription, callbacks) {
var self = this;
var oplogHandle = self._oplogHandle.onOplogEntry(cursorDescription.collectionName, function (op) {
console.log("A CHANGE TO THE DOC", op);
});
// XXX let's do this with race conditions first!
var idSet = {};
if (callbacks.added) {
var initialCursor = new Cursor(self, cursorDescription);
initialCursor.forEach(function (initialDoc) {
var id = initialDoc._id;
delete initialDoc._id;
idSet[id] = true;
callbacks.added(id, initialDoc);
});
}
var observeHandle = {
stop: function () {
oplogHandle.stop();
}
};
return observeHandle;
};
//////////// Public API //////////
// The write methods block until the database has confirmed the write (it may
@@ -937,6 +999,12 @@ MongoConnection.prototype._observeChanges = function (
return self._observeChangesTailable(cursorDescription, ordered, callbacks);
}
// XXX maybe this should actually use deduping too?
if (self._oplogHandle && !ordered
&& cursorSupportedByOplogTailing(cursorDescription)) {
return self._observeChangesWithOplog(cursorDescription, callbacks);
}
var observeKey = JSON.stringify(
_.extend({ordered: ordered}, cursorDescription));