diff --git a/packages/ddp-client/common/connection_stream_handlers.js b/packages/ddp-client/common/connection_stream_handlers.js index bfef109ff0..d4fa785df6 100644 --- a/packages/ddp-client/common/connection_stream_handlers.js +++ b/packages/ddp-client/common/connection_stream_handlers.js @@ -33,6 +33,11 @@ export class ConnectionStreamHandlers { return; } + // Track received message count for session resumption (excluding ping/pong) + if (!this._connection._ignoredMsgsForSessionOutOfDateCheck.includes(msg.msg)) { + this._connection._receivedCount++; + } + // Important: This was missing from previous version // We need to set the current version before routing the message if (msg.msg === 'connected') { @@ -139,6 +144,7 @@ export class ConnectionStreamHandlers { const msg = { msg: 'connect' }; if (this._connection._lastSessionId) { msg.session = this._connection._lastSessionId; + msg.receivedCount = this._connection._receivedCount; } msg.version = this._connection._versionSuggestion || this._connection._supportedDDPVersions[0]; this._connection._versionSuggestion = msg.version; diff --git a/packages/ddp-client/common/livedata_connection.js b/packages/ddp-client/common/livedata_connection.js index 9755a8012a..3f508eea34 100644 --- a/packages/ddp-client/common/livedata_connection.js +++ b/packages/ddp-client/common/livedata_connection.js @@ -93,6 +93,10 @@ export class Connection { } self._lastSessionId = null; + // how many messages we've received (excluding ping/pong). + // when we try to reconnect to the server, it will check this against the number of messages it sent. + // if there is a mismatch, our info is out of date and we need a clean session. + self._receivedCount = 0; self._versionSuggestion = null; // The last proposed DDP version. self._version = null; // The DDP version agreed on by client and server. self._stores = Object.create(null); // name -> object with methods @@ -102,6 +106,7 @@ export class Connection { self._heartbeatInterval = options.heartbeatInterval; self._heartbeatTimeout = options.heartbeatTimeout; + self._ignoredMsgsForSessionOutOfDateCheck = ['ping', 'pong']; // Tracks methods which the user has tried to call but which have not yet // called their user callback (ie, they are waiting on their result or for all @@ -1081,11 +1086,13 @@ export class Connection { * @locus Client */ disconnect(...args) { + this._send({ msg: 'disconnect' }); return this._stream.disconnect(...args); } close() { - return this._stream.disconnect({ _permanent: true }); + // _permanent is used by the underlying stream to prevent reconnection attempts + return this.disconnect({ _permanent: true }); } /// diff --git a/packages/ddp-client/common/message_processors.js b/packages/ddp-client/common/message_processors.js index 09b13f742e..0fbe7ece98 100644 --- a/packages/ddp-client/common/message_processors.js +++ b/packages/ddp-client/common/message_processors.js @@ -43,10 +43,15 @@ export class MessageProcessors { if (reconnectedToPreviousSession) { // Successful reconnection -- pick up where we left off. + // Don't reset stores since we're continuing the same session. + self._resetStores = false; return; } // Server doesn't have our data anymore. Re-sync a new session. + // Reset the received count since we're starting a new session. + // Set to 1 because the 'connected' message itself counts. + self._receivedCount = 1; // Forget about messages we were buffering for unknown collections. They'll // be resent if still relevant. diff --git a/packages/ddp-client/test/livedata_connection_tests.js b/packages/ddp-client/test/livedata_connection_tests.js index 94994d3fbe..69a8ebce0c 100644 --- a/packages/ddp-client/test/livedata_connection_tests.js +++ b/packages/ddp-client/test/livedata_connection_tests.js @@ -20,14 +20,16 @@ const newConnection = function(stream, options) { ); }; -const makeConnectMessage = function(session) { +const makeConnectMessage = function(session, receivedCount) { const msg = { msg: 'connect', version: DDPCommon.SUPPORTED_DDP_VERSIONS[0], - support: DDPCommon.SUPPORTED_DDP_VERSIONS + support: DDPCommon.SUPPORTED_DDP_VERSIONS, }; if (session) msg.session = session; + if (receivedCount) msg.receivedCount = receivedCount; + return msg; }; @@ -869,7 +871,7 @@ Tinytest.addAsync('livedata stub - reconnect', async function(test, onComplete) // sub. The wait method still is blocked. await stream.reset(); - testGotMessage(test, stream, makeConnectMessage(SESSION_ID)); + testGotMessage(test, stream, makeConnectMessage(SESSION_ID, conn._receivedCount)); testGotMessage(test, stream, methodMessage); testGotMessage(test, stream, subMessage); @@ -990,7 +992,7 @@ if (Meteor.isClient) { await stream.reset(); // verify that a reconnect message was sent. - testGotMessage(test, stream, makeConnectMessage(SESSION_ID)); + testGotMessage(test, stream, makeConnectMessage(SESSION_ID, conn._receivedCount)); // Make sure that the stream triggers connection. await stream.receive({ msg: 'connected', session: SESSION_ID + 1 }); @@ -1114,7 +1116,7 @@ if (Meteor.isClient) { // in. Reconnect quiescence happens as soon as 'connected' is received because // there are no pending methods or subs in need of revival. await stream.reset(); - testGotMessage(test, stream, makeConnectMessage(SESSION_ID)); + testGotMessage(test, stream, makeConnectMessage(SESSION_ID, conn._receivedCount)); // Still holding out hope for session resumption, so nothing updated yet. test.equal(coll.find().count(), 1); test.equal(await coll.findOneAsync(stubWrittenId), { @@ -1209,7 +1211,7 @@ if (Meteor.isClient) { // but slowMethod gets called via onReconnect. Reconnect quiescence is now // blocking on slowMethod. await stream.reset(); - testGotMessage(test, stream, makeConnectMessage(SESSION_ID + 1)); + testGotMessage(test, stream, makeConnectMessage(SESSION_ID + 1, conn._receivedCount)); const slowMethodId = testGotMessage(test, stream, { msg: 'method', method: 'slowMethod', @@ -1330,7 +1332,7 @@ Tinytest.addAsync('livedata stub - reconnect method which only got data', async // Reset stream. Method gets resent (with same ID), and blocks reconnect // quiescence. await stream.reset(); - testGotMessage(test, stream, makeConnectMessage(SESSION_ID)); + testGotMessage(test, stream, makeConnectMessage(SESSION_ID, conn._receivedCount)); testGotMessage(test, stream, { msg: 'method', method: 'doLittle', @@ -1807,7 +1809,7 @@ addReconnectTests( // reconnect stream.sent = []; await stream.reset(); - testGotMessage(test, stream, makeConnectMessage(conn._lastSessionId)); + testGotMessage(test, stream, makeConnectMessage(conn._lastSessionId, conn._receivedCount)); // Test that we sent what we expect to send, and we're blocked on // what we expect to be blocked. The subsequent logic to correctly @@ -2033,7 +2035,7 @@ addReconnectTests( // reconnect stream.sent = []; await stream.reset(); - testGotMessage(test, stream, makeConnectMessage(conn._lastSessionId)); + testGotMessage(test, stream, makeConnectMessage(conn._lastSessionId, conn._receivedCount)); // Test that we sent what we expect to send, and we're blocked on // what we expect to be blocked. The subsequent logic to correctly @@ -2084,7 +2086,7 @@ addReconnectTests( // initial connect stream.sent = []; await stream.reset(); - testGotMessage(test, stream, makeConnectMessage(conn._lastSessionId)); + testGotMessage(test, stream, makeConnectMessage(conn._lastSessionId, conn._receivedCount)); // Test that we sent just the login message. const loginId = testGotMessage(test, stream, { @@ -2152,7 +2154,7 @@ addReconnectTests('livedata stub - reconnect double wait method', async function // Reset stream. halfwayMethod does NOT get resent, but reconnectMethod does! // Reconnect quiescence happens when reconnectMethod is done. await stream.reset(); - testGotMessage(test, stream, makeConnectMessage(SESSION_ID)); + testGotMessage(test, stream, makeConnectMessage(SESSION_ID, conn._receivedCount)); const reconnectId = testGotMessage(test, stream, { msg: 'method', method: 'reconnectMethod', @@ -2257,7 +2259,7 @@ Tinytest.addAsync('livedata stub - subscribe errors', async function(test) { // stream reset: reconnect! await stream.reset(); // We send a connect. - testGotMessage(test, stream, makeConnectMessage(SESSION_ID)); + testGotMessage(test, stream, makeConnectMessage(SESSION_ID, conn._receivedCount)); // We should NOT re-sub to the sub, because we processed the error. test.length(stream.sent, 0); test.isFalse(onReadyFired); @@ -2376,7 +2378,7 @@ if (Meteor.isClient) { // Initiate reconnect. await stream.reset(); - testGotMessage(test, stream, makeConnectMessage(SESSION_ID)); + testGotMessage(test, stream, makeConnectMessage(SESSION_ID, conn._receivedCount)); testGotMessage(test, stream, subMessage); await stream.receive({ msg: 'connected', session: SESSION_ID + 1 }); @@ -2559,8 +2561,137 @@ if (Meteor.isClient) { ); } +// ============================================================================ +// DDP Session Resumption Tests (Client-side) +// ============================================================================ + +Tinytest.addAsync('livedata connection - receivedCount tracking', async function(test) { + const stream = new StubStream(); + const conn = newConnection(stream); + + // Initially receivedCount should be 0 + test.equal(conn._receivedCount, 0); + + await startAndConnect(test, stream); + + // After receiving 'connected', receivedCount should be 1 + // (the 'connected' message itself is counted) + test.equal(conn._receivedCount, 1); + + // Receive some data messages + await stream.receive({ msg: 'added', collection: 'test', id: '1', fields: { a: 1 } }); + test.equal(conn._receivedCount, 2); + + await stream.receive({ msg: 'added', collection: 'test', id: '2', fields: { b: 2 } }); + test.equal(conn._receivedCount, 3); + + // Ping/pong should NOT increment receivedCount + await stream.receive({ msg: 'ping', id: 'ping1' }); + test.equal(conn._receivedCount, 3, "ping should not increment receivedCount"); + + await stream.receive({ msg: 'pong', id: 'pong1' }); + test.equal(conn._receivedCount, 3, "pong should not increment receivedCount"); + + // More data messages should continue incrementing + await stream.receive({ msg: 'changed', collection: 'test', id: '1', fields: { a: 2 } }); + test.equal(conn._receivedCount, 4); +}); + +Tinytest.addAsync('livedata connection - receivedCount sent on reconnect', async function(test) { + const stream = new StubStream(); + const conn = newConnection(stream); + + await startAndConnect(test, stream); + + // Receive some messages to build up receivedCount + await stream.receive({ msg: 'added', collection: 'test', id: '1', fields: {} }); + await stream.receive({ msg: 'added', collection: 'test', id: '2', fields: {} }); + await stream.receive({ msg: 'ready', subs: ['sub1'] }); + + const expectedReceivedCount = conn._receivedCount; + test.equal(expectedReceivedCount, 4); // connected + 3 messages + + // Simulate disconnect and reconnect + await stream.reset(); + + // The connect message should include the receivedCount + const connectMsg = JSON.parse(stream.sent.shift()); + test.equal(connectMsg.msg, 'connect'); + test.equal(connectMsg.session, SESSION_ID); + test.equal(connectMsg.receivedCount, expectedReceivedCount, + "Connect message should include receivedCount for session resumption"); +}); + +Tinytest.addAsync('livedata connection - receivedCount reset on new session', async function(test) { + const stream = new StubStream(); + const conn = newConnection(stream); + + await startAndConnect(test, stream); + + // Build up some receivedCount + await stream.receive({ msg: 'added', collection: 'test', id: '1', fields: {} }); + await stream.receive({ msg: 'added', collection: 'test', id: '2', fields: {} }); + test.equal(conn._receivedCount, 3); + + // Simulate reconnect + await stream.reset(); + stream.sent.shift(); // consume connect message + + // Server responds with a DIFFERENT session (new session, not resumed) + const newSessionId = SESSION_ID + '_new'; + await stream.receive({ msg: 'connected', session: newSessionId }); + + // receivedCount should be reset to 1 (counting the new connected message) + test.equal(conn._receivedCount, 1, + "receivedCount should be reset to 1 when getting a new session"); + test.equal(conn._lastSessionId, newSessionId); +}); + +Tinytest.addAsync('livedata connection - receivedCount preserved on session resume', async function(test) { + const stream = new StubStream(); + const conn = newConnection(stream); + + await startAndConnect(test, stream); + + // Build up some receivedCount + await stream.receive({ msg: 'added', collection: 'test', id: '1', fields: {} }); + await stream.receive({ msg: 'added', collection: 'test', id: '2', fields: {} }); + const countBeforeDisconnect = conn._receivedCount; + test.equal(countBeforeDisconnect, 3); + + // Simulate reconnect + await stream.reset(); + stream.sent.shift(); // consume connect message + + // Server responds with the SAME session (resumed) + await stream.receive({ msg: 'connected', session: SESSION_ID }); + + // receivedCount should continue from where it was (plus the connected message) + test.equal(conn._receivedCount, countBeforeDisconnect + 1, + "receivedCount should continue incrementing on session resume"); + test.equal(conn._lastSessionId, SESSION_ID); +}); + +Tinytest.addAsync('livedata connection - disconnect sends disconnect message', async function(test) { + const stream = new StubStream(); + const conn = newConnection(stream); + + await startAndConnect(test, stream); + + // Clear any pending messages + stream.sent.length = 0; + + // Call disconnect + conn.disconnect(); + + // Should have sent a disconnect message + test.isTrue(stream.sent.length > 0, "Should have sent at least one message"); + const disconnectMsg = JSON.parse(stream.sent.shift()); + test.equal(disconnectMsg.msg, 'disconnect', + "disconnect() should send a disconnect message to the server"); +}); + // XXX also test: -// - reconnect, with session resume. // - restart on update flag // - on_update event // - reloading when the app changes, including session migration \ No newline at end of file diff --git a/packages/ddp-client/test/stub_stream.js b/packages/ddp-client/test/stub_stream.js index 43c05127fa..3e57b5ed4f 100644 --- a/packages/ddp-client/test/stub_stream.js +++ b/packages/ddp-client/test/stub_stream.js @@ -27,6 +27,10 @@ Object.assign(StubStream.prototype, { // no-op }, + disconnect: function() { + // no-op - for testing Connection.disconnect() + }, + _lostConnection: function() { // no-op }, diff --git a/packages/ddp-server/livedata_server.js b/packages/ddp-server/livedata_server.js index 3f5efceafb..2ffb63a223 100644 --- a/packages/ddp-server/livedata_server.js +++ b/packages/ddp-server/livedata_server.js @@ -81,11 +81,16 @@ var Session = function (server, version, socket, options) { var self = this; self.id = Random.id(); + // how many messages we've actually sent (not queued to send) excluding ping/pong + // we'll use this to detect mismatch of data on reconnect. + self.sentCount = 0; + self.server = server; self.version = version; self.initialized = false; self.socket = socket; + self.options = options; // Set to null when the session is destroyed. Multiple places below // use this to determine if the session is alive or not. @@ -134,6 +139,8 @@ var Session = function (server, version, socket, options) { self.connectionHandle = { id: self.id, close: function () { + // Server-initiated close should not be resumable + self._expectingDisconnect = true; self.close(); }, onClose: function (fn) { @@ -175,6 +182,8 @@ var Session = function (server, version, socket, options) { "livedata", "sessions", 1); }; +const ignoredMsgsForSessionOutOfDateCheck = ['ping', 'pong']; + Object.assign(Session.prototype, { sendReady: function (subscriptionIds) { var self = this; @@ -269,77 +278,103 @@ Object.assign(Session.prototype, { }, startUniversalSubs: function () { - var self = this; + const self = this; // Make a shallow copy of the set of universal handlers and start them. If // additional universal publishers start while we're running them (due to // yielding), they will run separately as part of Server.publish. - var handlers = [...self.server.universal_publish_handlers]; - handlers.forEach(function (handler) { + for (const handler of [...self.server.universal_publish_handlers]) { self._startSubscription(handler); - }); + } + }, + + // Stop heartbeat if running + _stopHeartbeat: function () { + if (this.heartbeat) { + this.heartbeat.stop(); + this.heartbeat = null; + } }, // Destroy this session and unregister it at the server. close: function () { - var self = this; + const self = this; // Destroy this session, even if it's not registered at the // server. Stop all processing and tear everything down. If a socket // was attached, close it. - // Already destroyed. - if (! self.inQueue) + // Already closing or closed - prevent multiple close() calls + if (self._isClosing) { return; + } + self._isClosing = true; - // Drop the merge box data immediately. - self.inQueue = null; - self.collectionViews = new Map(); - - if (self.heartbeat) { - self.heartbeat.stop(); - self.heartbeat = null; + if (self._removeTimeoutHandle) { + Meteor.clearTimeout(self._removeTimeoutHandle); + self._removeTimeoutHandle = null; } if (self.socket) { self.socket.close(); self.socket._meteorSession = null; + self.socket = null; } - Package['facts-base'] && Package['facts-base'].Facts.incrementServerFact( - "livedata", "sessions", -1); + // Stop heartbeat immediately - we don't need it during the grace period + // since we have no socket to send pings on anyway. + self._stopHeartbeat(); - Meteor.defer(function () { - // Stop callbacks can yield, so we defer this on close. - // sub._isDeactivated() detects that we set inQueue to null and - // treats it as semi-deactivated (it will ignore incoming callbacks, etc). - self._deactivateAllSubscriptions(); + self.server._removeSession(self, () => { + Package['facts-base'] && Package['facts-base'].Facts.incrementServerFact( + "livedata", "sessions", -1); - // Defer calling the close callbacks, so that the caller closing - // the session isn't waiting for all the callbacks to complete. - self._closeCallbacks.forEach(function (callback) { - callback(); + self.inQueue = null; + self.collectionViews = new Map(); + + self._stopHeartbeat(); + + Meteor.defer(function () { + // stop callbacks can yield, so we defer this on close. + // sub._isDeactivated() detects that we set inQueue to null and + // treats it as semi-deactivated (it will ignore incoming callbacks, etc). + self._deactivateAllSubscriptions(); + + // Defer calling the close callbacks, so that the caller closing + // the session isn't waiting for all the callbacks to complete. + self._closeCallbacks.forEach(callback => { + callback(); + }); }); }); - - // Unregister the session. - self.server._removeSession(self); }, // Send a message (doing nothing if no socket is connected right now). // It should be a JSON object (it will be stringified). send: function (msg) { const self = this; + const isIgnoredMsg = ignoredMsgsForSessionOutOfDateCheck.includes(msg.msg); + if (self.messageQueue && !isIgnoredMsg) { + self.messageQueue.push(msg); + if (self.messageQueue.length > self.options.maxMessageQueueLength) { + Meteor.clearTimeout(self._removeTimeoutHandle); + self._pendingRemoveFunction(); + } + return; + } if (self.socket) { if (Meteor._printSentDDP) Meteor._debug("Sent DDP", DDPCommon.stringifyDDP(msg)); + if (!isIgnoredMsg) { + self.sentCount++; + } self.socket.send(DDPCommon.stringifyDDP(msg)); } }, // Send a connection error. sendError: function (reason, offendingMessage) { - var self = this; - var msg = {msg: 'error', reason: reason}; + const self = this; + const msg = {msg: 'error', reason: reason}; if (offendingMessage) msg.offendingMessage = offendingMessage; self.send(msg); @@ -379,7 +414,7 @@ Object.assign(Session.prototype, { // the client is still alive. if (self.heartbeat) { self.heartbeat.messageReceived(); - }; + } if (self.version !== 'pre1' && msg_in.msg === 'ping') { if (self._respondToPings) @@ -391,6 +426,11 @@ Object.assign(Session.prototype, { return; } + if (msg_in.msg === 'disconnect') { + // Pre-empt the queue - a disconnect is imminent. + return self.protocol_handlers.disconnect.call(self, msg_in, () => {}); + } + self.inQueue.push(msg_in); if (self.workerRunning) return; @@ -444,6 +484,9 @@ Object.assign(Session.prototype, { }, protocol_handlers: { + disconnect: function(msg) { + this._expectingDisconnect = true; + }, sub: async function (msg, unblock) { var self = this; @@ -1252,6 +1295,19 @@ Server = function (options = {}) { // For testing, allow responding to pings to be disabled. respondToPings: true, defaultPublicationStrategy: publicationStrategies.SERVER_MERGE, + /** + * @summary How many messages should we queue during a non-graceful disconnect before we destroy the session, to help prevent memory leaks. + * @type {Number} + * @locus Server + */ + maxMessageQueueLength: 100, + /** + * @summary How long we should maintain a session for after a non-graceful disconnect before killing it + * sessions that reconnect within this time will be resumed with minimal performance impact. + * @type {Number} + * @locus Server + */ + disconnectGracePeriod: 15000, ...options, }; @@ -1421,16 +1477,66 @@ Object.assign(Server.prototype, { return; } - // Yay, version matches! Create a new session. + // Yay, version matches! Resume existing session if possible, otherwise create a new one. // Note: Troposphere depends on the ability to mutate // Meteor.server.options.heartbeatTimeout! This is a hack, but it's life. - socket._meteorSession = new Session(self, version, socket, self.options); - self.sessions.set(socket._meteorSession.id, socket._meteorSession); - self.onConnectionHook.each(function (callback) { - if (socket._meteorSession) - callback(socket._meteorSession.connectionHandle); - return true; - }); + const existingSession = self.sessions.get(msg.session); + + // we've found a session with: + // the right ID + // a matching sent/received count + // was disconnected and hasn't been reconnected to yet. + if (existingSession && existingSession.sentCount === msg.receivedCount && existingSession._removeTimeoutHandle) { + Meteor.clearTimeout(existingSession._removeTimeoutHandle); + existingSession._removeTimeoutHandle = undefined; + existingSession._pendingRemoveFunction = undefined; + existingSession._isClosing = false; // Reset so session can be closed again later + socket._meteorSession = existingSession; + const messageQueue = existingSession.messageQueue; + existingSession.messageQueue = undefined; + existingSession.socket = socket; + + // Restart heartbeat for the resumed session + if (existingSession.version !== 'pre1' && self.options.heartbeatInterval !== 0) { + socket.setWebsocketTimeout(0); + existingSession.heartbeat = new DDPCommon.Heartbeat({ + heartbeatInterval: self.options.heartbeatInterval, + heartbeatTimeout: self.options.heartbeatTimeout, + onTimeout: function () { + existingSession.close(); + }, + sendPing: function () { + existingSession.send({msg: 'ping'}); + } + }); + existingSession.heartbeat.start(); + } + + // Send connected message so client can restart heartbeat and confirm resumption + existingSession.send({ msg: 'connected', session: existingSession.id }); + if (messageQueue) { + Meteor.defer(() => { + messageQueue.forEach(msg => existingSession.send(msg)); + }); + } + // Note: onConnectionHook is NOT called on session resume - the connection + // is considered to be the same logical connection as before. + } + else { + // immediately remove the old session since we're out of date. + if (existingSession && existingSession._pendingRemoveFunction) { + Meteor.clearTimeout(existingSession._removeTimeoutHandle); + existingSession._pendingRemoveFunction(); + } + socket._meteorSession = new Session(self, version, socket, self.options); + self.sessions.set(socket._meteorSession.id, socket._meteorSession); + + self.onConnectionHook.each(function (callback) { + if (socket._meteorSession) + callback(socket._meteorSession.connectionHandle); + return true; + }); + } }, /** * Register a publish handler function. @@ -1520,9 +1626,31 @@ Object.assign(Server.prototype, { } }, - _removeSession: function (session) { + _removeSession: function (session, callback = () => {}) { var self = this; - self.sessions.delete(session.id); + const sessionRemoveFunction = () => { + // Guard against being called multiple times (e.g., from both overflow and timeout) + if (!self.sessions.has(session.id)) { + return; + } + // Clear timeout handle if it exists to prevent double execution + if (session._removeTimeoutHandle) { + Meteor.clearTimeout(session._removeTimeoutHandle); + session._removeTimeoutHandle = null; + } + session._pendingRemoveFunction = null; + self.sessions.delete(session.id); + callback(); + }; + if (session._expectingDisconnect) { + return sessionRemoveFunction(); + } + session.messageQueue = []; + session._pendingRemoveFunction = sessionRemoveFunction; + if (session._removeTimeoutHandle) { + Meteor.clearTimeout(session._removeTimeoutHandle); + } + session._removeTimeoutHandle = Meteor.setTimeout(sessionRemoveFunction, self.options.disconnectGracePeriod); }, /** diff --git a/packages/ddp-server/livedata_server_async_tests.js b/packages/ddp-server/livedata_server_async_tests.js index 4ca4ca0864..3606b66044 100644 --- a/packages/ddp-server/livedata_server_async_tests.js +++ b/packages/ddp-server/livedata_server_async_tests.js @@ -168,9 +168,24 @@ Tinytest.addAsync('livedata server - async publish cursor', function( connection: clientConn, }); clientConn.subscribe('asyncPublishCursor', async () => { - const actual = await remoteCollection.find().fetch(); - test.equal(actual[0].name, 'async'); - onComplete(); + // Wait for data to arrive - the subscription is ready but data may still be in transit + // This can happen when a previous test run was interrupted (page reload) and the + // server is still processing the old session's grace period + let attempts = 0; + const maxAttempts = 50; // 5 seconds max wait + const checkData = async () => { + const actual = await remoteCollection.find().fetch(); + if (actual.length > 0) { + test.equal(actual[0].name, 'async'); + onComplete(); + } else if (attempts++ < maxAttempts) { + setTimeout(checkData, 100); + } else { + test.fail('Timed out waiting for data in async publish cursor test'); + onComplete(); + } + }; + await checkData(); }); }); }); diff --git a/packages/ddp-server/livedata_server_tests.js b/packages/ddp-server/livedata_server_tests.js index 15b0349e87..48313dcbcb 100644 --- a/packages/ddp-server/livedata_server_tests.js +++ b/packages/ddp-server/livedata_server_tests.js @@ -1,3 +1,38 @@ +// Helper to temporarily set disconnectGracePeriod for DDP resumption tests +// This ensures test isolation - other tests run with the default grace period +const DEFAULT_GRACE_PERIOD = Meteor.server.options.disconnectGracePeriod; +const TEST_GRACE_PERIOD = 5000; // Short grace period for fast tests (ms) +// Derived timing constants to avoid hardcoding throughout tests +const WITHIN_GRACE_PERIOD_MS = Math.floor(TEST_GRACE_PERIOD / 4); // Well within grace period +const AFTER_GRACE_PERIOD_MS = Math.ceil(TEST_GRACE_PERIOD * 1.5); // After grace period expires +const POLL_TIMEOUT_MS = TEST_GRACE_PERIOD * 2; // Max time to wait for async operations before failing + +async function withTestGracePeriod(fn) { + const previous = Meteor.server.options.disconnectGracePeriod; + Meteor.server.options.disconnectGracePeriod = TEST_GRACE_PERIOD; + try { + await fn(); + } finally { + Meteor.server.options.disconnectGracePeriod = previous ?? DEFAULT_GRACE_PERIOD; + } +} + +// Helper to poll for a condition with timeout to prevent hanging tests +function pollUntil(conditionFn, timeoutMs = POLL_TIMEOUT_MS) { + return new Promise((resolve, reject) => { + const startTime = Date.now(); + const interval = setInterval(() => { + if (conditionFn()) { + clearInterval(interval); + resolve(); + } else if (Date.now() - startTime > timeoutMs) { + clearInterval(interval); + reject(new Error(`Timed out after ${timeoutMs}ms waiting for condition`)); + } + }, 10); + }); +} + Tinytest.addAsync( "livedata server - connectionHandle.onClose()", function (test, onComplete) { @@ -593,4 +628,344 @@ function getTestConnections(test) { function sleep(ms) { return new Promise(resolve => setTimeout(resolve, ms)); -} \ No newline at end of file +} + +// ============================================================================ +// DDP Session Resumption Tests +// ============================================================================ + +// Test that unexpected disconnects allow session resumption within grace period +Tinytest.addAsync( + "livedata server - DDP resumption: unexpected disconnect preserves session", + async function (test) { + await withTestGracePeriod(async () => { + const { clientConn, serverConn } = await getTestConnections(test); + const originalSessionId = serverConn.id; + + // Verify the session exists + test.isTrue(Meteor.server.sessions.has(originalSessionId)); + + // Simulate unexpected disconnect by forcing the stream to close + // without sending a disconnect message + clientConn._stream._lostConnection(); + + // Wait a bit but less than the grace period + await sleep(WITHIN_GRACE_PERIOD_MS); + + // Session should still exist during grace period + test.isTrue( + Meteor.server.sessions.has(originalSessionId), + "Session should be preserved during grace period" + ); + + // Wait for grace period to expire + await sleep(AFTER_GRACE_PERIOD_MS); + + // Session should be removed after grace period + test.isFalse( + Meteor.server.sessions.has(originalSessionId), + "Session should be removed after grace period expires" + ); + }); + } +); + +// Test that graceful disconnects (client sends disconnect message) remove session immediately +Tinytest.addAsync( + "livedata server - DDP resumption: graceful disconnect removes session immediately", + async function (test) { + await withTestGracePeriod(async () => { + const { clientConn, serverConn } = await getTestConnections(test); + const originalSessionId = serverConn.id; + + // Verify the session exists + test.isTrue(Meteor.server.sessions.has(originalSessionId)); + + // Graceful disconnect - this sends the disconnect message + clientConn.disconnect(); + + // Wait a moment for the disconnect to process + await sleep(WITHIN_GRACE_PERIOD_MS); + + // Session should be removed immediately (not waiting for grace period) + test.isFalse( + Meteor.server.sessions.has(originalSessionId), + "Session should be removed immediately after graceful disconnect" + ); + }); + } +); + +// Test that server-initiated close removes session immediately (not resumable) +Tinytest.addAsync( + "livedata server - DDP resumption: server-initiated close removes session immediately", + async function (test) { + await withTestGracePeriod(async () => { + const { serverConn } = await getTestConnections(test); + const originalSessionId = serverConn.id; + + // Verify the session exists + test.isTrue(Meteor.server.sessions.has(originalSessionId)); + + // Server-initiated close via connectionHandle.close() + serverConn.close(); + + // Wait a moment for the close to process + await sleep(WITHIN_GRACE_PERIOD_MS); + + // Session should be removed immediately (server kicks should not be resumable) + test.isFalse( + Meteor.server.sessions.has(originalSessionId), + "Session should be removed immediately after server-initiated close" + ); + }); + } +); + +// Test that onConnection hook is NOT called on session resume +Tinytest.addAsync( + "livedata server - DDP resumption: onConnection not called on resume", + async function (test) { + await withTestGracePeriod(async () => { + let onConnectionCallCount = 0; + let lastConnectionId = null; + + const handle = Meteor.onConnection(function (conn) { + onConnectionCallCount++; + lastConnectionId = conn.id; + }); + + // Create initial connection + const clientConn = DDP.connect(Meteor.absoluteUrl(), { retry: false }); + + // Wait for connection with timeout + await pollUntil(() => clientConn._lastSessionId); + + const originalSessionId = clientConn._lastSessionId; + test.equal(onConnectionCallCount, 1, "onConnection should be called once on initial connect"); + test.equal(lastConnectionId, originalSessionId); + + // Get the server session and verify it exists + const serverSession = Meteor.server.sessions.get(originalSessionId); + test.isTrue(serverSession, "Server session should exist"); + + // Simulate unexpected disconnect + clientConn._stream._lostConnection(); + + // Wait a bit (less than grace period) + await sleep(WITHIN_GRACE_PERIOD_MS); + + // Session should still exist + test.isTrue( + Meteor.server.sessions.has(originalSessionId), + "Session should still exist during grace period" + ); + + // Reconnect - this should resume the session + clientConn._stream.reconnect(); + + // Wait for reconnection with timeout + await pollUntil(() => clientConn.status().connected); + + // Give it a moment to process + await sleep(WITHIN_GRACE_PERIOD_MS); + + // IMPORTANT: Assert that session was actually resumed (same session ID) + // If this fails, the test is not actually testing resumption + test.equal( + clientConn._lastSessionId, + originalSessionId, + "Session should be resumed with same session ID" + ); + + // onConnection should NOT have been called again for a resumed session + test.equal( + onConnectionCallCount, + 1, + "onConnection should not be called again on session resume" + ); + + handle.stop(); + clientConn.disconnect(); + }); + } +); + +// Test that server-initiated close prevents session resumption +Tinytest.addAsync( + "livedata server - DDP resumption: server close prevents resumption", + async function (test) { + await withTestGracePeriod(async () => { + let onConnectionCallCount = 0; + + const handle = Meteor.onConnection(function (conn) { + onConnectionCallCount++; + }); + + // Create initial connection + const clientConn = DDP.connect(Meteor.absoluteUrl(), { retry: true }); + + // Wait for connection with timeout + await pollUntil(() => clientConn._lastSessionId); + + const originalSessionId = clientConn._lastSessionId; + test.equal(onConnectionCallCount, 1, "onConnection should be called once on initial connect"); + + // Get the server session + const serverSession = Meteor.server.sessions.get(originalSessionId); + test.isTrue(serverSession, "Server session should exist"); + + // Server-initiated close (kick the client) + serverSession.connectionHandle.close(); + + // Wait for client to reconnect with new session (retry is enabled) + await pollUntil(() => + clientConn.status().connected && clientConn._lastSessionId !== originalSessionId + ); + + // Should have a NEW session (not resumed) + test.notEqual( + clientConn._lastSessionId, + originalSessionId, + "Should have a new session ID after server-initiated close" + ); + + // onConnection should have been called again (new session, not resumed) + test.equal( + onConnectionCallCount, + 2, + "onConnection should be called again after server-initiated close" + ); + + handle.stop(); + clientConn.disconnect(); + }); + } +); + +// Test that graceful client disconnect prevents session resumption +Tinytest.addAsync( + "livedata server - DDP resumption: graceful disconnect prevents resumption", + async function (test) { + await withTestGracePeriod(async () => { + let onConnectionCallCount = 0; + + const handle = Meteor.onConnection(function (conn) { + onConnectionCallCount++; + }); + + // Create initial connection with retry enabled + const clientConn = DDP.connect(Meteor.absoluteUrl(), { retry: true }); + + // Wait for connection with timeout + await pollUntil(() => clientConn._lastSessionId); + + const originalSessionId = clientConn._lastSessionId; + test.equal(onConnectionCallCount, 1, "onConnection should be called once on initial connect"); + + // Graceful disconnect (sends disconnect message) + clientConn.disconnect(); + + // Wait for session to be removed + await sleep(WITHIN_GRACE_PERIOD_MS); + + // Session should be removed immediately + test.isFalse( + Meteor.server.sessions.has(originalSessionId), + "Session should be removed after graceful disconnect" + ); + + // Reconnect + clientConn.reconnect(); + + // Wait for reconnection with timeout + await pollUntil(() => clientConn.status().connected); + + // Should have a NEW session (not resumed, because we gracefully disconnected) + test.notEqual( + clientConn._lastSessionId, + originalSessionId, + "Should have a new session ID after graceful disconnect and reconnect" + ); + + // onConnection should have been called again + test.equal( + onConnectionCallCount, + 2, + "onConnection should be called again after graceful disconnect" + ); + + handle.stop(); + clientConn.disconnect(); + }); + } +); + +// Test that receivedCount mismatch causes new session (not resume) +Tinytest.addAsync( + "livedata server - DDP resumption: count mismatch creates new session", + async function (test) { + await withTestGracePeriod(async () => { + let onConnectionCallCount = 0; + + const handle = Meteor.onConnection(function (conn) { + onConnectionCallCount++; + }); + + // Create initial connection + const clientConn = DDP.connect(Meteor.absoluteUrl(), { retry: false }); + + // Wait for connection with timeout + await pollUntil(() => clientConn._lastSessionId); + + const originalSessionId = clientConn._lastSessionId; + test.equal(onConnectionCallCount, 1, "onConnection should be called once on initial connect"); + + // Get the server session + const serverSession = Meteor.server.sessions.get(originalSessionId); + test.isTrue(serverSession, "Server session should exist"); + + // Artificially increment sentCount to create a mismatch + // This simulates messages sent by server that client didn't receive + serverSession.sentCount += 5; + + // Simulate unexpected disconnect + clientConn._stream._lostConnection(); + + // Wait a bit (less than grace period) + await sleep(WITHIN_GRACE_PERIOD_MS); + + // Session should still exist during grace period + test.isTrue( + Meteor.server.sessions.has(originalSessionId), + "Session should still exist during grace period" + ); + + // Reconnect - this should NOT resume due to count mismatch + clientConn._stream.reconnect(); + + // Wait for reconnection with timeout + await pollUntil(() => clientConn.status().connected); + + // Give it a moment to process + await sleep(WITHIN_GRACE_PERIOD_MS); + + // Should have a NEW session (counts didn't match) + test.notEqual( + clientConn._lastSessionId, + originalSessionId, + "Should have a new session ID when counts mismatch" + ); + + // onConnection should have been called again (new session) + test.equal( + onConnectionCallCount, + 2, + "onConnection should be called again when counts mismatch" + ); + + handle.stop(); + clientConn.disconnect(); + }); + } +); \ No newline at end of file diff --git a/v3-docs/docs/api/meteor.md b/v3-docs/docs/api/meteor.md index 68f37fd31a..9cb36da796 100644 --- a/v3-docs/docs/api/meteor.md +++ b/v3-docs/docs/api/meteor.md @@ -993,16 +993,19 @@ contains the following fields: security risk for this transport. For details and alternatives, see the [SockJS documentation](https://github.com/sockjs/sockjs-node#authorisation). -> Currently when a client reconnects to the server (such as after -> temporarily losing its Internet connection), it will get a new -> connection each time. The `onConnection` callbacks will be called -> again, and the new connection will have a new connection `id`. +## Reconnection -> In the future, when client reconnection is fully implemented, -> reconnecting from the client will reconnect to the same connection on -> the server: the `onConnection` callback won't be called for that -> connection again, and the connection will still have the same -> connection `id`. +Meteor 3.5+ supports [DDP session resumption](https://github.com/meteor/meteor/pull/14051), allowing clients to automatically resume their previous connection after a temporary network disconnect. When a client reconnects within the grace period, the `onConnection` callback is not called again and the connection retains its original `id`. + +This behavior is controlled by the following server options: + +### Meteor.server.options.disconnectGracePeriod + +Defines how long (in milliseconds) we should maintain a session for after a non-graceful disconnect before destroying it. Sessions that reconnect within this time will be resumed with minimal performance impact. Defaults to `15000`. + +### Meteor.server.options.maxMessageQueueLength + +Determines how many messages we should queue during a non-graceful disconnect before we destroy the session, to help prevent memory leaks. Defaults to `100`.