only one mongo-livedata test fails

This commit is contained in:
David Glasser
2013-09-19 10:23:43 -07:00
parent fbfda21dfb
commit ff000110a0
2 changed files with 68 additions and 22 deletions

View File

@@ -28,6 +28,17 @@ _.extend(IdMap.prototype, {
var self = this;
return _.isEmpty(self._map);
},
clear: function () {
var self = this;
self._map = {};
},
each: function (iterator) {
var self = this;
_.each(self._map, function (value, key, obj) {
var context = this;
iterator.call(context, value, LocalCollection._idParse(key), obj);
});
},
// XXX used?
setDefault: function (id, def) {
var self = this;

View File

@@ -1,3 +1,4 @@
var Fiber = Npm.require('fibers');
var Future = Npm.require('fibers/future');
var PHASE = {
@@ -41,8 +42,57 @@ MongoConnection.prototype._observeChangesWithOplog = function (
callbacks.removed && callbacks.removed(id);
};
// XXX mutates newDoc, that's weird
var handleDoc = function (id, newDoc) {
var matchesNow = newDoc && selector(newDoc);
var matchedBefore = published.has(id);
if (matchesNow && !matchedBefore) {
add(newDoc);
} else if (matchedBefore && !matchesNow) {
remove(id);
} else if (matchesNow) {
var oldDoc = published.get(id);
if (!oldDoc)
throw Error("thought that " + id + " was there!");
delete newDoc._id;
published.set(id, newDoc);
if (callbacks.changed) {
var changed = LocalCollection._makeChangedFields(
EJSON.clone(newDoc), oldDoc);
if (!_.isEmpty(changed))
callbacks.changed(id, changed);
}
}
};
var beCurious = function () {
throw Error("I AM CURIOUS")
phase = PHASE.FETCHING;
while (!curiousity.isEmpty()) {
if (phase !== PHASE.FETCHING)
throw new Error("Surprising phase in beCurious: " + phase);
var futures = [];
curiousity.each(function (cacheKey, id) {
// Run each until they yield. This implies that curiousity should not be
// updated during this loop.
Fiber(function () {
var f = new Future;
futures.push(f);
var doc = self._docFetcher.fetch(cursorDescription.collectionName, id,
cacheKey);
handleDoc(id, doc);
f.return();
}).run();
});
curiousity.clear();
Future.wait(futures);
// Throw if any throw.
// XXX this means the observe will now be stalled
_.each(futures, function (f) {
f.get();
});
}
phase = PHASE.STEADY;
};
var oplogEntryHandlers = {};
@@ -50,7 +100,9 @@ MongoConnection.prototype._observeChangesWithOplog = function (
curiousity.set(idForOp(op), op.ts.toString());
};
oplogEntryHandlers[PHASE.FETCHING] = function (op) {
// XXX now
// XXX we can probably actually handle some operations directly (eg,
// insert/remove/replace if they don't conflict with "outstanding" fetches)
curiousity.set(idForOp(op), op.ts.toString());
};
oplogEntryHandlers[PHASE.STEADY] = function (op) {
var id = idForOp(op);
@@ -74,30 +126,12 @@ MongoConnection.prototype._observeChangesWithOplog = function (
if (isModifier) {
curiousity.set(id, op.ts.toString());
phase = PHASE.FETCHING;
beCurious();
return;
}
var newDoc = _.extend({_id: id}, op.o);
var matchesNow = selector(newDoc);
var matchedBefore = published.has(id);
if (matchesNow && !matchedBefore) {
add(newDoc);
} else if (matchedBefore && !matchesNow) {
remove(id);
} else if (matchesNow) {
var oldDoc = published.get(id);
if (!oldDoc)
throw Error("thought that " + id + " was there!");
delete newDoc._id;
published.set(id, newDoc);
if (callbacks.changed) {
var changed = LocalCollection._makeChangedFields(newDoc, oldDoc);
if (!_.isEmpty(changed))
callbacks.changed(id, changed);
}
}
handleDoc(id, newDoc);
} else {
throw Error("XXX SURPRISING OPERATION: " + op);
}
@@ -121,6 +155,7 @@ MongoConnection.prototype._observeChangesWithOplog = function (
return;
}
var write = fence.beginWrite();
// XXX this also has to wait for steady!!!
self._callWhenOplogProcessed(function () {
write.committed();
});