Fix selectors matching against arrays containing subdocuments.

This is a 0.5.5 regression (with the new selector compiler): the new compiler
handled selectors with "a.b.c" reasonably well if the "c" level contained an
array but not if the "a" or "b" levels did.

This also implements mostly-Mongo-compatible behavior for sorting when arrays
are involved; this is not an 0.5.5 regression, since the old compiler did not
get this right.
This commit is contained in:
David Glasser
2013-02-14 15:18:31 -08:00
parent 1535fe3437
commit ec852121ff
2 changed files with 170 additions and 17 deletions

View File

@@ -210,6 +210,26 @@ Tinytest.add("minimongo - misc", function (test) {
test.equal(b.x.a, 14); // just to document current behavior
});
Tinytest.add("minimongo - lookup", function (test) {
var lookupA = LocalCollection._makeLookupFunction('a');
test.equal(lookupA({}), [undefined]);
test.equal(lookupA({a: 1}), [1]);
test.equal(lookupA({a: [1]}), [[1]]);
var lookupAX = LocalCollection._makeLookupFunction('a.x');
test.equal(lookupAX({a: {x: 1}}), [1]);
test.equal(lookupAX({a: {x: [1]}}), [[1]]);
test.equal(lookupAX({a: 5}), [undefined]);
test.equal(lookupAX({a: [{x: 1}, {x: [2]}, {y: 3}]}),
[1, [2], undefined]);
var lookupA0X = LocalCollection._makeLookupFunction('a.0.x');
test.equal(lookupA0X({a: [{x: 1}]}), [1]);
test.equal(lookupA0X({a: [{x: [1]}]}), [[1]]);
test.equal(lookupA0X({a: 5}), [undefined]);
test.equal(lookupA0X({a: [{x: 1}, {x: [2]}, {y: 3}]}), [1]);
});
Tinytest.add("minimongo - selector_compiler", function (test) {
var matches = function (should_match, selector, doc) {
var does_match = LocalCollection._matches(selector, doc);
@@ -808,6 +828,21 @@ Tinytest.add("minimongo - selector_compiler", function (test) {
nomatch({"dogs.1.name": "Fido"}, {dogs: [{name: "Fido"}, {name: "Rex"}]});
match({"room.1b": "bla"}, {room: {"1b": "bla"}});
match({"dogs.name": "Fido"}, {dogs: [{name: "Fido"}, {name: "Rex"}]});
match({"dogs.name": "Rex"}, {dogs: [{name: "Fido"}, {name: "Rex"}]});
match({"animals.dogs.name": "Fido"},
{animals: [{dogs: [{name: "Rover"}]},
{},
{dogs: [{name: "Fido"}, {name: "Rex"}]}]});
match({"animals.dogs.name": "Fido"},
{animals: [{dogs: {name: "Rex"}},
{dogs: {name: "Fido"}}]});
match({"animals.dogs.name": "Fido"},
{animals: [{dogs: [{name: "Rover"}]},
{},
{dogs: [{name: ["Fido"]}, {name: "Rex"}]}]});
nomatch({"dogs.name": "Fido"}, {dogs: []});
// $elemMatch
match({dogs: {$elemMatch: {name: /e/}}},
{dogs: [{name: "Fido"}, {name: "Rex"}]});
@@ -847,10 +882,19 @@ Tinytest.add("minimongo - ordering", function (test) {
});
};
// note: [] doesn't sort with "arrays", it sorts as "undefined". the position
// of arrays in _typeorder only matters for things like $lt. (This behavior
// verified with MongoDB 2.2.1.) We don't define the relative order of {a: []}
// and {c: 1} is undefined (MongoDB does seem to care but it's not clear how
// or why).
verify([{"a" : 1}, ["a"], [["a", "asc"]]],
[{c: 1}, {a: 1}, {a: {}}, {a: []}, {a: true}]);
[{a: []}, {a: 1}, {a: {}}, {a: true}]);
verify([{"a" : 1}, ["a"], [["a", "asc"]]],
[{c: 1}, {a: 1}, {a: {}}, {a: true}]);
verify([{"a" : -1}, [["a", "desc"]]],
[{a: true}, {a: []}, {a: {}}, {a: 1}, {c: 1}]);
[{a: true}, {a: {}}, {a: 1}, {c: 1}]);
verify([{"a" : -1}, [["a", "desc"]]],
[{a: true}, {a: {}}, {a: 1}, {a: []}]);
verify([{"a" : 1, "b": -1}, ["a", ["b", "desc"]],
[["a", "asc"], ["b", "desc"]]],
@@ -935,6 +979,30 @@ Tinytest.add("minimongo - subkey sort", function (test) {
test.equal(c.find({}, {sort: {'a.nope.c': -1}}).count(), 6);
});
Tinytest.add("minimongo - array sort", function (test) {
var c = new LocalCollection();
// "up" and "down" are the indices that the docs should have when sorted
// ascending and descending by "a.x" respectively. They are not reverses of
// each other: when sorting ascending, you use the minimum value you can find
// in the document, and when sorting descending, you use the maximum value you
// can find. So [1, 4] shows up in the 1 slot when sorting ascending and the 4
// slot when sorting descending.
c.insert({up: 1, down: 1, a: {x: [1, 4]}});
c.insert({up: 2, down: 2, a: [{x: [2]}, {x: 3}]});
c.insert({up: 0, down: 4, a: {x: 0}});
c.insert({up: 3, down: 3, a: {x: 2.5}});
c.insert({up: 4, down: 0, a: {x: 5}});
test.equal(
_.pluck(c.find({}, {sort: {'a.x': 1}}).fetch(), 'up'),
_.range(c.find().count()));
test.equal(
_.pluck(c.find({}, {sort: {'a.x': -1}}).fetch(), 'down'),
_.range(c.find().count()));
});
Tinytest.add("minimongo - modify", function (test) {
var modify = function (doc, mod, result) {

View File

@@ -478,18 +478,61 @@ LocalCollection._matches = function (selector, doc) {
return (LocalCollection._compileSelector(selector))(doc);
};
var makeLookupFunction = function (key) {
// _makeLookupFunction(key) returns a lookup function.
//
// A lookup function takes in a document and returns an array of matching
// values. This array has more than one element if any segment of the key other
// than the last one is an array. ie, any arrays found when doing non-final
// lookups result in this function "branching"; each element in the returned
// array represents the value found at this branch. If any branch doesn't have a
// final value for the full key, its element in the returned list will be
// undefined. It always returns a non-empty array.
//
// _makeLookupFunction('a.x')({a: {x: 1}}) returns [1]
// _makeLookupFunction('a.x')({a: {x: [1]}}) returns [[1]]
// _makeLookupFunction('a.x')({a: 5}) returns [undefined]
// _makeLookupFunction('a.x')({a: [{x: 1},
// {x: [2]},
// {y: 3}]})
// returns [1, [2], undefined]
LocalCollection._makeLookupFunction = function (key) {
var dotLocation = key.indexOf('.');
var first = dotLocation === -1 ? key : key.substr(0, dotLocation);
var lookupRest = dotLocation !== -1 &&
makeLookupFunction(key.substr(dotLocation + 1));
var first, lookupRest, nextIsNumeric;
if (dotLocation === -1) {
first = key;
} else {
first = key.substr(0, dotLocation);
var rest = key.substr(dotLocation + 1);
lookupRest = LocalCollection._makeLookupFunction(rest);
// Is the next (perhaps final) piece numeric (ie, an array lookup?)
nextIsNumeric = /^\d+(\.|$)/.test(rest);
}
return function (doc) {
if (doc == null) // null or undefined
return undefined;
return [undefined];
var firstLevel = doc[first];
if (lookupRest)
return lookupRest(firstLevel);
return firstLevel;
// We don't "branch" at the final level.
if (!lookupRest)
return [firstLevel];
// It's an empty array, and we're not done: we won't find anything.
if (_.isArray(firstLevel) && firstLevel.length === 0)
return [undefined];
// For each result at this level, finish the lookup on the rest of the key,
// and return everything we find. Also, if the next result is a number,
// don't branch here.
//
// Technically, in MongoDB, we should be able to handle the case where
// objects have numeric keys, but Mongo doesn't actually handle this
// consistently yet itself, see eg
// https://jira.mongodb.org/browse/SERVER-2898
// https://github.com/mongodb/mongo/blob/master/jstests/array_match2.js
if (!_.isArray(firstLevel) || nextIsNumeric)
firstLevel = [firstLevel];
return Array.prototype.concat.apply([], _.map(firstLevel, lookupRest));
};
};
@@ -504,10 +547,14 @@ var compileDocumentSelector = function (docSelector) {
throw new Error("Unrecognized logical operator: " + key);
perKeySelectors.push(LOGICAL_OPERATORS[key](subSelector));
} else {
var lookUpByIndex = makeLookupFunction(key);
var lookUpByIndex = LocalCollection._makeLookupFunction(key);
var valueSelectorFunc = compileValueSelector(subSelector);
perKeySelectors.push(function (doc) {
return valueSelectorFunc(lookUpByIndex(doc));
var branchValues = lookUpByIndex(doc);
// We apply the selector to each "branched" value and return true if any
// match. This isn't 100% consistent with MongoDB; eg, see:
// https://jira.mongodb.org/browse/SERVER-8585
return _.any(branchValues, valueSelectorFunc);
});
}
});
@@ -570,12 +617,12 @@ LocalCollection._compileSort = function (spec) {
for (var i = 0; i < spec.length; i++) {
if (typeof spec[i] === "string") {
sortSpecParts.push({
lookup: makeLookupFunction(spec[i]),
lookup: LocalCollection._makeLookupFunction(spec[i]),
ascending: true
});
} else {
sortSpecParts.push({
lookup: makeLookupFunction(spec[i][0]),
lookup: LocalCollection._makeLookupFunction(spec[i][0]),
ascending: spec[i][1] !== "desc"
});
}
@@ -583,7 +630,7 @@ LocalCollection._compileSort = function (spec) {
} else if (typeof spec === "object") {
for (var key in spec) {
sortSpecParts.push({
lookup: makeLookupFunction(key),
lookup: LocalCollection._makeLookupFunction(key),
ascending: spec[key] >= 0
});
}
@@ -594,11 +641,49 @@ LocalCollection._compileSort = function (spec) {
if (sortSpecParts.length === 0)
return function () {return 0;};
// reduceValue takes in all the possible values for the sort key along various
// branches, and returns the min or max value (according to the bool
// findMin). Each value can itself be an array, and we look at its values
// too. (ie, we do a single level of flattening on branchValues, then find the
// min/max.)
var reduceValue = function (branchValues, findMin) {
var reduced;
var first = true;
// Iterate over all the values found in all the branches, and if a value is
// an array itself, iterate over the values in the array separately.
_.each(branchValues, function (branchValue) {
// Value not an array? Pretend it is.
if (!_.isArray(branchValue))
branchValue = [branchValue];
// Value is an empty array? Pretend it was missing, since that's where it
// should be sorted.
if (_.isArray(branchValue) && branchValue.length === 0)
branchValue = [undefined];
_.each(branchValue, function (value) {
// We should get here at least once: lookup functions return non-empty
// arrays, so the outer loop runs at least once, and we prevented
// branchValue from being an empty array.
if (first) {
reduced = value;
first = false;
} else {
// Compare the value we found to the value we found so far, saving it
// if it's less (for an ascending sort) or more (for a descending
// sort).
var cmp = LocalCollection._f._cmp(reduced, value);
if ((findMin && cmp > 0) || (!findMin && cmp < 0))
reduced = value;
}
});
});
return reduced;
};
return function (a, b) {
for (var i = 0; i < sortSpecParts.length; ++i) {
var specPart = sortSpecParts[i];
var aValue = specPart.lookup(a);
var bValue = specPart.lookup(b);
var aValue = reduceValue(specPart.lookup(a), specPart.ascending);
var bValue = reduceValue(specPart.lookup(b), specPart.ascending);
var compare = LocalCollection._f._cmp(aValue, bValue);
if (compare !== 0)
return specPart.ascending ? compare : -compare;