diff --git a/packages/ddp-server/writefence.js b/packages/ddp-server/writefence.js index e9310c9f7f..f911ae71b7 100644 --- a/packages/ddp-server/writefence.js +++ b/packages/ddp-server/writefence.js @@ -79,8 +79,7 @@ _.extend(DDPServer._WriteFence.prototype, { self.completion_callbacks.push(func); }, - // Convenience function. Arms the fence, then blocks until it fires. - armAndWait: function () { + _armAndWaitFibers: function () { var self = this; var future = new Future; self.onAllCommitted(function () { @@ -89,6 +88,29 @@ _.extend(DDPServer._WriteFence.prototype, { self.arm(); future.wait(); }, + _armAndWaitNoFibers: function () { + var self = this; + + let _resolver; + self.onAllCommitted(function () { + if (!_resolver) { + console.warn("oops, no resolver"); + return; + } + + _resolver(); + }); + + return new Promise((r) => { + _resolver = r; + self.arm(); + } ); + }, + + // Convenience function. Arms the fence, then blocks until it fires. + armAndWait: function () { + return Meteor._isFibersEnabled ? this._armAndWaitFibers() : this._armAndWaitNoFibers(); + }, _maybeFire: function () { var self = this; diff --git a/packages/meteor/dynamics_nodejs.js b/packages/meteor/dynamics_nodejs.js index b204bb5765..6fe1200df8 100644 --- a/packages/meteor/dynamics_nodejs.js +++ b/packages/meteor/dynamics_nodejs.js @@ -97,15 +97,15 @@ EVp.withValue = function (value, func) { let meteorDynamics = Meteor._getValueFromAslStore('_meteor_dynamics'); if (!meteorDynamics) { meteorDynamics = []; - Meteor._updateAslStore('_meteor_dynamics', []); } const saved = meteorDynamics[this.slot]; try { - Meteor._updateAslStore('_meteor_dynamics', value); + meteorDynamics[this.slot] = value; return func(); } finally { - Meteor._updateAslStore('_meteor_dynamics', saved); + meteorDynamics[this.slot] = saved; + Meteor._updateAslStore('_meteor_dynamics', meteorDynamics); } }; diff --git a/packages/meteor/fiber_helpers.js b/packages/meteor/fiber_helpers.js index d16cb9d787..a976d59616 100644 --- a/packages/meteor/fiber_helpers.js +++ b/packages/meteor/fiber_helpers.js @@ -162,12 +162,12 @@ SQp._scheduleRun = function () { */ if (Meteor._isFibersEnabled) { setImmediate(function() { - Fiber(function() { + Meteor._runAsync(function() { self._run(); - }).run(); + }); }); } else { - global.asyncLocalStorage.run(Meteor._getAslStore(), () => { + Meteor._runAsync(() => { self._run(); }); } diff --git a/packages/minimongo/local_collection.js b/packages/minimongo/local_collection.js index e3668eeb03..1466e0a783 100644 --- a/packages/minimongo/local_collection.js +++ b/packages/minimongo/local_collection.js @@ -689,8 +689,6 @@ LocalCollection._CachingChangeObserver = class _CachingChangeObserver { this.docs.putBefore(id, doc, before || null); }, movedBefore: (id, before) => { - const doc = this.docs.get(id); - if (callbacks.movedBefore) { callbacks.movedBefore.call(this, id, before); } @@ -1201,7 +1199,7 @@ LocalCollection._modify = (doc, modifier, options = {}) => { }); }; -LocalCollection._observeFromObserveChanges = (cursor, observeCallbacks) => { +LocalCollection._observeFromObserveChangesFibers = (cursor, observeCallbacks) => { const transform = cursor.getTransform() || (doc => doc); let suppressed = !!observeCallbacks._suppress_initial; @@ -1344,6 +1342,155 @@ LocalCollection._observeFromObserveChanges = (cursor, observeCallbacks) => { return handle; }; +LocalCollection._observeFromObserveChangesNoFibers = async (cursor, observeCallbacks) => { + const transform = cursor.getTransform() || (doc => doc); + let suppressed = !!observeCallbacks._suppress_initial; + + let observeChangesCallbacks; + if (LocalCollection._observeCallbacksAreOrdered(observeCallbacks)) { + // The "_no_indices" option sets all index arguments to -1 and skips the + // linear scans required to generate them. This lets observers that don't + // need absolute indices benefit from the other features of this API -- + // relative order, transforms, and applyChanges -- without the speed hit. + const indices = !observeCallbacks._no_indices; + + observeChangesCallbacks = { + addedBefore(id, fields, before) { + if (suppressed || !(observeCallbacks.addedAt || observeCallbacks.added)) { + return; + } + + const doc = transform(Object.assign(fields, {_id: id})); + + if (observeCallbacks.addedAt) { + observeCallbacks.addedAt( + doc, + indices + ? before + ? this.docs.indexOf(before) + : this.docs.size() + : -1, + before + ); + } else { + observeCallbacks.added(doc); + } + }, + changed(id, fields) { + if (!(observeCallbacks.changedAt || observeCallbacks.changed)) { + return; + } + + let doc = EJSON.clone(this.docs.get(id)); + if (!doc) { + throw new Error(`Unknown id for changed: ${id}`); + } + + const oldDoc = transform(EJSON.clone(doc)); + + DiffSequence.applyChanges(doc, fields); + + if (observeCallbacks.changedAt) { + observeCallbacks.changedAt( + transform(doc), + oldDoc, + indices ? this.docs.indexOf(id) : -1 + ); + } else { + observeCallbacks.changed(transform(doc), oldDoc); + } + }, + movedBefore(id, before) { + if (!observeCallbacks.movedTo) { + return; + } + + const from = indices ? this.docs.indexOf(id) : -1; + let to = indices + ? before + ? this.docs.indexOf(before) + : this.docs.size() + : -1; + + // When not moving backwards, adjust for the fact that removing the + // document slides everything back one slot. + if (to > from) { + --to; + } + + observeCallbacks.movedTo( + transform(EJSON.clone(this.docs.get(id))), + from, + to, + before || null + ); + }, + removed(id) { + if (!(observeCallbacks.removedAt || observeCallbacks.removed)) { + return; + } + + // technically maybe there should be an EJSON.clone here, but it's about + // to be removed from this.docs! + const doc = transform(this.docs.get(id)); + + if (observeCallbacks.removedAt) { + observeCallbacks.removedAt(doc, indices ? this.docs.indexOf(id) : -1); + } else { + observeCallbacks.removed(doc); + } + }, + }; + } else { + observeChangesCallbacks = { + added(id, fields) { + if (!suppressed && observeCallbacks.added) { + observeCallbacks.added(transform(Object.assign(fields, {_id: id}))); + } + }, + changed(id, fields) { + if (observeCallbacks.changed) { + const oldDoc = this.docs.get(id); + const doc = EJSON.clone(oldDoc); + + DiffSequence.applyChanges(doc, fields); + + observeCallbacks.changed( + transform(doc), + transform(EJSON.clone(oldDoc)) + ); + } + }, + removed(id) { + if (observeCallbacks.removed) { + observeCallbacks.removed(transform(this.docs.get(id))); + } + }, + }; + } + + const changeObserver = new LocalCollection._CachingChangeObserver({ + callbacks: observeChangesCallbacks + }); + + // CachingChangeObserver clones all received input on its callbacks + // So we can mark it as safe to reduce the ejson clones. + // This is tested by the `mongo-livedata - (extended) scribbling` tests + changeObserver.applyChange._fromObserve = true; + const handle = await cursor.observeChanges(changeObserver.applyChange, + { nonMutatingCallbacks: true }); + + suppressed = false; + + return handle; +}; + +LocalCollection._observeFromObserveChanges = (cursor, observeCallbacks) => { + return Meteor._isFibersEnabled + ? LocalCollection._observeFromObserveChangesFibers(cursor, observeCallbacks) + : LocalCollection._observeFromObserveChangesNoFibers(cursor, observeCallbacks); +}; + LocalCollection._observeCallbacksAreOrdered = callbacks => { if (callbacks.added && callbacks.addedAt) { throw new Error('Please specify only one of added() and addedAt()'); diff --git a/packages/mongo/mongo_driver.js b/packages/mongo/mongo_driver.js index 0f6396b505..a6ff3afe79 100644 --- a/packages/mongo/mongo_driver.js +++ b/packages/mongo/mongo_driver.js @@ -1125,23 +1125,23 @@ class AsynchronousCursor { }); } - forEach(callback, thisArg) { + async forEach(callback, thisArg) { // Get back to the beginning. this._rewind(); - return this._cursor.forEach((doc, index) => { - callback.call(thisArg, doc, index) - }, this._selfForIteration); + let idx = 0; + while (true) { + const doc = await this._nextObjectPromise(); + if (!doc) return; + await callback.call(thisArg, doc, idx++, this._selfForIteration); + } } async map(callback, thisArg) { const results = []; - - let idx = 0; - for await (const doc of this._cursor) { - results.push(await callback.call(thisArg, doc, idx, this._selfForIteration)) - idx++; - } + await this.forEach(async (doc, index) => { + results.push(await callback.call(thisArg, doc, index, this._selfForIteration)); + }); return results; } @@ -1159,7 +1159,7 @@ class AsynchronousCursor { } fetch() { - return this._cursor.toArray(); + return this.map(_.identity); } /** @@ -1172,13 +1172,13 @@ class AsynchronousCursor { } // This method is NOT wrapped in Cursor. - getRawObjects(ordered) { + async getRawObjects(ordered) { var self = this; if (ordered) { return self.fetch(); } else { var results = new LocalCollection._IdMap; - self.forEach(function (doc) { + await self.forEach(function (doc) { results.set(doc._id, doc); }); return results; @@ -1590,6 +1590,10 @@ Object.assign(MongoConnection.prototype, { _testOnlyPollCallback: callbacks._testOnlyPollCallback }); + if (observeDriver._init) { + await observeDriver._init(); + } + // This field is only set for use in tests. multiplexer._observeDriver = observeDriver; } @@ -1697,6 +1701,10 @@ Object.assign(MongoConnection.prototype, { _testOnlyPollCallback: callbacks._testOnlyPollCallback }); + if (observeDriver._init) { + observeDriver._init(); + } + // This field is only set for use in tests. multiplexer._observeDriver = observeDriver; } diff --git a/packages/mongo/polling_observe_driver.js b/packages/mongo/polling_observe_driver.js index fb3f692fef..32744abeba 100644 --- a/packages/mongo/polling_observe_driver.js +++ b/packages/mongo/polling_observe_driver.js @@ -11,7 +11,7 @@ PollingObserveDriver = function (options) { self._stopCallbacks = []; self._stopped = false; - self._synchronousCursor = self._mongoHandle._createSynchronousCursor( + self._cursor = self._mongoHandle._createSynchronousCursor( self._cursorDescription); // previous results snapshot. on each poll cycle, diffs against @@ -74,15 +74,28 @@ PollingObserveDriver = function (options) { Meteor.clearInterval(intervalHandle); }); } - - // Make sure we actually poll soon! - self._unthrottledEnsurePollIsScheduled(); - - Package['facts-base'] && Package['facts-base'].Facts.incrementServerFact( - "mongo-livedata", "observe-drivers-polling", 1); }; _.extend(PollingObserveDriver.prototype, { + _initAsync: async function () { + // Make sure we actually poll soon! + await this._unthrottledEnsurePollIsScheduled(); + + Package['facts-base'] && Package['facts-base'].Facts.incrementServerFact( + "mongo-livedata", "observe-drivers-polling", 1); + }, + _init() { + if (!Meteor._isFibersEnabled) { + return this._initAsync(); + } + + var self = this; + // Make sure we actually poll soon! + self._unthrottledEnsurePollIsScheduled(); + + Package['facts-base'] && Package['facts-base'].Facts.incrementServerFact( + "mongo-livedata", "observe-drivers-polling", 1); + }, // This is always called through _.throttle (except once at startup). _unthrottledEnsurePollIsScheduled: function () { var self = this; @@ -129,7 +142,7 @@ _.extend(PollingObserveDriver.prototype, { }); }, - _pollMongo: function () { + _pollMongoFibers: function () { var self = this; --self._pollsScheduledButNotStarted; @@ -153,7 +166,7 @@ _.extend(PollingObserveDriver.prototype, { // Get the new query results. (This yields.) try { - newResults = self._synchronousCursor.getRawObjects(self._ordered); + newResults = self._cursor.getRawObjects(self._ordered); } catch (e) { if (first && typeof(e.code) === 'number') { // This is an error document sent to us by mongod, not a connection @@ -208,6 +221,89 @@ _.extend(PollingObserveDriver.prototype, { }); }, + async _pollMongoNoFibers() { + var self = this; + --self._pollsScheduledButNotStarted; + + if (self._stopped) + return; + + var first = false; + var newResults; + var oldResults = self._results; + if (!oldResults) { + first = true; + // XXX maybe use OrderedDict instead? + oldResults = self._ordered ? [] : new LocalCollection._IdMap; + } + + self._testOnlyPollCallback && self._testOnlyPollCallback(); + + // Save the list of pending writes which this round will commit. + var writesForCycle = self._pendingWrites; + self._pendingWrites = []; + + // Get the new query results. (This yields.) + try { + newResults = await self._cursor.getRawObjects(self._ordered); + } catch (e) { + if (first && typeof(e.code) === 'number') { + // This is an error document sent to us by mongod, not a connection + // error generated by the client. And we've never seen this query work + // successfully. Probably it's a bad selector or something, so we should + // NOT retry. Instead, we should halt the observe (which ends up calling + // `stop` on us). + self._multiplexer.queryError( + new Error( + "Exception while polling query " + + JSON.stringify(self._cursorDescription) + ": " + e.message)); + return; + } + + // getRawObjects can throw if we're having trouble talking to the + // database. That's fine --- we will repoll later anyway. But we should + // make sure not to lose track of this cycle's writes. + // (It also can throw if there's just something invalid about this query; + // unfortunately the ObserveDriver API doesn't provide a good way to + // "cancel" the observe from the inside in this case. + Array.prototype.push.apply(self._pendingWrites, writesForCycle); + Meteor._debug("Exception while polling query " + + JSON.stringify(self._cursorDescription), e); + return; + } + + // Run diffs. + if (!self._stopped) { + LocalCollection._diffQueryChanges( + self._ordered, oldResults, newResults, self._multiplexer); + } + + // Signals the multiplexer to allow all observeChanges calls that share this + // multiplexer to return. (This happens asynchronously, via the + // multiplexer's queue.) + if (first) + self._multiplexer.ready(); + + // Replace self._results atomically. (This assignment is what makes `first` + // stay through on the next cycle, so we've waited until after we've + // committed to ready-ing the multiplexer.) + self._results = newResults; + + // Once the ObserveMultiplexer has processed everything we've done in this + // round, mark all the writes which existed before this call as + // commmitted. (If new writes have shown up in the meantime, there'll + // already be another _pollMongo task scheduled.) + self._multiplexer.onFlush(function () { + _.each(writesForCycle, function (w) { + w.committed(); + }); + }); + }, + + _pollMongo: function () { + return Meteor._isFibersEnabled ? this._pollMongoFibers() : this._pollMongoNoFibers(); + }, + stop: function () { var self = this; self._stopped = true;