diff --git a/packages/binary-heap/.gitignore b/packages/binary-heap/.gitignore new file mode 100644 index 0000000000..677a6fc263 --- /dev/null +++ b/packages/binary-heap/.gitignore @@ -0,0 +1 @@ +.build* diff --git a/packages/binary-heap/binary-heap-tests.js b/packages/binary-heap/binary-heap-tests.js new file mode 100644 index 0000000000..0a487b7b2f --- /dev/null +++ b/packages/binary-heap/binary-heap-tests.js @@ -0,0 +1,138 @@ +Tinytest.add("binary-heap - simple max-heap tests", function (test) { + var h = new MaxHeap(function (a, b) { return a-b; }); + h.set("a", 1); + h.set("b", 233); + h.set("c", -122); + h.set("d", 0); + h.set("e", 0); + + test.equal(h.size(), 5); + test.equal(h.maxElementId(), "b"); + test.equal(h.get("b"), 233); + + h.remove("b"); + test.equal(h.size(), 4); + test.equal(h.maxElementId(), "a"); + h.set("e", 44); + test.equal(h.maxElementId(), "e"); + test.equal(h.get("b"), null); + test.isTrue(h.has("a")); + test.isFalse(h.has("dd")); + + h.clear(); + test.isFalse(h.has("a")); + test.equal(h.size(), 0); + test.equal(h.setDefault("a", 12345), 12345); + test.equal(h.setDefault("a", 55555), 12345); + test.equal(h.size(), 1); + test.equal(h.maxElementId(), "a"); +}); + +Tinytest.add("binary-heap - big test for max-heap", function (test) { + var positiveNumbers = _.shuffle(_.range(1, 41)); + var negativeNumbers = _.shuffle(_.range(-1, -41, -1)); + var allNumbers = negativeNumbers.concat(positiveNumbers); + + var heap = new MaxHeap(function (a, b) { return a-b; }); + var output = []; + + _.each(allNumbers, function (n) { heap.set(n, n); }); + + _.times(positiveNumbers.length + negativeNumbers.length, function () { + var maxId = heap.maxElementId(); + output.push(heap.get(maxId)); + heap.remove(maxId); + }); + + allNumbers.sort(function (a, b) { return b-a; }); + + test.equal(output, allNumbers); +}); + +Tinytest.add("binary-heap - min-max heap tests", function (test) { + var h = new MinMaxHeap(function (a, b) { return a-b; }); + h.set("a", 1); + h.set("b", 233); + h.set("c", -122); + h.set("d", 0); + h.set("e", 0); + + test.equal(h.size(), 5); + test.equal(h.maxElementId(), "b"); + test.equal(h.get("b"), 233); + test.equal(h.minElementId(), "c"); + + h.remove("b"); + test.equal(h.size(), 4); + test.equal(h.minElementId(), "c"); + h.set("e", -123); + test.equal(h.minElementId(), "e"); + test.equal(h.get("b"), null); + test.isTrue(h.has("a")); + test.isFalse(h.has("dd")); + + h.clear(); + test.isFalse(h.has("a")); + test.equal(h.size(), 0); + test.equal(h.setDefault("a", 12345), 12345); + test.equal(h.setDefault("a", 55555), 12345); + test.equal(h.size(), 1); + test.equal(h.maxElementId(), "a"); + test.equal(h.minElementId(), "a"); +}); + +Tinytest.add("binary-heap - big test for min-max-heap", function (test) { + var N = 500; + var positiveNumbers = _.shuffle(_.range(1, N + 1)); + var negativeNumbers = _.shuffle(_.range(-1, -N - 1, -1)); + var allNumbers = positiveNumbers.concat(negativeNumbers); + + var heap = new MinMaxHeap(function (a, b) { return a-b; }); + var output = []; + + var initialSets = _.clone(allNumbers); + _.each(allNumbers, function (n) { + heap.set(n, n); + heap._selfCheck(); + heap._minHeap._selfCheck(); + }); + + allNumbers = _.shuffle(allNumbers); + var secondarySets = _.clone(allNumbers); + + _.each(allNumbers, function (n) { + heap.set(-n, n); + heap._selfCheck(); + heap._minHeap._selfCheck(); + }); + + _.times(positiveNumbers.length + negativeNumbers.length, function () { + var minId = heap.minElementId(); + output.push(heap.get(minId)); + heap.remove(minId); + heap._selfCheck(); heap._minHeap._selfCheck(); + }); + + test.equal(heap.size(), 0); + + allNumbers.sort(function (a, b) { return a-b; }); + + var initialTestText = "initial sets: " + initialSets.toString() + + "; secondary sets: " + secondarySets.toString(); + test.equal(output, allNumbers, initialTestText); + + _.each(initialSets, function (n) { heap.set(n, n); }) + _.each(secondarySets, function (n) { heap.set(-n, n); }); + + allNumbers.sort(function (a, b) { return b-a; }); + output = []; + _.times(positiveNumbers.length + negativeNumbers.length, function () { + var maxId = heap.maxElementId(); + output.push(heap.get(maxId)); + heap.remove(maxId); + heap._selfCheck(); heap._minHeap._selfCheck(); + }); + + test.equal(output, allNumbers, initialTestText); +}); + diff --git a/packages/binary-heap/max-heap.js b/packages/binary-heap/max-heap.js new file mode 100644 index 0000000000..aa08a3fad5 --- /dev/null +++ b/packages/binary-heap/max-heap.js @@ -0,0 +1,226 @@ +// Constructor of Heap +// - comparator - Function - given two items returns a number +// - options: +// - initData - Array - Optional - the initial data in a format: +// Object: +// - id - String - unique id of the item +// - value - Any - the data value +// each value is retained +// - IdMap - Constructor - Optional - custom IdMap class to store id->index +// mappings internally. Standard IdMap is used by default. +MaxHeap = function (comparator, options) { + if (! _.isFunction(comparator)) + throw new Error('Passed comparator is invalid, should be a comparison function'); + var self = this; + + // a C-style comparator that is given two values and returns a number, + // negative if the first value is less than the second, positive if the second + // value is greater than the first and zero if they are equal. + self._comparator = comparator; + + options = _.defaults(options || {}, { IdMap: IdMap }); + + // _heapIdx maps an id to an index in the Heap array the corresponding value + // is located on. + self._heapIdx = new options.IdMap; + + // The Heap data-structure implemented as a 0-based contiguous array where + // every item on index idx is a node in a complete binary tree. Every node can + // have children on indexes idx*2+1 and idx*2+2, except for the leaves. Every + // node has a parent on index (idx-1)/2; + self._heap = []; + + // If the initial array is passed, we can build the heap in linear time + // complexity (O(N)) compared to linearithmic time complexity (O(nlogn)) if + // we push elements one by one. + if (_.isArray(options.initData)) + self._initFromData(options.initData); +}; + +_.extend(MaxHeap.prototype, { + // Builds a new heap in-place in linear time based on passed data + _initFromData: function (data) { + var self = this; + + self._heap = _.map(data, function (o) { + return { id: o.id, value: o.value }; + }); + + _.each(data, function (o, i) { + self._heapIdx.set(o.id, i); + }); + + if (! data.length) + return; + + // start from the first non-leaf - the parent of the last leaf + for (var i = parentIdx(data.length - 1); i >= 0; i--) + self._downHeap(i); + }, + + _downHeap: function (idx) { + var self = this; + + while (leftChildIdx(idx) < self.size()) { + var left = leftChildIdx(idx); + var right = rightChildIdx(idx); + var largest = idx; + + if (left < self.size()) { + largest = self._maxIndex(largest, left); + } + if (right < self.size()) { + largest = self._maxIndex(largest, right); + } + + if (largest === idx) + break; + + self._swap(largest, idx); + idx = largest; + } + }, + + _upHeap: function (idx) { + var self = this; + + while (idx > 0) { + var parent = parentIdx(idx); + if (self._maxIndex(parent, idx) === idx) { + self._swap(parent, idx) + idx = parent; + } else { + break; + } + } + }, + + _maxIndex: function (idxA, idxB) { + var self = this; + var valueA = self._get(idxA); + var valueB = self._get(idxB); + return self._comparator(valueA, valueB) >= 0 ? idxA : idxB; + }, + + // Internal: gets raw data object placed on idxth place in heap + _get: function (idx) { + var self = this; + return self._heap[idx].value; + }, + + _swap: function (idxA, idxB) { + var self = this; + var recA = self._heap[idxA]; + var recB = self._heap[idxB]; + + self._heapIdx.set(recA.id, idxB); + self._heapIdx.set(recB.id, idxA); + + self._heap[idxA] = recB; + self._heap[idxB] = recA; + }, + + get: function (id) { + var self = this; + if (! self.has(id)) + return null; + return self._get(self._heapIdx.get(id)); + }, + set: function (id, value) { + var self = this; + + if (self.has(id)) { + if (self.get(id) === value) + return; + + var idx = self._heapIdx.get(id); + self._heap[idx].value = value; + + // Fix the new value's position + // Either bubble new value up if it is greater than its parent + self._upHeap(idx); + // or bubble it down if it is smaller than one of its children + self._downHeap(idx); + } else { + self._heapIdx.set(id, self._heap.length); + self._heap.push({ id: id, value: value }); + self._upHeap(self._heap.length - 1); + } + }, + remove: function (id) { + var self = this; + + if (self.has(id)) { + var last = self._heap.length - 1; + var idx = self._heapIdx.get(id); + + if (idx !== last) { + self._swap(idx, last); + self._heap.pop(); + self._heapIdx.remove(id); + + // Fix the swapped value's position + self._upHeap(idx); + self._downHeap(idx); + } else { + self._heap.pop(); + self._heapIdx.remove(id); + } + } + }, + has: function (id) { + var self = this; + return self._heapIdx.has(id); + }, + empty: function (id) { + var self = this; + return !self.size(); + }, + clear: function () { + var self = this; + self._heap = []; + self._heapIdx.clear(); + }, + // iterate over values in no particular order + forEach: function (iterator) { + var self = this; + _.each(self._heap, function (obj) { + return iterator(obj.value, obj.id); + }); + }, + size: function () { + var self = this; + return self._heap.length; + }, + setDefault: function (id, def) { + var self = this; + if (self.has(id)) + return self.get(id); + self.set(id, def); + return def; + }, + clone: function () { + var self = this; + var clone = new MaxHeap(self._comparator, self._heap); + return clone; + }, + + maxElementId: function () { + var self = this; + return self.size() ? self._heap[0].id : null; + }, + + _selfCheck: function () { + var self = this; + for (var i = 1; i < self._heap.length; i++) + if (self._maxIndex(parentIdx(i), i) !== parentIdx(i)) + throw new Error("An item with id " + self._heap[i].id + + " has a parent younger than it: " + + self._heap[parentIdx(i)].id); + } +}); + +function leftChildIdx (i) { return i * 2 + 1; } +function rightChildIdx (i) { return i * 2 + 2; } +function parentIdx (i) { return (i - 1) >> 1; } + diff --git a/packages/binary-heap/min-max-heap.js b/packages/binary-heap/min-max-heap.js new file mode 100644 index 0000000000..990a71402d --- /dev/null +++ b/packages/binary-heap/min-max-heap.js @@ -0,0 +1,55 @@ +// This implementation of Min/Max-Heap is just a subclass of Max-Heap +// with a Min-Heap as an encapsulated property. +// +// Most of the operations are just proxy methods to call the same method on both +// heaps. +// +// This implementation takes 2*N memory but is fairly simple to write and +// understand. And the constant factor of a simple Heap is usually smaller +// compared to other two-way priority queues like Min/Max Heaps +// (http://www.cs.otago.ac.nz/staffpriv/mike/Papers/MinMaxHeaps/MinMaxHeaps.pdf) +// and Interval Heaps +// (http://www.cise.ufl.edu/~sahni/dsaac/enrich/c13/double.htm) +MinMaxHeap = function (comparator, options) { + var self = this; + + MaxHeap.call(self, comparator, options); + self._minHeap = new MaxHeap(function (a, b) { + return -comparator(a, b); + }, options); +}; + +Meteor._inherits(MinMaxHeap, MaxHeap); + +_.extend(MinMaxHeap.prototype, { + set: function (id, value) { + var self = this; + MaxHeap.prototype.set.apply(self, arguments); + self._minHeap.set(id, value); + }, + remove: function (id) { + var self = this; + MaxHeap.prototype.remove.apply(self, arguments); + self._minHeap.remove(id); + }, + clear: function () { + var self = this; + MaxHeap.prototype.clear.apply(self, arguments); + self._minHeap.clear(); + }, + setDefault: function (id, def) { + var self = this; + MaxHeap.prototype.setDefault.apply(self, arguments); + return self._minHeap.setDefault(id, def); + }, + clone: function () { + var self = this; + var clone = new MinMaxHeap(self._comparator, self._heap); + return clone; + }, + minElementId: function () { + var self = this; + return self._minHeap.maxElementId(); + } +}); + diff --git a/packages/binary-heap/package.js b/packages/binary-heap/package.js new file mode 100644 index 0000000000..f8c4ae4613 --- /dev/null +++ b/packages/binary-heap/package.js @@ -0,0 +1,18 @@ +Package.describe({ + summary: "Binary Heap datastructure implementation", + internal: true +}); + +Package.on_use(function (api) { + api.export('MaxHeap'); + api.export('MinMaxHeap'); + api.use(['underscore', 'id-map']); + api.add_files(['max-heap.js', 'min-max-heap.js']); +}); + +Package.on_test(function (api) { + api.use('tinytest'); + api.use('binary-heap'); + api.add_files('binary-heap-tests.js'); +}); + diff --git a/packages/id-map/.gitignore b/packages/id-map/.gitignore new file mode 100644 index 0000000000..677a6fc263 --- /dev/null +++ b/packages/id-map/.gitignore @@ -0,0 +1 @@ +.build* diff --git a/packages/id-map/id-map.js b/packages/id-map/id-map.js new file mode 100644 index 0000000000..888fdee63d --- /dev/null +++ b/packages/id-map/id-map.js @@ -0,0 +1,77 @@ +IdMap = function (idStringify, idParse) { + var self = this; + self._map = {}; + self._idStringify = idStringify || JSON.stringify; + self._idParse = idParse || JSON.parse; +}; + +// Some of these methods are designed to match methods on OrderedDict, since +// (eg) ObserveMultiplex and _CachingChangeObserver use them interchangeably. +// (Conceivably, this should be replaced with "UnorderedDict" with a specific +// set of methods that overlap between the two.) + +_.extend(IdMap.prototype, { + get: function (id) { + var self = this; + var key = self._idStringify(id); + return self._map[key]; + }, + set: function (id, value) { + var self = this; + var key = self._idStringify(id); + self._map[key] = value; + }, + remove: function (id) { + var self = this; + var key = self._idStringify(id); + delete self._map[key]; + }, + has: function (id) { + var self = this; + var key = self._idStringify(id); + return _.has(self._map, key); + }, + empty: function () { + var self = this; + return _.isEmpty(self._map); + }, + clear: function () { + var self = this; + self._map = {}; + }, + // Iterates over the items in the map. Return `false` to break the loop. + forEach: function (iterator) { + var self = this; + // don't use _.each, because we can't break out of it. + var keys = _.keys(self._map); + for (var i = 0; i < keys.length; i++) { + var breakIfFalse = iterator.call(null, self._map[keys[i]], + self._idParse(keys[i])); + if (breakIfFalse === false) + return; + } + }, + size: function () { + var self = this; + return _.size(self._map); + }, + setDefault: function (id, def) { + var self = this; + var key = self._idStringify(id); + if (_.has(self._map, key)) + return self._map[key]; + self._map[key] = def; + return def; + }, + // Assumes that values are EJSON-cloneable, and that we don't need to clone + // IDs (ie, that nobody is going to mutate an ObjectId). + clone: function () { + var self = this; + var clone = new IdMap(self._idStringify, self._idParse); + self.forEach(function (value, id) { + clone.set(id, EJSON.clone(value)); + }); + return clone; + } +}); + diff --git a/packages/id-map/package.js b/packages/id-map/package.js new file mode 100644 index 0000000000..876d3406a6 --- /dev/null +++ b/packages/id-map/package.js @@ -0,0 +1,11 @@ +Package.describe({ + summary: "Dictionary data structure allowing non-string keys", + internal: true +}); + +Package.on_use(function (api) { + api.export('IdMap'); + api.use(['underscore', 'json', 'ejson']); + api.add_files([ 'id-map.js' ]); +}); + diff --git a/packages/meteor/helpers.js b/packages/meteor/helpers.js index 9b05246669..341df557b9 100644 --- a/packages/meteor/helpers.js +++ b/packages/meteor/helpers.js @@ -111,5 +111,26 @@ _.extend(Meteor, { return fut.wait(); return result; }; + }, + + // Sets child's prototype to a new object whose prototype is parent's + // prototype. Used as: + // Meteor._inherit(ClassB, ClassA). + // _.extend(ClassB.prototype, { ... }) + // Inspired by CoffeeScript's `extend` and Google Closure's `goog.inherits`. + _inherits: function (Child, Parent) { + // copy static fields + _.each(Parent, function (prop, field) { + Child[field] = prop; + }); + + // a middle member of prototype chain: takes the prototype from the Parent + var Middle = function () { + this.constructor = Child; + }; + Middle.prototype = Parent.prototype; + Child.prototype = new Middle(); + Child.__super__ = Parent.prototype; + return Child; } }); diff --git a/packages/minimongo/id_map.js b/packages/minimongo/id_map.js index 5c2b628155..ba4880980b 100644 --- a/packages/minimongo/id_map.js +++ b/packages/minimongo/id_map.js @@ -1,74 +1,7 @@ LocalCollection._IdMap = function () { var self = this; - self._map = {}; + IdMap.call(self, LocalCollection._idStringify, LocalCollection._idParse); }; -// Some of these methods are designed to match methods on OrderedDict, since -// (eg) ObserveMultiplex and _CachingChangeObserver use them interchangeably. -// (Conceivably, this should be replaced with "UnorderedDict" with a specific -// set of methods that overlap between the two.) +Meteor._inherits(LocalCollection._IdMap, IdMap); -_.extend(LocalCollection._IdMap.prototype, { - get: function (id) { - var self = this; - var key = LocalCollection._idStringify(id); - return self._map[key]; - }, - set: function (id, value) { - var self = this; - var key = LocalCollection._idStringify(id); - self._map[key] = value; - }, - remove: function (id) { - var self = this; - var key = LocalCollection._idStringify(id); - delete self._map[key]; - }, - has: function (id) { - var self = this; - var key = LocalCollection._idStringify(id); - return _.has(self._map, key); - }, - empty: function () { - var self = this; - return _.isEmpty(self._map); - }, - clear: function () { - var self = this; - self._map = {}; - }, - // Iterates over the items in the map. Return `false` to break the loop. - forEach: function (iterator) { - var self = this; - // don't use _.each, because we can't break out of it. - var keys = _.keys(self._map); - for (var i = 0; i < keys.length; i++) { - var breakIfFalse = iterator.call(null, self._map[keys[i]], - LocalCollection._idParse(keys[i])); - if (breakIfFalse === false) - return; - } - }, - size: function () { - var self = this; - return _.size(self._map); - }, - setDefault: function (id, def) { - var self = this; - var key = LocalCollection._idStringify(id); - if (_.has(self._map, key)) - return self._map[key]; - self._map[key] = def; - return def; - }, - // Assumes that values are EJSON-cloneable, and that we don't need to clone - // IDs (ie, that nobody is going to mutate an ObjectId). - clone: function () { - var self = this; - var clone = new LocalCollection._IdMap; - self.forEach(function (value, id) { - clone.set(id, EJSON.clone(value)); - }); - return clone; - } -}); diff --git a/packages/minimongo/minimongo_server_tests.js b/packages/minimongo/minimongo_server_tests.js index efd7808b4c..127e30a8e6 100644 --- a/packages/minimongo/minimongo_server_tests.js +++ b/packages/minimongo/minimongo_server_tests.js @@ -332,6 +332,31 @@ Tinytest.add("minimongo - selector and projection combination", function (test) }); +Tinytest.add("minimongo - sorter and projection combination", function (test) { + function testSorterProjectionComb (sortSpec, proj, expected, desc) { + var sorter = new Minimongo.Sorter(sortSpec); + test.equal(sorter.combineIntoProjection(proj), expected, desc); + } + + // Test with inclusive projection + testSorterProjectionComb({ a: 1, b: 1 }, { b: 1, c: 1, d: 1 }, { a: true, b: true, c: true, d: true }, "simplest incl"); + testSorterProjectionComb({ a: 1, b: -1 }, { b: 1, c: 1, d: 1 }, { a: true, b: true, c: true, d: true }, "simplest incl"); + testSorterProjectionComb({ 'a.c': 1 }, { b: 1 }, { 'a.c': true, b: true }, "dot path incl"); + testSorterProjectionComb({ 'a.1.c': 1 }, { b: 1 }, { 'a.c': true, b: true }, "dot num path incl"); + testSorterProjectionComb({ 'a.1.c': 1 }, { b: 1, a: 1 }, { a: true, b: true }, "dot num path incl overlap"); + testSorterProjectionComb({ 'a.1.c': 1, 'a.2.b': -1 }, { b: 1 }, { 'a.c': true, 'a.b': true, b: true }, "dot num path incl"); + testSorterProjectionComb({ 'a.1.c': 1, 'a.2.b': -1 }, {}, {}, "dot num path with empty incl"); + + // Test with exclusive projection + testSorterProjectionComb({ a: 1, b: 1 }, { b: 0, c: 0, d: 0 }, { c: false, d: false }, "simplest excl"); + testSorterProjectionComb({ a: 1, b: -1 }, { b: 0, c: 0, d: 0 }, { c: false, d: false }, "simplest excl"); + testSorterProjectionComb({ 'a.c': 1 }, { b: 0 }, { b: false }, "dot path excl"); + testSorterProjectionComb({ 'a.1.c': 1 }, { b: 0 }, { b: false }, "dot num path excl"); + testSorterProjectionComb({ 'a.1.c': 1 }, { b: 0, a: 0 }, { b: false }, "dot num path excl overlap"); + testSorterProjectionComb({ 'a.1.c': 1, 'a.2.b': -1 }, { b: 0 }, { b: false }, "dot num path excl"); +}); + + (function () { // TODO: Tests for "can selector become true by modifier" are incomplete, // absent or test the functionality of "not ideal" implementation (test checks diff --git a/packages/minimongo/minimongo_tests.js b/packages/minimongo/minimongo_tests.js index 6c9359761d..f299f2206f 100644 --- a/packages/minimongo/minimongo_tests.js +++ b/packages/minimongo/minimongo_tests.js @@ -1568,7 +1568,7 @@ Tinytest.add("minimongo - ordering", function (test) { // document ordering under a sort specification var verify = function (sorts, docs) { _.each(_.isArray(sorts) ? sorts : [sorts], function (sort) { - var sorter = new MinimongoTest.Sorter(sort); + var sorter = new Minimongo.Sorter(sort); assert_ordering(test, sorter.getComparator(), docs); }); }; @@ -1596,15 +1596,15 @@ Tinytest.add("minimongo - ordering", function (test) { [{c: 1}, {a: 1, b: 2}, {a: 1, b: 3}, {a: 2, b: 0}]); test.throws(function () { - new MinimongoTest.Sorter("a"); + new Minimongo.Sorter("a"); }); test.throws(function () { - new MinimongoTest.Sorter(123); + new Minimongo.Sorter(123); }); // No sort spec implies everything equal. - test.equal(new MinimongoTest.Sorter({}).getComparator()({a:1}, {a:2}), 0); + test.equal(new Minimongo.Sorter({}).getComparator()({a:1}, {a:2}), 0); // All sorts of array edge cases! // Increasing sort sorts by the smallest element it finds; 1 < 2. diff --git a/packages/minimongo/package.js b/packages/minimongo/package.js index 8791cbfe77..4dca1e8433 100644 --- a/packages/minimongo/package.js +++ b/packages/minimongo/package.js @@ -7,7 +7,7 @@ Package.on_use(function (api) { api.export('LocalCollection'); api.export('Minimongo'); api.export('MinimongoTest', { testOnly: true }); - api.use(['underscore', 'json', 'ejson', 'ordered-dict', 'deps', + api.use(['underscore', 'json', 'ejson', 'id-map', 'ordered-dict', 'deps', 'random', 'ordered-dict']); // This package is used for geo-location queries such as $near api.use('geojson-utils'); @@ -28,7 +28,8 @@ Package.on_use(function (api) { // Functionality used only by oplog tailing on the server side api.add_files([ 'selector_projection.js', - 'selector_modifier.js' + 'selector_modifier.js', + 'sorter_projection.js' ], 'server'); }); diff --git a/packages/minimongo/selector_projection.js b/packages/minimongo/selector_projection.js index 73a2727a8e..5f6e101b5b 100644 --- a/packages/minimongo/selector_projection.js +++ b/packages/minimongo/selector_projection.js @@ -3,7 +3,7 @@ // @returns Object - projection object (same as fields option of mongo cursor) Minimongo.Matcher.prototype.combineIntoProjection = function (projection) { var self = this; - var selectorPaths = self._getPathsElidingNumericKeys(); + var selectorPaths = Minimongo._pathsElidingNumericKeys(self._getPaths()); // Special case for $where operator in the selector - projection should depend // on all fields of the document. getSelectorPaths returns a list of paths @@ -12,12 +12,23 @@ Minimongo.Matcher.prototype.combineIntoProjection = function (projection) { if (_.contains(selectorPaths, '')) return {}; + return combineImportantPathsIntoProjection(selectorPaths, projection); +}; + +Minimongo._pathsElidingNumericKeys = function (paths) { + var self = this; + return _.map(paths, function (path) { + return _.reject(path.split('.'), isNumericKey).join('.'); + }); +}; + +combineImportantPathsIntoProjection = function (paths, projection) { var prjDetails = projectionDetails(projection); var tree = prjDetails.tree; var mergedProjection = {}; // merge the paths to include - tree = pathsToTree(selectorPaths, + tree = pathsToTree(paths, function (path) { return true; }, function (node, path, fullPath) { return true; }, tree); @@ -40,13 +51,6 @@ Minimongo.Matcher.prototype.combineIntoProjection = function (projection) { } }; -Minimongo.Matcher.prototype._getPathsElidingNumericKeys = function () { - var self = this; - return _.map(self._getPaths(), function (path) { - return _.reject(path.split('.'), isNumericKey).join('.'); - }); -}; - // Returns a set of key paths similar to // { 'foo.bar': 1, 'a.b.c': 1 } var treeToPaths = function (tree, prefix) { diff --git a/packages/minimongo/sort.js b/packages/minimongo/sort.js index 02218593ec..879d0e48c4 100644 --- a/packages/minimongo/sort.js +++ b/packages/minimongo/sort.js @@ -14,17 +14,19 @@ Sorter = function (spec) { var self = this; - var sortSpecParts = []; + var sortSpecParts = self._sortSpecParts = []; if (spec instanceof Array) { for (var i = 0; i < spec.length; i++) { if (typeof spec[i] === "string") { sortSpecParts.push({ + path: spec[i], lookup: makeLookupFunction(spec[i]), ascending: true }); } else { sortSpecParts.push({ + path: spec[i][0], lookup: makeLookupFunction(spec[i][0]), ascending: spec[i][1] !== "desc" }); @@ -33,12 +35,13 @@ Sorter = function (spec) { } else if (typeof spec === "object") { for (var key in spec) { sortSpecParts.push({ + path: key, lookup: makeLookupFunction(key), ascending: spec[key] >= 0 }); } } else { - throw Error("Bad sort specification: ", JSON.stringify(spec)); + throw Error("Bad sort specification: " + JSON.stringify(spec)); } // reduceValue takes in all the possible values for the sort key along various @@ -118,7 +121,12 @@ Sorter.prototype.getComparator = function (options) { }]); }; -MinimongoTest.Sorter = Sorter; +Sorter.prototype._getPaths = function () { + var self = this; + return _.pluck(self._sortSpecParts, 'path'); +}; + +Minimongo.Sorter = Sorter; // Given an array of comparators // (functions (a,b)->(negative or positive or zero)), returns a single diff --git a/packages/minimongo/sorter_projection.js b/packages/minimongo/sorter_projection.js new file mode 100644 index 0000000000..f02f388f7e --- /dev/null +++ b/packages/minimongo/sorter_projection.js @@ -0,0 +1,6 @@ +Sorter.prototype.combineIntoProjection = function (projection) { + var self = this; + var specPaths = Minimongo._pathsElidingNumericKeys(self._getPaths()); + return combineImportantPathsIntoProjection(specPaths, projection); +}; + diff --git a/packages/mongo-livedata/mongo_livedata_tests.js b/packages/mongo-livedata/mongo_livedata_tests.js index 032e0dbaa7..f155b85fc5 100644 --- a/packages/mongo-livedata/mongo_livedata_tests.js +++ b/packages/mongo-livedata/mongo_livedata_tests.js @@ -745,6 +745,425 @@ if (Meteor.isServer) { }); x++; }); + + // compares arrays a and b w/o looking at order + var setsEqual = function (a, b) { + a = _.map(a, EJSON.stringify); + b = _.map(b, EJSON.stringify); + return _.isEmpty(_.difference(a, b)) && _.isEmpty(_.difference(b, a)); + }; + + // This test mainly checks the correctness of oplog code dealing with limited + // queries. Compitablity with poll-diff is added as well. + Tinytest.addAsync("mongo-livedata - observe sorted, limited " + idGeneration, function (test, onComplete) { + var run = test.runId(); + var coll = new Meteor.Collection("observeLimit-"+run, collectionOptions); + + var observer = function () { + var state = {}; + var output = []; + var callbacks = { + changed: function (newDoc) { + output.push({changed: newDoc._id}); + state[newDoc._id] = newDoc; + }, + added: function (newDoc) { + output.push({added: newDoc._id}); + state[newDoc._id] = newDoc; + }, + removed: function (oldDoc) { + output.push({removed: oldDoc._id}); + delete state[oldDoc._id]; + } + }; + var handle = coll.find({foo: 22}, + {sort: {bar: 1}, limit: 3}).observe(callbacks); + + return {output: output, handle: handle, state: state}; + }; + var clearOutput = function (o) { o.output.splice(0, o.output.length); }; + + var ins = function (doc) { + var id; runInFence(function () { id = coll.insert(doc); }); + return id; + }; + var rem = function (sel) { runInFence(function () { coll.remove(sel); }); }; + var upd = function (sel, mod, opt) { + runInFence(function () { + coll.update(sel, mod, opt); + }); + }; + // tests '_id' subfields for all documents in oplog buffer + var testOplogBufferIds = function (ids) { + var bufferIds = []; + o.handle._multiplexer._observeDriver._unpublishedBuffer.forEach(function (x, id) { + bufferIds.push(id); + }); + + test.isTrue(setsEqual(ids, bufferIds), "expected: " + ids + "; got: " + bufferIds); + }; + var testSafeAppendToBufferFlag = function (expected) { + if (expected) + test.isTrue(o.handle._multiplexer._observeDriver._safeAppendToBuffer); + else + test.isFalse(o.handle._multiplexer._observeDriver._safeAppendToBuffer); + }; + + // Insert a doc and start observing. + var docId1 = ins({foo: 22, bar: 5}); + var o = observer(); + var usesOplog = o.handle._multiplexer._observeDriver._usesOplog; + // Initial add. + test.length(o.output, 1); + test.equal(o.output.shift(), {added: docId1}); + + // Insert another doc (blocking until observes have fired). + var docId2 = ins({foo: 22, bar: 6}); + // Observed add. + test.length(o.output, 1); + test.equal(o.output.shift(), {added: docId2}); + + var docId3 = ins({ foo: 22, bar: 3 }); + test.length(o.output, 1); + test.equal(o.output.shift(), {added: docId3}); + + // Add a non-matching document + ins({ foo: 13 }); + // It shouldn't be added + test.length(o.output, 0); + + // Add something that matches but is too big to fit in + var docId4 = ins({ foo: 22, bar: 7 }); + // It shouldn't be added + test.length(o.output, 0); + + // Let's add something small enough to fit in + var docId5 = ins({ foo: 22, bar: -1 }); + // We should get an added and a removed events + test.length(o.output, 2); + // doc 2 was removed from the published set as it is too big to be in + test.isTrue(setsEqual(o.output, [{added: docId5}, {removed: docId2}])); + clearOutput(o); + + // Now remove something and that doc 2 should be right back + rem(docId5); + test.length(o.output, 2); + test.isTrue(setsEqual(o.output, [{removed: docId5}, {added: docId2}])); + clearOutput(o); + usesOplog && testOplogBufferIds([docId4]); + usesOplog && testSafeAppendToBufferFlag(true); + + // Current state is [3 5 6 | 7] + // Add some negative numbers overflowing the buffer. + // New documents will take the published place, [3 5 6] will take the buffer + // and 7 will be outside of the buffer in MongoDB. + var docId6 = ins({ foo: 22, bar: -1 }); + var docId7 = ins({ foo: 22, bar: -2 }); + var docId8 = ins({ foo: 22, bar: -3 }); + test.length(o.output, 6); + var expected = [{added: docId6}, {removed: docId2}, + {added: docId7}, {removed: docId1}, + {added: docId8}, {removed: docId3}]; + + test.isTrue(setsEqual(o.output, expected)); + clearOutput(o); + usesOplog && testOplogBufferIds([docId1, docId2, docId3]); + usesOplog && testSafeAppendToBufferFlag(false); + + // Now the state is [-3 -2 -1 | 3 5 6] 7 + // If we update first 3 docs (increment them by 20), it would be + // interesting. + upd({ bar: { $lt: 0 }}, { $inc: { bar: 20 } }, { multi: true }); + + // The updated documents can't find their place in published and they can't + // be buffered as we are not aware of the situation outside of the buffer. + // But since our buffer becomes empty, it will be refilled partially with + // updated documents. + test.length(o.output, 6); + var expectedRemoves = [{removed: docId6}, + {removed: docId7}, + {removed: docId8}]; + var expectedAdds = [{added: docId3}, + {added: docId1}, + {added: docId2}]; + + test.isTrue(setsEqual(o.output, expectedAdds.concat(expectedRemoves))); + clearOutput(o); + usesOplog && testOplogBufferIds([docId4, docId7, docId8]); + usesOplog && testSafeAppendToBufferFlag(false); + + // The new arrangement is [3 5 6 | 7 17 18] 19 + // By ids: [docId3, docId1, docId2] docId4] docId6 docId7 docId8 + // Remove first 4 docs (3, 1, 2, 4) forcing buffer to become empty and + // schedule a repoll. + rem({ bar: { $lt: 10 } }); + + // XXX the oplog code analyzes the events one by one: one remove after + // another. Poll-n-diff code, on the other side, analyzes the batch action + // of multiple remove. Because of that difference, expected outputs differ. + if (usesOplog) { + var expectedRemoves = [{removed: docId3}, {removed: docId1}, + {removed: docId2}, {removed: docId4}]; + var expectedAdds = [{added: docId4}, {added: docId8}, + {added: docId7}, {added: docId6}]; + + test.length(o.output, 8); + } else { + var expectedRemoves = [{removed: docId3}, {removed: docId1}, + {removed: docId2}]; + var expectedAdds = [{added: docId8}, {added: docId7}, {added: docId6}]; + + test.length(o.output, 6); + } + + test.isTrue(setsEqual(o.output, expectedAdds.concat(expectedRemoves))); + clearOutput(o); + usesOplog && testOplogBufferIds([]); + usesOplog && testSafeAppendToBufferFlag(true); + + // The new arrangement is [17 18 19] or [docId6 docId7 docId8] + var docId9 = ins({ foo: 22, bar: 21 }); + var docId10 = ins({ foo: 22, bar: 31 }); + var docId11 = ins({ foo: 22, bar: 41 }); + var docId12 = ins({ foo: 22, bar: 51 }); + + // Becomes [17 18 19 | 21 31 41] 51 + usesOplog && testOplogBufferIds([docId9, docId10, docId11]); + usesOplog && testSafeAppendToBufferFlag(false); + test.length(o.output, 0); + upd({ bar: { $lt: 20 } }, { $inc: { bar: 5 } }, { multi: true }); + // Becomes [21 22 23 | 24 31 41] 51 + test.length(o.output, 4); + test.isTrue(setsEqual(o.output, [{removed: docId6}, + {added: docId9}, + {changed: docId7}, + {changed: docId8}])); + clearOutput(o); + usesOplog && testOplogBufferIds([docId6, docId10, docId11]); + usesOplog && testSafeAppendToBufferFlag(false); + + rem(docId9); + // Becomes [22 23 24 | 31 41] 51 + test.length(o.output, 2); + test.isTrue(setsEqual(o.output, [{removed: docId9}, {added: docId6}])); + clearOutput(o); + usesOplog && testOplogBufferIds([docId10, docId11]); + usesOplog && testSafeAppendToBufferFlag(false); + + upd({ bar: { $gt: 25 } }, { $inc: { bar: -7.5 } }, { multi: true }); + // Becomes [22 23 23.5 | 24] 33.5 43.5 - 33.5 doesn't update in-place in + // buffer, because it the driver is not sure it can do it and there is no a + // different doc which is less than 33.5. + test.length(o.output, 2); + test.isTrue(setsEqual(o.output, [{removed: docId6}, {added: docId10}])); + clearOutput(o); + usesOplog && testOplogBufferIds([docId6]); + usesOplog && testSafeAppendToBufferFlag(false); + + // Force buffer objects to be moved into published set so we can check them + rem(docId7); + rem(docId8); + rem(docId10); + // Becomes [24 33.5 43.5] + test.length(o.output, 6); + test.isTrue(setsEqual(o.output, [{removed: docId7}, {removed: docId8}, + {removed: docId10}, {added: docId6}, + {added: docId11}, {added: docId12}])); + + test.length(_.keys(o.state), 3); + test.equal(o.state[docId6], { _id: docId6, foo: 22, bar: 24 }); + test.equal(o.state[docId11], { _id: docId11, foo: 22, bar: 33.5 }); + test.equal(o.state[docId12], { _id: docId12, foo: 22, bar: 43.5 }); + clearOutput(o); + usesOplog && testOplogBufferIds([]); + usesOplog && testSafeAppendToBufferFlag(true); + + o.handle.stop(); + onComplete(); + }); + + Tinytest.addAsync("mongo-livedata - observe sorted, limited, sort fields " + idGeneration, function (test, onComplete) { + var run = test.runId(); + var coll = new Meteor.Collection("observeLimit-"+run, collectionOptions); + + var observer = function () { + var state = {}; + var output = []; + var callbacks = { + changed: function (newDoc) { + output.push({changed: newDoc._id}); + state[newDoc._id] = newDoc; + }, + added: function (newDoc) { + output.push({added: newDoc._id}); + state[newDoc._id] = newDoc; + }, + removed: function (oldDoc) { + output.push({removed: oldDoc._id}); + delete state[oldDoc._id]; + } + }; + var handle = coll.find({}, {sort: {x: 1}, + limit: 2, + fields: {y: 1}}).observe(callbacks); + + return {output: output, handle: handle, state: state}; + }; + var clearOutput = function (o) { o.output.splice(0, o.output.length); }; + var ins = function (doc) { + var id; runInFence(function () { id = coll.insert(doc); }); + return id; + }; + var rem = function (id) { + runInFence(function () { coll.remove(id); }); + }; + + var o = observer(); + + var docId1 = ins({ x: 1, y: 1222 }); + var docId2 = ins({ x: 5, y: 5222 }); + + test.length(o.output, 2); + test.equal(o.output, [{added: docId1}, {added: docId2}]); + clearOutput(o); + + var docId3 = ins({ x: 7, y: 7222 }); + test.length(o.output, 0); + + var docId4 = ins({ x: -1, y: -1222 }); + + // Becomes [docId4 docId1 | docId2 docId3] + test.length(o.output, 2); + test.isTrue(setsEqual(o.output, [{added: docId4}, {removed: docId2}])); + + test.equal(_.size(o.state), 2); + test.equal(o.state[docId4], {_id: docId4, y: -1222}); + test.equal(o.state[docId1], {_id: docId1, y: 1222}); + clearOutput(o); + + rem(docId2); + // Becomes [docId4 docId1 | docId3] + test.length(o.output, 0); + + rem(docId4); + // Becomes [docId1 docId3] + test.length(o.output, 2); + test.isTrue(setsEqual(o.output, [{added: docId3}, {removed: docId4}])); + + test.equal(_.size(o.state), 2); + test.equal(o.state[docId3], {_id: docId3, y: 7222}); + test.equal(o.state[docId1], {_id: docId1, y: 1222}); + clearOutput(o); + + onComplete(); + }); + + Tinytest.addAsync("mongo-livedata - observe sorted, limited, big initial set" + idGeneration, function (test, onComplete) { + var run = test.runId(); + var coll = new Meteor.Collection("observeLimit-"+run, collectionOptions); + + var observer = function () { + var state = {}; + var output = []; + var callbacks = { + changed: function (newDoc) { + output.push({changed: newDoc._id}); + state[newDoc._id] = newDoc; + }, + added: function (newDoc) { + output.push({added: newDoc._id}); + state[newDoc._id] = newDoc; + }, + removed: function (oldDoc) { + output.push({removed: oldDoc._id}); + delete state[oldDoc._id]; + } + }; + var handle = coll.find({}, {sort: {x: 1, y: 1}, limit: 3}) + .observe(callbacks); + + return {output: output, handle: handle, state: state}; + }; + var clearOutput = function (o) { o.output.splice(0, o.output.length); }; + var ins = function (doc) { + var id; runInFence(function () { id = coll.insert(doc); }); + return id; + }; + var rem = function (id) { + runInFence(function () { coll.remove(id); }); + }; + // tests '_id' subfields for all documents in oplog buffer + var testOplogBufferIds = function (ids) { + var bufferIds = []; + o.handle._multiplexer._observeDriver._unpublishedBuffer.forEach(function (x, id) { + bufferIds.push(id); + }); + + test.isTrue(setsEqual(ids, bufferIds), "expected: " + ids + "; got: " + bufferIds); + }; + var testSafeAppendToBufferFlag = function (expected) { + if (expected) + test.isTrue(o.handle._multiplexer._observeDriver._safeAppendToBuffer); + else + test.isFalse(o.handle._multiplexer._observeDriver._safeAppendToBuffer); + }; + + var ids = {}; + _.each([2, 4, 1, 3, 5, 5, 9, 1, 3, 2, 5], function (x, i) { + ids[i] = ins({ x: x, y: i }); + }); + + var o = observer(); + var usesOplog = o.handle._multiplexer._observeDriver._usesOplog; + // x: [1 1 2 | 2 3 3] 4 5 5 5 9 + // id: [2 7 0 | 9 3 8] 1 4 5 10 6 + + test.length(o.output, 3); + test.isTrue(setsEqual([{added: ids[2]}, {added: ids[7]}, {added: ids[0]}], o.output)); + usesOplog && testOplogBufferIds([ids[9], ids[3], ids[8]]); + usesOplog && testSafeAppendToBufferFlag(false); + clearOutput(o); + + rem(ids[0]); + // x: [1 1 2 | 3 3] 4 5 5 5 9 + // id: [2 7 9 | 3 8] 1 4 5 10 6 + test.length(o.output, 2); + test.isTrue(setsEqual([{removed: ids[0]}, {added: ids[9]}], o.output)); + usesOplog && testOplogBufferIds([ids[3], ids[8]]); + usesOplog && testSafeAppendToBufferFlag(false); + clearOutput(o); + + rem(ids[7]); + // x: [1 2 3 | 3] 4 5 5 5 9 + // id: [2 9 3 | 8] 1 4 5 10 6 + test.length(o.output, 2); + test.isTrue(setsEqual([{removed: ids[7]}, {added: ids[3]}], o.output)); + usesOplog && testOplogBufferIds([ids[8]]); + usesOplog && testSafeAppendToBufferFlag(false); + clearOutput(o); + + rem(ids[3]); + // x: [1 2 3 | 4 5 5] 5 9 + // id: [2 9 8 | 1 4 5] 10 6 + test.length(o.output, 2); + test.isTrue(setsEqual([{removed: ids[3]}, {added: ids[8]}], o.output)); + usesOplog && testOplogBufferIds([ids[1], ids[4], ids[5]]); + usesOplog && testSafeAppendToBufferFlag(false); + clearOutput(o); + + rem({ x: {$lt: 4} }); + // x: [4 5 5 | 5 9] + // id: [1 4 5 | 10 6] + test.length(o.output, 6); + test.isTrue(setsEqual([{removed: ids[2]}, {removed: ids[9]}, {removed: ids[8]}, + {added: ids[5]}, {added: ids[4]}, {added: ids[1]}], o.output)); + usesOplog && testOplogBufferIds([ids[10], ids[6]]); + usesOplog && testSafeAppendToBufferFlag(true); + clearOutput(o); + + + onComplete(); + }); } diff --git a/packages/mongo-livedata/oplog_observe_driver.js b/packages/mongo-livedata/oplog_observe_driver.js index 2cfa2a7775..5c83afdd6e 100644 --- a/packages/mongo-livedata/oplog_observe_driver.js +++ b/packages/mongo-livedata/oplog_observe_driver.js @@ -7,6 +7,20 @@ var PHASE = { STEADY: "STEADY" }; +// Exception thrown by _needToPollQuery which unrolls the stack up to the +// enclosing call to finishIfNeedToPollQuery. +var SwitchedToQuery = function () {}; +var finishIfNeedToPollQuery = function (f) { + return function () { + try { + f.apply(this, arguments); + } catch (e) { + if (!(e instanceof SwitchedToQuery)) + throw e; + } + }; +}; + // OplogObserveDriver is an alternative to PollingObserveDriver which follows // the Mongo operation log instead of just re-polling the query. It obeys the // same simple interface: constructing it starts sending observeChanges @@ -19,8 +33,44 @@ OplogObserveDriver = function (options) { self._cursorDescription = options.cursorDescription; self._mongoHandle = options.mongoHandle; self._multiplexer = options.multiplexer; - if (options.ordered) + + if (options.ordered) { throw Error("OplogObserveDriver only supports unordered observeChanges"); + } + + var sortSpec = options.cursorDescription.options.sort; + var sorter = sortSpec && new Minimongo.Sorter(sortSpec); + // We don't support $near and other geo-queries so it's OK to initialize the + // comparator only once in the constructor. + var comparator = sorter && sorter.getComparator(); + + if (options.cursorDescription.options.limit) { + // There are several properties ordered driver implements: + // - _limit is a positive number + // - _comparator is a function-comparator by which the query is ordered + // - _unpublishedBuffer is non-null Min/Max Heap, + // the empty buffer in STEADY phase implies that the + // everything that matches the queries selector fits + // into published set. + // - _published - Min Heap (also implements IdMap methods) + + var heapOptions = { IdMap: LocalCollection._IdMap }; + self._limit = self._cursorDescription.options.limit; + self._comparator = comparator; + self._unpublishedBuffer = new MinMaxHeap(comparator, heapOptions); + // We need something that can find Max value in addition to IdMap interface + self._published = new MaxHeap(comparator, heapOptions); + } else { + self._limit = 0; + self._comparator = null; + self._unpublishedBuffer = null; + self._published = new LocalCollection._IdMap; + } + + // Indicates if it is safe to insert a new document at the end of the buffer + // for this query. i.e. it is known that there are no documents matching the + // selector those are not in published or buffer. + self._safeAppendToBuffer = false; self._stopped = false; self._stopHandles = []; @@ -30,7 +80,6 @@ OplogObserveDriver = function (options) { self._registerPhaseChange(PHASE.QUERYING); - self._published = new LocalCollection._IdMap; var selector = self._cursorDescription.selector; self._matcher = options.matcher; var projection = self._cursorDescription.options.fields || {}; @@ -38,6 +87,8 @@ OplogObserveDriver = function (options) { // Projection function, result of combining important fields for selector and // existing fields projection self._sharedProjection = self._matcher.combineIntoProjection(projection); + if (sorter) + self._sharedProjection = sorter.combineIntoProjection(self._sharedProjection); self._sharedProjectionFn = LocalCollection._compileProjection( self._sharedProjection); @@ -51,7 +102,7 @@ OplogObserveDriver = function (options) { forEachTrigger(self._cursorDescription, function (trigger) { self._stopHandles.push(self._mongoHandle._oplogHandle.onOplogEntry( trigger, function (notification) { - Meteor._noYieldsAllowed(function () { + Meteor._noYieldsAllowed(finishIfNeedToPollQuery(function () { var op = notification.op; if (notification.dropCollection) { // Note: this call is not allowed to block on anything (especially @@ -65,7 +116,7 @@ OplogObserveDriver = function (options) { else self._handleOplogEntrySteadyOrFetching(op); } - }); + })); } )); }); @@ -101,55 +152,261 @@ OplogObserveDriver = function (options) { // Give _observeChanges a chance to add the new ObserveHandle to our // multiplexer, so that the added calls get streamed. - Meteor.defer(function () { + Meteor.defer(finishIfNeedToPollQuery(function () { self._runInitialQuery(); - }); + })); }; _.extend(OplogObserveDriver.prototype, { - _add: function (doc) { + _addPublished: function (id, doc) { var self = this; - var id = doc._id; var fields = _.clone(doc); delete fields._id; - if (self._published.has(id)) - throw Error("tried to add something already published " + id); - self._published.set(id, self._sharedProjectionFn(fields)); + self._published.set(id, self._sharedProjectionFn(doc)); self._multiplexer.added(id, self._projectionFn(fields)); + + // After adding this document, the published set might be overflowed + // (exceeding capacity specified by limit). If so, push the maximum element + // to the buffer, we might want to save it in memory to reduce the amount of + // Mongo lookups in the future. + if (self._limit && self._published.size() > self._limit) { + // XXX in theory the size of published is no more than limit+1 + if (self._published.size() !== self._limit + 1) { + throw new Error("After adding to published, " + + (self._published.size() - self._limit) + + " documents are overflowing the set"); + } + + var overflowingDocId = self._published.maxElementId(); + var overflowingDoc = self._published.get(overflowingDocId); + + if (EJSON.equals(overflowingDocId, id)) { + throw new Error("The document just added is overflowing the published set"); + } + + self._published.remove(overflowingDocId); + self._multiplexer.removed(overflowingDocId); + self._addBuffered(overflowingDocId, overflowingDoc); + } }, - _remove: function (id) { + _removePublished: function (id) { var self = this; - if (!self._published.has(id)) - throw Error("tried to remove something unpublished " + id); self._published.remove(id); self._multiplexer.removed(id); - }, - _handleDoc: function (id, newDoc, mustMatchNow) { - var self = this; - newDoc = _.clone(newDoc); + if (! self._limit || self._published.size() === self._limit) + return; - var matchesNow = newDoc && self._matcher.documentMatches(newDoc).result; - if (mustMatchNow && !matchesNow) { - throw Error("expected " + EJSON.stringify(newDoc) + " to match " - + EJSON.stringify(self._cursorDescription)); + if (self._published.size() > self._limit) + throw Error("self._published got too big"); + + // OK, we are publishing less than the limit. Maybe we should look in the + // buffer to find the next element past what we were publishing before. + + if (!self._unpublishedBuffer.empty()) { + // There's something in the buffer; move the first thing in it to + // _published. + var newDocId = self._unpublishedBuffer.minElementId(); + var newDoc = self._unpublishedBuffer.get(newDocId); + self._removeBuffered(newDocId); + self._addPublished(newDocId, newDoc); + return; } - var matchedBefore = self._published.has(id); + // There's nothing in the buffer. This could mean one of a few things. - if (matchesNow && !matchedBefore) { - self._add(newDoc); - } else if (matchedBefore && !matchesNow) { - self._remove(id); - } else if (matchesNow) { + // (a) We could be in the middle of re-running the query (specifically, we + // could be in _publishNewResults). In that case, _unpublishedBuffer is + // empty because we clear it at the beginning of _publishNewResults. In this + // case, our caller already knows the entire answer to the query and we + // don't need to do anything fancy here. Just return. + if (self._phase === PHASE.QUERYING) + return; + + // (b) We're pretty confident that the union of _published and + // _unpublishedBuffer contain all documents that match selector. Because + // _unpublishedBuffer is empty, that means we're confident that _published + // contains all documents that match selector. So we have nothing to do. + if (self._safeAppendToBuffer) + return; + + // (c) Maybe there are other documents out there that should be in our + // buffer. But in that case, when we emptied _unpublishedBuffer in + // _removeBuffered, we should have called _needToPollQuery, which will + // either put something in _unpublishedBuffer or set _safeAppendToBuffer (or + // both), and it will put us in QUERYING for that whole time. So in fact, we + // shouldn't be able to get here. + + throw new Error("Buffer inexplicably empty"); + }, + _changePublished: function (id, oldDoc, newDoc) { + var self = this; + self._published.set(id, self._sharedProjectionFn(newDoc)); + var changed = LocalCollection._makeChangedFields(_.clone(newDoc), oldDoc); + changed = self._projectionFn(changed); + if (!_.isEmpty(changed)) + self._multiplexer.changed(id, changed); + }, + _addBuffered: function (id, doc) { + var self = this; + self._unpublishedBuffer.set(id, self._sharedProjectionFn(doc)); + + // If something is overflowing the buffer, we just remove it from cache + if (self._unpublishedBuffer.size() > self._limit) { + var maxBufferedId = self._unpublishedBuffer.maxElementId(); + + self._unpublishedBuffer.remove(maxBufferedId); + + // Since something matching is removed from cache (both published set and + // buffer), set flag to false + self._safeAppendToBuffer = false; + } + }, + // Is called either to remove the doc completely from matching set or to move + // it to the published set later. + _removeBuffered: function (id) { + var self = this; + self._unpublishedBuffer.remove(id); + // To keep the contract "buffer is never empty in STEADY phase unless the + // everything matching fits into published" true, we poll everything as soon + // as we see the buffer becoming empty. + if (! self._unpublishedBuffer.size() && ! self._safeAppendToBuffer) + self._needToPollQuery(); + }, + // Called when a document has joined the "Matching" results set. + // Takes responsibility of keeping _unpublishedBuffer in sync with _published + // and the effect of limit enforced. + _addMatching: function (doc) { + var self = this; + var id = doc._id; + if (self._published.has(id)) + throw Error("tried to add something already published " + id); + if (self._limit && self._unpublishedBuffer.has(id)) + throw Error("tried to add something already existed in buffer " + id); + + var limit = self._limit; + var comparator = self._comparator; + var maxPublished = (limit && self._published.size() > 0) ? + self._published.get(self._published.maxElementId()) : null; + var maxBuffered = (limit && self._unpublishedBuffer.size() > 0) ? + self._unpublishedBuffer.get(self._unpublishedBuffer.maxElementId()) : null; + // The query is unlimited or didn't publish enough documents yet or the new + // document would fit into published set pushing the maximum element out, + // then we need to publish the doc. + var toPublish = ! limit || self._published.size() < limit || + comparator(doc, maxPublished) < 0; + + // Otherwise we might need to buffer it (only in case of limited query). + // Buffering is allowed if the buffer is not filled up yet and all matching + // docs are either in the published set or in the buffer. + var canAppendToBuffer = !toPublish && self._safeAppendToBuffer && + self._unpublishedBuffer.size() < limit; + + // Or if it is small enough to be safely inserted to the middle or the + // beginning of the buffer. + var canInsertIntoBuffer = !toPublish && maxBuffered && + comparator(doc, maxBuffered) <= 0; + + var toBuffer = canAppendToBuffer || canInsertIntoBuffer; + + if (toPublish) { + self._addPublished(id, doc); + } else if (toBuffer) { + self._addBuffered(id, doc); + } else { + // dropping it and not saving to the cache + self._safeAppendToBuffer = false; + } + }, + // Called when a document leaves the "Matching" results set. + // Takes responsibility of keeping _unpublishedBuffer in sync with _published + // and the effect of limit enforced. + _removeMatching: function (id) { + var self = this; + if (! self._published.has(id) && ! self._limit) + throw Error("tried to remove something matching but not cached " + id); + + if (self._published.has(id)) { + self._removePublished(id); + } else if (self._unpublishedBuffer.has(id)) { + self._removeBuffered(id); + } + }, + _handleDoc: function (id, newDoc) { + var self = this; + var matchesNow = newDoc && self._matcher.documentMatches(newDoc).result; + + var publishedBefore = self._published.has(id); + var bufferedBefore = self._limit && self._unpublishedBuffer.has(id); + var cachedBefore = publishedBefore || bufferedBefore; + + if (matchesNow && !cachedBefore) { + self._addMatching(newDoc); + } else if (cachedBefore && !matchesNow) { + self._removeMatching(id); + } else if (cachedBefore && matchesNow) { var oldDoc = self._published.get(id); - if (!oldDoc) - throw Error("thought that " + id + " was there!"); - delete newDoc._id; - self._published.set(id, self._sharedProjectionFn(newDoc)); - var changed = LocalCollection._makeChangedFields(_.clone(newDoc), oldDoc); - changed = self._projectionFn(changed); - if (!_.isEmpty(changed)) - self._multiplexer.changed(id, changed); + var comparator = self._comparator; + var minBuffered = self._limit && self._unpublishedBuffer.size() && + self._unpublishedBuffer.get(self._unpublishedBuffer.minElementId()); + + if (publishedBefore) { + // Unlimited case where the document stays in published once it matches + // or the case when we don't have enough matching docs to publish or the + // changed but matching doc will stay in published anyways. + // XXX: We rely on the emptiness of buffer. Be sure to maintain the fact + // that buffer can't be empty if there are matching documents not + // published. Notably, we don't want to schedule repoll and continue + // relying on this property. + var staysInPublished = ! self._limit || + self._unpublishedBuffer.size() === 0 || + comparator(newDoc, minBuffered) <= 0; + + if (staysInPublished) { + self._changePublished(id, oldDoc, newDoc); + } else { + // after the change doc doesn't stay in the published, remove it + self._removePublished(id); + // but it can move into buffered now, check it + var maxBuffered = self._unpublishedBuffer.get(self._unpublishedBuffer.maxElementId()); + + var toBuffer = self._safeAppendToBuffer || + (maxBuffered && comparator(newDoc, maxBuffered) <= 0); + + if (toBuffer) { + self._addBuffered(id, newDoc); + } else { + // Throw away from both published set and buffer + self._safeAppendToBuffer = false; + } + } + } else if (bufferedBefore) { + oldDoc = self._unpublishedBuffer.get(id); + // remove the old version manually so we don't trigger the querying + // immediately + self._unpublishedBuffer.remove(id); + + var maxPublished = self._published.get(self._published.maxElementId()); + var maxBuffered = self._unpublishedBuffer.size() && self._unpublishedBuffer.get(self._unpublishedBuffer.maxElementId()); + + // the buffered doc was updated, it could move to published + var toPublish = comparator(newDoc, maxPublished) < 0; + + // or stays in buffer even after the change + var staysInBuffer = (! toPublish && self._safeAppendToBuffer) || + (!toPublish && maxBuffered && comparator(newDoc, maxBuffered) <= 0); + + if (toPublish) { + self._addPublished(id, newDoc); + } else if (staysInBuffer) { + // stays in buffer but changes + self._unpublishedBuffer.set(id, newDoc); + } else { + // Throw away from both published set and buffer + self._safeAppendToBuffer = false; + } + } else { + throw new Error("cachedBefore implies either of publishedBefore or bufferedBefore is true."); + } } }, _fetchModifiedDocuments: function () { @@ -157,7 +414,7 @@ _.extend(OplogObserveDriver.prototype, { self._registerPhaseChange(PHASE.FETCHING); // Defer, because nothing called from the oplog entry handler may yield, but // fetch() yields. - Meteor.defer(function () { + Meteor.defer(finishIfNeedToPollQuery(function () { while (!self._stopped && !self._needToFetch.empty()) { if (self._phase !== PHASE.FETCHING) throw new Error("phase in fetchModifiedDocuments: " + self._phase); @@ -174,36 +431,40 @@ _.extend(OplogObserveDriver.prototype, { waiting++; self._mongoHandle._docFetcher.fetch( self._cursorDescription.collectionName, id, cacheKey, - function (err, doc) { - if (err) { - if (!anyError) - anyError = err; - } else if (!self._stopped && self._phase === PHASE.FETCHING - && self._fetchGeneration === thisGeneration) { - // We re-check the generation in case we've had an explicit - // _pollQuery call which should effectively cancel this round of - // fetches. (_pollQuery increments the generation.) - self._handleDoc(id, doc); + finishIfNeedToPollQuery(function (err, doc) { + try { + if (err) { + if (!anyError) + anyError = err; + } else if (!self._stopped && self._phase === PHASE.FETCHING + && self._fetchGeneration === thisGeneration) { + // We re-check the generation in case we've had an explicit + // _pollQuery call (eg, in another fiber) which should + // effectively cancel this round of fetches. (_pollQuery + // increments the generation.) + self._handleDoc(id, doc); + } + } finally { + waiting--; + // Because fetch() never calls its callback synchronously, this + // is safe (ie, we won't call fut.return() before the forEach is + // done). + if (waiting === 0) + fut.return(); } - waiting--; - // Because fetch() never calls its callback synchronously, this is - // safe (ie, we won't call fut.return() before the forEach is - // done). - if (waiting === 0) - fut.return(); - }); + })); }); fut.wait(); // XXX do this even if we've switched to PHASE.QUERYING? if (anyError) throw anyError; - // Exit now if we've had a _pollQuery call. + // Exit now if we've had a _pollQuery call (here or in another fiber). if (self._phase === PHASE.QUERYING) return; self._currentlyFetching = null; } self._beSteady(); - }); + })); }, _beSteady: function () { var self = this; @@ -233,16 +494,18 @@ _.extend(OplogObserveDriver.prototype, { } if (op.op === 'd') { - if (self._published.has(id)) - self._remove(id); + if (self._published.has(id) || (self._limit && self._unpublishedBuffer.has(id))) + self._removeMatching(id); } else if (op.op === 'i') { if (self._published.has(id)) - throw new Error("insert found for already-existing ID"); + throw new Error("insert found for already-existing ID in published"); + if (self._unpublishedBuffer && self._unpublishedBuffer.has(id)) + throw new Error("insert found for already-existing ID in buffer"); // XXX what if selector yields? for now it can't but later it could have // $where if (self._matcher.documentMatches(op.o).result) - self._add(op.o); + self._addMatching(op.o); } else if (op.op === 'u') { // Is this a modifier ($set/$unset, which may require us to poll the // database to figure out if the whole document matches the selector) or a @@ -256,12 +519,19 @@ _.extend(OplogObserveDriver.prototype, { var canDirectlyModifyDoc = !isReplace && modifierCanBeDirectlyApplied(op.o); + var publishedBefore = self._published.has(id); + var bufferedBefore = self._limit && self._unpublishedBuffer.has(id); + if (isReplace) { self._handleDoc(id, _.extend({_id: id}, op.o)); - } else if (self._published.has(id) && canDirectlyModifyDoc) { + } else if ((publishedBefore || bufferedBefore) && canDirectlyModifyDoc) { // Oh great, we actually know what the document is, so we can apply // this directly. - var newDoc = EJSON.clone(self._published.get(id)); + var newDoc = self._published.has(id) ? + self._published.get(id) : + self._unpublishedBuffer.get(id); + newDoc = EJSON.clone(newDoc); + newDoc._id = id; LocalCollection._modify(newDoc, op.o); self._handleDoc(id, self._sharedProjectionFn(newDoc)); @@ -280,10 +550,8 @@ _.extend(OplogObserveDriver.prototype, { if (self._stopped) throw new Error("oplog stopped surprisingly early"); - var initialCursor = self._cursorForQuery(); - initialCursor.forEach(function (initialDoc) { - self._add(initialDoc); - }); + self._runQuery(); + if (self._stopped) throw new Error("oplog stopped quite early"); // Allow observeChanges calls to return. (After this, it's possible for @@ -319,27 +587,48 @@ _.extend(OplogObserveDriver.prototype, { ++self._fetchGeneration; // ignore any in-flight fetches self._registerPhaseChange(PHASE.QUERYING); - // Defer so that we don't block. + // Defer so that we don't block. We don't need finishIfNeedToPollQuery here + // because SwitchedToQuery is not called in QUERYING mode. Meteor.defer(function () { - // subtle note: _published does not contain _id fields, but newResults - // does - var newResults = new LocalCollection._IdMap; - var cursor = self._cursorForQuery(); - cursor.forEach(function (doc) { - newResults.set(doc._id, doc); - }); - - self._publishNewResults(newResults); - + self._runQuery(); self._doneQuerying(); }); }, + _runQuery: function () { + var self = this; + var newResults = new LocalCollection._IdMap; + var newBuffer = new LocalCollection._IdMap; + + // Query 2x documents as the half excluded from the original query will go + // into unpublished buffer to reduce additional Mongo lookups in cases when + // documents are removed from the published set and need a replacement. + // XXX needs more thought on non-zero skip + // XXX 2 is a "magic number" meaning there is an extra chunk of docs for + // buffer if such is needed. + var cursor = self._cursorForQuery({ limit: self._limit * 2 }); + cursor.forEach(function (doc, i) { + if (!self._limit || i < self._limit) + newResults.set(doc._id, doc); + else + newBuffer.set(doc._id, doc); + }); + + self._publishNewResults(newResults, newBuffer); + }, + // Transitions to QUERYING and runs another query, or (if already in QUERYING) // ensures that we will query again later. // // This function may not block, because it is called from an oplog entry - // handler. + // handler. However, if we were not already in the QUERYING phase, it throws + // an exception that is caught by the closest surrounding + // finishIfNeedToPollQuery call; this ensures that we don't continue running + // close that was designed for another phase inside PHASE.QUERYING. + // + // (It's also necessary whenever logic in this file yields to check that other + // phases haven't put us into QUERYING mode, though; eg, + // _fetchModifiedDocuments does this.) _needToPollQuery: function () { var self = this; if (self._stopped) @@ -349,7 +638,7 @@ _.extend(OplogObserveDriver.prototype, { // pausing FETCHING). if (self._phase !== PHASE.QUERYING) { self._pollQuery(); - return; + throw new SwitchedToQuery; } // We're currently in QUERYING. Set a flag to ensure that we run another @@ -379,7 +668,7 @@ _.extend(OplogObserveDriver.prototype, { } }, - _cursorForQuery: function () { + _cursorForQuery: function (optionsOverwrite) { var self = this; // The query we run is almost the same as the cursor we are observing, with @@ -388,6 +677,11 @@ _.extend(OplogObserveDriver.prototype, { // "shared" projection). And we don't want to apply any transform in the // cursor, because observeChanges shouldn't use the transform. var options = _.clone(self._cursorDescription.options); + + // Allow the caller to modify the options. Useful to specify different skip + // and limit values. + _.extend(options, optionsOverwrite); + options.fields = self._sharedProjection; delete options.transform; // We are NOT deep cloning fields or selector here, which should be OK. @@ -401,13 +695,20 @@ _.extend(OplogObserveDriver.prototype, { // Replace self._published with newResults (both are IdMaps), invoking observe // callbacks on the multiplexer. + // Replace self._unpublishedBuffer with newBuffer. // // XXX This is very similar to LocalCollection._diffQueryUnorderedChanges. We // should really: (a) Unify IdMap and OrderedDict into Unordered/OrderedDict (b) // Rewrite diff.js to use these classes instead of arrays and objects. - _publishNewResults: function (newResults) { + _publishNewResults: function (newResults, newBuffer) { var self = this; + // If the query is limited and there is a buffer, shut down so it doesn't + // stay in a way. + if (self._limit) { + self._unpublishedBuffer.clear(); + } + // First remove anything that's gone. Be careful not to modify // self._published while iterating over it. var idsToRemove = []; @@ -416,15 +717,33 @@ _.extend(OplogObserveDriver.prototype, { idsToRemove.push(id); }); _.each(idsToRemove, function (id) { - self._remove(id); + self._removePublished(id); }); // Now do adds and changes. + // If self has a buffer and limit, the new fetched result will be + // limited correctly as the query has sort specifier. newResults.forEach(function (doc, id) { - // "true" here means to throw if we think this doc doesn't match the - // selector. - self._handleDoc(id, doc, true); + self._handleDoc(id, doc); }); + + // Sanity-check that everything we tried to put into _published ended up + // there. + // XXX if this is slow, remove it later + if (self._published.size() !== newResults.size()) { + throw Error("failed to copy newResults into _published!"); + } + self._published.forEach(function (doc, id) { + if (!newResults.has(id)) + throw Error("_published has a doc that newResults doesn't; " + id); + }); + + // Finally, replace the buffer + newBuffer.forEach(function (doc, id) { + self._addBuffered(id, doc); + }); + + self._safeAppendToBuffer = newBuffer.size() < self._limit; }, // This stop function is invoked from the onStop of the ObserveMultiplexer, so @@ -451,6 +770,7 @@ _.extend(OplogObserveDriver.prototype, { // Proactively drop references to potentially big things. self._published = null; + self._unpublishedBuffer = null; self._needToFetch = null; self._currentlyFetching = null; self._oplogEntryHandle = null; @@ -486,10 +806,11 @@ OplogObserveDriver.cursorSupported = function (cursorDescription, matcher) { if (options._disableOplog) return false; - // This option (which are mostly used for sorted cursors) require us to figure - // out where a given document fits in an order to know if it's included or - // not, and we don't track that information when doing oplog tailing. - if (options.limit || options.skip) return false; + // skip is not supported: to support it we would need to keep track of all + // "skipped" documents or at least their ids. + // limit w/o a sort specifier is not supported: current implementation needs a + // deterministic way to order documents. + if (options.skip || (options.limit && !options.sort)) return false; // If a fields projection option is given check if it is supported by // minimongo (some operators are not supported). @@ -509,7 +830,9 @@ OplogObserveDriver.cursorSupported = function (cursorDescription, matcher) { // as Mongo, and can yield!) // - $near (has "interesting" properties in MongoDB, like the possibility // of returning an ID multiple times, though even polling maybe - // have a bug there + // have a bug there) + // XXX: once we support it, we would need to think more on how we + // initialize the comparators when we create the driver. return !matcher.hasWhere() && !matcher.hasGeoQuery(); }; diff --git a/packages/mongo-livedata/oplog_tests.js b/packages/mongo-livedata/oplog_tests.js index b46c00d799..8ecc1099c5 100644 --- a/packages/mongo-livedata/oplog_tests.js +++ b/packages/mongo-livedata/oplog_tests.js @@ -4,8 +4,8 @@ Tinytest.add("mongo-livedata - oplog - cursorSupported", function (test) { var oplogEnabled = !!MongoInternals.defaultRemoteCollectionDriver().mongo._oplogHandle; - var supported = function (expected, selector) { - var cursor = OplogCollection.find(selector); + var supported = function (expected, selector, options) { + var cursor = OplogCollection.find(selector, options); var handle = cursor.observeChanges({added: function () {}}); // If there's no oplog at all, we shouldn't ever use it. if (!oplogEnabled) @@ -44,4 +44,10 @@ Tinytest.add("mongo-livedata - oplog - cursorSupported", function (test) { // Nothing Minimongo doesn't understand. (Minimongo happens to fail to // implement $elemMatch inside $all which MongoDB supports.) supported(false, {x: {$all: [{$elemMatch: {y: 2}}]}}); + + supported(true, {}, { sort: {x:1} }); + supported(true, {}, { sort: {x:1}, limit: 5 }); + supported(false, {}, { limit: 5 }); + supported(false, {}, { skip: 2, limit: 5 }); + supported(false, {}, { skip: 2 }); }); diff --git a/packages/mongo-livedata/package.js b/packages/mongo-livedata/package.js index f0b1f9e3d2..4f1dfa8866 100644 --- a/packages/mongo-livedata/package.js +++ b/packages/mongo-livedata/package.js @@ -24,6 +24,10 @@ Package.on_use(function (api) { ['client', 'server']); api.use('check', ['client', 'server']); + // Binary Heap data structure is used to optimize oplog observe driver + // performance. + api.use('binary-heap', 'server'); + // Allow us to detect 'insecure'. api.use('insecure', {weak: true});