Merge branch 'oplog-limits-buffered' into devel

This commit is contained in:
Slava Kim
2014-03-03 23:28:54 -08:00
20 changed files with 1456 additions and 179 deletions

1
packages/binary-heap/.gitignore vendored Normal file
View File

@@ -0,0 +1 @@
.build*

View File

@@ -0,0 +1,138 @@
Tinytest.add("binary-heap - simple max-heap tests", function (test) {
var h = new MaxHeap(function (a, b) { return a-b; });
h.set("a", 1);
h.set("b", 233);
h.set("c", -122);
h.set("d", 0);
h.set("e", 0);
test.equal(h.size(), 5);
test.equal(h.maxElementId(), "b");
test.equal(h.get("b"), 233);
h.remove("b");
test.equal(h.size(), 4);
test.equal(h.maxElementId(), "a");
h.set("e", 44);
test.equal(h.maxElementId(), "e");
test.equal(h.get("b"), null);
test.isTrue(h.has("a"));
test.isFalse(h.has("dd"));
h.clear();
test.isFalse(h.has("a"));
test.equal(h.size(), 0);
test.equal(h.setDefault("a", 12345), 12345);
test.equal(h.setDefault("a", 55555), 12345);
test.equal(h.size(), 1);
test.equal(h.maxElementId(), "a");
});
Tinytest.add("binary-heap - big test for max-heap", function (test) {
var positiveNumbers = _.shuffle(_.range(1, 41));
var negativeNumbers = _.shuffle(_.range(-1, -41, -1));
var allNumbers = negativeNumbers.concat(positiveNumbers);
var heap = new MaxHeap(function (a, b) { return a-b; });
var output = [];
_.each(allNumbers, function (n) { heap.set(n, n); });
_.times(positiveNumbers.length + negativeNumbers.length, function () {
var maxId = heap.maxElementId();
output.push(heap.get(maxId));
heap.remove(maxId);
});
allNumbers.sort(function (a, b) { return b-a; });
test.equal(output, allNumbers);
});
Tinytest.add("binary-heap - min-max heap tests", function (test) {
var h = new MinMaxHeap(function (a, b) { return a-b; });
h.set("a", 1);
h.set("b", 233);
h.set("c", -122);
h.set("d", 0);
h.set("e", 0);
test.equal(h.size(), 5);
test.equal(h.maxElementId(), "b");
test.equal(h.get("b"), 233);
test.equal(h.minElementId(), "c");
h.remove("b");
test.equal(h.size(), 4);
test.equal(h.minElementId(), "c");
h.set("e", -123);
test.equal(h.minElementId(), "e");
test.equal(h.get("b"), null);
test.isTrue(h.has("a"));
test.isFalse(h.has("dd"));
h.clear();
test.isFalse(h.has("a"));
test.equal(h.size(), 0);
test.equal(h.setDefault("a", 12345), 12345);
test.equal(h.setDefault("a", 55555), 12345);
test.equal(h.size(), 1);
test.equal(h.maxElementId(), "a");
test.equal(h.minElementId(), "a");
});
Tinytest.add("binary-heap - big test for min-max-heap", function (test) {
var N = 500;
var positiveNumbers = _.shuffle(_.range(1, N + 1));
var negativeNumbers = _.shuffle(_.range(-1, -N - 1, -1));
var allNumbers = positiveNumbers.concat(negativeNumbers);
var heap = new MinMaxHeap(function (a, b) { return a-b; });
var output = [];
var initialSets = _.clone(allNumbers);
_.each(allNumbers, function (n) {
heap.set(n, n);
heap._selfCheck();
heap._minHeap._selfCheck();
});
allNumbers = _.shuffle(allNumbers);
var secondarySets = _.clone(allNumbers);
_.each(allNumbers, function (n) {
heap.set(-n, n);
heap._selfCheck();
heap._minHeap._selfCheck();
});
_.times(positiveNumbers.length + negativeNumbers.length, function () {
var minId = heap.minElementId();
output.push(heap.get(minId));
heap.remove(minId);
heap._selfCheck(); heap._minHeap._selfCheck();
});
test.equal(heap.size(), 0);
allNumbers.sort(function (a, b) { return a-b; });
var initialTestText = "initial sets: " + initialSets.toString() +
"; secondary sets: " + secondarySets.toString();
test.equal(output, allNumbers, initialTestText);
_.each(initialSets, function (n) { heap.set(n, n); })
_.each(secondarySets, function (n) { heap.set(-n, n); });
allNumbers.sort(function (a, b) { return b-a; });
output = [];
_.times(positiveNumbers.length + negativeNumbers.length, function () {
var maxId = heap.maxElementId();
output.push(heap.get(maxId));
heap.remove(maxId);
heap._selfCheck(); heap._minHeap._selfCheck();
});
test.equal(output, allNumbers, initialTestText);
});

View File

@@ -0,0 +1,226 @@
// Constructor of Heap
// - comparator - Function - given two items returns a number
// - options:
// - initData - Array - Optional - the initial data in a format:
// Object:
// - id - String - unique id of the item
// - value - Any - the data value
// each value is retained
// - IdMap - Constructor - Optional - custom IdMap class to store id->index
// mappings internally. Standard IdMap is used by default.
MaxHeap = function (comparator, options) {
if (! _.isFunction(comparator))
throw new Error('Passed comparator is invalid, should be a comparison function');
var self = this;
// a C-style comparator that is given two values and returns a number,
// negative if the first value is less than the second, positive if the second
// value is greater than the first and zero if they are equal.
self._comparator = comparator;
options = _.defaults(options || {}, { IdMap: IdMap });
// _heapIdx maps an id to an index in the Heap array the corresponding value
// is located on.
self._heapIdx = new options.IdMap;
// The Heap data-structure implemented as a 0-based contiguous array where
// every item on index idx is a node in a complete binary tree. Every node can
// have children on indexes idx*2+1 and idx*2+2, except for the leaves. Every
// node has a parent on index (idx-1)/2;
self._heap = [];
// If the initial array is passed, we can build the heap in linear time
// complexity (O(N)) compared to linearithmic time complexity (O(nlogn)) if
// we push elements one by one.
if (_.isArray(options.initData))
self._initFromData(options.initData);
};
_.extend(MaxHeap.prototype, {
// Builds a new heap in-place in linear time based on passed data
_initFromData: function (data) {
var self = this;
self._heap = _.map(data, function (o) {
return { id: o.id, value: o.value };
});
_.each(data, function (o, i) {
self._heapIdx.set(o.id, i);
});
if (! data.length)
return;
// start from the first non-leaf - the parent of the last leaf
for (var i = parentIdx(data.length - 1); i >= 0; i--)
self._downHeap(i);
},
_downHeap: function (idx) {
var self = this;
while (leftChildIdx(idx) < self.size()) {
var left = leftChildIdx(idx);
var right = rightChildIdx(idx);
var largest = idx;
if (left < self.size()) {
largest = self._maxIndex(largest, left);
}
if (right < self.size()) {
largest = self._maxIndex(largest, right);
}
if (largest === idx)
break;
self._swap(largest, idx);
idx = largest;
}
},
_upHeap: function (idx) {
var self = this;
while (idx > 0) {
var parent = parentIdx(idx);
if (self._maxIndex(parent, idx) === idx) {
self._swap(parent, idx)
idx = parent;
} else {
break;
}
}
},
_maxIndex: function (idxA, idxB) {
var self = this;
var valueA = self._get(idxA);
var valueB = self._get(idxB);
return self._comparator(valueA, valueB) >= 0 ? idxA : idxB;
},
// Internal: gets raw data object placed on idxth place in heap
_get: function (idx) {
var self = this;
return self._heap[idx].value;
},
_swap: function (idxA, idxB) {
var self = this;
var recA = self._heap[idxA];
var recB = self._heap[idxB];
self._heapIdx.set(recA.id, idxB);
self._heapIdx.set(recB.id, idxA);
self._heap[idxA] = recB;
self._heap[idxB] = recA;
},
get: function (id) {
var self = this;
if (! self.has(id))
return null;
return self._get(self._heapIdx.get(id));
},
set: function (id, value) {
var self = this;
if (self.has(id)) {
if (self.get(id) === value)
return;
var idx = self._heapIdx.get(id);
self._heap[idx].value = value;
// Fix the new value's position
// Either bubble new value up if it is greater than its parent
self._upHeap(idx);
// or bubble it down if it is smaller than one of its children
self._downHeap(idx);
} else {
self._heapIdx.set(id, self._heap.length);
self._heap.push({ id: id, value: value });
self._upHeap(self._heap.length - 1);
}
},
remove: function (id) {
var self = this;
if (self.has(id)) {
var last = self._heap.length - 1;
var idx = self._heapIdx.get(id);
if (idx !== last) {
self._swap(idx, last);
self._heap.pop();
self._heapIdx.remove(id);
// Fix the swapped value's position
self._upHeap(idx);
self._downHeap(idx);
} else {
self._heap.pop();
self._heapIdx.remove(id);
}
}
},
has: function (id) {
var self = this;
return self._heapIdx.has(id);
},
empty: function (id) {
var self = this;
return !self.size();
},
clear: function () {
var self = this;
self._heap = [];
self._heapIdx.clear();
},
// iterate over values in no particular order
forEach: function (iterator) {
var self = this;
_.each(self._heap, function (obj) {
return iterator(obj.value, obj.id);
});
},
size: function () {
var self = this;
return self._heap.length;
},
setDefault: function (id, def) {
var self = this;
if (self.has(id))
return self.get(id);
self.set(id, def);
return def;
},
clone: function () {
var self = this;
var clone = new MaxHeap(self._comparator, self._heap);
return clone;
},
maxElementId: function () {
var self = this;
return self.size() ? self._heap[0].id : null;
},
_selfCheck: function () {
var self = this;
for (var i = 1; i < self._heap.length; i++)
if (self._maxIndex(parentIdx(i), i) !== parentIdx(i))
throw new Error("An item with id " + self._heap[i].id +
" has a parent younger than it: " +
self._heap[parentIdx(i)].id);
}
});
function leftChildIdx (i) { return i * 2 + 1; }
function rightChildIdx (i) { return i * 2 + 2; }
function parentIdx (i) { return (i - 1) >> 1; }

View File

@@ -0,0 +1,55 @@
// This implementation of Min/Max-Heap is just a subclass of Max-Heap
// with a Min-Heap as an encapsulated property.
//
// Most of the operations are just proxy methods to call the same method on both
// heaps.
//
// This implementation takes 2*N memory but is fairly simple to write and
// understand. And the constant factor of a simple Heap is usually smaller
// compared to other two-way priority queues like Min/Max Heaps
// (http://www.cs.otago.ac.nz/staffpriv/mike/Papers/MinMaxHeaps/MinMaxHeaps.pdf)
// and Interval Heaps
// (http://www.cise.ufl.edu/~sahni/dsaac/enrich/c13/double.htm)
MinMaxHeap = function (comparator, options) {
var self = this;
MaxHeap.call(self, comparator, options);
self._minHeap = new MaxHeap(function (a, b) {
return -comparator(a, b);
}, options);
};
Meteor._inherits(MinMaxHeap, MaxHeap);
_.extend(MinMaxHeap.prototype, {
set: function (id, value) {
var self = this;
MaxHeap.prototype.set.apply(self, arguments);
self._minHeap.set(id, value);
},
remove: function (id) {
var self = this;
MaxHeap.prototype.remove.apply(self, arguments);
self._minHeap.remove(id);
},
clear: function () {
var self = this;
MaxHeap.prototype.clear.apply(self, arguments);
self._minHeap.clear();
},
setDefault: function (id, def) {
var self = this;
MaxHeap.prototype.setDefault.apply(self, arguments);
return self._minHeap.setDefault(id, def);
},
clone: function () {
var self = this;
var clone = new MinMaxHeap(self._comparator, self._heap);
return clone;
},
minElementId: function () {
var self = this;
return self._minHeap.maxElementId();
}
});

View File

@@ -0,0 +1,18 @@
Package.describe({
summary: "Binary Heap datastructure implementation",
internal: true
});
Package.on_use(function (api) {
api.export('MaxHeap');
api.export('MinMaxHeap');
api.use(['underscore', 'id-map']);
api.add_files(['max-heap.js', 'min-max-heap.js']);
});
Package.on_test(function (api) {
api.use('tinytest');
api.use('binary-heap');
api.add_files('binary-heap-tests.js');
});

1
packages/id-map/.gitignore vendored Normal file
View File

@@ -0,0 +1 @@
.build*

77
packages/id-map/id-map.js Normal file
View File

@@ -0,0 +1,77 @@
IdMap = function (idStringify, idParse) {
var self = this;
self._map = {};
self._idStringify = idStringify || JSON.stringify;
self._idParse = idParse || JSON.parse;
};
// Some of these methods are designed to match methods on OrderedDict, since
// (eg) ObserveMultiplex and _CachingChangeObserver use them interchangeably.
// (Conceivably, this should be replaced with "UnorderedDict" with a specific
// set of methods that overlap between the two.)
_.extend(IdMap.prototype, {
get: function (id) {
var self = this;
var key = self._idStringify(id);
return self._map[key];
},
set: function (id, value) {
var self = this;
var key = self._idStringify(id);
self._map[key] = value;
},
remove: function (id) {
var self = this;
var key = self._idStringify(id);
delete self._map[key];
},
has: function (id) {
var self = this;
var key = self._idStringify(id);
return _.has(self._map, key);
},
empty: function () {
var self = this;
return _.isEmpty(self._map);
},
clear: function () {
var self = this;
self._map = {};
},
// Iterates over the items in the map. Return `false` to break the loop.
forEach: function (iterator) {
var self = this;
// don't use _.each, because we can't break out of it.
var keys = _.keys(self._map);
for (var i = 0; i < keys.length; i++) {
var breakIfFalse = iterator.call(null, self._map[keys[i]],
self._idParse(keys[i]));
if (breakIfFalse === false)
return;
}
},
size: function () {
var self = this;
return _.size(self._map);
},
setDefault: function (id, def) {
var self = this;
var key = self._idStringify(id);
if (_.has(self._map, key))
return self._map[key];
self._map[key] = def;
return def;
},
// Assumes that values are EJSON-cloneable, and that we don't need to clone
// IDs (ie, that nobody is going to mutate an ObjectId).
clone: function () {
var self = this;
var clone = new IdMap(self._idStringify, self._idParse);
self.forEach(function (value, id) {
clone.set(id, EJSON.clone(value));
});
return clone;
}
});

View File

@@ -0,0 +1,11 @@
Package.describe({
summary: "Dictionary data structure allowing non-string keys",
internal: true
});
Package.on_use(function (api) {
api.export('IdMap');
api.use(['underscore', 'json', 'ejson']);
api.add_files([ 'id-map.js' ]);
});

View File

@@ -111,5 +111,26 @@ _.extend(Meteor, {
return fut.wait();
return result;
};
},
// Sets child's prototype to a new object whose prototype is parent's
// prototype. Used as:
// Meteor._inherit(ClassB, ClassA).
// _.extend(ClassB.prototype, { ... })
// Inspired by CoffeeScript's `extend` and Google Closure's `goog.inherits`.
_inherits: function (Child, Parent) {
// copy static fields
_.each(Parent, function (prop, field) {
Child[field] = prop;
});
// a middle member of prototype chain: takes the prototype from the Parent
var Middle = function () {
this.constructor = Child;
};
Middle.prototype = Parent.prototype;
Child.prototype = new Middle();
Child.__super__ = Parent.prototype;
return Child;
}
});

View File

@@ -1,74 +1,7 @@
LocalCollection._IdMap = function () {
var self = this;
self._map = {};
IdMap.call(self, LocalCollection._idStringify, LocalCollection._idParse);
};
// Some of these methods are designed to match methods on OrderedDict, since
// (eg) ObserveMultiplex and _CachingChangeObserver use them interchangeably.
// (Conceivably, this should be replaced with "UnorderedDict" with a specific
// set of methods that overlap between the two.)
Meteor._inherits(LocalCollection._IdMap, IdMap);
_.extend(LocalCollection._IdMap.prototype, {
get: function (id) {
var self = this;
var key = LocalCollection._idStringify(id);
return self._map[key];
},
set: function (id, value) {
var self = this;
var key = LocalCollection._idStringify(id);
self._map[key] = value;
},
remove: function (id) {
var self = this;
var key = LocalCollection._idStringify(id);
delete self._map[key];
},
has: function (id) {
var self = this;
var key = LocalCollection._idStringify(id);
return _.has(self._map, key);
},
empty: function () {
var self = this;
return _.isEmpty(self._map);
},
clear: function () {
var self = this;
self._map = {};
},
// Iterates over the items in the map. Return `false` to break the loop.
forEach: function (iterator) {
var self = this;
// don't use _.each, because we can't break out of it.
var keys = _.keys(self._map);
for (var i = 0; i < keys.length; i++) {
var breakIfFalse = iterator.call(null, self._map[keys[i]],
LocalCollection._idParse(keys[i]));
if (breakIfFalse === false)
return;
}
},
size: function () {
var self = this;
return _.size(self._map);
},
setDefault: function (id, def) {
var self = this;
var key = LocalCollection._idStringify(id);
if (_.has(self._map, key))
return self._map[key];
self._map[key] = def;
return def;
},
// Assumes that values are EJSON-cloneable, and that we don't need to clone
// IDs (ie, that nobody is going to mutate an ObjectId).
clone: function () {
var self = this;
var clone = new LocalCollection._IdMap;
self.forEach(function (value, id) {
clone.set(id, EJSON.clone(value));
});
return clone;
}
});

View File

@@ -332,6 +332,31 @@ Tinytest.add("minimongo - selector and projection combination", function (test)
});
Tinytest.add("minimongo - sorter and projection combination", function (test) {
function testSorterProjectionComb (sortSpec, proj, expected, desc) {
var sorter = new Minimongo.Sorter(sortSpec);
test.equal(sorter.combineIntoProjection(proj), expected, desc);
}
// Test with inclusive projection
testSorterProjectionComb({ a: 1, b: 1 }, { b: 1, c: 1, d: 1 }, { a: true, b: true, c: true, d: true }, "simplest incl");
testSorterProjectionComb({ a: 1, b: -1 }, { b: 1, c: 1, d: 1 }, { a: true, b: true, c: true, d: true }, "simplest incl");
testSorterProjectionComb({ 'a.c': 1 }, { b: 1 }, { 'a.c': true, b: true }, "dot path incl");
testSorterProjectionComb({ 'a.1.c': 1 }, { b: 1 }, { 'a.c': true, b: true }, "dot num path incl");
testSorterProjectionComb({ 'a.1.c': 1 }, { b: 1, a: 1 }, { a: true, b: true }, "dot num path incl overlap");
testSorterProjectionComb({ 'a.1.c': 1, 'a.2.b': -1 }, { b: 1 }, { 'a.c': true, 'a.b': true, b: true }, "dot num path incl");
testSorterProjectionComb({ 'a.1.c': 1, 'a.2.b': -1 }, {}, {}, "dot num path with empty incl");
// Test with exclusive projection
testSorterProjectionComb({ a: 1, b: 1 }, { b: 0, c: 0, d: 0 }, { c: false, d: false }, "simplest excl");
testSorterProjectionComb({ a: 1, b: -1 }, { b: 0, c: 0, d: 0 }, { c: false, d: false }, "simplest excl");
testSorterProjectionComb({ 'a.c': 1 }, { b: 0 }, { b: false }, "dot path excl");
testSorterProjectionComb({ 'a.1.c': 1 }, { b: 0 }, { b: false }, "dot num path excl");
testSorterProjectionComb({ 'a.1.c': 1 }, { b: 0, a: 0 }, { b: false }, "dot num path excl overlap");
testSorterProjectionComb({ 'a.1.c': 1, 'a.2.b': -1 }, { b: 0 }, { b: false }, "dot num path excl");
});
(function () {
// TODO: Tests for "can selector become true by modifier" are incomplete,
// absent or test the functionality of "not ideal" implementation (test checks

View File

@@ -1568,7 +1568,7 @@ Tinytest.add("minimongo - ordering", function (test) {
// document ordering under a sort specification
var verify = function (sorts, docs) {
_.each(_.isArray(sorts) ? sorts : [sorts], function (sort) {
var sorter = new MinimongoTest.Sorter(sort);
var sorter = new Minimongo.Sorter(sort);
assert_ordering(test, sorter.getComparator(), docs);
});
};
@@ -1596,15 +1596,15 @@ Tinytest.add("minimongo - ordering", function (test) {
[{c: 1}, {a: 1, b: 2}, {a: 1, b: 3}, {a: 2, b: 0}]);
test.throws(function () {
new MinimongoTest.Sorter("a");
new Minimongo.Sorter("a");
});
test.throws(function () {
new MinimongoTest.Sorter(123);
new Minimongo.Sorter(123);
});
// No sort spec implies everything equal.
test.equal(new MinimongoTest.Sorter({}).getComparator()({a:1}, {a:2}), 0);
test.equal(new Minimongo.Sorter({}).getComparator()({a:1}, {a:2}), 0);
// All sorts of array edge cases!
// Increasing sort sorts by the smallest element it finds; 1 < 2.

View File

@@ -7,7 +7,7 @@ Package.on_use(function (api) {
api.export('LocalCollection');
api.export('Minimongo');
api.export('MinimongoTest', { testOnly: true });
api.use(['underscore', 'json', 'ejson', 'ordered-dict', 'deps',
api.use(['underscore', 'json', 'ejson', 'id-map', 'ordered-dict', 'deps',
'random', 'ordered-dict']);
// This package is used for geo-location queries such as $near
api.use('geojson-utils');
@@ -28,7 +28,8 @@ Package.on_use(function (api) {
// Functionality used only by oplog tailing on the server side
api.add_files([
'selector_projection.js',
'selector_modifier.js'
'selector_modifier.js',
'sorter_projection.js'
], 'server');
});

View File

@@ -3,7 +3,7 @@
// @returns Object - projection object (same as fields option of mongo cursor)
Minimongo.Matcher.prototype.combineIntoProjection = function (projection) {
var self = this;
var selectorPaths = self._getPathsElidingNumericKeys();
var selectorPaths = Minimongo._pathsElidingNumericKeys(self._getPaths());
// Special case for $where operator in the selector - projection should depend
// on all fields of the document. getSelectorPaths returns a list of paths
@@ -12,12 +12,23 @@ Minimongo.Matcher.prototype.combineIntoProjection = function (projection) {
if (_.contains(selectorPaths, ''))
return {};
return combineImportantPathsIntoProjection(selectorPaths, projection);
};
Minimongo._pathsElidingNumericKeys = function (paths) {
var self = this;
return _.map(paths, function (path) {
return _.reject(path.split('.'), isNumericKey).join('.');
});
};
combineImportantPathsIntoProjection = function (paths, projection) {
var prjDetails = projectionDetails(projection);
var tree = prjDetails.tree;
var mergedProjection = {};
// merge the paths to include
tree = pathsToTree(selectorPaths,
tree = pathsToTree(paths,
function (path) { return true; },
function (node, path, fullPath) { return true; },
tree);
@@ -40,13 +51,6 @@ Minimongo.Matcher.prototype.combineIntoProjection = function (projection) {
}
};
Minimongo.Matcher.prototype._getPathsElidingNumericKeys = function () {
var self = this;
return _.map(self._getPaths(), function (path) {
return _.reject(path.split('.'), isNumericKey).join('.');
});
};
// Returns a set of key paths similar to
// { 'foo.bar': 1, 'a.b.c': 1 }
var treeToPaths = function (tree, prefix) {

View File

@@ -14,17 +14,19 @@
Sorter = function (spec) {
var self = this;
var sortSpecParts = [];
var sortSpecParts = self._sortSpecParts = [];
if (spec instanceof Array) {
for (var i = 0; i < spec.length; i++) {
if (typeof spec[i] === "string") {
sortSpecParts.push({
path: spec[i],
lookup: makeLookupFunction(spec[i]),
ascending: true
});
} else {
sortSpecParts.push({
path: spec[i][0],
lookup: makeLookupFunction(spec[i][0]),
ascending: spec[i][1] !== "desc"
});
@@ -33,12 +35,13 @@ Sorter = function (spec) {
} else if (typeof spec === "object") {
for (var key in spec) {
sortSpecParts.push({
path: key,
lookup: makeLookupFunction(key),
ascending: spec[key] >= 0
});
}
} else {
throw Error("Bad sort specification: ", JSON.stringify(spec));
throw Error("Bad sort specification: " + JSON.stringify(spec));
}
// reduceValue takes in all the possible values for the sort key along various
@@ -118,7 +121,12 @@ Sorter.prototype.getComparator = function (options) {
}]);
};
MinimongoTest.Sorter = Sorter;
Sorter.prototype._getPaths = function () {
var self = this;
return _.pluck(self._sortSpecParts, 'path');
};
Minimongo.Sorter = Sorter;
// Given an array of comparators
// (functions (a,b)->(negative or positive or zero)), returns a single

View File

@@ -0,0 +1,6 @@
Sorter.prototype.combineIntoProjection = function (projection) {
var self = this;
var specPaths = Minimongo._pathsElidingNumericKeys(self._getPaths());
return combineImportantPathsIntoProjection(specPaths, projection);
};

View File

@@ -745,6 +745,425 @@ if (Meteor.isServer) {
});
x++;
});
// compares arrays a and b w/o looking at order
var setsEqual = function (a, b) {
a = _.map(a, EJSON.stringify);
b = _.map(b, EJSON.stringify);
return _.isEmpty(_.difference(a, b)) && _.isEmpty(_.difference(b, a));
};
// This test mainly checks the correctness of oplog code dealing with limited
// queries. Compitablity with poll-diff is added as well.
Tinytest.addAsync("mongo-livedata - observe sorted, limited " + idGeneration, function (test, onComplete) {
var run = test.runId();
var coll = new Meteor.Collection("observeLimit-"+run, collectionOptions);
var observer = function () {
var state = {};
var output = [];
var callbacks = {
changed: function (newDoc) {
output.push({changed: newDoc._id});
state[newDoc._id] = newDoc;
},
added: function (newDoc) {
output.push({added: newDoc._id});
state[newDoc._id] = newDoc;
},
removed: function (oldDoc) {
output.push({removed: oldDoc._id});
delete state[oldDoc._id];
}
};
var handle = coll.find({foo: 22},
{sort: {bar: 1}, limit: 3}).observe(callbacks);
return {output: output, handle: handle, state: state};
};
var clearOutput = function (o) { o.output.splice(0, o.output.length); };
var ins = function (doc) {
var id; runInFence(function () { id = coll.insert(doc); });
return id;
};
var rem = function (sel) { runInFence(function () { coll.remove(sel); }); };
var upd = function (sel, mod, opt) {
runInFence(function () {
coll.update(sel, mod, opt);
});
};
// tests '_id' subfields for all documents in oplog buffer
var testOplogBufferIds = function (ids) {
var bufferIds = [];
o.handle._multiplexer._observeDriver._unpublishedBuffer.forEach(function (x, id) {
bufferIds.push(id);
});
test.isTrue(setsEqual(ids, bufferIds), "expected: " + ids + "; got: " + bufferIds);
};
var testSafeAppendToBufferFlag = function (expected) {
if (expected)
test.isTrue(o.handle._multiplexer._observeDriver._safeAppendToBuffer);
else
test.isFalse(o.handle._multiplexer._observeDriver._safeAppendToBuffer);
};
// Insert a doc and start observing.
var docId1 = ins({foo: 22, bar: 5});
var o = observer();
var usesOplog = o.handle._multiplexer._observeDriver._usesOplog;
// Initial add.
test.length(o.output, 1);
test.equal(o.output.shift(), {added: docId1});
// Insert another doc (blocking until observes have fired).
var docId2 = ins({foo: 22, bar: 6});
// Observed add.
test.length(o.output, 1);
test.equal(o.output.shift(), {added: docId2});
var docId3 = ins({ foo: 22, bar: 3 });
test.length(o.output, 1);
test.equal(o.output.shift(), {added: docId3});
// Add a non-matching document
ins({ foo: 13 });
// It shouldn't be added
test.length(o.output, 0);
// Add something that matches but is too big to fit in
var docId4 = ins({ foo: 22, bar: 7 });
// It shouldn't be added
test.length(o.output, 0);
// Let's add something small enough to fit in
var docId5 = ins({ foo: 22, bar: -1 });
// We should get an added and a removed events
test.length(o.output, 2);
// doc 2 was removed from the published set as it is too big to be in
test.isTrue(setsEqual(o.output, [{added: docId5}, {removed: docId2}]));
clearOutput(o);
// Now remove something and that doc 2 should be right back
rem(docId5);
test.length(o.output, 2);
test.isTrue(setsEqual(o.output, [{removed: docId5}, {added: docId2}]));
clearOutput(o);
usesOplog && testOplogBufferIds([docId4]);
usesOplog && testSafeAppendToBufferFlag(true);
// Current state is [3 5 6 | 7]
// Add some negative numbers overflowing the buffer.
// New documents will take the published place, [3 5 6] will take the buffer
// and 7 will be outside of the buffer in MongoDB.
var docId6 = ins({ foo: 22, bar: -1 });
var docId7 = ins({ foo: 22, bar: -2 });
var docId8 = ins({ foo: 22, bar: -3 });
test.length(o.output, 6);
var expected = [{added: docId6}, {removed: docId2},
{added: docId7}, {removed: docId1},
{added: docId8}, {removed: docId3}];
test.isTrue(setsEqual(o.output, expected));
clearOutput(o);
usesOplog && testOplogBufferIds([docId1, docId2, docId3]);
usesOplog && testSafeAppendToBufferFlag(false);
// Now the state is [-3 -2 -1 | 3 5 6] 7
// If we update first 3 docs (increment them by 20), it would be
// interesting.
upd({ bar: { $lt: 0 }}, { $inc: { bar: 20 } }, { multi: true });
// The updated documents can't find their place in published and they can't
// be buffered as we are not aware of the situation outside of the buffer.
// But since our buffer becomes empty, it will be refilled partially with
// updated documents.
test.length(o.output, 6);
var expectedRemoves = [{removed: docId6},
{removed: docId7},
{removed: docId8}];
var expectedAdds = [{added: docId3},
{added: docId1},
{added: docId2}];
test.isTrue(setsEqual(o.output, expectedAdds.concat(expectedRemoves)));
clearOutput(o);
usesOplog && testOplogBufferIds([docId4, docId7, docId8]);
usesOplog && testSafeAppendToBufferFlag(false);
// The new arrangement is [3 5 6 | 7 17 18] 19
// By ids: [docId3, docId1, docId2] docId4] docId6 docId7 docId8
// Remove first 4 docs (3, 1, 2, 4) forcing buffer to become empty and
// schedule a repoll.
rem({ bar: { $lt: 10 } });
// XXX the oplog code analyzes the events one by one: one remove after
// another. Poll-n-diff code, on the other side, analyzes the batch action
// of multiple remove. Because of that difference, expected outputs differ.
if (usesOplog) {
var expectedRemoves = [{removed: docId3}, {removed: docId1},
{removed: docId2}, {removed: docId4}];
var expectedAdds = [{added: docId4}, {added: docId8},
{added: docId7}, {added: docId6}];
test.length(o.output, 8);
} else {
var expectedRemoves = [{removed: docId3}, {removed: docId1},
{removed: docId2}];
var expectedAdds = [{added: docId8}, {added: docId7}, {added: docId6}];
test.length(o.output, 6);
}
test.isTrue(setsEqual(o.output, expectedAdds.concat(expectedRemoves)));
clearOutput(o);
usesOplog && testOplogBufferIds([]);
usesOplog && testSafeAppendToBufferFlag(true);
// The new arrangement is [17 18 19] or [docId6 docId7 docId8]
var docId9 = ins({ foo: 22, bar: 21 });
var docId10 = ins({ foo: 22, bar: 31 });
var docId11 = ins({ foo: 22, bar: 41 });
var docId12 = ins({ foo: 22, bar: 51 });
// Becomes [17 18 19 | 21 31 41] 51
usesOplog && testOplogBufferIds([docId9, docId10, docId11]);
usesOplog && testSafeAppendToBufferFlag(false);
test.length(o.output, 0);
upd({ bar: { $lt: 20 } }, { $inc: { bar: 5 } }, { multi: true });
// Becomes [21 22 23 | 24 31 41] 51
test.length(o.output, 4);
test.isTrue(setsEqual(o.output, [{removed: docId6},
{added: docId9},
{changed: docId7},
{changed: docId8}]));
clearOutput(o);
usesOplog && testOplogBufferIds([docId6, docId10, docId11]);
usesOplog && testSafeAppendToBufferFlag(false);
rem(docId9);
// Becomes [22 23 24 | 31 41] 51
test.length(o.output, 2);
test.isTrue(setsEqual(o.output, [{removed: docId9}, {added: docId6}]));
clearOutput(o);
usesOplog && testOplogBufferIds([docId10, docId11]);
usesOplog && testSafeAppendToBufferFlag(false);
upd({ bar: { $gt: 25 } }, { $inc: { bar: -7.5 } }, { multi: true });
// Becomes [22 23 23.5 | 24] 33.5 43.5 - 33.5 doesn't update in-place in
// buffer, because it the driver is not sure it can do it and there is no a
// different doc which is less than 33.5.
test.length(o.output, 2);
test.isTrue(setsEqual(o.output, [{removed: docId6}, {added: docId10}]));
clearOutput(o);
usesOplog && testOplogBufferIds([docId6]);
usesOplog && testSafeAppendToBufferFlag(false);
// Force buffer objects to be moved into published set so we can check them
rem(docId7);
rem(docId8);
rem(docId10);
// Becomes [24 33.5 43.5]
test.length(o.output, 6);
test.isTrue(setsEqual(o.output, [{removed: docId7}, {removed: docId8},
{removed: docId10}, {added: docId6},
{added: docId11}, {added: docId12}]));
test.length(_.keys(o.state), 3);
test.equal(o.state[docId6], { _id: docId6, foo: 22, bar: 24 });
test.equal(o.state[docId11], { _id: docId11, foo: 22, bar: 33.5 });
test.equal(o.state[docId12], { _id: docId12, foo: 22, bar: 43.5 });
clearOutput(o);
usesOplog && testOplogBufferIds([]);
usesOplog && testSafeAppendToBufferFlag(true);
o.handle.stop();
onComplete();
});
Tinytest.addAsync("mongo-livedata - observe sorted, limited, sort fields " + idGeneration, function (test, onComplete) {
var run = test.runId();
var coll = new Meteor.Collection("observeLimit-"+run, collectionOptions);
var observer = function () {
var state = {};
var output = [];
var callbacks = {
changed: function (newDoc) {
output.push({changed: newDoc._id});
state[newDoc._id] = newDoc;
},
added: function (newDoc) {
output.push({added: newDoc._id});
state[newDoc._id] = newDoc;
},
removed: function (oldDoc) {
output.push({removed: oldDoc._id});
delete state[oldDoc._id];
}
};
var handle = coll.find({}, {sort: {x: 1},
limit: 2,
fields: {y: 1}}).observe(callbacks);
return {output: output, handle: handle, state: state};
};
var clearOutput = function (o) { o.output.splice(0, o.output.length); };
var ins = function (doc) {
var id; runInFence(function () { id = coll.insert(doc); });
return id;
};
var rem = function (id) {
runInFence(function () { coll.remove(id); });
};
var o = observer();
var docId1 = ins({ x: 1, y: 1222 });
var docId2 = ins({ x: 5, y: 5222 });
test.length(o.output, 2);
test.equal(o.output, [{added: docId1}, {added: docId2}]);
clearOutput(o);
var docId3 = ins({ x: 7, y: 7222 });
test.length(o.output, 0);
var docId4 = ins({ x: -1, y: -1222 });
// Becomes [docId4 docId1 | docId2 docId3]
test.length(o.output, 2);
test.isTrue(setsEqual(o.output, [{added: docId4}, {removed: docId2}]));
test.equal(_.size(o.state), 2);
test.equal(o.state[docId4], {_id: docId4, y: -1222});
test.equal(o.state[docId1], {_id: docId1, y: 1222});
clearOutput(o);
rem(docId2);
// Becomes [docId4 docId1 | docId3]
test.length(o.output, 0);
rem(docId4);
// Becomes [docId1 docId3]
test.length(o.output, 2);
test.isTrue(setsEqual(o.output, [{added: docId3}, {removed: docId4}]));
test.equal(_.size(o.state), 2);
test.equal(o.state[docId3], {_id: docId3, y: 7222});
test.equal(o.state[docId1], {_id: docId1, y: 1222});
clearOutput(o);
onComplete();
});
Tinytest.addAsync("mongo-livedata - observe sorted, limited, big initial set" + idGeneration, function (test, onComplete) {
var run = test.runId();
var coll = new Meteor.Collection("observeLimit-"+run, collectionOptions);
var observer = function () {
var state = {};
var output = [];
var callbacks = {
changed: function (newDoc) {
output.push({changed: newDoc._id});
state[newDoc._id] = newDoc;
},
added: function (newDoc) {
output.push({added: newDoc._id});
state[newDoc._id] = newDoc;
},
removed: function (oldDoc) {
output.push({removed: oldDoc._id});
delete state[oldDoc._id];
}
};
var handle = coll.find({}, {sort: {x: 1, y: 1}, limit: 3})
.observe(callbacks);
return {output: output, handle: handle, state: state};
};
var clearOutput = function (o) { o.output.splice(0, o.output.length); };
var ins = function (doc) {
var id; runInFence(function () { id = coll.insert(doc); });
return id;
};
var rem = function (id) {
runInFence(function () { coll.remove(id); });
};
// tests '_id' subfields for all documents in oplog buffer
var testOplogBufferIds = function (ids) {
var bufferIds = [];
o.handle._multiplexer._observeDriver._unpublishedBuffer.forEach(function (x, id) {
bufferIds.push(id);
});
test.isTrue(setsEqual(ids, bufferIds), "expected: " + ids + "; got: " + bufferIds);
};
var testSafeAppendToBufferFlag = function (expected) {
if (expected)
test.isTrue(o.handle._multiplexer._observeDriver._safeAppendToBuffer);
else
test.isFalse(o.handle._multiplexer._observeDriver._safeAppendToBuffer);
};
var ids = {};
_.each([2, 4, 1, 3, 5, 5, 9, 1, 3, 2, 5], function (x, i) {
ids[i] = ins({ x: x, y: i });
});
var o = observer();
var usesOplog = o.handle._multiplexer._observeDriver._usesOplog;
// x: [1 1 2 | 2 3 3] 4 5 5 5 9
// id: [2 7 0 | 9 3 8] 1 4 5 10 6
test.length(o.output, 3);
test.isTrue(setsEqual([{added: ids[2]}, {added: ids[7]}, {added: ids[0]}], o.output));
usesOplog && testOplogBufferIds([ids[9], ids[3], ids[8]]);
usesOplog && testSafeAppendToBufferFlag(false);
clearOutput(o);
rem(ids[0]);
// x: [1 1 2 | 3 3] 4 5 5 5 9
// id: [2 7 9 | 3 8] 1 4 5 10 6
test.length(o.output, 2);
test.isTrue(setsEqual([{removed: ids[0]}, {added: ids[9]}], o.output));
usesOplog && testOplogBufferIds([ids[3], ids[8]]);
usesOplog && testSafeAppendToBufferFlag(false);
clearOutput(o);
rem(ids[7]);
// x: [1 2 3 | 3] 4 5 5 5 9
// id: [2 9 3 | 8] 1 4 5 10 6
test.length(o.output, 2);
test.isTrue(setsEqual([{removed: ids[7]}, {added: ids[3]}], o.output));
usesOplog && testOplogBufferIds([ids[8]]);
usesOplog && testSafeAppendToBufferFlag(false);
clearOutput(o);
rem(ids[3]);
// x: [1 2 3 | 4 5 5] 5 9
// id: [2 9 8 | 1 4 5] 10 6
test.length(o.output, 2);
test.isTrue(setsEqual([{removed: ids[3]}, {added: ids[8]}], o.output));
usesOplog && testOplogBufferIds([ids[1], ids[4], ids[5]]);
usesOplog && testSafeAppendToBufferFlag(false);
clearOutput(o);
rem({ x: {$lt: 4} });
// x: [4 5 5 | 5 9]
// id: [1 4 5 | 10 6]
test.length(o.output, 6);
test.isTrue(setsEqual([{removed: ids[2]}, {removed: ids[9]}, {removed: ids[8]},
{added: ids[5]}, {added: ids[4]}, {added: ids[1]}], o.output));
usesOplog && testOplogBufferIds([ids[10], ids[6]]);
usesOplog && testSafeAppendToBufferFlag(true);
clearOutput(o);
onComplete();
});
}

View File

@@ -7,6 +7,20 @@ var PHASE = {
STEADY: "STEADY"
};
// Exception thrown by _needToPollQuery which unrolls the stack up to the
// enclosing call to finishIfNeedToPollQuery.
var SwitchedToQuery = function () {};
var finishIfNeedToPollQuery = function (f) {
return function () {
try {
f.apply(this, arguments);
} catch (e) {
if (!(e instanceof SwitchedToQuery))
throw e;
}
};
};
// OplogObserveDriver is an alternative to PollingObserveDriver which follows
// the Mongo operation log instead of just re-polling the query. It obeys the
// same simple interface: constructing it starts sending observeChanges
@@ -19,8 +33,44 @@ OplogObserveDriver = function (options) {
self._cursorDescription = options.cursorDescription;
self._mongoHandle = options.mongoHandle;
self._multiplexer = options.multiplexer;
if (options.ordered)
if (options.ordered) {
throw Error("OplogObserveDriver only supports unordered observeChanges");
}
var sortSpec = options.cursorDescription.options.sort;
var sorter = sortSpec && new Minimongo.Sorter(sortSpec);
// We don't support $near and other geo-queries so it's OK to initialize the
// comparator only once in the constructor.
var comparator = sorter && sorter.getComparator();
if (options.cursorDescription.options.limit) {
// There are several properties ordered driver implements:
// - _limit is a positive number
// - _comparator is a function-comparator by which the query is ordered
// - _unpublishedBuffer is non-null Min/Max Heap,
// the empty buffer in STEADY phase implies that the
// everything that matches the queries selector fits
// into published set.
// - _published - Min Heap (also implements IdMap methods)
var heapOptions = { IdMap: LocalCollection._IdMap };
self._limit = self._cursorDescription.options.limit;
self._comparator = comparator;
self._unpublishedBuffer = new MinMaxHeap(comparator, heapOptions);
// We need something that can find Max value in addition to IdMap interface
self._published = new MaxHeap(comparator, heapOptions);
} else {
self._limit = 0;
self._comparator = null;
self._unpublishedBuffer = null;
self._published = new LocalCollection._IdMap;
}
// Indicates if it is safe to insert a new document at the end of the buffer
// for this query. i.e. it is known that there are no documents matching the
// selector those are not in published or buffer.
self._safeAppendToBuffer = false;
self._stopped = false;
self._stopHandles = [];
@@ -30,7 +80,6 @@ OplogObserveDriver = function (options) {
self._registerPhaseChange(PHASE.QUERYING);
self._published = new LocalCollection._IdMap;
var selector = self._cursorDescription.selector;
self._matcher = options.matcher;
var projection = self._cursorDescription.options.fields || {};
@@ -38,6 +87,8 @@ OplogObserveDriver = function (options) {
// Projection function, result of combining important fields for selector and
// existing fields projection
self._sharedProjection = self._matcher.combineIntoProjection(projection);
if (sorter)
self._sharedProjection = sorter.combineIntoProjection(self._sharedProjection);
self._sharedProjectionFn = LocalCollection._compileProjection(
self._sharedProjection);
@@ -51,7 +102,7 @@ OplogObserveDriver = function (options) {
forEachTrigger(self._cursorDescription, function (trigger) {
self._stopHandles.push(self._mongoHandle._oplogHandle.onOplogEntry(
trigger, function (notification) {
Meteor._noYieldsAllowed(function () {
Meteor._noYieldsAllowed(finishIfNeedToPollQuery(function () {
var op = notification.op;
if (notification.dropCollection) {
// Note: this call is not allowed to block on anything (especially
@@ -65,7 +116,7 @@ OplogObserveDriver = function (options) {
else
self._handleOplogEntrySteadyOrFetching(op);
}
});
}));
}
));
});
@@ -101,55 +152,261 @@ OplogObserveDriver = function (options) {
// Give _observeChanges a chance to add the new ObserveHandle to our
// multiplexer, so that the added calls get streamed.
Meteor.defer(function () {
Meteor.defer(finishIfNeedToPollQuery(function () {
self._runInitialQuery();
});
}));
};
_.extend(OplogObserveDriver.prototype, {
_add: function (doc) {
_addPublished: function (id, doc) {
var self = this;
var id = doc._id;
var fields = _.clone(doc);
delete fields._id;
if (self._published.has(id))
throw Error("tried to add something already published " + id);
self._published.set(id, self._sharedProjectionFn(fields));
self._published.set(id, self._sharedProjectionFn(doc));
self._multiplexer.added(id, self._projectionFn(fields));
// After adding this document, the published set might be overflowed
// (exceeding capacity specified by limit). If so, push the maximum element
// to the buffer, we might want to save it in memory to reduce the amount of
// Mongo lookups in the future.
if (self._limit && self._published.size() > self._limit) {
// XXX in theory the size of published is no more than limit+1
if (self._published.size() !== self._limit + 1) {
throw new Error("After adding to published, " +
(self._published.size() - self._limit) +
" documents are overflowing the set");
}
var overflowingDocId = self._published.maxElementId();
var overflowingDoc = self._published.get(overflowingDocId);
if (EJSON.equals(overflowingDocId, id)) {
throw new Error("The document just added is overflowing the published set");
}
self._published.remove(overflowingDocId);
self._multiplexer.removed(overflowingDocId);
self._addBuffered(overflowingDocId, overflowingDoc);
}
},
_remove: function (id) {
_removePublished: function (id) {
var self = this;
if (!self._published.has(id))
throw Error("tried to remove something unpublished " + id);
self._published.remove(id);
self._multiplexer.removed(id);
},
_handleDoc: function (id, newDoc, mustMatchNow) {
var self = this;
newDoc = _.clone(newDoc);
if (! self._limit || self._published.size() === self._limit)
return;
var matchesNow = newDoc && self._matcher.documentMatches(newDoc).result;
if (mustMatchNow && !matchesNow) {
throw Error("expected " + EJSON.stringify(newDoc) + " to match "
+ EJSON.stringify(self._cursorDescription));
if (self._published.size() > self._limit)
throw Error("self._published got too big");
// OK, we are publishing less than the limit. Maybe we should look in the
// buffer to find the next element past what we were publishing before.
if (!self._unpublishedBuffer.empty()) {
// There's something in the buffer; move the first thing in it to
// _published.
var newDocId = self._unpublishedBuffer.minElementId();
var newDoc = self._unpublishedBuffer.get(newDocId);
self._removeBuffered(newDocId);
self._addPublished(newDocId, newDoc);
return;
}
var matchedBefore = self._published.has(id);
// There's nothing in the buffer. This could mean one of a few things.
if (matchesNow && !matchedBefore) {
self._add(newDoc);
} else if (matchedBefore && !matchesNow) {
self._remove(id);
} else if (matchesNow) {
// (a) We could be in the middle of re-running the query (specifically, we
// could be in _publishNewResults). In that case, _unpublishedBuffer is
// empty because we clear it at the beginning of _publishNewResults. In this
// case, our caller already knows the entire answer to the query and we
// don't need to do anything fancy here. Just return.
if (self._phase === PHASE.QUERYING)
return;
// (b) We're pretty confident that the union of _published and
// _unpublishedBuffer contain all documents that match selector. Because
// _unpublishedBuffer is empty, that means we're confident that _published
// contains all documents that match selector. So we have nothing to do.
if (self._safeAppendToBuffer)
return;
// (c) Maybe there are other documents out there that should be in our
// buffer. But in that case, when we emptied _unpublishedBuffer in
// _removeBuffered, we should have called _needToPollQuery, which will
// either put something in _unpublishedBuffer or set _safeAppendToBuffer (or
// both), and it will put us in QUERYING for that whole time. So in fact, we
// shouldn't be able to get here.
throw new Error("Buffer inexplicably empty");
},
_changePublished: function (id, oldDoc, newDoc) {
var self = this;
self._published.set(id, self._sharedProjectionFn(newDoc));
var changed = LocalCollection._makeChangedFields(_.clone(newDoc), oldDoc);
changed = self._projectionFn(changed);
if (!_.isEmpty(changed))
self._multiplexer.changed(id, changed);
},
_addBuffered: function (id, doc) {
var self = this;
self._unpublishedBuffer.set(id, self._sharedProjectionFn(doc));
// If something is overflowing the buffer, we just remove it from cache
if (self._unpublishedBuffer.size() > self._limit) {
var maxBufferedId = self._unpublishedBuffer.maxElementId();
self._unpublishedBuffer.remove(maxBufferedId);
// Since something matching is removed from cache (both published set and
// buffer), set flag to false
self._safeAppendToBuffer = false;
}
},
// Is called either to remove the doc completely from matching set or to move
// it to the published set later.
_removeBuffered: function (id) {
var self = this;
self._unpublishedBuffer.remove(id);
// To keep the contract "buffer is never empty in STEADY phase unless the
// everything matching fits into published" true, we poll everything as soon
// as we see the buffer becoming empty.
if (! self._unpublishedBuffer.size() && ! self._safeAppendToBuffer)
self._needToPollQuery();
},
// Called when a document has joined the "Matching" results set.
// Takes responsibility of keeping _unpublishedBuffer in sync with _published
// and the effect of limit enforced.
_addMatching: function (doc) {
var self = this;
var id = doc._id;
if (self._published.has(id))
throw Error("tried to add something already published " + id);
if (self._limit && self._unpublishedBuffer.has(id))
throw Error("tried to add something already existed in buffer " + id);
var limit = self._limit;
var comparator = self._comparator;
var maxPublished = (limit && self._published.size() > 0) ?
self._published.get(self._published.maxElementId()) : null;
var maxBuffered = (limit && self._unpublishedBuffer.size() > 0) ?
self._unpublishedBuffer.get(self._unpublishedBuffer.maxElementId()) : null;
// The query is unlimited or didn't publish enough documents yet or the new
// document would fit into published set pushing the maximum element out,
// then we need to publish the doc.
var toPublish = ! limit || self._published.size() < limit ||
comparator(doc, maxPublished) < 0;
// Otherwise we might need to buffer it (only in case of limited query).
// Buffering is allowed if the buffer is not filled up yet and all matching
// docs are either in the published set or in the buffer.
var canAppendToBuffer = !toPublish && self._safeAppendToBuffer &&
self._unpublishedBuffer.size() < limit;
// Or if it is small enough to be safely inserted to the middle or the
// beginning of the buffer.
var canInsertIntoBuffer = !toPublish && maxBuffered &&
comparator(doc, maxBuffered) <= 0;
var toBuffer = canAppendToBuffer || canInsertIntoBuffer;
if (toPublish) {
self._addPublished(id, doc);
} else if (toBuffer) {
self._addBuffered(id, doc);
} else {
// dropping it and not saving to the cache
self._safeAppendToBuffer = false;
}
},
// Called when a document leaves the "Matching" results set.
// Takes responsibility of keeping _unpublishedBuffer in sync with _published
// and the effect of limit enforced.
_removeMatching: function (id) {
var self = this;
if (! self._published.has(id) && ! self._limit)
throw Error("tried to remove something matching but not cached " + id);
if (self._published.has(id)) {
self._removePublished(id);
} else if (self._unpublishedBuffer.has(id)) {
self._removeBuffered(id);
}
},
_handleDoc: function (id, newDoc) {
var self = this;
var matchesNow = newDoc && self._matcher.documentMatches(newDoc).result;
var publishedBefore = self._published.has(id);
var bufferedBefore = self._limit && self._unpublishedBuffer.has(id);
var cachedBefore = publishedBefore || bufferedBefore;
if (matchesNow && !cachedBefore) {
self._addMatching(newDoc);
} else if (cachedBefore && !matchesNow) {
self._removeMatching(id);
} else if (cachedBefore && matchesNow) {
var oldDoc = self._published.get(id);
if (!oldDoc)
throw Error("thought that " + id + " was there!");
delete newDoc._id;
self._published.set(id, self._sharedProjectionFn(newDoc));
var changed = LocalCollection._makeChangedFields(_.clone(newDoc), oldDoc);
changed = self._projectionFn(changed);
if (!_.isEmpty(changed))
self._multiplexer.changed(id, changed);
var comparator = self._comparator;
var minBuffered = self._limit && self._unpublishedBuffer.size() &&
self._unpublishedBuffer.get(self._unpublishedBuffer.minElementId());
if (publishedBefore) {
// Unlimited case where the document stays in published once it matches
// or the case when we don't have enough matching docs to publish or the
// changed but matching doc will stay in published anyways.
// XXX: We rely on the emptiness of buffer. Be sure to maintain the fact
// that buffer can't be empty if there are matching documents not
// published. Notably, we don't want to schedule repoll and continue
// relying on this property.
var staysInPublished = ! self._limit ||
self._unpublishedBuffer.size() === 0 ||
comparator(newDoc, minBuffered) <= 0;
if (staysInPublished) {
self._changePublished(id, oldDoc, newDoc);
} else {
// after the change doc doesn't stay in the published, remove it
self._removePublished(id);
// but it can move into buffered now, check it
var maxBuffered = self._unpublishedBuffer.get(self._unpublishedBuffer.maxElementId());
var toBuffer = self._safeAppendToBuffer ||
(maxBuffered && comparator(newDoc, maxBuffered) <= 0);
if (toBuffer) {
self._addBuffered(id, newDoc);
} else {
// Throw away from both published set and buffer
self._safeAppendToBuffer = false;
}
}
} else if (bufferedBefore) {
oldDoc = self._unpublishedBuffer.get(id);
// remove the old version manually so we don't trigger the querying
// immediately
self._unpublishedBuffer.remove(id);
var maxPublished = self._published.get(self._published.maxElementId());
var maxBuffered = self._unpublishedBuffer.size() && self._unpublishedBuffer.get(self._unpublishedBuffer.maxElementId());
// the buffered doc was updated, it could move to published
var toPublish = comparator(newDoc, maxPublished) < 0;
// or stays in buffer even after the change
var staysInBuffer = (! toPublish && self._safeAppendToBuffer) ||
(!toPublish && maxBuffered && comparator(newDoc, maxBuffered) <= 0);
if (toPublish) {
self._addPublished(id, newDoc);
} else if (staysInBuffer) {
// stays in buffer but changes
self._unpublishedBuffer.set(id, newDoc);
} else {
// Throw away from both published set and buffer
self._safeAppendToBuffer = false;
}
} else {
throw new Error("cachedBefore implies either of publishedBefore or bufferedBefore is true.");
}
}
},
_fetchModifiedDocuments: function () {
@@ -157,7 +414,7 @@ _.extend(OplogObserveDriver.prototype, {
self._registerPhaseChange(PHASE.FETCHING);
// Defer, because nothing called from the oplog entry handler may yield, but
// fetch() yields.
Meteor.defer(function () {
Meteor.defer(finishIfNeedToPollQuery(function () {
while (!self._stopped && !self._needToFetch.empty()) {
if (self._phase !== PHASE.FETCHING)
throw new Error("phase in fetchModifiedDocuments: " + self._phase);
@@ -174,36 +431,40 @@ _.extend(OplogObserveDriver.prototype, {
waiting++;
self._mongoHandle._docFetcher.fetch(
self._cursorDescription.collectionName, id, cacheKey,
function (err, doc) {
if (err) {
if (!anyError)
anyError = err;
} else if (!self._stopped && self._phase === PHASE.FETCHING
&& self._fetchGeneration === thisGeneration) {
// We re-check the generation in case we've had an explicit
// _pollQuery call which should effectively cancel this round of
// fetches. (_pollQuery increments the generation.)
self._handleDoc(id, doc);
finishIfNeedToPollQuery(function (err, doc) {
try {
if (err) {
if (!anyError)
anyError = err;
} else if (!self._stopped && self._phase === PHASE.FETCHING
&& self._fetchGeneration === thisGeneration) {
// We re-check the generation in case we've had an explicit
// _pollQuery call (eg, in another fiber) which should
// effectively cancel this round of fetches. (_pollQuery
// increments the generation.)
self._handleDoc(id, doc);
}
} finally {
waiting--;
// Because fetch() never calls its callback synchronously, this
// is safe (ie, we won't call fut.return() before the forEach is
// done).
if (waiting === 0)
fut.return();
}
waiting--;
// Because fetch() never calls its callback synchronously, this is
// safe (ie, we won't call fut.return() before the forEach is
// done).
if (waiting === 0)
fut.return();
});
}));
});
fut.wait();
// XXX do this even if we've switched to PHASE.QUERYING?
if (anyError)
throw anyError;
// Exit now if we've had a _pollQuery call.
// Exit now if we've had a _pollQuery call (here or in another fiber).
if (self._phase === PHASE.QUERYING)
return;
self._currentlyFetching = null;
}
self._beSteady();
});
}));
},
_beSteady: function () {
var self = this;
@@ -233,16 +494,18 @@ _.extend(OplogObserveDriver.prototype, {
}
if (op.op === 'd') {
if (self._published.has(id))
self._remove(id);
if (self._published.has(id) || (self._limit && self._unpublishedBuffer.has(id)))
self._removeMatching(id);
} else if (op.op === 'i') {
if (self._published.has(id))
throw new Error("insert found for already-existing ID");
throw new Error("insert found for already-existing ID in published");
if (self._unpublishedBuffer && self._unpublishedBuffer.has(id))
throw new Error("insert found for already-existing ID in buffer");
// XXX what if selector yields? for now it can't but later it could have
// $where
if (self._matcher.documentMatches(op.o).result)
self._add(op.o);
self._addMatching(op.o);
} else if (op.op === 'u') {
// Is this a modifier ($set/$unset, which may require us to poll the
// database to figure out if the whole document matches the selector) or a
@@ -256,12 +519,19 @@ _.extend(OplogObserveDriver.prototype, {
var canDirectlyModifyDoc =
!isReplace && modifierCanBeDirectlyApplied(op.o);
var publishedBefore = self._published.has(id);
var bufferedBefore = self._limit && self._unpublishedBuffer.has(id);
if (isReplace) {
self._handleDoc(id, _.extend({_id: id}, op.o));
} else if (self._published.has(id) && canDirectlyModifyDoc) {
} else if ((publishedBefore || bufferedBefore) && canDirectlyModifyDoc) {
// Oh great, we actually know what the document is, so we can apply
// this directly.
var newDoc = EJSON.clone(self._published.get(id));
var newDoc = self._published.has(id) ?
self._published.get(id) :
self._unpublishedBuffer.get(id);
newDoc = EJSON.clone(newDoc);
newDoc._id = id;
LocalCollection._modify(newDoc, op.o);
self._handleDoc(id, self._sharedProjectionFn(newDoc));
@@ -280,10 +550,8 @@ _.extend(OplogObserveDriver.prototype, {
if (self._stopped)
throw new Error("oplog stopped surprisingly early");
var initialCursor = self._cursorForQuery();
initialCursor.forEach(function (initialDoc) {
self._add(initialDoc);
});
self._runQuery();
if (self._stopped)
throw new Error("oplog stopped quite early");
// Allow observeChanges calls to return. (After this, it's possible for
@@ -319,27 +587,48 @@ _.extend(OplogObserveDriver.prototype, {
++self._fetchGeneration; // ignore any in-flight fetches
self._registerPhaseChange(PHASE.QUERYING);
// Defer so that we don't block.
// Defer so that we don't block. We don't need finishIfNeedToPollQuery here
// because SwitchedToQuery is not called in QUERYING mode.
Meteor.defer(function () {
// subtle note: _published does not contain _id fields, but newResults
// does
var newResults = new LocalCollection._IdMap;
var cursor = self._cursorForQuery();
cursor.forEach(function (doc) {
newResults.set(doc._id, doc);
});
self._publishNewResults(newResults);
self._runQuery();
self._doneQuerying();
});
},
_runQuery: function () {
var self = this;
var newResults = new LocalCollection._IdMap;
var newBuffer = new LocalCollection._IdMap;
// Query 2x documents as the half excluded from the original query will go
// into unpublished buffer to reduce additional Mongo lookups in cases when
// documents are removed from the published set and need a replacement.
// XXX needs more thought on non-zero skip
// XXX 2 is a "magic number" meaning there is an extra chunk of docs for
// buffer if such is needed.
var cursor = self._cursorForQuery({ limit: self._limit * 2 });
cursor.forEach(function (doc, i) {
if (!self._limit || i < self._limit)
newResults.set(doc._id, doc);
else
newBuffer.set(doc._id, doc);
});
self._publishNewResults(newResults, newBuffer);
},
// Transitions to QUERYING and runs another query, or (if already in QUERYING)
// ensures that we will query again later.
//
// This function may not block, because it is called from an oplog entry
// handler.
// handler. However, if we were not already in the QUERYING phase, it throws
// an exception that is caught by the closest surrounding
// finishIfNeedToPollQuery call; this ensures that we don't continue running
// close that was designed for another phase inside PHASE.QUERYING.
//
// (It's also necessary whenever logic in this file yields to check that other
// phases haven't put us into QUERYING mode, though; eg,
// _fetchModifiedDocuments does this.)
_needToPollQuery: function () {
var self = this;
if (self._stopped)
@@ -349,7 +638,7 @@ _.extend(OplogObserveDriver.prototype, {
// pausing FETCHING).
if (self._phase !== PHASE.QUERYING) {
self._pollQuery();
return;
throw new SwitchedToQuery;
}
// We're currently in QUERYING. Set a flag to ensure that we run another
@@ -379,7 +668,7 @@ _.extend(OplogObserveDriver.prototype, {
}
},
_cursorForQuery: function () {
_cursorForQuery: function (optionsOverwrite) {
var self = this;
// The query we run is almost the same as the cursor we are observing, with
@@ -388,6 +677,11 @@ _.extend(OplogObserveDriver.prototype, {
// "shared" projection). And we don't want to apply any transform in the
// cursor, because observeChanges shouldn't use the transform.
var options = _.clone(self._cursorDescription.options);
// Allow the caller to modify the options. Useful to specify different skip
// and limit values.
_.extend(options, optionsOverwrite);
options.fields = self._sharedProjection;
delete options.transform;
// We are NOT deep cloning fields or selector here, which should be OK.
@@ -401,13 +695,20 @@ _.extend(OplogObserveDriver.prototype, {
// Replace self._published with newResults (both are IdMaps), invoking observe
// callbacks on the multiplexer.
// Replace self._unpublishedBuffer with newBuffer.
//
// XXX This is very similar to LocalCollection._diffQueryUnorderedChanges. We
// should really: (a) Unify IdMap and OrderedDict into Unordered/OrderedDict (b)
// Rewrite diff.js to use these classes instead of arrays and objects.
_publishNewResults: function (newResults) {
_publishNewResults: function (newResults, newBuffer) {
var self = this;
// If the query is limited and there is a buffer, shut down so it doesn't
// stay in a way.
if (self._limit) {
self._unpublishedBuffer.clear();
}
// First remove anything that's gone. Be careful not to modify
// self._published while iterating over it.
var idsToRemove = [];
@@ -416,15 +717,33 @@ _.extend(OplogObserveDriver.prototype, {
idsToRemove.push(id);
});
_.each(idsToRemove, function (id) {
self._remove(id);
self._removePublished(id);
});
// Now do adds and changes.
// If self has a buffer and limit, the new fetched result will be
// limited correctly as the query has sort specifier.
newResults.forEach(function (doc, id) {
// "true" here means to throw if we think this doc doesn't match the
// selector.
self._handleDoc(id, doc, true);
self._handleDoc(id, doc);
});
// Sanity-check that everything we tried to put into _published ended up
// there.
// XXX if this is slow, remove it later
if (self._published.size() !== newResults.size()) {
throw Error("failed to copy newResults into _published!");
}
self._published.forEach(function (doc, id) {
if (!newResults.has(id))
throw Error("_published has a doc that newResults doesn't; " + id);
});
// Finally, replace the buffer
newBuffer.forEach(function (doc, id) {
self._addBuffered(id, doc);
});
self._safeAppendToBuffer = newBuffer.size() < self._limit;
},
// This stop function is invoked from the onStop of the ObserveMultiplexer, so
@@ -451,6 +770,7 @@ _.extend(OplogObserveDriver.prototype, {
// Proactively drop references to potentially big things.
self._published = null;
self._unpublishedBuffer = null;
self._needToFetch = null;
self._currentlyFetching = null;
self._oplogEntryHandle = null;
@@ -486,10 +806,11 @@ OplogObserveDriver.cursorSupported = function (cursorDescription, matcher) {
if (options._disableOplog)
return false;
// This option (which are mostly used for sorted cursors) require us to figure
// out where a given document fits in an order to know if it's included or
// not, and we don't track that information when doing oplog tailing.
if (options.limit || options.skip) return false;
// skip is not supported: to support it we would need to keep track of all
// "skipped" documents or at least their ids.
// limit w/o a sort specifier is not supported: current implementation needs a
// deterministic way to order documents.
if (options.skip || (options.limit && !options.sort)) return false;
// If a fields projection option is given check if it is supported by
// minimongo (some operators are not supported).
@@ -509,7 +830,9 @@ OplogObserveDriver.cursorSupported = function (cursorDescription, matcher) {
// as Mongo, and can yield!)
// - $near (has "interesting" properties in MongoDB, like the possibility
// of returning an ID multiple times, though even polling maybe
// have a bug there
// have a bug there)
// XXX: once we support it, we would need to think more on how we
// initialize the comparators when we create the driver.
return !matcher.hasWhere() && !matcher.hasGeoQuery();
};

View File

@@ -4,8 +4,8 @@ Tinytest.add("mongo-livedata - oplog - cursorSupported", function (test) {
var oplogEnabled =
!!MongoInternals.defaultRemoteCollectionDriver().mongo._oplogHandle;
var supported = function (expected, selector) {
var cursor = OplogCollection.find(selector);
var supported = function (expected, selector, options) {
var cursor = OplogCollection.find(selector, options);
var handle = cursor.observeChanges({added: function () {}});
// If there's no oplog at all, we shouldn't ever use it.
if (!oplogEnabled)
@@ -44,4 +44,10 @@ Tinytest.add("mongo-livedata - oplog - cursorSupported", function (test) {
// Nothing Minimongo doesn't understand. (Minimongo happens to fail to
// implement $elemMatch inside $all which MongoDB supports.)
supported(false, {x: {$all: [{$elemMatch: {y: 2}}]}});
supported(true, {}, { sort: {x:1} });
supported(true, {}, { sort: {x:1}, limit: 5 });
supported(false, {}, { limit: 5 });
supported(false, {}, { skip: 2, limit: 5 });
supported(false, {}, { skip: 2 });
});

View File

@@ -24,6 +24,10 @@ Package.on_use(function (api) {
['client', 'server']);
api.use('check', ['client', 'server']);
// Binary Heap data structure is used to optimize oplog observe driver
// performance.
api.use('binary-heap', 'server');
// Allow us to detect 'insecure'.
api.use('insecure', {weak: true});