ok, rewrite is done.

now "replace" updates work too. also give up on idea of knowing
what fields changed (more correct, does require an in memory diff).

many tests pass.
This commit is contained in:
David Glasser
2013-09-12 20:49:37 -07:00
parent 0344e946fd
commit a93f742b3a

View File

@@ -273,6 +273,9 @@ MongoConnection.prototype._startOplogTailing = function (oplogUrl, dbName) {
var nextId = 0;
self._oplogHandle.onOplogEntry = function (collectionName, callback) {
callback = Meteor.bindEnvironment(callback, function (err) {
Meteor._debug("Error in oplog callback", err.stack);
});
if (!_.has(callbacksByCollection, collectionName))
callbacksByCollection[collectionName] = {};
var callbackId = nextId++;
@@ -285,21 +288,6 @@ MongoConnection.prototype._startOplogTailing = function (oplogUrl, dbName) {
};
};
// XXX you can actually get a replacement doc instead of $set/$unset! this
// completely messes with the attempt to do a non-ID-polling process of
// updates...
var modifierTopLevelFields = function (mod) {
var fields = {};
_.each(mod, function (mapping, op) {
if (op !== '$set' && op != '$unset')
throw new Error("Unknown oplog operation " + op);
_.each(mapping, function (value, field) {
fields[field.split('.')[0]] = true;
});
});
return _.keys(fields);
};
MongoConnection.prototype._observeChangesWithOplog = function (
cursorDescription, callbacks) {
var self = this;
@@ -334,12 +322,9 @@ MongoConnection.prototype._observeChangesWithOplog = function (
// ah well)
var published = new IdMap;
// XXX KILL THESE
var idSet = new IdMap;
var changedFields = new IdMap;
var selector = LocalCollection._compileSelector(cursorDescription.selector);
// XXX add mutates its argument, which could get confusing
var add = function (doc) {
var id = doc._id;
delete doc._id;
@@ -359,6 +344,8 @@ MongoConnection.prototype._observeChangesWithOplog = function (
id = op.o._id;
if (published.has(id))
remove(id);
// XXX this needs to cancel any in-progress "ID lookup" for the document
} else if (op.op === 'i') {
id = op.o._id;
if (published.has(id))
@@ -370,54 +357,45 @@ MongoConnection.prototype._observeChangesWithOplog = function (
add(op.o);
} else if (op.op === 'u') {
id = op.o2._id;
var fields = changedFields.get(id);
if (!fields) {
fields = {};
changedFields.set(id, fields);
Fiber(function (){
// XXX problem is, the result of this findOne is delivered at a random
// time, not necessarily synced with other stuff that may be coming
// down the oplog. how much does this matter?
var updatedDoc = self.findOne(
cursorDescription.collectionName, {_id: id});
// XXX in what circumstances does this !== fields?
var myChangedFields = changedFields.get(id);
// Did we process a remove while we were waiting?
if (!myChangedFields)
return;
// Is this a modifier ($set/$unset, which may require us to poll the
// database to figure out if the whole document matches the selector) or a
// replacement (in which case we can just directly re-evaluate the
// selector)?
var isModifier = _.has(op.o, '$set') || _.has(op.o, '$unset');
// Delete this record from myChangedFields atomically before anything
// that might yield (even selector might yield if it has $where!)
changedFields.remove(id);
var matchesNow = updatedDoc && selector(updatedDoc);
var matchedBefore = idSet.has(id);
if (matchesNow && !matchedBefore) {
add(updatedDoc);
} else if (matchedBefore && !matchesNow) {
remove(id);
} else if (matchesNow) {
if (callbacks.changed) {
// XXX this assumes that every field we saw a set/unset on
// actually changed. otherwise we may send out something
// redundant.
var changed = {};
_.each(myChangedFields, function (unused, fieldName) {
changed[fieldName] = _.has(updatedDoc, fieldName)
? updatedDoc[fieldName] : undefined;
});
callbacks.changed(id, changed);
}
}
}).run();
var newDoc;
if (isModifier) {
// XXX problem is, the result of this findOne is delivered at a random
// time, not necessarily synced with other stuff that may be coming down
// the oplog. also, we should coalesce multiple pings of the same
// document ("ID queue"). also, we shouldn't read fields that aren't
// necessary to evaluate selector or to publish.
newDoc = self.findOne(cursorDescription.collectionName, {_id: id});
} else {
newDoc = op.o;
}
var matchesNow = newDoc && selector(newDoc);
var matchedBefore = published.has(id);
if (matchesNow && !matchedBefore) {
add(newDoc);
} else if (matchedBefore && !matchesNow) {
remove(id);
} else if (matchesNow) {
var oldDoc = published.get(id);
if (!oldDoc)
throw Error("thought that " + id + " was there!");
published.set(id, newDoc);
if (callbacks.changed) {
var changed = LocalCollection._makeChangedFields(newDoc, oldDoc);
if (!_.isEmpty(changed)) {
callbacks.changed(id, changed);
}
}
}
_.each(modifierTopLevelFields(op.o), function (field) {
fields[field] = true;
});
} else {
console.log("A CHANGE TO THE DOC", op);
console.log("SURPRISING FOR NOW OPERATION (eg drop collection)", op);
}
});
@@ -1602,7 +1580,8 @@ var cursorSupportedByOplogTailing = function (cursorDescription) {
// For now, we're just dealing with equality queries: no $operators, regexps,
// or $and/$or/$where/etc clauses. We can expand the scope of what we're
// comfortable processing later.
// comfortable processing later. ($where will get pretty scary since it will
// allow selector processing to yield!)
return _.all(cursorDescription.selector, function (value, field) {
// No logical operators like $and.
if (field.substr(0, 1) === '$')