ddp-server: clear subscription references after async onStop callbacks

Minimal fix: only _callStopCallbacks becomes async, _deactivate uses
.then() to clear _session and _documents after callbacks complete.
No async propagation up the call stack.

Addresses review feedback: avoid async/await snowball, use this instead
of self, keep changes contained to the subscription cleanup level.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
dupontbertrand
2026-03-19 23:03:24 +01:00
parent b700e0aa53
commit eee349ebcb
2 changed files with 82 additions and 14 deletions

View File

@@ -1095,23 +1095,33 @@ Object.assign(Subscription.prototype, {
// removed messages for the published objects; if that is necessary, call
// _removeAllDocuments first.
_deactivate: function() {
var self = this;
if (self._deactivated)
if (this._deactivated)
return;
self._deactivated = true;
self._callStopCallbacks();
this._deactivated = true;
this._callStopCallbacks().then(() => {
// Break reference chains to allow GC of the Session and its data.
// Without this, deactivated subscriptions retain live references
// to the (now-closed) session indefinitely.
this._session = null;
this._documents = new Map();
});
Package['facts-base'] && Package['facts-base'].Facts.incrementServerFact(
"livedata", "subscriptions", -1);
},
_callStopCallbacks: function () {
var self = this;
// Tell listeners, so they can clean up
var callbacks = self._stopCallbacks;
self._stopCallbacks = [];
callbacks.forEach(function (callback) {
callback();
});
_callStopCallbacks: async function () {
// In Meteor 3, onStop callbacks can be async (e.g. observeHandle.stop()
// returns a Promise). We must await each one so that observer teardown
// completes before the subscription is considered fully deactivated.
const callbacks = this._stopCallbacks;
this._stopCallbacks = [];
for (const callback of callbacks) {
try {
await callback();
} catch (e) {
Meteor._debug("Exception in onStop callback:", e);
}
}
},
// Send remove messages for every document.
@@ -1190,8 +1200,7 @@ Object.assign(Subscription.prototype, {
// destroyed but the deferred call to _deactivateAllSubscriptions hasn't
// happened yet.
_isDeactivated: function () {
var self = this;
return self._deactivated || self._session.inQueue === null;
return this._deactivated || !this._session || this._session.inQueue === null;
},
/**

View File

@@ -968,4 +968,63 @@ Tinytest.addAsync(
clientConn.disconnect();
});
}
);
// ============================================================================
// Async onStop cleanup tests (memory leak fix)
// ============================================================================
const asyncCleanupTracker = {};
Meteor.publish('test_async_onstop_cleanup', function (trackerId) {
this.onStop(async function () {
await new Promise(resolve => setTimeout(resolve, 50));
asyncCleanupTracker[trackerId] = true;
});
this.ready();
});
Tinytest.addAsync(
'livedata server - async onStop callbacks complete on unsubscribe',
async function (test) {
const trackerId = Random.id();
asyncCleanupTracker[trackerId] = false;
const { clientConn } = await getTestConnections(test);
const sub = clientConn.subscribe('test_async_onstop_cleanup', trackerId);
await sleep(100);
sub.stop();
await sleep(200);
test.isTrue(
asyncCleanupTracker[trackerId],
'Async onStop callback should have completed'
);
clientConn.disconnect();
delete asyncCleanupTracker[trackerId];
}
);
Tinytest.addAsync(
'livedata server - async onStop callbacks complete on disconnect',
async function (test) {
const trackerId = Random.id();
asyncCleanupTracker[trackerId] = false;
const { clientConn } = await getTestConnections(test);
clientConn.subscribe('test_async_onstop_cleanup', trackerId);
await sleep(100);
clientConn.disconnect();
await sleep(300);
test.isTrue(
asyncCleanupTracker[trackerId],
'Async onStop callback should have completed on disconnect'
);
delete asyncCleanupTracker[trackerId];
}
);