whoa, we're halfway there

This commit is contained in:
David Glasser
2013-09-18 18:51:35 -07:00
parent c5c19114ab
commit fbfda21dfb
3 changed files with 81 additions and 68 deletions

View File

@@ -24,6 +24,10 @@ _.extend(IdMap.prototype, {
var key = LocalCollection._idStringify(id);
return _.has(self._map, key);
},
isEmpty: function () {
var self = this;
return _.isEmpty(self._map);
},
// XXX used?
setDefault: function (id, def) {
var self = this;

View File

@@ -244,10 +244,8 @@ MongoConnection.prototype._startOplogTailing = function (oplogUrl, dbName) {
var oplogSelector = {
ns: new RegExp('^' + quotemeta(dbName) + '\\.'),
$or: [
{op: {$in: ['i', 'u', 'd']}},
{op: 'c', 'o.drop': {$exists: true}}
]
// XXX also handle drop collection, etc
op: {$in: ['i', 'u', 'd']}
};
if (lastOplogEntry)
oplogSelector.ts = {$gt: lastOplogEntry.ts};

View File

@@ -1,45 +1,39 @@
var Future = Npm.require('fibers/future');
var PHASE = {
INITIALIZING: 1,
FETCHING: 2,
STEADY: 3
};
var idForOp = function (op) {
if (op.op === 'd')
return op.o._id;
else if (op.op === 'i')
return op.o._id;
else if (op.op === 'u')
return op.o2._id;
else
throw Error("Unknown op: " + EJSON.stringify(op));
};
MongoConnection.prototype._observeChangesWithOplog = function (
cursorDescription, callbacks) {
var self = this;
// XXX let's do this with race conditions first!
//
// the real way will involve special oplog handling during the initial cursor
// read. specifically:
//
// 1) start reading the oplog. for every document that could conceivably be
// relevant, cache a bit of information about what we saw. (eg, cache
// document for inserts, removal fact for removes, "needs poll" for updates.
// most recent overrides.)
//
// 2) read the initial set and send added messages.
//
// 3) write a sentinel to some field.
//
// 4) wait until that sentinel comes up through the oplog.
//
// 5) use the cached information (compared to what we already know) to send
// messages about things that changed right about then
//
// 6) now that we're in the "steady state", process ops more directly
var phase = PHASE.INITIALIZING;
// XXX NOW: replace idSet/changedFields with simply currently published
// results, ok??? that should simplify things, and allow the implementation of
// "replace" (noodles)
// XXX DOC: map id -> currently published fields
// (which of course is also the same as what is tracked in merge box,
// ah well)
var published = new IdMap;
var selector = LocalCollection._compileSelector(cursorDescription.selector);
// XXX add mutates its argument, which could get confusing
var curiousity = new IdMap;
var add = function (doc) {
var id = doc._id;
delete doc._id;
published.set(id, doc);
callbacks.added && callbacks.added(id, doc);
var fields = EJSON.clone(doc);
delete fields._id;
published.set(id, fields);
callbacks.added && callbacks.added(id, EJSON.clone(fields));
};
var remove = function (id) {
@@ -47,53 +41,46 @@ MongoConnection.prototype._observeChangesWithOplog = function (
callbacks.removed && callbacks.removed(id);
};
// XXX the ordering here is wrong
var initialCursor = new Cursor(self, cursorDescription);
initialCursor.forEach(function (initialDoc) {
add(initialDoc);
});
var beCurious = function () {
throw Error("I AM CURIOUS")
};
var oplogHandle = self._oplogHandle.onOplogEntry(cursorDescription.collectionName, function (op) {
var id;
var oplogEntryHandlers = {};
oplogEntryHandlers[PHASE.INITIALIZING] = function (op) {
curiousity.set(idForOp(op), op.ts.toString());
};
oplogEntryHandlers[PHASE.FETCHING] = function (op) {
// XXX now
};
oplogEntryHandlers[PHASE.STEADY] = function (op) {
var id = idForOp(op);
if (op.op === 'd') {
// XXX check that ObjectId works here
id = op.o._id;
if (published.has(id))
remove(id);
// XXX this needs to cancel any in-progress "ID lookup" for the document
} else if (op.op === 'i') {
id = op.o._id;
if (published.has(id))
throw new Error("insert found for already-existing ID");
// XXX what if selector yields? for now it can't but later it could have
// $where
if (selector(op.o)) {
if (selector(op.o))
add(op.o);
}
} else if (op.op === 'u') {
id = op.o2._id;
// Is this a modifier ($set/$unset, which may require us to poll the
// database to figure out if the whole document matches the selector) or a
// replacement (in which case we can just directly re-evaluate the
// selector)?
var isModifier = _.has(op.o, '$set') || _.has(op.o, '$unset');
var newDoc;
if (isModifier) {
// XXX problem is, the result of this findOne is delivered at a random
// time, not necessarily synced with other stuff that may be coming down
// the oplog. also, we shouldn't read fields that aren't
// necessary to evaluate selector or to publish.
newDoc = self._docFetcher.fetch(cursorDescription.collectionName, id,
op.ts.toString());
} else {
newDoc = _.extend({_id: id}, op.o);
curiousity.set(id, op.ts.toString());
phase = PHASE.FETCHING;
beCurious();
return;
}
var matchesNow = newDoc && selector(newDoc);
var newDoc = _.extend({_id: id}, op.o);
var matchesNow = selector(newDoc);
var matchedBefore = published.has(id);
if (matchesNow && !matchedBefore) {
add(newDoc);
@@ -107,15 +94,21 @@ MongoConnection.prototype._observeChangesWithOplog = function (
published.set(id, newDoc);
if (callbacks.changed) {
var changed = LocalCollection._makeChangedFields(newDoc, oldDoc);
if (!_.isEmpty(changed)) {
if (!_.isEmpty(changed))
callbacks.changed(id, changed);
}
}
}
} else {
console.log("SURPRISING FOR NOW OPERATION (eg drop collection)", op);
throw Error("XXX SURPRISING OPERATION: " + op);
}
});
};
var oplogHandle = self._oplogHandle.onOplogEntry(
cursorDescription.collectionName, function (op) {
oplogEntryHandlers[phase](op);
}
);
// XXX ordering w.r.t. everything else?
var listenersHandle = listenAll(
@@ -135,11 +128,29 @@ MongoConnection.prototype._observeChangesWithOplog = function (
}
);
var observeHandle = {
var initialCursor = new Cursor(self, cursorDescription);
initialCursor.forEach(function (initialDoc) {
add(initialDoc);
});
var catchUpFuture = new Future;
self._callWhenOplogProcessed(catchUpFuture.resolver());
catchUpFuture.wait();
if (phase !== PHASE.INITIALIZING)
throw Error("Phase unexpectedly " + phase);
if (curiousity.isEmpty()) {
phase = PHASE.STEADY;
} else {
phase = PHASE.FETCHING;
Meteor.defer(beCurious);
}
return {
stop: function () {
listenersHandle.stop();
oplogHandle.stop();
}
};
return observeHandle;
};