Files
meteor/packages/minimongo/cursor.js
Jan Dvorak b5b7306bed Remove deprecated code from before v1.0
Remove deprecated code from before Meteor 1.0 that  was documented via XXX COMPAT in comments. Move deprecated packages into the deprecated folder (deps, livedata, srp).
2021-04-23 17:56:05 +02:00

510 lines
15 KiB
JavaScript

import LocalCollection from './local_collection.js';
import { hasOwn } from './common.js';
// Cursor: a specification for a particular subset of documents, w/ a defined
// order, limit, and offset. creating a Cursor with LocalCollection.find(),
export default class Cursor {
// don't call this ctor directly. use LocalCollection.find().
constructor(collection, selector, options = {}) {
this.collection = collection;
this.sorter = null;
this.matcher = new Minimongo.Matcher(selector);
if (LocalCollection._selectorIsIdPerhapsAsObject(selector)) {
// stash for fast _id and { _id }
this._selectorId = hasOwn.call(selector, '_id')
? selector._id
: selector;
} else {
this._selectorId = undefined;
if (this.matcher.hasGeoQuery() || options.sort) {
this.sorter = new Minimongo.Sorter(options.sort || []);
}
}
this.skip = options.skip || 0;
this.limit = options.limit;
this.fields = options.fields;
this._projectionFn = LocalCollection._compileProjection(this.fields || {});
this._transform = LocalCollection.wrapTransform(options.transform);
// by default, queries register w/ Tracker when it is available.
if (typeof Tracker !== 'undefined') {
this.reactive = options.reactive === undefined ? true : options.reactive;
}
}
/**
* @summary Returns the number of documents that match a query.
* @memberOf Mongo.Cursor
* @method count
* @param {boolean} [applySkipLimit=true] If set to `false`, the value
* returned will reflect the total
* number of matching documents,
* ignoring any value supplied for
* limit
* @instance
* @locus Anywhere
* @returns {Number}
*/
count(applySkipLimit = true) {
if (this.reactive) {
// allow the observe to be unordered
this._depend({added: true, removed: true}, true);
}
return this._getRawObjects({
ordered: true,
applySkipLimit
}).length;
}
/**
* @summary Return all matching documents as an Array.
* @memberOf Mongo.Cursor
* @method fetch
* @instance
* @locus Anywhere
* @returns {Object[]}
*/
fetch() {
const result = [];
this.forEach(doc => {
result.push(doc);
});
return result;
}
[Symbol.iterator]() {
if (this.reactive) {
this._depend({
addedBefore: true,
removed: true,
changed: true,
movedBefore: true});
}
let index = 0;
const objects = this._getRawObjects({ordered: true});
return {
next: () => {
if (index < objects.length) {
// This doubles as a clone operation.
let element = this._projectionFn(objects[index++]);
if (this._transform)
element = this._transform(element);
return {value: element};
}
return {done: true};
}
};
}
/**
* @callback IterationCallback
* @param {Object} doc
* @param {Number} index
*/
/**
* @summary Call `callback` once for each matching document, sequentially and
* synchronously.
* @locus Anywhere
* @method forEach
* @instance
* @memberOf Mongo.Cursor
* @param {IterationCallback} callback Function to call. It will be called
* with three arguments: the document, a
* 0-based index, and <em>cursor</em>
* itself.
* @param {Any} [thisArg] An object which will be the value of `this` inside
* `callback`.
*/
forEach(callback, thisArg) {
if (this.reactive) {
this._depend({
addedBefore: true,
removed: true,
changed: true,
movedBefore: true});
}
this._getRawObjects({ordered: true}).forEach((element, i) => {
// This doubles as a clone operation.
element = this._projectionFn(element);
if (this._transform) {
element = this._transform(element);
}
callback.call(thisArg, element, i, this);
});
}
getTransform() {
return this._transform;
}
/**
* @summary Map callback over all matching documents. Returns an Array.
* @locus Anywhere
* @method map
* @instance
* @memberOf Mongo.Cursor
* @param {IterationCallback} callback Function to call. It will be called
* with three arguments: the document, a
* 0-based index, and <em>cursor</em>
* itself.
* @param {Any} [thisArg] An object which will be the value of `this` inside
* `callback`.
*/
map(callback, thisArg) {
const result = [];
this.forEach((doc, i) => {
result.push(callback.call(thisArg, doc, i, this));
});
return result;
}
// options to contain:
// * callbacks for observe():
// - addedAt (document, atIndex)
// - added (document)
// - changedAt (newDocument, oldDocument, atIndex)
// - changed (newDocument, oldDocument)
// - removedAt (document, atIndex)
// - removed (document)
// - movedTo (document, oldIndex, newIndex)
//
// attributes available on returned query handle:
// * stop(): end updates
// * collection: the collection this query is querying
//
// iff x is a returned query handle, (x instanceof
// LocalCollection.ObserveHandle) is true
//
// initial results delivered through added callback
// XXX maybe callbacks should take a list of objects, to expose transactions?
// XXX maybe support field limiting (to limit what you're notified on)
/**
* @summary Watch a query. Receive callbacks as the result set changes.
* @locus Anywhere
* @memberOf Mongo.Cursor
* @instance
* @param {Object} callbacks Functions to call to deliver the result set as it
* changes
*/
observe(options) {
return LocalCollection._observeFromObserveChanges(this, options);
}
/**
* @summary Watch a query. Receive callbacks as the result set changes. Only
* the differences between the old and new documents are passed to
* the callbacks.
* @locus Anywhere
* @memberOf Mongo.Cursor
* @instance
* @param {Object} callbacks Functions to call to deliver the result set as it
* changes
*/
observeChanges(options) {
const ordered = LocalCollection._observeChangesCallbacksAreOrdered(options);
// there are several places that assume you aren't combining skip/limit with
// unordered observe. eg, update's EJSON.clone, and the "there are several"
// comment in _modifyAndNotify
// XXX allow skip/limit with unordered observe
if (!options._allow_unordered && !ordered && (this.skip || this.limit)) {
throw new Error(
"Must use an ordered observe with skip or limit (i.e. 'addedBefore' " +
"for observeChanges or 'addedAt' for observe, instead of 'added')."
);
}
if (this.fields && (this.fields._id === 0 || this.fields._id === false)) {
throw Error('You may not observe a cursor with {fields: {_id: 0}}');
}
const distances = (
this.matcher.hasGeoQuery() &&
ordered &&
new LocalCollection._IdMap
);
const query = {
cursor: this,
dirty: false,
distances,
matcher: this.matcher, // not fast pathed
ordered,
projectionFn: this._projectionFn,
resultsSnapshot: null,
sorter: ordered && this.sorter
};
let qid;
// Non-reactive queries call added[Before] and then never call anything
// else.
if (this.reactive) {
qid = this.collection.next_qid++;
this.collection.queries[qid] = query;
}
query.results = this._getRawObjects({ordered, distances: query.distances});
if (this.collection.paused) {
query.resultsSnapshot = ordered ? [] : new LocalCollection._IdMap;
}
// wrap callbacks we were passed. callbacks only fire when not paused and
// are never undefined
// Filters out blacklisted fields according to cursor's projection.
// XXX wrong place for this?
// furthermore, callbacks enqueue until the operation we're working on is
// done.
const wrapCallback = fn => {
if (!fn) {
return () => {};
}
const self = this;
return function(/* args*/) {
if (self.collection.paused) {
return;
}
const args = arguments;
self.collection._observeQueue.queueTask(() => {
fn.apply(this, args);
});
};
};
query.added = wrapCallback(options.added);
query.changed = wrapCallback(options.changed);
query.removed = wrapCallback(options.removed);
if (ordered) {
query.addedBefore = wrapCallback(options.addedBefore);
query.movedBefore = wrapCallback(options.movedBefore);
}
if (!options._suppress_initial && !this.collection.paused) {
query.results.forEach(doc => {
const fields = EJSON.clone(doc);
delete fields._id;
if (ordered) {
query.addedBefore(doc._id, this._projectionFn(fields), null);
}
query.added(doc._id, this._projectionFn(fields));
});
}
const handle = Object.assign(new LocalCollection.ObserveHandle, {
collection: this.collection,
stop: () => {
if (this.reactive) {
delete this.collection.queries[qid];
}
}
});
if (this.reactive && Tracker.active) {
// XXX in many cases, the same observe will be recreated when
// the current autorun is rerun. we could save work by
// letting it linger across rerun and potentially get
// repurposed if the same observe is performed, using logic
// similar to that of Meteor.subscribe.
Tracker.onInvalidate(() => {
handle.stop();
});
}
// run the observe callbacks resulting from the initial contents
// before we leave the observe.
this.collection._observeQueue.drain();
return handle;
}
// XXX Maybe we need a version of observe that just calls a callback if
// anything changed.
_depend(changers, _allow_unordered) {
if (Tracker.active) {
const dependency = new Tracker.Dependency;
const notify = dependency.changed.bind(dependency);
dependency.depend();
const options = {_allow_unordered, _suppress_initial: true};
['added', 'addedBefore', 'changed', 'movedBefore', 'removed']
.forEach(fn => {
if (changers[fn]) {
options[fn] = notify;
}
});
// observeChanges will stop() when this computation is invalidated
this.observeChanges(options);
}
}
_getCollectionName() {
return this.collection.name;
}
// Returns a collection of matching objects, but doesn't deep copy them.
//
// If ordered is set, returns a sorted array, respecting sorter, skip, and
// limit properties of the query provided that options.applySkipLimit is
// not set to false (#1201). If sorter is falsey, no sort -- you get the
// natural order.
//
// If ordered is not set, returns an object mapping from ID to doc (sorter,
// skip and limit should not be set).
//
// If ordered is set and this cursor is a $near geoquery, then this function
// will use an _IdMap to track each distance from the $near argument point in
// order to use it as a sort key. If an _IdMap is passed in the 'distances'
// argument, this function will clear it and use it for this purpose
// (otherwise it will just create its own _IdMap). The observeChanges
// implementation uses this to remember the distances after this function
// returns.
_getRawObjects(options = {}) {
// By default this method will respect skip and limit because .fetch(),
// .forEach() etc... expect this behaviour. It can be forced to ignore
// skip and limit by setting applySkipLimit to false (.count() does this,
// for example)
const applySkipLimit = options.applySkipLimit !== false;
// XXX use OrderedDict instead of array, and make IdMap and OrderedDict
// compatible
const results = options.ordered ? [] : new LocalCollection._IdMap;
// fast path for single ID value
if (this._selectorId !== undefined) {
// If you have non-zero skip and ask for a single id, you get nothing.
// This is so it matches the behavior of the '{_id: foo}' path.
if (applySkipLimit && this.skip) {
return results;
}
const selectedDoc = this.collection._docs.get(this._selectorId);
if (selectedDoc) {
if (options.ordered) {
results.push(selectedDoc);
} else {
results.set(this._selectorId, selectedDoc);
}
}
return results;
}
// slow path for arbitrary selector, sort, skip, limit
// in the observeChanges case, distances is actually part of the "query"
// (ie, live results set) object. in other cases, distances is only used
// inside this function.
let distances;
if (this.matcher.hasGeoQuery() && options.ordered) {
if (options.distances) {
distances = options.distances;
distances.clear();
} else {
distances = new LocalCollection._IdMap();
}
}
this.collection._docs.forEach((doc, id) => {
const matchResult = this.matcher.documentMatches(doc);
if (matchResult.result) {
if (options.ordered) {
results.push(doc);
if (distances && matchResult.distance !== undefined) {
distances.set(id, matchResult.distance);
}
} else {
results.set(id, doc);
}
}
// Override to ensure all docs are matched if ignoring skip & limit
if (!applySkipLimit) {
return true;
}
// Fast path for limited unsorted queries.
// XXX 'length' check here seems wrong for ordered
return (
!this.limit ||
this.skip ||
this.sorter ||
results.length !== this.limit
);
});
if (!options.ordered) {
return results;
}
if (this.sorter) {
results.sort(this.sorter.getComparator({distances}));
}
// Return the full set of results if there is no skip or limit or if we're
// ignoring them
if (!applySkipLimit || (!this.limit && !this.skip)) {
return results;
}
return results.slice(
this.skip,
this.limit ? this.limit + this.skip : results.length
);
}
_publishCursor(subscription) {
// XXX minimongo should not depend on mongo-livedata!
if (!Package.mongo) {
throw new Error(
'Can\'t publish from Minimongo without the `mongo` package.'
);
}
if (!this.collection.name) {
throw new Error(
'Can\'t publish a cursor from a collection without a name.'
);
}
return Package.mongo.Mongo.Collection._publishCursor(
this,
subscription,
this.collection.name
);
}
}