From 53ff57b492d6ec9c5faa77af0175fcdc55b43056 Mon Sep 17 00:00:00 2001 From: Matheus Castro Date: Thu, 20 Oct 2022 23:55:58 -0300 Subject: [PATCH] Improving write fence. --- packages/ddp-server/writefence.js | 254 +++++++++++++----------------- 1 file changed, 113 insertions(+), 141 deletions(-) diff --git a/packages/ddp-server/writefence.js b/packages/ddp-server/writefence.js index f911ae71b7..d85f028ff8 100644 --- a/packages/ddp-server/writefence.js +++ b/packages/ddp-server/writefence.js @@ -1,18 +1,121 @@ -var Future = Npm.require('fibers/future'); - // A write fence collects a group of writes, and provides a callback // when all of the writes are fully committed and propagated (all // observers have been notified of the write and acknowledged it.) // -DDPServer._WriteFence = function () { - var self = this; +DDPServer._WriteFence = class { + constructor() { + this.armed = false; + this.fired = false; + this.retired = false; + this.outstanding_writes = 0; + this.before_fire_callbacks = []; + this.completion_callbacks = []; + } - self.armed = false; - self.fired = false; - self.retired = false; - self.outstanding_writes = 0; - self.before_fire_callbacks = []; - self.completion_callbacks = []; + // Start tracking a write, and return an object to represent it. The + // object has a single method, committed(). This method should be + // called when the write is fully committed and propagated. You can + // continue to add writes to the WriteFence up until it is triggered + // (calls its callbacks because all writes have committed.) + beginWrite() { + if (this.retired) + return { committed: function () {} }; + + if (this.fired) + throw new Error("fence has already activated -- too late to add writes"); + + this.outstanding_writes++; + let committed = false; + const _committedFn = async () => { + if (committed) + throw new Error("committed called twice on the same write"); + committed = true; + this.outstanding_writes--; + await this._maybeFire(); + }; + + const self = this; + return { + committed: Meteor._isFibersEnabled ? () => Promise.await(_committedFn.apply(self)) : _committedFn, + }; + } + + // Arm the fence. Once the fence is armed, and there are no more + // uncommitted writes, it will activate. + arm() { + if (this === DDPServer._CurrentWriteFence.get()) + throw Error("Can't arm the current fence"); + this.armed = true; + return Meteor._isFibersEnabled ? Promise.await(this._maybeFire()) : this._maybeFire(); + } + + // Register a function to be called once before firing the fence. + // Callback function can add new writes to the fence, in which case + // it won't fire until those writes are done as well. + onBeforeFire(func) { + if (this.fired) + throw new Error("fence has already activated -- too late to " + + "add a callback"); + this.before_fire_callbacks.push(func); + } + + // Register a function to be called when the fence fires. + onAllCommitted(func) { + if (this.fired) + throw new Error("fence has already activated -- too late to " + + "add a callback"); + this.completion_callbacks.push(func); + } + + _armAndWait() { + let resolver; + const returnValue = new Promise(r => resolver = r); + this.onAllCommitted(resolver); + this.arm(); + + return returnValue; + } + // Convenience function. Arms the fence, then blocks until it fires. + armAndWait() { + return Meteor._isFibersEnabled ? Promise.await(this._armAndWait()) : this._armAndWait(); + } + + async _maybeFire() { + if (this.fired) + throw new Error("write fence already activated?"); + if (this.armed && !this.outstanding_writes) { + const invokeCallback = async (func) => { + try { + await func(this); + } catch (err) { + Meteor._debug("exception in write fence callback:", err); + } + }; + + this.outstanding_writes++; + while (this.before_fire_callbacks.length > 0) { + const cb = this.before_fire_callbacks.shift(); + await invokeCallback(cb); + } + this.outstanding_writes--; + + if (!this.outstanding_writes) { + this.fired = true; + while (this.completion_callbacks.length > 0) { + const cb = this.completion_callbacks.shift(); + await invokeCallback(cb); + } + } + } + } + + // Deactivate this fence so that adding more writes has no effect. + // The fence must have already fired. + retire() { + if (!this.fired) + throw new Error("Can't retire a fence that hasn't fired."); + this.retired = true; + } }; // The current write fence. When there is a current write fence, code @@ -20,134 +123,3 @@ DDPServer._WriteFence = function () { // beginWrite(). // DDPServer._CurrentWriteFence = new Meteor.EnvironmentVariable; - -_.extend(DDPServer._WriteFence.prototype, { - // Start tracking a write, and return an object to represent it. The - // object has a single method, committed(). This method should be - // called when the write is fully committed and propagated. You can - // continue to add writes to the WriteFence up until it is triggered - // (calls its callbacks because all writes have committed.) - beginWrite: function () { - var self = this; - - if (self.retired) - return { committed: function () {} }; - - if (self.fired) - throw new Error("fence has already activated -- too late to add writes"); - - self.outstanding_writes++; - var committed = false; - return { - committed: function () { - if (committed) - throw new Error("committed called twice on the same write"); - committed = true; - self.outstanding_writes--; - self._maybeFire(); - } - }; - }, - - // Arm the fence. Once the fence is armed, and there are no more - // uncommitted writes, it will activate. - arm: function () { - var self = this; - if (self === DDPServer._CurrentWriteFence.get()) - throw Error("Can't arm the current fence"); - self.armed = true; - self._maybeFire(); - }, - - // Register a function to be called once before firing the fence. - // Callback function can add new writes to the fence, in which case - // it won't fire until those writes are done as well. - onBeforeFire: function (func) { - var self = this; - if (self.fired) - throw new Error("fence has already activated -- too late to " + - "add a callback"); - self.before_fire_callbacks.push(func); - }, - - // Register a function to be called when the fence fires. - onAllCommitted: function (func) { - var self = this; - if (self.fired) - throw new Error("fence has already activated -- too late to " + - "add a callback"); - self.completion_callbacks.push(func); - }, - - _armAndWaitFibers: function () { - var self = this; - var future = new Future; - self.onAllCommitted(function () { - future['return'](); - }); - self.arm(); - future.wait(); - }, - _armAndWaitNoFibers: function () { - var self = this; - - let _resolver; - self.onAllCommitted(function () { - if (!_resolver) { - console.warn("oops, no resolver"); - return; - } - - _resolver(); - }); - - return new Promise((r) => { - _resolver = r; - self.arm(); - } ); - }, - - // Convenience function. Arms the fence, then blocks until it fires. - armAndWait: function () { - return Meteor._isFibersEnabled ? this._armAndWaitFibers() : this._armAndWaitNoFibers(); - }, - - _maybeFire: function () { - var self = this; - if (self.fired) - throw new Error("write fence already activated?"); - if (self.armed && !self.outstanding_writes) { - function invokeCallback (func) { - try { - func(self); - } catch (err) { - Meteor._debug("exception in write fence callback", err); - } - } - - self.outstanding_writes++; - while (self.before_fire_callbacks.length > 0) { - var callbacks = self.before_fire_callbacks; - self.before_fire_callbacks = []; - _.each(callbacks, invokeCallback); - } - self.outstanding_writes--; - - if (!self.outstanding_writes) { - self.fired = true; - var callbacks = self.completion_callbacks; - self.completion_callbacks = []; - _.each(callbacks, invokeCallback); - } - } - }, - - // Deactivate this fence so that adding more writes has no effect. - // The fence must have already fired. - retire: function () { - var self = this; - if (! self.fired) - throw new Error("Can't retire a fence that hasn't fired."); - self.retired = true; - } -});