diff --git a/packages/ddp-server/livedata_server.js b/packages/ddp-server/livedata_server.js index 6ec03e8dda..2a0ec3e39a 100644 --- a/packages/ddp-server/livedata_server.js +++ b/packages/ddp-server/livedata_server.js @@ -1095,23 +1095,33 @@ Object.assign(Subscription.prototype, { // removed messages for the published objects; if that is necessary, call // _removeAllDocuments first. _deactivate: function() { - var self = this; - if (self._deactivated) + if (this._deactivated) return; - self._deactivated = true; - self._callStopCallbacks(); + this._deactivated = true; + this._callStopCallbacks().then(() => { + // Break reference chains to allow GC of the Session and its data. + // Without this, deactivated subscriptions retain live references + // to the (now-closed) session indefinitely. + this._session = null; + this._documents = new Map(); + }); Package['facts-base'] && Package['facts-base'].Facts.incrementServerFact( "livedata", "subscriptions", -1); }, - _callStopCallbacks: function () { - var self = this; - // Tell listeners, so they can clean up - var callbacks = self._stopCallbacks; - self._stopCallbacks = []; - callbacks.forEach(function (callback) { - callback(); - }); + _callStopCallbacks: async function () { + // In Meteor 3, onStop callbacks can be async (e.g. observeHandle.stop() + // returns a Promise). We must await each one so that observer teardown + // completes before the subscription is considered fully deactivated. + const callbacks = this._stopCallbacks; + this._stopCallbacks = []; + for (const callback of callbacks) { + try { + await callback(); + } catch (e) { + Meteor._debug("Exception in onStop callback:", e); + } + } }, // Send remove messages for every document. @@ -1190,8 +1200,7 @@ Object.assign(Subscription.prototype, { // destroyed but the deferred call to _deactivateAllSubscriptions hasn't // happened yet. _isDeactivated: function () { - var self = this; - return self._deactivated || self._session.inQueue === null; + return this._deactivated || !this._session || this._session.inQueue === null; }, /** diff --git a/packages/ddp-server/livedata_server_tests.js b/packages/ddp-server/livedata_server_tests.js index 48313dcbcb..6dbb189d05 100644 --- a/packages/ddp-server/livedata_server_tests.js +++ b/packages/ddp-server/livedata_server_tests.js @@ -968,4 +968,63 @@ Tinytest.addAsync( clientConn.disconnect(); }); } +); + +// ============================================================================ +// Async onStop cleanup tests (memory leak fix) +// ============================================================================ + +const asyncCleanupTracker = {}; + +Meteor.publish('test_async_onstop_cleanup', function (trackerId) { + this.onStop(async function () { + await new Promise(resolve => setTimeout(resolve, 50)); + asyncCleanupTracker[trackerId] = true; + }); + this.ready(); +}); + +Tinytest.addAsync( + 'livedata server - async onStop callbacks complete on unsubscribe', + async function (test) { + const trackerId = Random.id(); + asyncCleanupTracker[trackerId] = false; + + const { clientConn } = await getTestConnections(test); + const sub = clientConn.subscribe('test_async_onstop_cleanup', trackerId); + await sleep(100); + + sub.stop(); + await sleep(200); + + test.isTrue( + asyncCleanupTracker[trackerId], + 'Async onStop callback should have completed' + ); + + clientConn.disconnect(); + delete asyncCleanupTracker[trackerId]; + } +); + +Tinytest.addAsync( + 'livedata server - async onStop callbacks complete on disconnect', + async function (test) { + const trackerId = Random.id(); + asyncCleanupTracker[trackerId] = false; + + const { clientConn } = await getTestConnections(test); + clientConn.subscribe('test_async_onstop_cleanup', trackerId); + await sleep(100); + + clientConn.disconnect(); + await sleep(300); + + test.isTrue( + asyncCleanupTracker[trackerId], + 'Async onStop callback should have completed on disconnect' + ); + + delete asyncCleanupTracker[trackerId]; + } ); \ No newline at end of file