refactor multiplexer to typescript

This commit is contained in:
Leonardo Venturini
2024-10-14 19:45:02 -04:00
parent ae516579f6
commit bad17a5ab5
8 changed files with 2421 additions and 234 deletions

2181
package-lock.json generated

File diff suppressed because it is too large Load Diff

View File

@@ -17,6 +17,8 @@
"@babel/eslint-parser": "^7.21.3",
"@babel/eslint-plugin": "^7.19.1",
"@babel/preset-react": "^7.18.6",
"@types/lodash.isempty": "^4.4.9",
"@types/meteor": "^2.9.8",
"@types/node": "^18.16.18",
"@typescript-eslint/eslint-plugin": "^5.56.0",
"@typescript-eslint/parser": "^5.56.0",

View File

@@ -24,6 +24,7 @@ import {
} from "meteor/minimongo/constants";
import { Meteor } from "meteor/meteor";
import { ObserveHandle } from './observe_handle';
import { ObserveMultiplexer } from './observe_multiplex';
MongoInternals = {};

View File

@@ -1,37 +1,46 @@
import { ObserveHandleCallback, ObserveMultiplexer } from './observe_multiplex';
let nextObserveHandleId = 1;
export type ObserveHandleCallbackInternal = '_added' | '_addedBefore' | '_changed' | '_movedBefore' | '_removed';
// When the callbacks do not mutate the arguments, we can skip a lot of data clones
export class ObserveHandle {
_stopped: boolean;
_id: number;
_multiplexer: any;
nonMutatingCallbacks: boolean;
_id: number;
_multiplexer: ObserveMultiplexer;
nonMutatingCallbacks: boolean;
_stopped: boolean;
constructor(multiplexer: any, callbacks: any, nonMutatingCallbacks: boolean) {
this._multiplexer = multiplexer;
_added?: (...args: any[]) => void;
_addedBefore?: (...args: any[]) => void;
_changed?: (...args: any[]) => void;
_movedBefore?: (...args: any[]) => void;
_removed?: (...args: any[]) => void;
multiplexer.callbackNames().forEach((name) => {
if (callbacks[name]) {
this['_' + name] = callbacks[name];
} else if (name === "addedBefore" && callbacks.added) {
this._addedBefore = async function (id, fields, before) {
await callbacks.added(id, fields);
};
}
});
constructor(multiplexer: any, callbacks: Record<ObserveHandleCallback, any>, nonMutatingCallbacks: boolean) {
this._multiplexer = multiplexer;
this._stopped = false;
this._id = nextObserveHandleId++;
this.nonMutatingCallbacks = nonMutatingCallbacks;
}
multiplexer.callbackNames().forEach((name: ObserveHandleCallback) => {
if (callbacks[name]) {
this[`_${name}` as ObserveHandleCallbackInternal] = callbacks[name];
return;
}
async stop() {
if (this._stopped) return;
this._stopped = true;
await this._multiplexer.removeHandle(this._id);
}
if (name === "addedBefore" && callbacks.added) {
this._addedBefore = async function (id, fields, before) {
await callbacks.added(id, fields);
};
}
});
_addedBefore(id: any, fields: any, before: any) {
throw new Error("Method not implemented.");
}
this._stopped = false;
this._id = nextObserveHandleId++;
this.nonMutatingCallbacks = nonMutatingCallbacks;
}
async stop() {
if (this._stopped) return;
this._stopped = true;
await this._multiplexer.removeHandle(this._id);
}
}

View File

@@ -1,206 +0,0 @@
import has from 'lodash.has';
import isEmpty from 'lodash.isempty';
ObserveMultiplexer = class {
constructor({ ordered, onStop = () => {} } = {}) {
if (ordered === undefined) throw Error("must specify ordered");
Package['facts-base'] && Package['facts-base'].Facts.incrementServerFact(
"mongo-livedata", "observe-multiplexers", 1);
this._ordered = ordered;
this._onStop = onStop;
this._queue = new Meteor._AsynchronousQueue();
this._handles = {};
this._resolver = null;
this._readyPromise = new Promise(r => this._resolver = r).then(() => this._isReady = true);
this._cache = new LocalCollection._CachingChangeObserver({
ordered});
// Number of addHandleAndSendInitialAdds tasks scheduled but not yet
// running. removeHandle uses this to know if it's time to call the onStop
// callback.
this._addHandleTasksScheduledButNotPerformed = 0;
const self = this;
this.callbackNames().forEach(callbackName => {
this[callbackName] = function(/* ... */) {
self._applyCallback(callbackName, _.toArray(arguments));
};
});
}
addHandleAndSendInitialAdds(handle) {
return this._addHandleAndSendInitialAdds(handle);
}
async _addHandleAndSendInitialAdds(handle) {
++this._addHandleTasksScheduledButNotPerformed;
Package['facts-base'] && Package['facts-base'].Facts.incrementServerFact(
"mongo-livedata", "observe-handles", 1);
const self = this;
await this._queue.runTask(async function () {
self._handles[handle._id] = handle;
// Send out whatever adds we have so far (whether the
// multiplexer is ready).
await self._sendAdds(handle);
--self._addHandleTasksScheduledButNotPerformed;
});
await this._readyPromise;
}
// Remove an observe handle. If it was the last observe handle, call the
// onStop callback; you cannot add any more observe handles after this.
//
// This is not synchronized with polls and handle additions: this means that
// you can safely call it from within an observe callback, but it also means
// that we have to be careful when we iterate over _handles.
async removeHandle(id) {
// This should not be possible: you can only call removeHandle by having
// access to the ObserveHandle, which isn't returned to user code until the
// multiplex is ready.
if (!this._ready())
throw new Error("Can't remove handles until the multiplex is ready");
delete this._handles[id];
Package['facts-base'] && Package['facts-base'].Facts.incrementServerFact(
"mongo-livedata", "observe-handles", -1);
if (isEmpty(this._handles) &&
this._addHandleTasksScheduledButNotPerformed === 0) {
await this._stop();
}
}
async _stop(options) {
options = options || {};
// It shouldn't be possible for us to stop when all our handles still
// haven't been returned from observeChanges!
if (! this._ready() && ! options.fromQueryError)
throw Error("surprising _stop: not ready");
// Call stop callback (which kills the underlying process which sends us
// callbacks and removes us from the connection's dictionary).
await this._onStop();
Package['facts-base'] && Package['facts-base'].Facts.incrementServerFact(
"mongo-livedata", "observe-multiplexers", -1);
// Cause future addHandleAndSendInitialAdds calls to throw (but the onStop
// callback should make our connection forget about us).
this._handles = null;
}
// Allows all addHandleAndSendInitialAdds calls to return, once all preceding
// adds have been processed. Does not block.
async ready() {
const self = this;
this._queue.queueTask(function () {
if (self._ready())
throw Error("can't make ObserveMultiplex ready twice!");
if (!self._resolver) {
throw new Error("Missing resolver");
}
self._resolver();
self._isReady = true;
});
}
// If trying to execute the query results in an error, call this. This is
// intended for permanent errors, not transient network errors that could be
// fixed. It should only be called before ready(), because if you called ready
// that meant that you managed to run the query once. It will stop this
// ObserveMultiplex and cause addHandleAndSendInitialAdds calls (and thus
// observeChanges calls) to throw the error.
async queryError(err) {
var self = this;
await this._queue.runTask(function () {
if (self._ready())
throw Error("can't claim query has an error after it worked!");
self._stop({fromQueryError: true});
throw err;
});
}
// Calls "cb" once the effects of all "ready", "addHandleAndSendInitialAdds"
// and observe callbacks which came before this call have been propagated to
// all handles. "ready" must have already been called on this multiplexer.
async onFlush(cb) {
var self = this;
await this._queue.queueTask(async function () {
if (!self._ready())
throw Error("only call onFlush on a multiplexer that will be ready");
await cb();
});
}
callbackNames() {
if (this._ordered)
return ["addedBefore", "changed", "movedBefore", "removed"];
else
return ["added", "changed", "removed"];
}
_ready() {
return !!this._isReady;
}
_applyCallback(callbackName, args) {
const self = this;
this._queue.queueTask(async function () {
// If we stopped in the meantime, do nothing.
if (!self._handles)
return;
// First, apply the change to the cache.
await self._cache.applyChange[callbackName].apply(null, args);
// If we haven't finished the initial adds, then we should only be getting
// adds.
if (!self._ready() &&
(callbackName !== 'added' && callbackName !== 'addedBefore')) {
throw new Error("Got " + callbackName + " during initial adds");
}
// Now multiplex the callbacks out to all observe handles. It's OK if
// these calls yield; since we're inside a task, no other use of our queue
// can continue until these are done. (But we do have to be careful to not
// use a handle that got removed, because removeHandle does not use the
// queue; thus, we iterate over an array of keys that we control.)
for (const handleId of Object.keys(self._handles)) {
var handle = self._handles && self._handles[handleId];
if (!handle) return;
var callback = handle['_' + callbackName];
// clone arguments so that callbacks can mutate their arguments
callback &&
(await callback.apply(
null,
handle.nonMutatingCallbacks ? args : EJSON.clone(args)
));
}
});
}
// Sends initial adds to a handle. It should only be called from within a task
// (the task that is processing the addHandleAndSendInitialAdds call). It
// synchronously invokes the handle's added or addedBefore; there's no need to
// flush the queue afterwards to ensure that the callbacks get out.
async _sendAdds(handle) {
var add = this._ordered ? handle._addedBefore : handle._added;
if (!add)
return;
// note: docs may be an _IdMap or an OrderedDict
await this._cache.docs.forEachAsync(async (doc, id) => {
if (!has(this._handles, handle._id))
throw Error("handle got removed before sending initial adds!");
const { _id, ...fields } = handle.nonMutatingCallbacks ? doc
: EJSON.clone(doc);
if (this._ordered)
await add(id, fields, null); // we're going in order, so add at end
else
await add(id, fields);
});
}
};

View File

@@ -0,0 +1,176 @@
import { ObserveHandle } from './observe_handle';
import isEmpty from 'lodash.isempty';
interface ObserveMultiplexerOptions {
ordered: boolean;
onStop?: () => void;
}
export type ObserveHandleCallback = 'added' | 'addedBefore' | 'changed' | 'movedBefore' | 'removed';
export class ObserveMultiplexer {
private readonly _ordered: boolean;
private readonly _onStop: () => void;
private _queue: any;
private _handles: { [key: string]: ObserveHandle };
private _resolver: ((value?: unknown) => void) | null;
private readonly _readyPromise: Promise<boolean | void>;
private _isReady: boolean;
private _cache: any;
private _addHandleTasksScheduledButNotPerformed: number;
constructor({ ordered, onStop = () => {} }: ObserveMultiplexerOptions) {
if (ordered === undefined) throw Error("must specify ordered");
// @ts-ignore
Package['facts-base'] && Package['facts-base']
.Facts.incrementServerFact("mongo-livedata", "observe-multiplexers", 1);
this._ordered = ordered;
this._onStop = onStop;
// @ts-ignore
this._queue = new Meteor._AsynchronousQueue();
this._handles = {};
this._resolver = null;
this._isReady = false;
this._readyPromise = new Promise(r => this._resolver = r).then(() => this._isReady = true);
// @ts-ignore
this._cache = new LocalCollection._CachingChangeObserver({ ordered });
this._addHandleTasksScheduledButNotPerformed = 0;
this.callbackNames().forEach(callbackName => {
(this as any)[callbackName] = (...args: any[]) => {
this._applyCallback(callbackName, args);
};
});
}
addHandleAndSendInitialAdds(handle: ObserveHandle): Promise<void> {
return this._addHandleAndSendInitialAdds(handle);
}
async _addHandleAndSendInitialAdds(handle: ObserveHandle): Promise<void> {
++this._addHandleTasksScheduledButNotPerformed;
// @ts-ignore
Package['facts-base'] && Package['facts-base'].Facts.incrementServerFact(
"mongo-livedata", "observe-handles", 1);
await this._queue.runTask(async () => {
this._handles[handle._id] = handle;
await this._sendAdds(handle);
--this._addHandleTasksScheduledButNotPerformed;
});
await this._readyPromise;
}
async removeHandle(id: number): Promise<void> {
if (!this._ready())
throw new Error("Can't remove handles until the multiplex is ready");
delete this._handles[id];
// @ts-ignore
Package['facts-base'] && Package['facts-base'].Facts.incrementServerFact(
"mongo-livedata", "observe-handles", -1);
if (isEmpty(this._handles) &&
this._addHandleTasksScheduledButNotPerformed === 0) {
await this._stop();
}
}
async _stop(options: { fromQueryError?: boolean } = {}): Promise<void> {
if (!this._ready() && !options.fromQueryError)
throw Error("surprising _stop: not ready");
await this._onStop();
// @ts-ignore
Package['facts-base'] && Package['facts-base']
.Facts.incrementServerFact("mongo-livedata", "observe-multiplexers", -1);
this._handles = {};
}
async ready(): Promise<void> {
await this._queue.queueTask(() => {
if (this._ready())
throw Error("can't make ObserveMultiplex ready twice!");
if (!this._resolver) {
throw new Error("Missing resolver");
}
this._resolver();
this._isReady = true;
});
}
async queryError(err: Error): Promise<void> {
await this._queue.runTask(() => {
if (this._ready())
throw Error("can't claim query has an error after it worked!");
this._stop({ fromQueryError: true });
throw err;
});
}
async onFlush(cb: () => void): Promise<void> {
await this._queue.queueTask(async () => {
if (!this._ready())
throw Error("only call onFlush on a multiplexer that will be ready");
await cb();
});
}
callbackNames(): ObserveHandleCallback[] {
return this._ordered
? ["addedBefore", "changed", "movedBefore", "removed"]
: ["added", "changed", "removed"];
}
_ready(): boolean {
return !!this._isReady;
}
_applyCallback(callbackName: string, args: any[]): void {
this._queue.queueTask(async () => {
if (!this._handles) return;
await this._cache.applyChange[callbackName].apply(null, args);
if (!this._ready() &&
(callbackName !== 'added' && callbackName !== 'addedBefore')) {
throw new Error(`Got ${callbackName} during initial adds`);
}
for (const handleId of Object.keys(this._handles)) {
const handle = this._handles && this._handles[handleId];
if (!handle) return;
const callback = (handle as any)[`_${callbackName}`];
callback && (await callback.apply(
null,
handle.nonMutatingCallbacks ? args : EJSON.clone(args)
));
}
});
}
async _sendAdds(handle: ObserveHandle): Promise<void> {
const add = this._ordered ? handle._addedBefore : handle._added;
if (!add) return;
await this._cache.docs.forEachAsync(async (doc: any, id: string) => {
if (!(handle._id in this._handles))
throw Error("handle got removed before sending initial adds!");
const { _id, ...fields } = handle.nonMutatingCallbacks ? doc : EJSON.clone(doc);
if (this._ordered)
await add(id, fields, null);
else
await add(id, fields);
});
}
}

View File

@@ -88,7 +88,7 @@ Package.onUse(function (api) {
[
"mongo_driver.js",
"oplog_tailing.js",
"observe_multiplex.js",
"observe_multiplex.ts",
"doc_fetcher.js",
"polling_observe_driver.js",
"oplog_observe_driver.js",

24
tsconfig.json Normal file
View File

@@ -0,0 +1,24 @@
{
"compilerOptions": {
"target": "es2020",
"module": "commonjs",
"strict": true,
"esModuleInterop": true,
"skipLibCheck": true,
"forceConsistentCasingInFileNames": true,
"outDir": "./dist",
"rootDir": "./src",
"typeRoots": [
"./node_modules/@types",
"./types"
]
},
"include": [
"src/**/*.ts",
"src/**/*.d.ts",
"types/**/*.d.ts"
],
"exclude": [
"node_modules"
]
}