From 00ff9c17b55408134d218a7820e6e71d998a6848 Mon Sep 17 00:00:00 2001 From: Vlad Lasky Date: Thu, 11 Dec 2025 22:48:59 +1100 Subject: [PATCH 1/3] Implement DDP session resumption for graceful reconnects This feature allows Meteor clients to resume their DDP connections after temporary disconnections without re-establishing full session state. Key changes: - Server tracks sent message count per session (sentCount) - Client tracks received message count (receivedCount) - On reconnect, client sends receivedCount with session ID - Server validates counts match to allow session resumption - New disconnect message allows graceful disconnects - Grace period (default 15s) keeps session alive after disconnect - Message queue (max 100) buffers messages during disconnect Server options: - Meteor.server.options.disconnectGracePeriod (default: 15000ms) - Meteor.server.options.maxMessageQueueLength (default: 100) This significantly reduces server CPU spikes when clients reconnect (e.g., after Google Cloud Run timeouts) by avoiding full session recreation and data re-fetch. Based on PR #13378. Rebased onto current devel branch with refactored ddp-client handlers. Co-authored-by: Jan Dvorak Co-authored-by: Valentin Slatineanu Co-authored-by: zodern --- .../common/connection_stream_handlers.js | 6 + .../ddp-client/common/livedata_connection.js | 8 +- .../ddp-client/common/message_processors.js | 5 + .../test/livedata_connection_tests.js | 159 +++++++- packages/ddp-client/test/stub_stream.js | 4 + packages/ddp-server/livedata_server.js | 208 ++++++++-- packages/ddp-server/livedata_server_tests.js | 377 +++++++++++++++++- v3-docs/docs/api/meteor.md | 19 +- 8 files changed, 721 insertions(+), 65 deletions(-) diff --git a/packages/ddp-client/common/connection_stream_handlers.js b/packages/ddp-client/common/connection_stream_handlers.js index bfef109ff0..d4fa785df6 100644 --- a/packages/ddp-client/common/connection_stream_handlers.js +++ b/packages/ddp-client/common/connection_stream_handlers.js @@ -33,6 +33,11 @@ export class ConnectionStreamHandlers { return; } + // Track received message count for session resumption (excluding ping/pong) + if (!this._connection._ignoredMsgsForSessionOutOfDateCheck.includes(msg.msg)) { + this._connection._receivedCount++; + } + // Important: This was missing from previous version // We need to set the current version before routing the message if (msg.msg === 'connected') { @@ -139,6 +144,7 @@ export class ConnectionStreamHandlers { const msg = { msg: 'connect' }; if (this._connection._lastSessionId) { msg.session = this._connection._lastSessionId; + msg.receivedCount = this._connection._receivedCount; } msg.version = this._connection._versionSuggestion || this._connection._supportedDDPVersions[0]; this._connection._versionSuggestion = msg.version; diff --git a/packages/ddp-client/common/livedata_connection.js b/packages/ddp-client/common/livedata_connection.js index 9755a8012a..9cbbe0ee77 100644 --- a/packages/ddp-client/common/livedata_connection.js +++ b/packages/ddp-client/common/livedata_connection.js @@ -93,6 +93,10 @@ export class Connection { } self._lastSessionId = null; + // how many messages we've received (excluding ping/pong). + // when we try to reconnect to the server, it will check this against the number of messages it sent. + // if there is a mismatch, our info is out of date and we need a clean session. + self._receivedCount = 0; self._versionSuggestion = null; // The last proposed DDP version. self._version = null; // The DDP version agreed on by client and server. self._stores = Object.create(null); // name -> object with methods @@ -102,6 +106,7 @@ export class Connection { self._heartbeatInterval = options.heartbeatInterval; self._heartbeatTimeout = options.heartbeatTimeout; + self._ignoredMsgsForSessionOutOfDateCheck = ['ping', 'pong']; // Tracks methods which the user has tried to call but which have not yet // called their user callback (ie, they are waiting on their result or for all @@ -1081,11 +1086,12 @@ export class Connection { * @locus Client */ disconnect(...args) { + this._send({ msg: 'disconnect' }); return this._stream.disconnect(...args); } close() { - return this._stream.disconnect({ _permanent: true }); + return this.disconnect({ _permanent: true }); } /// diff --git a/packages/ddp-client/common/message_processors.js b/packages/ddp-client/common/message_processors.js index 09b13f742e..0fbe7ece98 100644 --- a/packages/ddp-client/common/message_processors.js +++ b/packages/ddp-client/common/message_processors.js @@ -43,10 +43,15 @@ export class MessageProcessors { if (reconnectedToPreviousSession) { // Successful reconnection -- pick up where we left off. + // Don't reset stores since we're continuing the same session. + self._resetStores = false; return; } // Server doesn't have our data anymore. Re-sync a new session. + // Reset the received count since we're starting a new session. + // Set to 1 because the 'connected' message itself counts. + self._receivedCount = 1; // Forget about messages we were buffering for unknown collections. They'll // be resent if still relevant. diff --git a/packages/ddp-client/test/livedata_connection_tests.js b/packages/ddp-client/test/livedata_connection_tests.js index 94994d3fbe..69a8ebce0c 100644 --- a/packages/ddp-client/test/livedata_connection_tests.js +++ b/packages/ddp-client/test/livedata_connection_tests.js @@ -20,14 +20,16 @@ const newConnection = function(stream, options) { ); }; -const makeConnectMessage = function(session) { +const makeConnectMessage = function(session, receivedCount) { const msg = { msg: 'connect', version: DDPCommon.SUPPORTED_DDP_VERSIONS[0], - support: DDPCommon.SUPPORTED_DDP_VERSIONS + support: DDPCommon.SUPPORTED_DDP_VERSIONS, }; if (session) msg.session = session; + if (receivedCount) msg.receivedCount = receivedCount; + return msg; }; @@ -869,7 +871,7 @@ Tinytest.addAsync('livedata stub - reconnect', async function(test, onComplete) // sub. The wait method still is blocked. await stream.reset(); - testGotMessage(test, stream, makeConnectMessage(SESSION_ID)); + testGotMessage(test, stream, makeConnectMessage(SESSION_ID, conn._receivedCount)); testGotMessage(test, stream, methodMessage); testGotMessage(test, stream, subMessage); @@ -990,7 +992,7 @@ if (Meteor.isClient) { await stream.reset(); // verify that a reconnect message was sent. - testGotMessage(test, stream, makeConnectMessage(SESSION_ID)); + testGotMessage(test, stream, makeConnectMessage(SESSION_ID, conn._receivedCount)); // Make sure that the stream triggers connection. await stream.receive({ msg: 'connected', session: SESSION_ID + 1 }); @@ -1114,7 +1116,7 @@ if (Meteor.isClient) { // in. Reconnect quiescence happens as soon as 'connected' is received because // there are no pending methods or subs in need of revival. await stream.reset(); - testGotMessage(test, stream, makeConnectMessage(SESSION_ID)); + testGotMessage(test, stream, makeConnectMessage(SESSION_ID, conn._receivedCount)); // Still holding out hope for session resumption, so nothing updated yet. test.equal(coll.find().count(), 1); test.equal(await coll.findOneAsync(stubWrittenId), { @@ -1209,7 +1211,7 @@ if (Meteor.isClient) { // but slowMethod gets called via onReconnect. Reconnect quiescence is now // blocking on slowMethod. await stream.reset(); - testGotMessage(test, stream, makeConnectMessage(SESSION_ID + 1)); + testGotMessage(test, stream, makeConnectMessage(SESSION_ID + 1, conn._receivedCount)); const slowMethodId = testGotMessage(test, stream, { msg: 'method', method: 'slowMethod', @@ -1330,7 +1332,7 @@ Tinytest.addAsync('livedata stub - reconnect method which only got data', async // Reset stream. Method gets resent (with same ID), and blocks reconnect // quiescence. await stream.reset(); - testGotMessage(test, stream, makeConnectMessage(SESSION_ID)); + testGotMessage(test, stream, makeConnectMessage(SESSION_ID, conn._receivedCount)); testGotMessage(test, stream, { msg: 'method', method: 'doLittle', @@ -1807,7 +1809,7 @@ addReconnectTests( // reconnect stream.sent = []; await stream.reset(); - testGotMessage(test, stream, makeConnectMessage(conn._lastSessionId)); + testGotMessage(test, stream, makeConnectMessage(conn._lastSessionId, conn._receivedCount)); // Test that we sent what we expect to send, and we're blocked on // what we expect to be blocked. The subsequent logic to correctly @@ -2033,7 +2035,7 @@ addReconnectTests( // reconnect stream.sent = []; await stream.reset(); - testGotMessage(test, stream, makeConnectMessage(conn._lastSessionId)); + testGotMessage(test, stream, makeConnectMessage(conn._lastSessionId, conn._receivedCount)); // Test that we sent what we expect to send, and we're blocked on // what we expect to be blocked. The subsequent logic to correctly @@ -2084,7 +2086,7 @@ addReconnectTests( // initial connect stream.sent = []; await stream.reset(); - testGotMessage(test, stream, makeConnectMessage(conn._lastSessionId)); + testGotMessage(test, stream, makeConnectMessage(conn._lastSessionId, conn._receivedCount)); // Test that we sent just the login message. const loginId = testGotMessage(test, stream, { @@ -2152,7 +2154,7 @@ addReconnectTests('livedata stub - reconnect double wait method', async function // Reset stream. halfwayMethod does NOT get resent, but reconnectMethod does! // Reconnect quiescence happens when reconnectMethod is done. await stream.reset(); - testGotMessage(test, stream, makeConnectMessage(SESSION_ID)); + testGotMessage(test, stream, makeConnectMessage(SESSION_ID, conn._receivedCount)); const reconnectId = testGotMessage(test, stream, { msg: 'method', method: 'reconnectMethod', @@ -2257,7 +2259,7 @@ Tinytest.addAsync('livedata stub - subscribe errors', async function(test) { // stream reset: reconnect! await stream.reset(); // We send a connect. - testGotMessage(test, stream, makeConnectMessage(SESSION_ID)); + testGotMessage(test, stream, makeConnectMessage(SESSION_ID, conn._receivedCount)); // We should NOT re-sub to the sub, because we processed the error. test.length(stream.sent, 0); test.isFalse(onReadyFired); @@ -2376,7 +2378,7 @@ if (Meteor.isClient) { // Initiate reconnect. await stream.reset(); - testGotMessage(test, stream, makeConnectMessage(SESSION_ID)); + testGotMessage(test, stream, makeConnectMessage(SESSION_ID, conn._receivedCount)); testGotMessage(test, stream, subMessage); await stream.receive({ msg: 'connected', session: SESSION_ID + 1 }); @@ -2559,8 +2561,137 @@ if (Meteor.isClient) { ); } +// ============================================================================ +// DDP Session Resumption Tests (Client-side) +// ============================================================================ + +Tinytest.addAsync('livedata connection - receivedCount tracking', async function(test) { + const stream = new StubStream(); + const conn = newConnection(stream); + + // Initially receivedCount should be 0 + test.equal(conn._receivedCount, 0); + + await startAndConnect(test, stream); + + // After receiving 'connected', receivedCount should be 1 + // (the 'connected' message itself is counted) + test.equal(conn._receivedCount, 1); + + // Receive some data messages + await stream.receive({ msg: 'added', collection: 'test', id: '1', fields: { a: 1 } }); + test.equal(conn._receivedCount, 2); + + await stream.receive({ msg: 'added', collection: 'test', id: '2', fields: { b: 2 } }); + test.equal(conn._receivedCount, 3); + + // Ping/pong should NOT increment receivedCount + await stream.receive({ msg: 'ping', id: 'ping1' }); + test.equal(conn._receivedCount, 3, "ping should not increment receivedCount"); + + await stream.receive({ msg: 'pong', id: 'pong1' }); + test.equal(conn._receivedCount, 3, "pong should not increment receivedCount"); + + // More data messages should continue incrementing + await stream.receive({ msg: 'changed', collection: 'test', id: '1', fields: { a: 2 } }); + test.equal(conn._receivedCount, 4); +}); + +Tinytest.addAsync('livedata connection - receivedCount sent on reconnect', async function(test) { + const stream = new StubStream(); + const conn = newConnection(stream); + + await startAndConnect(test, stream); + + // Receive some messages to build up receivedCount + await stream.receive({ msg: 'added', collection: 'test', id: '1', fields: {} }); + await stream.receive({ msg: 'added', collection: 'test', id: '2', fields: {} }); + await stream.receive({ msg: 'ready', subs: ['sub1'] }); + + const expectedReceivedCount = conn._receivedCount; + test.equal(expectedReceivedCount, 4); // connected + 3 messages + + // Simulate disconnect and reconnect + await stream.reset(); + + // The connect message should include the receivedCount + const connectMsg = JSON.parse(stream.sent.shift()); + test.equal(connectMsg.msg, 'connect'); + test.equal(connectMsg.session, SESSION_ID); + test.equal(connectMsg.receivedCount, expectedReceivedCount, + "Connect message should include receivedCount for session resumption"); +}); + +Tinytest.addAsync('livedata connection - receivedCount reset on new session', async function(test) { + const stream = new StubStream(); + const conn = newConnection(stream); + + await startAndConnect(test, stream); + + // Build up some receivedCount + await stream.receive({ msg: 'added', collection: 'test', id: '1', fields: {} }); + await stream.receive({ msg: 'added', collection: 'test', id: '2', fields: {} }); + test.equal(conn._receivedCount, 3); + + // Simulate reconnect + await stream.reset(); + stream.sent.shift(); // consume connect message + + // Server responds with a DIFFERENT session (new session, not resumed) + const newSessionId = SESSION_ID + '_new'; + await stream.receive({ msg: 'connected', session: newSessionId }); + + // receivedCount should be reset to 1 (counting the new connected message) + test.equal(conn._receivedCount, 1, + "receivedCount should be reset to 1 when getting a new session"); + test.equal(conn._lastSessionId, newSessionId); +}); + +Tinytest.addAsync('livedata connection - receivedCount preserved on session resume', async function(test) { + const stream = new StubStream(); + const conn = newConnection(stream); + + await startAndConnect(test, stream); + + // Build up some receivedCount + await stream.receive({ msg: 'added', collection: 'test', id: '1', fields: {} }); + await stream.receive({ msg: 'added', collection: 'test', id: '2', fields: {} }); + const countBeforeDisconnect = conn._receivedCount; + test.equal(countBeforeDisconnect, 3); + + // Simulate reconnect + await stream.reset(); + stream.sent.shift(); // consume connect message + + // Server responds with the SAME session (resumed) + await stream.receive({ msg: 'connected', session: SESSION_ID }); + + // receivedCount should continue from where it was (plus the connected message) + test.equal(conn._receivedCount, countBeforeDisconnect + 1, + "receivedCount should continue incrementing on session resume"); + test.equal(conn._lastSessionId, SESSION_ID); +}); + +Tinytest.addAsync('livedata connection - disconnect sends disconnect message', async function(test) { + const stream = new StubStream(); + const conn = newConnection(stream); + + await startAndConnect(test, stream); + + // Clear any pending messages + stream.sent.length = 0; + + // Call disconnect + conn.disconnect(); + + // Should have sent a disconnect message + test.isTrue(stream.sent.length > 0, "Should have sent at least one message"); + const disconnectMsg = JSON.parse(stream.sent.shift()); + test.equal(disconnectMsg.msg, 'disconnect', + "disconnect() should send a disconnect message to the server"); +}); + // XXX also test: -// - reconnect, with session resume. // - restart on update flag // - on_update event // - reloading when the app changes, including session migration \ No newline at end of file diff --git a/packages/ddp-client/test/stub_stream.js b/packages/ddp-client/test/stub_stream.js index 43c05127fa..3e57b5ed4f 100644 --- a/packages/ddp-client/test/stub_stream.js +++ b/packages/ddp-client/test/stub_stream.js @@ -27,6 +27,10 @@ Object.assign(StubStream.prototype, { // no-op }, + disconnect: function() { + // no-op - for testing Connection.disconnect() + }, + _lostConnection: function() { // no-op }, diff --git a/packages/ddp-server/livedata_server.js b/packages/ddp-server/livedata_server.js index 3f5efceafb..d3cba79ad7 100644 --- a/packages/ddp-server/livedata_server.js +++ b/packages/ddp-server/livedata_server.js @@ -81,11 +81,16 @@ var Session = function (server, version, socket, options) { var self = this; self.id = Random.id(); + // how many messages we've actually sent (not queued to send) excluding ping/pong + // we'll use this to detect mismatch of data on reconnect. + self.sentCount = 0; + self.server = server; self.version = version; self.initialized = false; self.socket = socket; + self.options = options; // Set to null when the session is destroyed. Multiple places below // use this to determine if the session is alive or not. @@ -134,6 +139,8 @@ var Session = function (server, version, socket, options) { self.connectionHandle = { id: self.id, close: function () { + // Server-initiated close should not be resumable + self._expectingDisconnect = true; self.close(); }, onClose: function (fn) { @@ -175,6 +182,8 @@ var Session = function (server, version, socket, options) { "livedata", "sessions", 1); }; +const ignoredMsgsForSessionOutOfDateCheck = ['ping', 'pong']; + Object.assign(Session.prototype, { sendReady: function (subscriptionIds) { var self = this; @@ -269,77 +278,101 @@ Object.assign(Session.prototype, { }, startUniversalSubs: function () { - var self = this; + const self = this; // Make a shallow copy of the set of universal handlers and start them. If // additional universal publishers start while we're running them (due to // yielding), they will run separately as part of Server.publish. - var handlers = [...self.server.universal_publish_handlers]; - handlers.forEach(function (handler) { + for (const handler of [...self.server.universal_publish_handlers]) { self._startSubscription(handler); - }); + } }, // Destroy this session and unregister it at the server. close: function () { - var self = this; + const self = this; // Destroy this session, even if it's not registered at the // server. Stop all processing and tear everything down. If a socket // was attached, close it. - // Already destroyed. - if (! self.inQueue) + // Already closing or closed - prevent multiple close() calls + if (self._isClosing) { return; + } + self._isClosing = true; - // Drop the merge box data immediately. - self.inQueue = null; - self.collectionViews = new Map(); - - if (self.heartbeat) { - self.heartbeat.stop(); - self.heartbeat = null; + if (self._removeTimeoutHandle) { + Meteor.clearTimeout(self._removeTimeoutHandle); + self._removeTimeoutHandle = null; } if (self.socket) { self.socket.close(); self.socket._meteorSession = null; + self.socket = null; } - Package['facts-base'] && Package['facts-base'].Facts.incrementServerFact( - "livedata", "sessions", -1); + // Stop heartbeat immediately - we don't need it during the grace period + // since we have no socket to send pings on anyway. + if (self.heartbeat) { + self.heartbeat.stop(); + self.heartbeat = null; + } - Meteor.defer(function () { - // Stop callbacks can yield, so we defer this on close. - // sub._isDeactivated() detects that we set inQueue to null and - // treats it as semi-deactivated (it will ignore incoming callbacks, etc). - self._deactivateAllSubscriptions(); + self.server._removeSession(self, () => { + Package['facts-base'] && Package['facts-base'].Facts.incrementServerFact( + "livedata", "sessions", -1); - // Defer calling the close callbacks, so that the caller closing - // the session isn't waiting for all the callbacks to complete. - self._closeCallbacks.forEach(function (callback) { - callback(); + self.inQueue = null; + self.collectionViews = new Map(); + + if (self.heartbeat) { + self.heartbeat.stop(); + self.heartbeat = null; + } + + Meteor.defer(function () { + // stop callbacks can yield, so we defer this on close. + // sub._isDeactivated() detects that we set inQueue to null and + // treats it as semi-deactivated (it will ignore incoming callbacks, etc). + self._deactivateAllSubscriptions(); + + // Defer calling the close callbacks, so that the caller closing + // the session isn't waiting for all the callbacks to complete. + self._closeCallbacks.forEach(callback => { + callback(); + }); }); }); - - // Unregister the session. - self.server._removeSession(self); }, // Send a message (doing nothing if no socket is connected right now). // It should be a JSON object (it will be stringified). send: function (msg) { const self = this; + const isIgnoredMsg = ignoredMsgsForSessionOutOfDateCheck.includes(msg.msg); + if (self.messageQueue && !isIgnoredMsg) { + self.messageQueue.push(msg); + if (self.messageQueue.length > self.options.maxMessageQueueLength) { + Meteor.clearTimeout(self._removeTimeoutHandle); + self._pendingRemoveFunction(); + } + return; + } if (self.socket) { if (Meteor._printSentDDP) Meteor._debug("Sent DDP", DDPCommon.stringifyDDP(msg)); + if (!isIgnoredMsg) { + self.sentCount++; + } self.socket.send(DDPCommon.stringifyDDP(msg)); } }, // Send a connection error. sendError: function (reason, offendingMessage) { - var self = this; - var msg = {msg: 'error', reason: reason}; + const self = this; + const msg = {msg: 'error', reason: reason}; if (offendingMessage) msg.offendingMessage = offendingMessage; self.send(msg); @@ -379,7 +412,7 @@ Object.assign(Session.prototype, { // the client is still alive. if (self.heartbeat) { self.heartbeat.messageReceived(); - }; + } if (self.version !== 'pre1' && msg_in.msg === 'ping') { if (self._respondToPings) @@ -391,6 +424,13 @@ Object.assign(Session.prototype, { return; } + if (msg_in.msg === 'disconnect') { + if (msg_in.msg in self.protocol_handlers) { + // we want to pre-empt the queue - a disconnect is imminent. + return self.protocol_handlers[msg_in.msg].call(self, msg_in, () => {}); + } + } + self.inQueue.push(msg_in); if (self.workerRunning) return; @@ -444,6 +484,9 @@ Object.assign(Session.prototype, { }, protocol_handlers: { + disconnect: function(msg) { + this._expectingDisconnect = true; + }, sub: async function (msg, unblock) { var self = this; @@ -1252,6 +1295,19 @@ Server = function (options = {}) { // For testing, allow responding to pings to be disabled. respondToPings: true, defaultPublicationStrategy: publicationStrategies.SERVER_MERGE, + /** + * @summary How many messages should we queue during a non-graceful disconnect before we kill the session (to insure against memory leaks). + * @type {Number} + * @locus Server + */ + maxMessageQueueLength: 100, + /** + * @summary How long we should maintain a session for after a non-graceful disconnect before killing it + * sessions that reconnect within this time will be resumed with minimal performance impact. + * @type {Number} + * @locus Server + */ + disconnectGracePeriod: 15000, ...options, }; @@ -1424,13 +1480,63 @@ Object.assign(Server.prototype, { // Yay, version matches! Create a new session. // Note: Troposphere depends on the ability to mutate // Meteor.server.options.heartbeatTimeout! This is a hack, but it's life. - socket._meteorSession = new Session(self, version, socket, self.options); - self.sessions.set(socket._meteorSession.id, socket._meteorSession); - self.onConnectionHook.each(function (callback) { - if (socket._meteorSession) - callback(socket._meteorSession.connectionHandle); - return true; - }); + const existingSession = self.sessions.get(msg.session); + + // we've found a session with: + // the right ID + // a matching sent/received count + // was disconnected and hasn't been reconnected to yet. + if (existingSession && existingSession.sentCount === msg.receivedCount && existingSession._removeTimeoutHandle) { + Meteor.clearTimeout(existingSession._removeTimeoutHandle); + delete existingSession._removeTimeoutHandle; + delete existingSession._pendingRemoveFunction; + existingSession._isClosing = false; // Reset so session can be closed again later + socket._meteorSession = existingSession; + const messageQueue = existingSession.messageQueue; + delete existingSession.messageQueue; + existingSession.socket = socket; + + // Restart heartbeat for the resumed session + if (existingSession.version !== 'pre1' && self.options.heartbeatInterval !== 0) { + socket.setWebsocketTimeout(0); + existingSession.heartbeat = new DDPCommon.Heartbeat({ + heartbeatInterval: self.options.heartbeatInterval, + heartbeatTimeout: self.options.heartbeatTimeout, + onTimeout: function () { + existingSession.close(); + }, + sendPing: function () { + existingSession.send({msg: 'ping'}); + } + }); + existingSession.heartbeat.start(); + } + + // Send connected message so client can restart heartbeat and confirm resumption + existingSession.send({ msg: 'connected', session: existingSession.id }); + if (messageQueue) { + Meteor.defer(() => { + messageQueue.forEach(msg => existingSession.send(msg)); + }); + } + // Note: onConnectionHook is NOT called on session resume - the connection + // is considered to be the same logical connection as before. + } + else { + // immediately remove the old session since we're out of date. + if (existingSession && existingSession._pendingRemoveFunction) { + Meteor.clearTimeout(existingSession._removeTimeoutHandle); + existingSession._pendingRemoveFunction(); + } + socket._meteorSession = new Session(self, version, socket, self.options); + self.sessions.set(socket._meteorSession.id, socket._meteorSession); + + self.onConnectionHook.each(function (callback) { + if (socket._meteorSession) + callback(socket._meteorSession.connectionHandle); + return true; + }); + } }, /** * Register a publish handler function. @@ -1520,9 +1626,31 @@ Object.assign(Server.prototype, { } }, - _removeSession: function (session) { + _removeSession: function (session, callback = () => {}) { var self = this; - self.sessions.delete(session.id); + const sessionRemoveFunction = () => { + // Guard against being called multiple times (e.g., from both overflow and timeout) + if (!self.sessions.has(session.id)) { + return; + } + // Clear timeout handle if it exists to prevent double execution + if (session._removeTimeoutHandle) { + Meteor.clearTimeout(session._removeTimeoutHandle); + session._removeTimeoutHandle = null; + } + session._pendingRemoveFunction = null; + self.sessions.delete(session.id); + callback(); + }; + if (session._expectingDisconnect) { + return sessionRemoveFunction(); + } + session.messageQueue = []; + session._pendingRemoveFunction = sessionRemoveFunction; + if (session._removeTimeoutHandle) { + Meteor.clearTimeout(session._removeTimeoutHandle); + } + session._removeTimeoutHandle = Meteor.setTimeout(sessionRemoveFunction, self.options.disconnectGracePeriod); }, /** diff --git a/packages/ddp-server/livedata_server_tests.js b/packages/ddp-server/livedata_server_tests.js index 15b0349e87..d903e5a74c 100644 --- a/packages/ddp-server/livedata_server_tests.js +++ b/packages/ddp-server/livedata_server_tests.js @@ -1,3 +1,38 @@ +// Helper to temporarily set disconnectGracePeriod for DDP resumption tests +// This ensures test isolation - other tests run with the default grace period +const DEFAULT_GRACE_PERIOD = Meteor.server.options.disconnectGracePeriod; +const TEST_GRACE_PERIOD = 5000; // Short grace period for fast tests (ms) +// Derived timing constants to avoid hardcoding throughout tests +const WITHIN_GRACE_PERIOD_MS = Math.floor(TEST_GRACE_PERIOD / 4); // Well within grace period +const AFTER_GRACE_PERIOD_MS = Math.ceil(TEST_GRACE_PERIOD * 1.5); // After grace period expires +const POLL_TIMEOUT_MS = TEST_GRACE_PERIOD * 2; // Max time to wait for async operations before failing + +async function withTestGracePeriod(fn) { + const previous = Meteor.server.options.disconnectGracePeriod; + Meteor.server.options.disconnectGracePeriod = TEST_GRACE_PERIOD; + try { + await fn(); + } finally { + Meteor.server.options.disconnectGracePeriod = previous ?? DEFAULT_GRACE_PERIOD; + } +} + +// Helper to poll for a condition with timeout to prevent hanging tests +function pollUntil(conditionFn, timeoutMs = POLL_TIMEOUT_MS) { + return new Promise((resolve, reject) => { + const startTime = Date.now(); + const interval = setInterval(() => { + if (conditionFn()) { + clearInterval(interval); + resolve(); + } else if (Date.now() - startTime > timeoutMs) { + clearInterval(interval); + reject(new Error(`Timed out after ${timeoutMs}ms waiting for condition`)); + } + }, 10); + }); +} + Tinytest.addAsync( "livedata server - connectionHandle.onClose()", function (test, onComplete) { @@ -593,4 +628,344 @@ function getTestConnections(test) { function sleep(ms) { return new Promise(resolve => setTimeout(resolve, ms)); -} \ No newline at end of file +} + +// ============================================================================ +// DDP Session Resumption Tests +// ============================================================================ + +// Test that unexpected disconnects allow session resumption within grace period +Tinytest.addAsync( + "livedata server - DDP resumption: unexpected disconnect preserves session", + async function (test) { + await withTestGracePeriod(async () => { + const { clientConn, serverConn } = await getTestConnections(test); + const originalSessionId = serverConn.id; + + // Verify the session exists + test.isTrue(Meteor.server.sessions.has(originalSessionId)); + + // Simulate unexpected disconnect by forcing the stream to close + // without sending a disconnect message + clientConn._stream._lostConnection(); + + // Wait a bit but less than the grace period + await sleep(WITHIN_GRACE_PERIOD_MS); + + // Session should still exist during grace period + test.isTrue( + Meteor.server.sessions.has(originalSessionId), + "Session should be preserved during grace period" + ); + + // Wait for grace period to expire + await sleep(AFTER_GRACE_PERIOD_MS); + + // Session should be removed after grace period + test.isFalse( + Meteor.server.sessions.has(originalSessionId), + "Session should be removed after grace period expires" + ); + }); + } +); + +// Test that graceful disconnects (client sends disconnect message) remove session immediately +Tinytest.addAsync( + "livedata server - DDP resumption: graceful disconnect removes session immediately", + async function (test) { + await withTestGracePeriod(async () => { + const { clientConn, serverConn } = await getTestConnections(test); + const originalSessionId = serverConn.id; + + // Verify the session exists + test.isTrue(Meteor.server.sessions.has(originalSessionId)); + + // Graceful disconnect - this sends the disconnect message + clientConn.disconnect(); + + // Wait a moment for the disconnect to process + await sleep(WITHIN_GRACE_PERIOD_MS); + + // Session should be removed immediately (not waiting for grace period) + test.isFalse( + Meteor.server.sessions.has(originalSessionId), + "Session should be removed immediately after graceful disconnect" + ); + }); + } +); + +// Test that server-initiated close removes session immediately (not resumable) +Tinytest.addAsync( + "livedata server - DDP resumption: server-initiated close removes session immediately", + async function (test) { + await withTestGracePeriod(async () => { + const { clientConn, serverConn } = await getTestConnections(test); + const originalSessionId = serverConn.id; + + // Verify the session exists + test.isTrue(Meteor.server.sessions.has(originalSessionId)); + + // Server-initiated close via connectionHandle.close() + serverConn.close(); + + // Wait a moment for the close to process + await sleep(WITHIN_GRACE_PERIOD_MS); + + // Session should be removed immediately (server kicks should not be resumable) + test.isFalse( + Meteor.server.sessions.has(originalSessionId), + "Session should be removed immediately after server-initiated close" + ); + }); + } +); + +// Test that onConnection hook is NOT called on session resume +Tinytest.addAsync( + "livedata server - DDP resumption: onConnection not called on resume", + async function (test) { + await withTestGracePeriod(async () => { + let onConnectionCallCount = 0; + let lastConnectionId = null; + + const handle = Meteor.onConnection(function (conn) { + onConnectionCallCount++; + lastConnectionId = conn.id; + }); + + // Create initial connection + const clientConn = DDP.connect(Meteor.absoluteUrl(), { retry: false }); + + // Wait for connection with timeout + await pollUntil(() => clientConn._lastSessionId); + + const originalSessionId = clientConn._lastSessionId; + test.equal(onConnectionCallCount, 1, "onConnection should be called once on initial connect"); + test.equal(lastConnectionId, originalSessionId); + + // Get the server session and verify it exists + const serverSession = Meteor.server.sessions.get(originalSessionId); + test.isTrue(serverSession, "Server session should exist"); + + // Simulate unexpected disconnect + clientConn._stream._lostConnection(); + + // Wait a bit (less than grace period) + await sleep(WITHIN_GRACE_PERIOD_MS); + + // Session should still exist + test.isTrue( + Meteor.server.sessions.has(originalSessionId), + "Session should still exist during grace period" + ); + + // Reconnect - this should resume the session + clientConn._stream.reconnect(); + + // Wait for reconnection with timeout + await pollUntil(() => clientConn.status().connected); + + // Give it a moment to process + await sleep(WITHIN_GRACE_PERIOD_MS); + + // IMPORTANT: Assert that session was actually resumed (same session ID) + // If this fails, the test is not actually testing resumption + test.equal( + clientConn._lastSessionId, + originalSessionId, + "Session should be resumed with same session ID" + ); + + // onConnection should NOT have been called again for a resumed session + test.equal( + onConnectionCallCount, + 1, + "onConnection should not be called again on session resume" + ); + + handle.stop(); + clientConn.disconnect(); + }); + } +); + +// Test that server-initiated close prevents session resumption +Tinytest.addAsync( + "livedata server - DDP resumption: server close prevents resumption", + async function (test) { + await withTestGracePeriod(async () => { + let onConnectionCallCount = 0; + + const handle = Meteor.onConnection(function (conn) { + onConnectionCallCount++; + }); + + // Create initial connection + const clientConn = DDP.connect(Meteor.absoluteUrl(), { retry: true }); + + // Wait for connection with timeout + await pollUntil(() => clientConn._lastSessionId); + + const originalSessionId = clientConn._lastSessionId; + test.equal(onConnectionCallCount, 1, "onConnection should be called once on initial connect"); + + // Get the server session + const serverSession = Meteor.server.sessions.get(originalSessionId); + test.isTrue(serverSession, "Server session should exist"); + + // Server-initiated close (kick the client) + serverSession.connectionHandle.close(); + + // Wait for client to reconnect with new session (retry is enabled) + await pollUntil(() => + clientConn.status().connected && clientConn._lastSessionId !== originalSessionId + ); + + // Should have a NEW session (not resumed) + test.notEqual( + clientConn._lastSessionId, + originalSessionId, + "Should have a new session ID after server-initiated close" + ); + + // onConnection should have been called again (new session, not resumed) + test.equal( + onConnectionCallCount, + 2, + "onConnection should be called again after server-initiated close" + ); + + handle.stop(); + clientConn.disconnect(); + }); + } +); + +// Test that graceful client disconnect prevents session resumption +Tinytest.addAsync( + "livedata server - DDP resumption: graceful disconnect prevents resumption", + async function (test) { + await withTestGracePeriod(async () => { + let onConnectionCallCount = 0; + + const handle = Meteor.onConnection(function (conn) { + onConnectionCallCount++; + }); + + // Create initial connection with retry enabled + const clientConn = DDP.connect(Meteor.absoluteUrl(), { retry: true }); + + // Wait for connection with timeout + await pollUntil(() => clientConn._lastSessionId); + + const originalSessionId = clientConn._lastSessionId; + test.equal(onConnectionCallCount, 1, "onConnection should be called once on initial connect"); + + // Graceful disconnect (sends disconnect message) + clientConn.disconnect(); + + // Wait for session to be removed + await sleep(WITHIN_GRACE_PERIOD_MS); + + // Session should be removed immediately + test.isFalse( + Meteor.server.sessions.has(originalSessionId), + "Session should be removed after graceful disconnect" + ); + + // Reconnect + clientConn.reconnect(); + + // Wait for reconnection with timeout + await pollUntil(() => clientConn.status().connected); + + // Should have a NEW session (not resumed, because we gracefully disconnected) + test.notEqual( + clientConn._lastSessionId, + originalSessionId, + "Should have a new session ID after graceful disconnect and reconnect" + ); + + // onConnection should have been called again + test.equal( + onConnectionCallCount, + 2, + "onConnection should be called again after graceful disconnect" + ); + + handle.stop(); + clientConn.disconnect(); + }); + } +); + +// Test that receivedCount mismatch causes new session (not resume) +Tinytest.addAsync( + "livedata server - DDP resumption: count mismatch creates new session", + async function (test) { + await withTestGracePeriod(async () => { + let onConnectionCallCount = 0; + + const handle = Meteor.onConnection(function (conn) { + onConnectionCallCount++; + }); + + // Create initial connection + const clientConn = DDP.connect(Meteor.absoluteUrl(), { retry: false }); + + // Wait for connection with timeout + await pollUntil(() => clientConn._lastSessionId); + + const originalSessionId = clientConn._lastSessionId; + test.equal(onConnectionCallCount, 1, "onConnection should be called once on initial connect"); + + // Get the server session + const serverSession = Meteor.server.sessions.get(originalSessionId); + test.isTrue(serverSession, "Server session should exist"); + + // Artificially increment sentCount to create a mismatch + // This simulates messages sent by server that client didn't receive + serverSession.sentCount += 5; + + // Simulate unexpected disconnect + clientConn._stream._lostConnection(); + + // Wait a bit (less than grace period) + await sleep(WITHIN_GRACE_PERIOD_MS); + + // Session should still exist during grace period + test.isTrue( + Meteor.server.sessions.has(originalSessionId), + "Session should still exist during grace period" + ); + + // Reconnect - this should NOT resume due to count mismatch + clientConn._stream.reconnect(); + + // Wait for reconnection with timeout + await pollUntil(() => clientConn.status().connected); + + // Give it a moment to process + await sleep(WITHIN_GRACE_PERIOD_MS); + + // Should have a NEW session (counts didn't match) + test.notEqual( + clientConn._lastSessionId, + originalSessionId, + "Should have a new session ID when counts mismatch" + ); + + // onConnection should have been called again (new session) + test.equal( + onConnectionCallCount, + 2, + "onConnection should be called again when counts mismatch" + ); + + handle.stop(); + clientConn.disconnect(); + }); + } +); \ No newline at end of file diff --git a/v3-docs/docs/api/meteor.md b/v3-docs/docs/api/meteor.md index 01d7010f93..d330333177 100644 --- a/v3-docs/docs/api/meteor.md +++ b/v3-docs/docs/api/meteor.md @@ -910,16 +910,17 @@ contains the following fields: security risk for this transport. For details and alternatives, see the [SockJS documentation](https://github.com/sockjs/sockjs-node#authorisation). -> Currently when a client reconnects to the server (such as after -> temporarily losing its Internet connection), it will get a new -> connection each time. The `onConnection` callbacks will be called -> again, and the new connection will have a new connection `id`. +> In previous versions of Meteor, when a client reconnects to the server (such as after temporarily losing its Internet connection), it will get a new connection each time. The `onConnection` callbacks will be called again, and the new connection will have a new connection `id`. -> In the future, when client reconnection is fully implemented, -> reconnecting from the client will reconnect to the same connection on -> the server: the `onConnection` callback won't be called for that -> connection again, and the connection will still have the same -> connection `id`. +> With the new client reconnection feature ([DDP resumption](https://github.com/meteor/meteor/pull/13378)) introduced in Meteor version 3.4, the client will attempt to automatically resume the previous connection to the server without calling the `onConnection` callback again and the connection will still keep the previous connection `id`. This functionality is controlled by the following new server options: + +### Meteor.server.options.disconnectGracePeriod + +Defines how long (in milliseconds) we should maintain a session for after a non-graceful disconnect before destroying it. Sessions that reconnect within this time will be resumed with minimal performance impact. Defaults to `15000`. + +### Meteor.server.options.maxMessageQueueLength + +Determines how many messages we should queue during a non-graceful disconnect before we destroy the session, to insure against memory leaks. Defaults to `100`. From c5b725020b1c815fea0d064b3a2c96cae9fe8753 Mon Sep 17 00:00:00 2001 From: Vlad Lasky Date: Fri, 12 Dec 2025 12:12:13 +1100 Subject: [PATCH 2/3] Fix flaky async publish cursor test on page reload The test was failing when the browser page was reloaded during test runs because the subscription ready callback could fire before the added messages arrived. This can happen when a previous test run was interrupted and the server is still processing the old session. The fix adds a polling mechanism to wait for data to arrive before asserting, with a 5-second timeout. --- .../ddp-server/livedata_server_async_tests.js | 21 ++++++++++++++++--- 1 file changed, 18 insertions(+), 3 deletions(-) diff --git a/packages/ddp-server/livedata_server_async_tests.js b/packages/ddp-server/livedata_server_async_tests.js index 4ca4ca0864..3606b66044 100644 --- a/packages/ddp-server/livedata_server_async_tests.js +++ b/packages/ddp-server/livedata_server_async_tests.js @@ -168,9 +168,24 @@ Tinytest.addAsync('livedata server - async publish cursor', function( connection: clientConn, }); clientConn.subscribe('asyncPublishCursor', async () => { - const actual = await remoteCollection.find().fetch(); - test.equal(actual[0].name, 'async'); - onComplete(); + // Wait for data to arrive - the subscription is ready but data may still be in transit + // This can happen when a previous test run was interrupted (page reload) and the + // server is still processing the old session's grace period + let attempts = 0; + const maxAttempts = 50; // 5 seconds max wait + const checkData = async () => { + const actual = await remoteCollection.find().fetch(); + if (actual.length > 0) { + test.equal(actual[0].name, 'async'); + onComplete(); + } else if (attempts++ < maxAttempts) { + setTimeout(checkData, 100); + } else { + test.fail('Timed out waiting for data in async publish cursor test'); + onComplete(); + } + }; + await checkData(); }); }); }); From 4984d3bf1e9c45e9e4d270e26f51457eded404ec Mon Sep 17 00:00:00 2001 From: Vlad Lasky Date: Tue, 23 Dec 2025 17:01:41 +1100 Subject: [PATCH 3/3] Address PR review feedback MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Fix wording: "insure" → "to help prevent" in docs and JSDoc - Extract heartbeat cleanup to _stopHeartbeat() helper method - Simplify disconnect handling: remove redundant condition check - Update documentation: add Reconnection section, use "Meteor 3.5+" instead of time-sensitive language, update PR link to #14051 - Update comment at line 1480 to reflect session resume/create behavior - Replace delete statements with = undefined for V8 JIT optimization - Remove unused variable clientConn in test - Add comment clarifying _permanent flag purpose in close() --- .../ddp-client/common/livedata_connection.js | 1 + packages/ddp-server/livedata_server.js | 34 +++++++++---------- packages/ddp-server/livedata_server_tests.js | 2 +- v3-docs/docs/api/meteor.md | 8 +++-- 4 files changed, 24 insertions(+), 21 deletions(-) diff --git a/packages/ddp-client/common/livedata_connection.js b/packages/ddp-client/common/livedata_connection.js index 9cbbe0ee77..3f508eea34 100644 --- a/packages/ddp-client/common/livedata_connection.js +++ b/packages/ddp-client/common/livedata_connection.js @@ -1091,6 +1091,7 @@ export class Connection { } close() { + // _permanent is used by the underlying stream to prevent reconnection attempts return this.disconnect({ _permanent: true }); } diff --git a/packages/ddp-server/livedata_server.js b/packages/ddp-server/livedata_server.js index d3cba79ad7..2ffb63a223 100644 --- a/packages/ddp-server/livedata_server.js +++ b/packages/ddp-server/livedata_server.js @@ -287,6 +287,14 @@ Object.assign(Session.prototype, { } }, + // Stop heartbeat if running + _stopHeartbeat: function () { + if (this.heartbeat) { + this.heartbeat.stop(); + this.heartbeat = null; + } + }, + // Destroy this session and unregister it at the server. close: function () { const self = this; @@ -314,10 +322,7 @@ Object.assign(Session.prototype, { // Stop heartbeat immediately - we don't need it during the grace period // since we have no socket to send pings on anyway. - if (self.heartbeat) { - self.heartbeat.stop(); - self.heartbeat = null; - } + self._stopHeartbeat(); self.server._removeSession(self, () => { Package['facts-base'] && Package['facts-base'].Facts.incrementServerFact( @@ -326,10 +331,7 @@ Object.assign(Session.prototype, { self.inQueue = null; self.collectionViews = new Map(); - if (self.heartbeat) { - self.heartbeat.stop(); - self.heartbeat = null; - } + self._stopHeartbeat(); Meteor.defer(function () { // stop callbacks can yield, so we defer this on close. @@ -425,10 +427,8 @@ Object.assign(Session.prototype, { } if (msg_in.msg === 'disconnect') { - if (msg_in.msg in self.protocol_handlers) { - // we want to pre-empt the queue - a disconnect is imminent. - return self.protocol_handlers[msg_in.msg].call(self, msg_in, () => {}); - } + // Pre-empt the queue - a disconnect is imminent. + return self.protocol_handlers.disconnect.call(self, msg_in, () => {}); } self.inQueue.push(msg_in); @@ -1296,7 +1296,7 @@ Server = function (options = {}) { respondToPings: true, defaultPublicationStrategy: publicationStrategies.SERVER_MERGE, /** - * @summary How many messages should we queue during a non-graceful disconnect before we kill the session (to insure against memory leaks). + * @summary How many messages should we queue during a non-graceful disconnect before we destroy the session, to help prevent memory leaks. * @type {Number} * @locus Server */ @@ -1477,7 +1477,7 @@ Object.assign(Server.prototype, { return; } - // Yay, version matches! Create a new session. + // Yay, version matches! Resume existing session if possible, otherwise create a new one. // Note: Troposphere depends on the ability to mutate // Meteor.server.options.heartbeatTimeout! This is a hack, but it's life. const existingSession = self.sessions.get(msg.session); @@ -1488,12 +1488,12 @@ Object.assign(Server.prototype, { // was disconnected and hasn't been reconnected to yet. if (existingSession && existingSession.sentCount === msg.receivedCount && existingSession._removeTimeoutHandle) { Meteor.clearTimeout(existingSession._removeTimeoutHandle); - delete existingSession._removeTimeoutHandle; - delete existingSession._pendingRemoveFunction; + existingSession._removeTimeoutHandle = undefined; + existingSession._pendingRemoveFunction = undefined; existingSession._isClosing = false; // Reset so session can be closed again later socket._meteorSession = existingSession; const messageQueue = existingSession.messageQueue; - delete existingSession.messageQueue; + existingSession.messageQueue = undefined; existingSession.socket = socket; // Restart heartbeat for the resumed session diff --git a/packages/ddp-server/livedata_server_tests.js b/packages/ddp-server/livedata_server_tests.js index d903e5a74c..48313dcbcb 100644 --- a/packages/ddp-server/livedata_server_tests.js +++ b/packages/ddp-server/livedata_server_tests.js @@ -701,7 +701,7 @@ Tinytest.addAsync( "livedata server - DDP resumption: server-initiated close removes session immediately", async function (test) { await withTestGracePeriod(async () => { - const { clientConn, serverConn } = await getTestConnections(test); + const { serverConn } = await getTestConnections(test); const originalSessionId = serverConn.id; // Verify the session exists diff --git a/v3-docs/docs/api/meteor.md b/v3-docs/docs/api/meteor.md index d330333177..62d85b2754 100644 --- a/v3-docs/docs/api/meteor.md +++ b/v3-docs/docs/api/meteor.md @@ -910,9 +910,11 @@ contains the following fields: security risk for this transport. For details and alternatives, see the [SockJS documentation](https://github.com/sockjs/sockjs-node#authorisation). -> In previous versions of Meteor, when a client reconnects to the server (such as after temporarily losing its Internet connection), it will get a new connection each time. The `onConnection` callbacks will be called again, and the new connection will have a new connection `id`. +## Reconnection -> With the new client reconnection feature ([DDP resumption](https://github.com/meteor/meteor/pull/13378)) introduced in Meteor version 3.4, the client will attempt to automatically resume the previous connection to the server without calling the `onConnection` callback again and the connection will still keep the previous connection `id`. This functionality is controlled by the following new server options: +Meteor 3.5+ supports [DDP session resumption](https://github.com/meteor/meteor/pull/14051), allowing clients to automatically resume their previous connection after a temporary network disconnect. When a client reconnects within the grace period, the `onConnection` callback is not called again and the connection retains its original `id`. + +This behavior is controlled by the following server options: ### Meteor.server.options.disconnectGracePeriod @@ -920,7 +922,7 @@ Defines how long (in milliseconds) we should maintain a session for after a non- ### Meteor.server.options.maxMessageQueueLength -Determines how many messages we should queue during a non-graceful disconnect before we destroy the session, to insure against memory leaks. Defaults to `100`. +Determines how many messages we should queue during a non-graceful disconnect before we destroy the session, to help prevent memory leaks. Defaults to `100`.