made observe sync

This commit is contained in:
Gabriel Grubba
2023-03-27 10:56:30 -03:00
parent c6e386b2f2
commit 00fa5cd793

View File

@@ -237,19 +237,16 @@ export default class Cursor {
if (!options._allow_unordered && !ordered && (this.skip || this.limit)) {
throw new Error(
"Must use an ordered observe with skip or limit (i.e. 'addedBefore' " +
"for observeChanges or 'addedAt' for observe, instead of 'added')."
"for observeChanges or 'addedAt' for observe, instead of 'added')."
);
}
if (this.fields && (this.fields._id === 0 || this.fields._id === false)) {
throw Error('You may not observe a cursor with {fields: {_id: 0}}');
throw Error("You may not observe a cursor with {fields: {_id: 0}}");
}
const distances = (
this.matcher.hasGeoQuery() &&
ordered &&
new LocalCollection._IdMap
);
const distances =
this.matcher.hasGeoQuery() && ordered && new LocalCollection._IdMap();
const query = {
cursor: this,
@@ -259,7 +256,7 @@ export default class Cursor {
ordered,
projectionFn: this._projectionFn,
resultsSnapshot: null,
sorter: ordered && this.sorter
sorter: ordered && this.sorter,
};
let qid;
@@ -271,10 +268,13 @@ export default class Cursor {
this.collection.queries[qid] = query;
}
query.results = this._getRawObjects({ordered, distances: query.distances});
query.results = this._getRawObjects({
ordered,
distances: query.distances,
});
if (this.collection.paused) {
query.resultsSnapshot = ordered ? [] : new LocalCollection._IdMap;
query.resultsSnapshot = ordered ? [] : new LocalCollection._IdMap();
}
// wrap callbacks we were passed. callbacks only fire when not paused and
@@ -284,43 +284,23 @@ export default class Cursor {
// furthermore, callbacks enqueue until the operation we're working on is
// done.
const wrapCallback = fn => {
const wrapCallback = (fn) => {
if (!fn) {
return () => {};
}
const self = this;
if (Meteor.isClient) {
return function(/* args*/) {
if (self.collection.paused) {
return;
}
const args = arguments;
self.collection._observeQueue.queueTask(() => {
fn.apply(this, args);
});
};
}
return function(/* args*/) {
return function (/* args*/) {
if (self.collection.paused) {
return;
}
let resolve;
const promise = new Promise(r => resolve = r);
const args = arguments;
self.collection._observeQueue.queueTask(() => {
fn.apply(this, args);
resolve();
});
return promise;
};
};
@@ -333,39 +313,31 @@ export default class Cursor {
query.movedBefore = wrapCallback(options.movedBefore);
}
const isReadyPromise = (async () => {
if (!options._suppress_initial && !this.collection.paused) {
const handler = async (doc) => {
const fields = EJSON.clone(doc);
if (!options._suppress_initial && !this.collection.paused) {
const handler = (doc) => {
const fields = EJSON.clone(doc);
delete fields._id;
delete fields._id;
if (ordered) {
await query.addedBefore(doc._id, this._projectionFn(fields), null);
}
await query.added(doc._id, this._projectionFn(fields));
};
// it means it's just an array
if (query.results.length) {
for (const doc of query.results) {
await handler(doc);
}
if (ordered) {
query.addedBefore(doc._id, this._projectionFn(fields), null);
}
// it means it's an id map
if (query.results?.size?.()) {
await query.results.forEachAsync(handler);
query.added(doc._id, this._projectionFn(fields));
};
// it means it's just an array
if (query.results.length) {
for (const doc of query.results) {
handler(doc);
}
}
// it means it's an id map
if (query.results?.size?.()) {
query.results.forEachAsync(handler);
}
}
// run the observe callbacks resulting from the initial contents
// before we leave the observe.
await this.collection._observeQueue.drain();
handle.isReady = true;
})();
const handle = Object.assign(new LocalCollection.ObserveHandle, {
const handle = Object.assign(new LocalCollection.ObserveHandle(), {
collection: this.collection,
stop: () => {
if (this.reactive) {
@@ -373,7 +345,7 @@ export default class Cursor {
}
},
isReady: false,
isReadyPromise,
isReadyPromise: null,
});
if (this.reactive && Tracker.active) {
@@ -387,6 +359,17 @@ export default class Cursor {
});
}
// run the observe callbacks resulting from the initial contents
// before we leave the observe.
const isReadyPromise = this.collection._observeQueue.drain();
if (Meteor.isClient) handle.isReady = true;
else isReadyPromise.then(() => (handle.isReady = true));
handle.isReadyPromise = Meteor.isClient
? Promise.resolve()
: isReadyPromise;
return handle;
}