fixing _scheduleRun: It was adding an infity amount of tasks.

fixing tests
This commit is contained in:
denihs
2022-12-06 18:16:35 -04:00
parent 7c4107c1f0
commit a308098633
5 changed files with 276 additions and 321 deletions

View File

@@ -15,13 +15,6 @@ import {
last,
} from "meteor/ddp-common/utils.js";
let Fiber;
let Future;
if (Meteor.isServer) {
Fiber = Npm.require('fibers');
Future = Npm.require('fibers/future');
}
class MongoIDMap extends IdMap {
constructor() {
super(MongoID.idStringify, MongoID.idParse);
@@ -494,30 +487,6 @@ export class Connection {
return handle;
}
// options:
// - onLateError {Function(error)} called if an error was received after the ready event.
// (errors received before ready cause an error to be thrown)
_subscribeAndWait(name, args, options) {
const self = this;
const f = new Future();
let ready = false;
args = args || [];
args.push({
onReady() {
ready = true;
f['return']();
},
onError(e) {
if (!ready) f['throw'](e);
else options && options.onLateError && options.onLateError(e);
}
});
const handle = self.subscribe.apply(self, [name].concat(args));
f.wait();
return handle;
}
methods(methods) {
Object.entries(methods).forEach(([name, func]) => {
if (typeof func !== 'function') {

File diff suppressed because it is too large Load Diff

View File

@@ -32,23 +32,23 @@ _.extend(StubStream.prototype, {
},
// Methods for tests
receive: function(data) {
receive: async function(data) {
const self = this;
if (typeof data === 'object') {
data = EJSON.stringify(data);
}
_.each(self.callbacks['message'], function(cb) {
cb(data);
});
for (const cb of self.callbacks['message']) {
await cb(data);
}
},
reset: function() {
reset: async function() {
const self = this;
_.each(self.callbacks['reset'], function(cb) {
cb();
});
for (const cb of self.callbacks['reset']) {
await cb();
}
},
// Provide a tag to detect stub streams.

View File

@@ -35,36 +35,22 @@ class AsynchronousQueue {
this._draining = false;
}
queueTask(task) {
async queueTask(task) {
this._taskHandles.push({
task: task,
name: task.name
});
return this._scheduleRun();
await this._scheduleRun();
}
_scheduleRun() {
async _scheduleRun() {
// Already running or scheduled? Do nothing.
if (this._runningOrRunScheduled)
return;
this._runningOrRunScheduled = true;
let resolver;
const returnValue = new Promise(r => resolver = r);
setImmediate(() => {
Meteor._runAsync(async () => {
await this._run();
if (!resolver) {
throw new Error("Resolver not found for task");
}
resolver();
});
});
return returnValue;
await this._run();
}
async _run() {
@@ -91,7 +77,7 @@ class AsynchronousQueue {
await this._scheduleRun();
}
runTask(task) {
async runTask(task) {
const handle = {
task: Meteor.bindEnvironment(task, function(e) {
Meteor._debug('Exception from task', e);
@@ -100,7 +86,7 @@ class AsynchronousQueue {
name: task.name
};
this._taskHandles.push(handle);
return this._scheduleRun();
await this._scheduleRun();
}
flush() {

View File

@@ -245,12 +245,12 @@ const bindEnvironmentAsync = (func, onException, _this) => {
async () => {
let ret;
try {
Meteor._updateAslStore(CURRENT_VALUE_KEY_NAME, dynamics);
if (currentSlot) {
Meteor._updateAslStore(CURRENT_VALUE_KEY_NAME, dynamics);
}
ret = await func.apply(_this, args);
} catch (e) {
onException(e);
} finally {
Meteor._updateAslStore(CURRENT_VALUE_KEY_NAME, undefined);
}
return ret;
},