Improving write fence.

This commit is contained in:
Matheus Castro
2022-10-20 23:55:58 -03:00
parent fb5e207e43
commit 53ff57b492

View File

@@ -1,18 +1,121 @@
var Future = Npm.require('fibers/future');
// A write fence collects a group of writes, and provides a callback
// when all of the writes are fully committed and propagated (all
// observers have been notified of the write and acknowledged it.)
//
DDPServer._WriteFence = function () {
var self = this;
DDPServer._WriteFence = class {
constructor() {
this.armed = false;
this.fired = false;
this.retired = false;
this.outstanding_writes = 0;
this.before_fire_callbacks = [];
this.completion_callbacks = [];
}
self.armed = false;
self.fired = false;
self.retired = false;
self.outstanding_writes = 0;
self.before_fire_callbacks = [];
self.completion_callbacks = [];
// Start tracking a write, and return an object to represent it. The
// object has a single method, committed(). This method should be
// called when the write is fully committed and propagated. You can
// continue to add writes to the WriteFence up until it is triggered
// (calls its callbacks because all writes have committed.)
beginWrite() {
if (this.retired)
return { committed: function () {} };
if (this.fired)
throw new Error("fence has already activated -- too late to add writes");
this.outstanding_writes++;
let committed = false;
const _committedFn = async () => {
if (committed)
throw new Error("committed called twice on the same write");
committed = true;
this.outstanding_writes--;
await this._maybeFire();
};
const self = this;
return {
committed: Meteor._isFibersEnabled ? () => Promise.await(_committedFn.apply(self)) : _committedFn,
};
}
// Arm the fence. Once the fence is armed, and there are no more
// uncommitted writes, it will activate.
arm() {
if (this === DDPServer._CurrentWriteFence.get())
throw Error("Can't arm the current fence");
this.armed = true;
return Meteor._isFibersEnabled ? Promise.await(this._maybeFire()) : this._maybeFire();
}
// Register a function to be called once before firing the fence.
// Callback function can add new writes to the fence, in which case
// it won't fire until those writes are done as well.
onBeforeFire(func) {
if (this.fired)
throw new Error("fence has already activated -- too late to " +
"add a callback");
this.before_fire_callbacks.push(func);
}
// Register a function to be called when the fence fires.
onAllCommitted(func) {
if (this.fired)
throw new Error("fence has already activated -- too late to " +
"add a callback");
this.completion_callbacks.push(func);
}
_armAndWait() {
let resolver;
const returnValue = new Promise(r => resolver = r);
this.onAllCommitted(resolver);
this.arm();
return returnValue;
}
// Convenience function. Arms the fence, then blocks until it fires.
armAndWait() {
return Meteor._isFibersEnabled ? Promise.await(this._armAndWait()) : this._armAndWait();
}
async _maybeFire() {
if (this.fired)
throw new Error("write fence already activated?");
if (this.armed && !this.outstanding_writes) {
const invokeCallback = async (func) => {
try {
await func(this);
} catch (err) {
Meteor._debug("exception in write fence callback:", err);
}
};
this.outstanding_writes++;
while (this.before_fire_callbacks.length > 0) {
const cb = this.before_fire_callbacks.shift();
await invokeCallback(cb);
}
this.outstanding_writes--;
if (!this.outstanding_writes) {
this.fired = true;
while (this.completion_callbacks.length > 0) {
const cb = this.completion_callbacks.shift();
await invokeCallback(cb);
}
}
}
}
// Deactivate this fence so that adding more writes has no effect.
// The fence must have already fired.
retire() {
if (!this.fired)
throw new Error("Can't retire a fence that hasn't fired.");
this.retired = true;
}
};
// The current write fence. When there is a current write fence, code
@@ -20,134 +123,3 @@ DDPServer._WriteFence = function () {
// beginWrite().
//
DDPServer._CurrentWriteFence = new Meteor.EnvironmentVariable;
_.extend(DDPServer._WriteFence.prototype, {
// Start tracking a write, and return an object to represent it. The
// object has a single method, committed(). This method should be
// called when the write is fully committed and propagated. You can
// continue to add writes to the WriteFence up until it is triggered
// (calls its callbacks because all writes have committed.)
beginWrite: function () {
var self = this;
if (self.retired)
return { committed: function () {} };
if (self.fired)
throw new Error("fence has already activated -- too late to add writes");
self.outstanding_writes++;
var committed = false;
return {
committed: function () {
if (committed)
throw new Error("committed called twice on the same write");
committed = true;
self.outstanding_writes--;
self._maybeFire();
}
};
},
// Arm the fence. Once the fence is armed, and there are no more
// uncommitted writes, it will activate.
arm: function () {
var self = this;
if (self === DDPServer._CurrentWriteFence.get())
throw Error("Can't arm the current fence");
self.armed = true;
self._maybeFire();
},
// Register a function to be called once before firing the fence.
// Callback function can add new writes to the fence, in which case
// it won't fire until those writes are done as well.
onBeforeFire: function (func) {
var self = this;
if (self.fired)
throw new Error("fence has already activated -- too late to " +
"add a callback");
self.before_fire_callbacks.push(func);
},
// Register a function to be called when the fence fires.
onAllCommitted: function (func) {
var self = this;
if (self.fired)
throw new Error("fence has already activated -- too late to " +
"add a callback");
self.completion_callbacks.push(func);
},
_armAndWaitFibers: function () {
var self = this;
var future = new Future;
self.onAllCommitted(function () {
future['return']();
});
self.arm();
future.wait();
},
_armAndWaitNoFibers: function () {
var self = this;
let _resolver;
self.onAllCommitted(function () {
if (!_resolver) {
console.warn("oops, no resolver");
return;
}
_resolver();
});
return new Promise((r) => {
_resolver = r;
self.arm();
} );
},
// Convenience function. Arms the fence, then blocks until it fires.
armAndWait: function () {
return Meteor._isFibersEnabled ? this._armAndWaitFibers() : this._armAndWaitNoFibers();
},
_maybeFire: function () {
var self = this;
if (self.fired)
throw new Error("write fence already activated?");
if (self.armed && !self.outstanding_writes) {
function invokeCallback (func) {
try {
func(self);
} catch (err) {
Meteor._debug("exception in write fence callback", err);
}
}
self.outstanding_writes++;
while (self.before_fire_callbacks.length > 0) {
var callbacks = self.before_fire_callbacks;
self.before_fire_callbacks = [];
_.each(callbacks, invokeCallback);
}
self.outstanding_writes--;
if (!self.outstanding_writes) {
self.fired = true;
var callbacks = self.completion_callbacks;
self.completion_callbacks = [];
_.each(callbacks, invokeCallback);
}
}
},
// Deactivate this fence so that adding more writes has no effect.
// The fence must have already fired.
retire: function () {
var self = this;
if (! self.fired)
throw new Error("Can't retire a fence that hasn't fired.");
self.retired = true;
}
});