Merge branch 'better-serverside-observe-diffing'

This commit is contained in:
Geoff Schmidt
2012-03-17 04:36:36 -07:00
4 changed files with 231 additions and 42 deletions

View File

@@ -57,6 +57,17 @@ _.extend(Meteor._WriteFence.prototype, {
self.completion_callbacks.push(func);
},
// Convenience function. Arms the fence, then blocks until it fires.
armAndWait: function () {
var self = this;
var future = new Future;
self.onAllCommitted(function () {
future['return']();
});
self.arm();
future.wait();
},
_maybeFire: function () {
var self = this;
if (self.fired)

View File

@@ -382,8 +382,7 @@ _Mongo.LiveResultsSet = function (cursor, options) {
// previous results snapshot. on each poll cycle, diffs against
// results drives the callbacks.
self.results = {};
self.indexes = {};
self.results = [];
// state for polling
self.dirty = false; // do we need polling?
@@ -431,17 +430,6 @@ _Mongo.LiveResultsSet = function (cursor, options) {
10 * 1000 /* 10 seconds */);
};
_Mongo.LiveResultsSet.prototype._fetchResults = function (results, indexes) {
var self = this;
var index = 0;
self.cursor.rewind();
self.cursor.forEach(function (obj) {
results[obj._id] = obj;
indexes[obj._id] = index++;
});
};
_Mongo.LiveResultsSet.prototype._unthrottled_markDirty = function () {
var self = this;
@@ -467,32 +455,67 @@ _Mongo.LiveResultsSet.prototype._unthrottled_markDirty = function () {
_Mongo.LiveResultsSet.prototype._doPoll = function () {
var self = this;
var old_results = self.results;
var old_indexes = self.indexes;
var new_results = {};
var new_indexes = {};
// Get the new query results
self.cursor.rewind();
var new_results = self.cursor.fetch();
var present_in_new = {}, present_in_old = {};
var callbacks = [];
self._fetchResults(new_results, new_indexes);
_.each(new_results, function (obj) {
if (self.added && !old_results[obj._id])
self.added(obj, new_indexes[obj._id]);
else if (self.changed && !_.isEqual(new_results[obj._id], old_results[obj._id]))
self.changed(obj, old_indexes[obj._id], old_results[obj._id]);
if (self.moved && new_indexes[obj._id] !== old_indexes[obj._id])
self.moved(obj, old_indexes[obj._id], new_indexes[obj._id]);
// Generate some indexes to speed up the process
_.each(new_results, function (doc) {
present_in_new[doc._id] = true;
});
_.each(self.results, function (doc) {
present_in_old[doc._id] = true;
});
for (var id in old_results)
if (self.removed && !(id in new_results))
self.removed(old_results[id], old_indexes[id]);
// If documents left the query, remove them from self.results
for (var i = 0; i < self.results.length; i++) {
if (!(self.results[i]._id in present_in_new)) {
self.removed && self.removed(self.results[i], i);
self.results.splice(i, 1);
i--;
}
}
self.results = new_results;
self.indexes = new_indexes;
// Now new_results is a (non-strict) superset of self.results, so we
// can be sure that new_results is at least as long as self.results.
// Transform self.results into new_results
// XXX this is O(N^2) in the worst case, but O(N) in typical cases
for (var i = 0; i < new_results.length; i++) {
// Newly added documents
if (!(new_results[i]._id in present_in_old)) {
self.added && self.added(new_results[i], i);
self.results.splice(i, 0, new_results[i]);
continue;
}
// Find the offset of new_results[i] in self.results (if
// present). Note that we check the most likely case first
// (old_offset === i)
var old_offset;
for (var j = i; j < self.results.length; j++)
if (self.results[j]._id === new_results[i]._id) {
old_offset = j;
break;
}
if (old_offset === undefined)
throw new Error("Document in index, but missing from array?");
// Changed documents
if (!_.isEqual(self.results[old_offset], new_results[i])) {
self.changed && self.changed(new_results[i], old_offset,
self.results[old_offset]);
self.results[i] = new_results[i];
}
// Moved documents
if (old_offset !== i) {
self.moved && self.moved(new_results[i], old_offset, i);
self.results.splice(old_offset, 1);
self.results.splice(i, 0, new_results[i]);
}
}
};
_Mongo.LiveResultsSet.prototype.stop = function () {

View File

@@ -32,3 +32,108 @@ testAsyncMulti("mongo-livedata - database failure reporting", [
});
}
]);
// XXX namespacing
Meteor._LivedataTestCollection =
new Meteor.Collection("livedata_test_collection");
Tinytest.add("mongo-livedata - basics", function (test) {
var coll = Meteor._LivedataTestCollection;
var run = test.runId();
var log = '';
var obs = coll.find({run: run}, {sort: ["x"]}).observe({
added: function (doc, before_index) {
log += 'a(' + doc.x + ',' + before_index + ')';
},
changed: function (new_doc, at_index, old_doc) {
log += 'c(' + new_doc.x + ',' + at_index + ',' + old_doc.x + ')';
},
moved: function (doc, old_index, new_index) {
log += 'm(' + doc.x + ',' + old_index + ',' + new_index + ')';
},
removed: function (doc, at_index) {
log += 'r(' + doc.x + ',' + at_index + ')';
}
});
var expectObserve = function (expected, f) {
if (Meteor.is_client) {
f();
} else {
var fence = new Meteor._WriteFence;
Meteor._CurrentWriteFence.withValue(fence, f);
fence.armAndWait();
}
if (!(expected instanceof Array))
expected = [expected];
test.include(expected, log);
log = '';
};
test.equal(coll.find({run: run}).count(), 0);
test.equal(coll.findOne("abc"), undefined);
test.equal(coll.findOne({run: run}), undefined);
expectObserve('a(1,0)', function () {
var id = coll.insert({run: run, x: 1});
test.equal(id.length, 36);
test.equal(coll.find({run: run}).count(), 1);
test.equal(coll.findOne(id).x, 1);
test.equal(coll.findOne({run: run}).x, 1);
});
expectObserve('a(4,1)', function () {
var id2 = coll.insert({run: run, x: 4});
test.equal(coll.find({run: run}).count(), 2);
test.equal(coll.find({_id: id2}).count(), 1);
test.equal(coll.findOne(id2).x, 4);
});
test.equal(coll.findOne({run: run}, {sort: ["x"], skip: 0}).x, 1);
test.equal(coll.findOne({run: run}, {sort: ["x"], skip: 1}).x, 4);
test.equal(coll.findOne({run: run}, {sort: {x: -1}, skip: 0}).x, 4);
test.equal(coll.findOne({run: run}, {sort: {x: -1}, skip: 1}).x, 1);
var cur = coll.find({run: run}, {sort: ["x"]});
var total = 0;
cur.forEach(function (doc) {
total *= 10;
total += doc.x;
})
test.equal(total, 14);
cur.rewind();
test.equal(cur.map(function (doc) {
return doc.x * 2;
}), [2, 8]);
test.equal(_.pluck(coll.find({run: run}, {sort: {x: -1}}).fetch(), "x"),
[4, 1]);
expectObserve('c(3,0,1)c(6,1,4)', function () {
coll.update({run: run}, {$inc: {x: 2}}, {multi: true});
test.equal(_.pluck(coll.find({run: run}, {sort: {x: -1}}).fetch(), "x"),
[6, 3]);
});
expectObserve(['c(13,0,3)m(13,0,1)', 'm(6,1,0)c(13,1,3)'], function () {
coll.update({run: run, x: 3}, {$inc: {x: 10}}, {multi: true});
test.equal(_.pluck(coll.find({run: run}, {sort: {x: -1}}).fetch(), "x"),
[13, 6]);
});
expectObserve('r(13,1)', function () {
coll.remove({run: run, x: {$gt: 10}});
test.equal(coll.find({run: run}).count(), 1);
});
expectObserve('r(6,0)', function () {
coll.remove({run: run});
test.equal(coll.find({run: run}).count(), 0);
});
obs.stop();
});

View File

@@ -119,13 +119,6 @@ _.extend(TestCaseResults.prototype, {
this.fail({type: "instanceOf"}); // XXX what other data?
},
length: function (obj, expected_length) {
if (obj.length === expected_length)
this.ok();
else
this.fail({type: "length"}); // XXX what other data?
},
// XXX nodejs assert.throws can take an expected error, as a class,
// regular expression, or predicate function..
throws: function (f) {
@@ -155,7 +148,64 @@ _.extend(TestCaseResults.prototype, {
this.fail({type: "true"});
else
this.ok();
},
isNull: function (v) {
if (v === null)
this.ok();
else
this.fail({type: "null"});
},
isNotNull: function (v) {
if (v === null)
this.fail({type: "true"});
else
this.ok();
},
isUndefined: function (v) {
if (v === undefined)
this.ok();
else
this.fail({type: "undefined"});
},
isNaN: function (v) {
if (isNaN(v))
this.ok();
else
this.fail({type: "NaN"});
},
include: function (s, v) {
var pass = false;
if (s instanceof Array)
pass = _.indexOf(s, v) !== -1;
else if (typeof s === "object")
pass = v in s;
else if (typeof s === "string")
for (var i = 0; i < s.length; i++)
if (s.charAt(i) === v) {
pass = true;
break;
}
else
/* fail -- not something that contains other things */;
if (pass)
this.ok();
else
this.fail({type: "include", sequence: s, should_contain_value: v});
},
// XXX should change to lengthOf to match vowsjs
length: function (obj, expected_length) {
if (obj.length === expected_length)
this.ok();
else
this.fail({type: "length"}); // XXX what other data?
}
});
/******************************************************************************/