Improving refactor of synchronous queue.

This commit is contained in:
Matheus Castro
2022-10-20 23:24:38 -03:00
parent e52001e7d3
commit fb5e207e43

View File

@@ -64,14 +64,16 @@ class AsynchronousQueue {
let resolver;
const returnValue = new Promise(r => resolver = r);
setImmediate(async () => {
await this._run();
setImmediate(() => {
Meteor._runAsync(async () => {
await this._run();
if (!resolver) {
throw new Error("Resolver not found for task");
}
if (!resolver) {
throw new Error("Resolver not found for task");
}
resolver();
resolver();
});
});
return returnValue;
@@ -98,7 +100,7 @@ class AsynchronousQueue {
// Soon, run the next task, if there is any.
this._runningOrRunScheduled = false;
this._scheduleRun();
await this._scheduleRun();
}
runTask(task) {
@@ -113,90 +115,68 @@ class AsynchronousQueue {
return this._scheduleRun();
}
async flush() {
await this.runTask(() => {});
flush() {
return this.runTask(() => {});
}
drain() {
async drain() {
if (this._draining)
return;
if (this._taskHandles.isEmpty())
return;
this._draining = true;
return Promise.all(this._taskHandles).finally(() => {
this._draining = false;
});
while (!this._taskHandles.isEmpty()) {
await this.flush();
}
this._draining = false;
}
}
Meteor._AsynchronousQueue = AsynchronousQueue;
const runWithFibers = (fn) => {
if (!Meteor._isFibersEnabled) return fn();
Meteor._SynchronousQueue = class extends AsynchronousQueue {
constructor() {
super();
// During the execution of a task, this is set to the fiber used to execute
// that task. We use this to throw an error rather than deadlocking if the
// user calls runTask from within a task on the same fiber.
this._currentTaskFiber = undefined;
}
Promise.await(fn());
};
runWithFibers(fn, args) {
if (!Meteor._isFibersEnabled) {
return fn.apply(this, args);
}
Meteor._SynchronousQueue = function () {
const self = this;
// During the execution of a task, this is set to the fiber used to execute
// that task. We use this to throw an error rather than deadlocking if the
// user calls runTask from within a task on the same fiber.
self._currentTaskFiber = undefined;
self._asyncQueue = new AsynchronousQueue();
};
return Promise.await(fn.apply(this, args));
}
var SQp = Meteor._SynchronousQueue.prototype;
runTask(task) {
this.runWithFibers(super.runTask, [task]);
}
SQp.runTask = function (task) {
var self = this;
runWithFibers(() => self._asyncQueue.runTask(task));
};
flush() {
this.runWithFibers(super.flush);
}
SQp.queueTask = function (task) {
var self = this;
self._asyncQueue.queueTask(task);
};
safeToRunTask() {
return Fiber.current && this._currentTaskFiber !== Fiber.current;
}
SQp.flush = function () {
var self = this;
runWithFibers(self._asyncQueue.flush);
};
drain() {
this.runWithFibers(super.drain);
}
SQp.safeToRunTask = function () {
var self = this;
return Fiber.current && self._currentTaskFiber !== Fiber.current;
};
SQp.drain = function () {
var self = this;
runWithFibers(self._asyncQueue.drain());
};
SQp._scheduleRun = function () {
var self = this;
self._asyncQueue._scheduleRun();
};
SQp._run = function () {
var self = this;
runWithFibers(self._asyncQueue._run);
_run() {
this.runWithFibers(super._run);
}
};
// Sleep. Mostly used for debugging (eg, inserting latency into server
// methods).
//
const _sleep = (ms) => new Promise(resolve => setTimeout(resolve, ms));
Meteor._sleepForMs = function (ms) {
if (Meteor._isFibersEnabled) {
var fiber = Fiber.current;
setTimeout(function() {
fiber.run();
}, ms);
Fiber.yield();
return;
}
if (!Meteor._isFibersEnabled) return _sleep(ms);
return new Promise(resolve => setTimeout(() => resolve(), ms));
Promise.await(_sleep(ms));
};