Refactored Cursor.

This commit is contained in:
Radosław Miernik
2017-07-12 18:39:01 +02:00
parent fe576f60ce
commit a686b93e06

View File

@@ -17,13 +17,12 @@ export default class Cursor {
this._selectorId = selector._id;
} else {
this._selectorId = undefined;
if (this.matcher.hasGeoQuery() || options.sort) {
this.sorter = new Minimongo.Sorter(options.sort || [],
{ matcher: this.matcher });
}
if (this.matcher.hasGeoQuery() || options.sort)
this.sorter = new Minimongo.Sorter(options.sort || [], {matcher: this.matcher});
}
this.skip = options.skip;
this.skip = options.skip || 0;
this.limit = options.limit;
this.fields = options.fields;
@@ -32,7 +31,8 @@ export default class Cursor {
this._transform = LocalCollection.wrapTransform(options.transform);
// by default, queries register w/ Tracker when it is available.
if (typeof Tracker !== 'undefined') {this.reactive = options.reactive === undefined ? true : options.reactive;}
if (typeof Tracker !== 'undefined')
this.reactive = options.reactive === undefined ? true : options.reactive;
}
/**
@@ -44,10 +44,8 @@ export default class Cursor {
* @returns {Number}
*/
count() {
if (this.reactive) {
this._depend({added: true, removed: true},
true /* allow the observe to be unordered */);
}
if (this.reactive)
this._depend({added: true, removed: true}, true /* allow the observe to be unordered */);
return this._getRawObjects({ordered: true}).length;
}
@@ -61,11 +59,13 @@ export default class Cursor {
* @returns {Object[]}
*/
fetch() {
const res = [];
const result = [];
this.forEach(doc => {
res.push(doc);
result.push(doc);
});
return res;
return result;
}
/**
@@ -93,12 +93,14 @@ export default class Cursor {
movedBefore: true});
}
objects.forEach((elt, i) => {
objects.forEach((element, i) => {
// This doubles as a clone operation.
elt = this._projectionFn(elt);
element = this._projectionFn(element);
if (this._transform) {elt = this._transform(elt);}
callback.call(thisArg, elt, i, this);
if (this._transform)
element = this._transform(element);
callback.call(thisArg, element, i, this);
});
}
@@ -116,11 +118,13 @@ export default class Cursor {
* @param {Any} [thisArg] An object which will be the value of `this` inside `callback`.
*/
map(callback, thisArg) {
const res = [];
this.forEach((doc, index) => {
res.push(callback.call(thisArg, doc, index, this));
const result = [];
this.forEach((doc, i) => {
result.push(callback.call(thisArg, doc, i, this));
});
return res;
return result;
}
// options to contain:
@@ -169,21 +173,23 @@ export default class Cursor {
// unordered observe. eg, update's EJSON.clone, and the "there are several"
// comment in _modifyAndNotify
// XXX allow skip/limit with unordered observe
if (!options._allow_unordered && !ordered && (this.skip || this.limit)) {throw new Error("must use ordered observe (ie, 'addedBefore' instead of 'added') with skip or limit");}
if (!options._allow_unordered && !ordered && (this.skip || this.limit))
throw new Error("must use ordered observe (ie, 'addedBefore' instead of 'added') with skip or limit");
if (this.fields && (this.fields._id === 0 || this.fields._id === false)) {throw Error('You may not observe a cursor with {fields: {_id: 0}}');}
if (this.fields && (this.fields._id === 0 || this.fields._id === false))
throw Error('You may not observe a cursor with {fields: {_id: 0}}');
const query = {
dirty: false,
matcher: this.matcher, // not fast pathed
sorter: ordered && this.sorter,
distances:
this.matcher.hasGeoQuery() && ordered && new LocalCollection._IdMap,
distances: this.matcher.hasGeoQuery() && ordered && new LocalCollection._IdMap,
resultsSnapshot: null,
ordered,
cursor: this,
projectionFn: this._projectionFn,
};
let qid;
// Non-reactive queries call added[Before] and then never call anything
@@ -192,9 +198,11 @@ export default class Cursor {
qid = this.collection.next_qid++;
this.collection.queries[qid] = query;
}
query.results = this._getRawObjects({
ordered, distances: query.distances});
if (this.collection.paused) {query.resultsSnapshot = ordered ? [] : new LocalCollection._IdMap;}
query.results = this._getRawObjects({ordered, distances: query.distances});
if (this.collection.paused)
query.resultsSnapshot = ordered ? [] : new LocalCollection._IdMap;
// wrap callbacks we were passed. callbacks only fire when not paused and
// are never undefined
@@ -203,45 +211,54 @@ export default class Cursor {
// furthermore, callbacks enqueue until the operation we're working on is
// done.
const wrapCallback = f => {
if (!f) {return () => {};}
const wrapCallback = fn => {
if (!fn)
return () => {};
const self = this;
return function(/* args*/) {
const args = arguments;
if (self.collection.paused) {return;}
if (self.collection.paused)
return;
self.collection._observeQueue.queueTask(() => {
f.apply(this, args);
fn.apply(this, args);
});
};
};
query.added = wrapCallback(options.added);
query.changed = wrapCallback(options.changed);
query.removed = wrapCallback(options.removed);
if (ordered) {
query.addedBefore = wrapCallback(options.addedBefore);
query.movedBefore = wrapCallback(options.movedBefore);
}
if (!options._suppress_initial && !this.collection.paused) {
const results = query.results._map || query.results;
const results = ordered ? query.results : query.results._map;
Object.keys(results).forEach(key => {
const doc = results[key];
const fields = EJSON.clone(doc);
delete fields._id;
if (ordered) {query.addedBefore(doc._id, this._projectionFn(fields), null);}
if (ordered)
query.addedBefore(doc._id, this._projectionFn(fields), null);
query.added(doc._id, this._projectionFn(fields));
});
}
const handle = new LocalCollection.ObserveHandle;
Object.assign(handle, {
const handle = Object.assign(new LocalCollection.ObserveHandle, {
collection: this.collection,
stop: () => {
if (this.reactive) {delete this.collection.queries[qid];}
},
if (this.reactive)
delete this.collection.queries[qid];
}
});
if (this.reactive && Tracker.active) {
@@ -254,6 +271,7 @@ export default class Cursor {
handle.stop();
});
}
// run the observe callbacks resulting from the initial contents
// before we leave the observe.
this.collection._observeQueue.drain();
@@ -271,16 +289,16 @@ export default class Cursor {
// anything changed.
_depend(changers, _allow_unordered) {
if (Tracker.active) {
const v = new Tracker.Dependency;
v.depend();
const notifyChange = v.changed.bind(v);
const dependency = new Tracker.Dependency;
const notify = dependency.changed.bind(dependency);
const options = {
_suppress_initial: true,
_allow_unordered,
};
['added', 'changed', 'removed', 'addedBefore', 'movedBefore'].forEach(fnName => {
if (changers[fnName]) {options[fnName] = notifyChange;}
dependency.depend();
const options = {_allow_unordered, _suppress_initial: true};
['added', 'addedBefore', 'changed', 'movedBefore', 'removed'].forEach(fn => {
if (changers[fn])
options[fn] = notify;
});
// observeChanges will stop() when this computation is invalidated
@@ -317,12 +335,18 @@ export default class Cursor {
// If you have non-zero skip and ask for a single id, you get
// nothing. This is so it matches the behavior of the '{_id: foo}'
// path.
if (this.skip) {return results;}
if (this.skip)
return results;
const selectedDoc = this.collection._docs.get(this._selectorId);
if (selectedDoc) {
if (options.ordered) {results.push(selectedDoc);} else {results.set(this._selectorId, selectedDoc);}
if (options.ordered)
results.push(selectedDoc);
else
results.set(this._selectorId, selectedDoc);
}
return results;
}
@@ -343,42 +367,43 @@ export default class Cursor {
this.collection._docs.forEach((doc, id) => {
const matchResult = this.matcher.documentMatches(doc);
if (matchResult.result) {
if (options.ordered) {
results.push(doc);
if (distances && matchResult.distance !== undefined) {distances.set(id, matchResult.distance);}
if (distances && matchResult.distance !== undefined)
distances.set(id, matchResult.distance);
} else {
results.set(id, doc);
}
}
// Fast path for limited unsorted queries.
// XXX 'length' check here seems wrong for ordered
if (this.limit && !this.skip && !this.sorter &&
results.length === this.limit) {return false;} // break
return true; // continue
return !this.limit || this.skip || this.sorter || results.length !== this.limit;
});
if (!options.ordered) {return results;}
if (!options.ordered)
return results;
if (this.sorter) {
const comparator = this.sorter.getComparator({distances});
results.sort(comparator);
}
if (this.sorter)
results.sort(this.sorter.getComparator({distances}));
const idx_start = this.skip || 0;
const idx_end = this.limit ? this.limit + idx_start : results.length;
return results.slice(idx_start, idx_end);
if (!this.limit && !this.skip)
return results;
return results.slice(this.skip, this.limit ? this.limit + this.skip : results.length);
}
_publishCursor(sub) {
if (! this.collection.name) {throw new Error("Can't publish a cursor from a collection without a name.");}
const collection = this.collection.name;
_publishCursor(subscription) {
// XXX minimongo should not depend on mongo-livedata!
if (! Package.mongo) {
throw new Error("Can't publish from Minimongo without the `mongo` package.");
}
if (!Package.mongo)
throw new Error('Can\'t publish from Minimongo without the `mongo` package.');
return Package.mongo.Mongo.Collection._publishCursor(this, sub, collection);
if (!this.collection.name)
throw new Error('Can\'t publish a cursor from a collection without a name.');
return Package.mongo.Mongo.Collection._publishCursor(this, subscription, this.collection.name);
}
}