From fb5e207e431e1132f361fb2e8f2b03a2f5347c21 Mon Sep 17 00:00:00 2001 From: Matheus Castro Date: Thu, 20 Oct 2022 23:24:38 -0300 Subject: [PATCH] Improving refactor of synchronous queue. --- packages/meteor/fiber_helpers.js | 116 +++++++++++++------------------ 1 file changed, 48 insertions(+), 68 deletions(-) diff --git a/packages/meteor/fiber_helpers.js b/packages/meteor/fiber_helpers.js index d4bb486551..431f3543d7 100644 --- a/packages/meteor/fiber_helpers.js +++ b/packages/meteor/fiber_helpers.js @@ -64,14 +64,16 @@ class AsynchronousQueue { let resolver; const returnValue = new Promise(r => resolver = r); - setImmediate(async () => { - await this._run(); + setImmediate(() => { + Meteor._runAsync(async () => { + await this._run(); - if (!resolver) { - throw new Error("Resolver not found for task"); - } + if (!resolver) { + throw new Error("Resolver not found for task"); + } - resolver(); + resolver(); + }); }); return returnValue; @@ -98,7 +100,7 @@ class AsynchronousQueue { // Soon, run the next task, if there is any. this._runningOrRunScheduled = false; - this._scheduleRun(); + await this._scheduleRun(); } runTask(task) { @@ -113,90 +115,68 @@ class AsynchronousQueue { return this._scheduleRun(); } - async flush() { - await this.runTask(() => {}); + flush() { + return this.runTask(() => {}); } - drain() { + async drain() { if (this._draining) return; - if (this._taskHandles.isEmpty()) - return; this._draining = true; - - return Promise.all(this._taskHandles).finally(() => { - this._draining = false; - }); + while (!this._taskHandles.isEmpty()) { + await this.flush(); + } + this._draining = false; } } Meteor._AsynchronousQueue = AsynchronousQueue; -const runWithFibers = (fn) => { - if (!Meteor._isFibersEnabled) return fn(); +Meteor._SynchronousQueue = class extends AsynchronousQueue { + constructor() { + super(); + // During the execution of a task, this is set to the fiber used to execute + // that task. We use this to throw an error rather than deadlocking if the + // user calls runTask from within a task on the same fiber. + this._currentTaskFiber = undefined; + } - Promise.await(fn()); -}; + runWithFibers(fn, args) { + if (!Meteor._isFibersEnabled) { + return fn.apply(this, args); + } -Meteor._SynchronousQueue = function () { - const self = this; - // During the execution of a task, this is set to the fiber used to execute - // that task. We use this to throw an error rather than deadlocking if the - // user calls runTask from within a task on the same fiber. - self._currentTaskFiber = undefined; - self._asyncQueue = new AsynchronousQueue(); -}; + return Promise.await(fn.apply(this, args)); + } -var SQp = Meteor._SynchronousQueue.prototype; + runTask(task) { + this.runWithFibers(super.runTask, [task]); + } -SQp.runTask = function (task) { - var self = this; - runWithFibers(() => self._asyncQueue.runTask(task)); -}; + flush() { + this.runWithFibers(super.flush); + } -SQp.queueTask = function (task) { - var self = this; - self._asyncQueue.queueTask(task); -}; + safeToRunTask() { + return Fiber.current && this._currentTaskFiber !== Fiber.current; + } -SQp.flush = function () { - var self = this; - runWithFibers(self._asyncQueue.flush); -}; + drain() { + this.runWithFibers(super.drain); + } -SQp.safeToRunTask = function () { - var self = this; - return Fiber.current && self._currentTaskFiber !== Fiber.current; -}; - -SQp.drain = function () { - var self = this; - runWithFibers(self._asyncQueue.drain()); -}; - -SQp._scheduleRun = function () { - var self = this; - self._asyncQueue._scheduleRun(); -}; - -SQp._run = function () { - var self = this; - runWithFibers(self._asyncQueue._run); + _run() { + this.runWithFibers(super._run); + } }; // Sleep. Mostly used for debugging (eg, inserting latency into server // methods). // +const _sleep = (ms) => new Promise(resolve => setTimeout(resolve, ms)); Meteor._sleepForMs = function (ms) { - if (Meteor._isFibersEnabled) { - var fiber = Fiber.current; - setTimeout(function() { - fiber.run(); - }, ms); - Fiber.yield(); - return; - } + if (!Meteor._isFibersEnabled) return _sleep(ms); - return new Promise(resolve => setTimeout(() => resolve(), ms)); + Promise.await(_sleep(ms)); };