Files
meteor/packages/minimongo/cursor.js
Hugh Willson 3e28d5d49c Adjust observeChanges skip/limit ordered observe error message (#9373)
* Adjust observeChanges skip/limit ordered observe error message

The `Cursor.observeChanges` function throws an error when there
is an attempt to use an unordered observe with skip/limit.
This error can also be thrown when attempting the same thing
with `Cursor.observe`. This is because `Cursor.observe` ends
up calling back to `Cursor.observeChanges`. When using
`Cursor.observe` however, the shared error message
is a bit misleading, as `addedAt` should be used instead of
`addedBefore`. This commit adjusts the shared error message to
cover both `observeChanges` and `observe` scenarios.

Fixes #4996.

* Bump minimongo patch version
2017-11-15 12:59:30 -05:00

522 lines
15 KiB
JavaScript

import LocalCollection from './local_collection.js';
import { hasOwn } from './common.js';
// Cursor: a specification for a particular subset of documents, w/ a defined
// order, limit, and offset. creating a Cursor with LocalCollection.find(),
export default class Cursor {
// don't call this ctor directly. use LocalCollection.find().
constructor(collection, selector, options = {}) {
this.collection = collection;
this.sorter = null;
this.matcher = new Minimongo.Matcher(selector);
if (LocalCollection._selectorIsIdPerhapsAsObject(selector)) {
// stash for fast _id and { _id }
this._selectorId = hasOwn.call(selector, '_id')
? selector._id
: selector;
} else {
this._selectorId = undefined;
if (this.matcher.hasGeoQuery() || options.sort) {
this.sorter = new Minimongo.Sorter(
options.sort || [],
{matcher: this.matcher}
);
}
}
this.skip = options.skip || 0;
this.limit = options.limit;
this.fields = options.fields;
this._projectionFn = LocalCollection._compileProjection(this.fields || {});
this._transform = LocalCollection.wrapTransform(options.transform);
// by default, queries register w/ Tracker when it is available.
if (typeof Tracker !== 'undefined') {
this.reactive = options.reactive === undefined ? true : options.reactive;
}
}
/**
* @summary Returns the number of documents that match a query.
* @memberOf Mongo.Cursor
* @method count
* @param {boolean} [applySkipLimit=true] If set to `false`, the value
* returned will reflect the total
* number of matching documents,
* ignoring any value supplied for
* limit
* @instance
* @locus Anywhere
* @returns {Number}
*/
count(applySkipLimit = true) {
if (this.reactive) {
// allow the observe to be unordered
this._depend({added: true, removed: true}, true);
}
return this._getRawObjects({
ordered: true,
applySkipLimit
}).length;
}
/**
* @summary Return all matching documents as an Array.
* @memberOf Mongo.Cursor
* @method fetch
* @instance
* @locus Anywhere
* @returns {Object[]}
*/
fetch() {
const result = [];
this.forEach(doc => {
result.push(doc);
});
return result;
}
[Symbol.iterator]() {
if (this.reactive) {
this._depend({
addedBefore: true,
removed: true,
changed: true,
movedBefore: true});
}
let index = 0;
const objects = this._getRawObjects({ordered: true});
return {
next: () => {
if (index < objects.length) {
// This doubles as a clone operation.
let element = this._projectionFn(objects[index++]);
if (this._transform)
element = this._transform(element);
return {value: element};
}
return {done: true};
}
};
}
/**
* @callback IterationCallback
* @param {Object} doc
* @param {Number} index
*/
/**
* @summary Call `callback` once for each matching document, sequentially and
* synchronously.
* @locus Anywhere
* @method forEach
* @instance
* @memberOf Mongo.Cursor
* @param {IterationCallback} callback Function to call. It will be called
* with three arguments: the document, a
* 0-based index, and <em>cursor</em>
* itself.
* @param {Any} [thisArg] An object which will be the value of `this` inside
* `callback`.
*/
forEach(callback, thisArg) {
if (this.reactive) {
this._depend({
addedBefore: true,
removed: true,
changed: true,
movedBefore: true});
}
this._getRawObjects({ordered: true}).forEach((element, i) => {
// This doubles as a clone operation.
element = this._projectionFn(element);
if (this._transform) {
element = this._transform(element);
}
callback.call(thisArg, element, i, this);
});
}
getTransform() {
return this._transform;
}
/**
* @summary Map callback over all matching documents. Returns an Array.
* @locus Anywhere
* @method map
* @instance
* @memberOf Mongo.Cursor
* @param {IterationCallback} callback Function to call. It will be called
* with three arguments: the document, a
* 0-based index, and <em>cursor</em>
* itself.
* @param {Any} [thisArg] An object which will be the value of `this` inside
* `callback`.
*/
map(callback, thisArg) {
const result = [];
this.forEach((doc, i) => {
result.push(callback.call(thisArg, doc, i, this));
});
return result;
}
// options to contain:
// * callbacks for observe():
// - addedAt (document, atIndex)
// - added (document)
// - changedAt (newDocument, oldDocument, atIndex)
// - changed (newDocument, oldDocument)
// - removedAt (document, atIndex)
// - removed (document)
// - movedTo (document, oldIndex, newIndex)
//
// attributes available on returned query handle:
// * stop(): end updates
// * collection: the collection this query is querying
//
// iff x is a returned query handle, (x instanceof
// LocalCollection.ObserveHandle) is true
//
// initial results delivered through added callback
// XXX maybe callbacks should take a list of objects, to expose transactions?
// XXX maybe support field limiting (to limit what you're notified on)
/**
* @summary Watch a query. Receive callbacks as the result set changes.
* @locus Anywhere
* @memberOf Mongo.Cursor
* @instance
* @param {Object} callbacks Functions to call to deliver the result set as it
* changes
*/
observe(options) {
return LocalCollection._observeFromObserveChanges(this, options);
}
/**
* @summary Watch a query. Receive callbacks as the result set changes. Only
* the differences between the old and new documents are passed to
* the callbacks.
* @locus Anywhere
* @memberOf Mongo.Cursor
* @instance
* @param {Object} callbacks Functions to call to deliver the result set as it
* changes
*/
observeChanges(options) {
const ordered = LocalCollection._observeChangesCallbacksAreOrdered(options);
// there are several places that assume you aren't combining skip/limit with
// unordered observe. eg, update's EJSON.clone, and the "there are several"
// comment in _modifyAndNotify
// XXX allow skip/limit with unordered observe
if (!options._allow_unordered && !ordered && (this.skip || this.limit)) {
throw new Error(
"Must use an ordered observe with skip or limit (i.e. 'addedBefore' " +
"for observeChanges or 'addedAt' for observe, instead of 'added')."
);
}
if (this.fields && (this.fields._id === 0 || this.fields._id === false)) {
throw Error('You may not observe a cursor with {fields: {_id: 0}}');
}
const distances = (
this.matcher.hasGeoQuery() &&
ordered &&
new LocalCollection._IdMap
);
const query = {
cursor: this,
dirty: false,
distances,
matcher: this.matcher, // not fast pathed
ordered,
projectionFn: this._projectionFn,
resultsSnapshot: null,
sorter: ordered && this.sorter
};
let qid;
// Non-reactive queries call added[Before] and then never call anything
// else.
if (this.reactive) {
qid = this.collection.next_qid++;
this.collection.queries[qid] = query;
}
query.results = this._getRawObjects({ordered, distances: query.distances});
if (this.collection.paused) {
query.resultsSnapshot = ordered ? [] : new LocalCollection._IdMap;
}
// wrap callbacks we were passed. callbacks only fire when not paused and
// are never undefined
// Filters out blacklisted fields according to cursor's projection.
// XXX wrong place for this?
// furthermore, callbacks enqueue until the operation we're working on is
// done.
const wrapCallback = fn => {
if (!fn) {
return () => {};
}
const self = this;
return function(/* args*/) {
if (self.collection.paused) {
return;
}
const args = arguments;
self.collection._observeQueue.queueTask(() => {
fn.apply(this, args);
});
};
};
query.added = wrapCallback(options.added);
query.changed = wrapCallback(options.changed);
query.removed = wrapCallback(options.removed);
if (ordered) {
query.addedBefore = wrapCallback(options.addedBefore);
query.movedBefore = wrapCallback(options.movedBefore);
}
if (!options._suppress_initial && !this.collection.paused) {
const results = ordered ? query.results : query.results._map;
Object.keys(results).forEach(key => {
const doc = results[key];
const fields = EJSON.clone(doc);
delete fields._id;
if (ordered) {
query.addedBefore(doc._id, this._projectionFn(fields), null);
}
query.added(doc._id, this._projectionFn(fields));
});
}
const handle = Object.assign(new LocalCollection.ObserveHandle, {
collection: this.collection,
stop: () => {
if (this.reactive) {
delete this.collection.queries[qid];
}
}
});
if (this.reactive && Tracker.active) {
// XXX in many cases, the same observe will be recreated when
// the current autorun is rerun. we could save work by
// letting it linger across rerun and potentially get
// repurposed if the same observe is performed, using logic
// similar to that of Meteor.subscribe.
Tracker.onInvalidate(() => {
handle.stop();
});
}
// run the observe callbacks resulting from the initial contents
// before we leave the observe.
this.collection._observeQueue.drain();
return handle;
}
// Since we don't actually have a "nextObject" interface, there's really no
// reason to have a "rewind" interface. All it did was make multiple calls
// to fetch/map/forEach return nothing the second time.
// XXX COMPAT WITH 0.8.1
rewind() {}
// XXX Maybe we need a version of observe that just calls a callback if
// anything changed.
_depend(changers, _allow_unordered) {
if (Tracker.active) {
const dependency = new Tracker.Dependency;
const notify = dependency.changed.bind(dependency);
dependency.depend();
const options = {_allow_unordered, _suppress_initial: true};
['added', 'addedBefore', 'changed', 'movedBefore', 'removed']
.forEach(fn => {
if (changers[fn]) {
options[fn] = notify;
}
});
// observeChanges will stop() when this computation is invalidated
this.observeChanges(options);
}
}
_getCollectionName() {
return this.collection.name;
}
// Returns a collection of matching objects, but doesn't deep copy them.
//
// If ordered is set, returns a sorted array, respecting sorter, skip, and
// limit properties of the query provided that options.applySkipLimit is
// not set to false (#1201). If sorter is falsey, no sort -- you get the
// natural order.
//
// If ordered is not set, returns an object mapping from ID to doc (sorter,
// skip and limit should not be set).
//
// If ordered is set and this cursor is a $near geoquery, then this function
// will use an _IdMap to track each distance from the $near argument point in
// order to use it as a sort key. If an _IdMap is passed in the 'distances'
// argument, this function will clear it and use it for this purpose
// (otherwise it will just create its own _IdMap). The observeChanges
// implementation uses this to remember the distances after this function
// returns.
_getRawObjects(options = {}) {
// By default this method will respect skip and limit because .fetch(),
// .forEach() etc... expect this behaviour. It can be forced to ignore
// skip and limit by setting applySkipLimit to false (.count() does this,
// for example)
const applySkipLimit = options.applySkipLimit !== false;
// XXX use OrderedDict instead of array, and make IdMap and OrderedDict
// compatible
const results = options.ordered ? [] : new LocalCollection._IdMap;
// fast path for single ID value
if (this._selectorId !== undefined) {
// If you have non-zero skip and ask for a single id, you get nothing.
// This is so it matches the behavior of the '{_id: foo}' path.
if (applySkipLimit && this.skip) {
return results;
}
const selectedDoc = this.collection._docs.get(this._selectorId);
if (selectedDoc) {
if (options.ordered) {
results.push(selectedDoc);
} else {
results.set(this._selectorId, selectedDoc);
}
}
return results;
}
// slow path for arbitrary selector, sort, skip, limit
// in the observeChanges case, distances is actually part of the "query"
// (ie, live results set) object. in other cases, distances is only used
// inside this function.
let distances;
if (this.matcher.hasGeoQuery() && options.ordered) {
if (options.distances) {
distances = options.distances;
distances.clear();
} else {
distances = new LocalCollection._IdMap();
}
}
this.collection._docs.forEach((doc, id) => {
const matchResult = this.matcher.documentMatches(doc);
if (matchResult.result) {
if (options.ordered) {
results.push(doc);
if (distances && matchResult.distance !== undefined) {
distances.set(id, matchResult.distance);
}
} else {
results.set(id, doc);
}
}
// Override to ensure all docs are matched if ignoring skip & limit
if (!applySkipLimit) {
return true;
}
// Fast path for limited unsorted queries.
// XXX 'length' check here seems wrong for ordered
return (
!this.limit ||
this.skip ||
this.sorter ||
results.length !== this.limit
);
});
if (!options.ordered) {
return results;
}
if (this.sorter) {
results.sort(this.sorter.getComparator({distances}));
}
// Return the full set of results if there is no skip or limit or if we're
// ignoring them
if (!applySkipLimit || (!this.limit && !this.skip)) {
return results;
}
return results.slice(
this.skip,
this.limit ? this.limit + this.skip : results.length
);
}
_publishCursor(subscription) {
// XXX minimongo should not depend on mongo-livedata!
if (!Package.mongo) {
throw new Error(
'Can\'t publish from Minimongo without the `mongo` package.'
);
}
if (!this.collection.name) {
throw new Error(
'Can\'t publish a cursor from a collection without a name.'
);
}
return Package.mongo.Mongo.Collection._publishCursor(
this,
subscription,
this.collection.name
);
}
}