diff --git a/packages/ddp-server/livedata_server.js b/packages/ddp-server/livedata_server.js index 8d8f0ed451..ba95cc1492 100644 --- a/packages/ddp-server/livedata_server.js +++ b/packages/ddp-server/livedata_server.js @@ -1,7 +1,5 @@ DDPServer = {}; -var Fiber = Npm.require('fibers'); -const ASL = global.asyncLocalStorage; // Publication strategies define how we handle data from published cursors at the collection level // This allows someone to: // - Choose a trade-off between client-server bandwidth and server memory usage @@ -330,15 +328,9 @@ var Session = function (server, version, socket, options) { self.send({ msg: 'connected', session: self.id }); // On initial connect, spin up all the universal publishers. - if (Meteor._isFibersEnabled) { - Fiber(function() { - self.startUniversalSubs(); - }).run(); - } else { - ASL.run(Meteor._getAslStore, function () { - self.startUniversalSubs(); - }); - } + Meteor._runAsync(function() { + self.startUniversalSubs(); + }); if (version !== 'pre1' && options.heartbeatInterval !== 0) { // We no longer need the low level timeout because we have heartbeats. @@ -562,15 +554,11 @@ Object.assign(Session.prototype, { // // Any message counts as receiving a pong, as it demonstrates that // the client is still alive. - if (Meteor._isFibersEnabled && self.heartbeat) { - Fiber(function() { - self.heartbeat.messageReceived(); - }).run(); - } else if (self.heartbeat) { - ASL.run(Meteor._getAslStore, function () { + if (self.heartbeat) { + Meteor._runAsync(function() { self.heartbeat.messageReceived(); }); - } + }; if (self.version !== 'pre1' && msg_in.msg === 'ping') { if (self._respondToPings) @@ -616,12 +604,7 @@ Object.assign(Session.prototype, { unblock(); // in case the handler didn't already do it } - if (Meteor._isFibersEnabled) { - Fiber(runHandlers).run(); - return; - } - - ASL.run(Meteor._getAslStore, runHandlers); + Meteor._runAsync(runHandlers); }; processNext(); @@ -1505,15 +1488,9 @@ Server = function (options = {}) { return; } - if (Meteor._isFibersEnabled) { - Fiber(function() { - self._handleConnect(socket, msg); - }).run(); - } else { - ASL.run(Meteor._getAslStore, function () { - self._handleConnect(socket, msg); - }); - } + Meteor._runAsync(function() { + self._handleConnect(socket, msg); + }) return; } @@ -1531,14 +1508,7 @@ Server = function (options = {}) { socket.on('close', function () { if (socket._meteorSession) { - if (Meteor._isFibersEnabled) { - Fiber(function() { - socket._meteorSession.close() - }).run(); - return; - } - - ASL.run(Meteor._getAslStore, function () { + Meteor._runAsync(function() { socket._meteorSession.close(); }); } @@ -1718,14 +1688,7 @@ Object.assign(Server.prototype, { // self.sessions to change while we're running this loop. self.sessions.forEach(function (session) { if (!session._dontStartNewUniversalSubs) { - if (Meteor._isFibersEnabled) { - Fiber(function() { - session._startSubscription(handler); - }).run(); - return; - } - - ASL.run(Meteor._getAslStore, function() { + Meteor._runAsync(function() { session._startSubscription(handler); }); } diff --git a/packages/meteor/asl-helpers.js b/packages/meteor/asl-helpers.js index 77214cfc32..27d9b227cb 100644 --- a/packages/meteor/asl-helpers.js +++ b/packages/meteor/asl-helpers.js @@ -6,3 +6,17 @@ Meteor._isFibersEnabled = !process.env.DISABLE_FIBERS && Meteor.isServer; Meteor._getAslStore = getAslStore; Meteor._getValueFromAslStore = getValueFromAslStore; Meteor._updateAslStore = updateAslStore; + +Meteor._runAsync = (fn, ctx) => { + if (Meteor._isFibersEnabled) { + const Fiber = Npm.require('fibers'); + + return Fiber(() => { + fn.call(ctx); + }).run(); + } + + global.asyncLocalStorage.run(Meteor._getAslStore(), () => { + fn.call(ctx); + }); +};