Merge pull request #14051 from vlasky/feature/ddp-resumptions-rebased

Implement DDP session resumption for graceful reconnects
This commit is contained in:
Italo José
2026-03-06 16:43:09 -03:00
committed by GitHub
9 changed files with 743 additions and 69 deletions

View File

@@ -33,6 +33,11 @@ export class ConnectionStreamHandlers {
return;
}
// Track received message count for session resumption (excluding ping/pong)
if (!this._connection._ignoredMsgsForSessionOutOfDateCheck.includes(msg.msg)) {
this._connection._receivedCount++;
}
// Important: This was missing from previous version
// We need to set the current version before routing the message
if (msg.msg === 'connected') {
@@ -139,6 +144,7 @@ export class ConnectionStreamHandlers {
const msg = { msg: 'connect' };
if (this._connection._lastSessionId) {
msg.session = this._connection._lastSessionId;
msg.receivedCount = this._connection._receivedCount;
}
msg.version = this._connection._versionSuggestion || this._connection._supportedDDPVersions[0];
this._connection._versionSuggestion = msg.version;

View File

@@ -93,6 +93,10 @@ export class Connection {
}
self._lastSessionId = null;
// how many messages we've received (excluding ping/pong).
// when we try to reconnect to the server, it will check this against the number of messages it sent.
// if there is a mismatch, our info is out of date and we need a clean session.
self._receivedCount = 0;
self._versionSuggestion = null; // The last proposed DDP version.
self._version = null; // The DDP version agreed on by client and server.
self._stores = Object.create(null); // name -> object with methods
@@ -102,6 +106,7 @@ export class Connection {
self._heartbeatInterval = options.heartbeatInterval;
self._heartbeatTimeout = options.heartbeatTimeout;
self._ignoredMsgsForSessionOutOfDateCheck = ['ping', 'pong'];
// Tracks methods which the user has tried to call but which have not yet
// called their user callback (ie, they are waiting on their result or for all
@@ -1081,11 +1086,13 @@ export class Connection {
* @locus Client
*/
disconnect(...args) {
this._send({ msg: 'disconnect' });
return this._stream.disconnect(...args);
}
close() {
return this._stream.disconnect({ _permanent: true });
// _permanent is used by the underlying stream to prevent reconnection attempts
return this.disconnect({ _permanent: true });
}
///

View File

@@ -43,10 +43,15 @@ export class MessageProcessors {
if (reconnectedToPreviousSession) {
// Successful reconnection -- pick up where we left off.
// Don't reset stores since we're continuing the same session.
self._resetStores = false;
return;
}
// Server doesn't have our data anymore. Re-sync a new session.
// Reset the received count since we're starting a new session.
// Set to 1 because the 'connected' message itself counts.
self._receivedCount = 1;
// Forget about messages we were buffering for unknown collections. They'll
// be resent if still relevant.

View File

@@ -20,14 +20,16 @@ const newConnection = function(stream, options) {
);
};
const makeConnectMessage = function(session) {
const makeConnectMessage = function(session, receivedCount) {
const msg = {
msg: 'connect',
version: DDPCommon.SUPPORTED_DDP_VERSIONS[0],
support: DDPCommon.SUPPORTED_DDP_VERSIONS
support: DDPCommon.SUPPORTED_DDP_VERSIONS,
};
if (session) msg.session = session;
if (receivedCount) msg.receivedCount = receivedCount;
return msg;
};
@@ -869,7 +871,7 @@ Tinytest.addAsync('livedata stub - reconnect', async function(test, onComplete)
// sub. The wait method still is blocked.
await stream.reset();
testGotMessage(test, stream, makeConnectMessage(SESSION_ID));
testGotMessage(test, stream, makeConnectMessage(SESSION_ID, conn._receivedCount));
testGotMessage(test, stream, methodMessage);
testGotMessage(test, stream, subMessage);
@@ -990,7 +992,7 @@ if (Meteor.isClient) {
await stream.reset();
// verify that a reconnect message was sent.
testGotMessage(test, stream, makeConnectMessage(SESSION_ID));
testGotMessage(test, stream, makeConnectMessage(SESSION_ID, conn._receivedCount));
// Make sure that the stream triggers connection.
await stream.receive({ msg: 'connected', session: SESSION_ID + 1 });
@@ -1114,7 +1116,7 @@ if (Meteor.isClient) {
// in. Reconnect quiescence happens as soon as 'connected' is received because
// there are no pending methods or subs in need of revival.
await stream.reset();
testGotMessage(test, stream, makeConnectMessage(SESSION_ID));
testGotMessage(test, stream, makeConnectMessage(SESSION_ID, conn._receivedCount));
// Still holding out hope for session resumption, so nothing updated yet.
test.equal(coll.find().count(), 1);
test.equal(await coll.findOneAsync(stubWrittenId), {
@@ -1209,7 +1211,7 @@ if (Meteor.isClient) {
// but slowMethod gets called via onReconnect. Reconnect quiescence is now
// blocking on slowMethod.
await stream.reset();
testGotMessage(test, stream, makeConnectMessage(SESSION_ID + 1));
testGotMessage(test, stream, makeConnectMessage(SESSION_ID + 1, conn._receivedCount));
const slowMethodId = testGotMessage(test, stream, {
msg: 'method',
method: 'slowMethod',
@@ -1330,7 +1332,7 @@ Tinytest.addAsync('livedata stub - reconnect method which only got data', async
// Reset stream. Method gets resent (with same ID), and blocks reconnect
// quiescence.
await stream.reset();
testGotMessage(test, stream, makeConnectMessage(SESSION_ID));
testGotMessage(test, stream, makeConnectMessage(SESSION_ID, conn._receivedCount));
testGotMessage(test, stream, {
msg: 'method',
method: 'doLittle',
@@ -1807,7 +1809,7 @@ addReconnectTests(
// reconnect
stream.sent = [];
await stream.reset();
testGotMessage(test, stream, makeConnectMessage(conn._lastSessionId));
testGotMessage(test, stream, makeConnectMessage(conn._lastSessionId, conn._receivedCount));
// Test that we sent what we expect to send, and we're blocked on
// what we expect to be blocked. The subsequent logic to correctly
@@ -2033,7 +2035,7 @@ addReconnectTests(
// reconnect
stream.sent = [];
await stream.reset();
testGotMessage(test, stream, makeConnectMessage(conn._lastSessionId));
testGotMessage(test, stream, makeConnectMessage(conn._lastSessionId, conn._receivedCount));
// Test that we sent what we expect to send, and we're blocked on
// what we expect to be blocked. The subsequent logic to correctly
@@ -2084,7 +2086,7 @@ addReconnectTests(
// initial connect
stream.sent = [];
await stream.reset();
testGotMessage(test, stream, makeConnectMessage(conn._lastSessionId));
testGotMessage(test, stream, makeConnectMessage(conn._lastSessionId, conn._receivedCount));
// Test that we sent just the login message.
const loginId = testGotMessage(test, stream, {
@@ -2152,7 +2154,7 @@ addReconnectTests('livedata stub - reconnect double wait method', async function
// Reset stream. halfwayMethod does NOT get resent, but reconnectMethod does!
// Reconnect quiescence happens when reconnectMethod is done.
await stream.reset();
testGotMessage(test, stream, makeConnectMessage(SESSION_ID));
testGotMessage(test, stream, makeConnectMessage(SESSION_ID, conn._receivedCount));
const reconnectId = testGotMessage(test, stream, {
msg: 'method',
method: 'reconnectMethod',
@@ -2257,7 +2259,7 @@ Tinytest.addAsync('livedata stub - subscribe errors', async function(test) {
// stream reset: reconnect!
await stream.reset();
// We send a connect.
testGotMessage(test, stream, makeConnectMessage(SESSION_ID));
testGotMessage(test, stream, makeConnectMessage(SESSION_ID, conn._receivedCount));
// We should NOT re-sub to the sub, because we processed the error.
test.length(stream.sent, 0);
test.isFalse(onReadyFired);
@@ -2376,7 +2378,7 @@ if (Meteor.isClient) {
// Initiate reconnect.
await stream.reset();
testGotMessage(test, stream, makeConnectMessage(SESSION_ID));
testGotMessage(test, stream, makeConnectMessage(SESSION_ID, conn._receivedCount));
testGotMessage(test, stream, subMessage);
await stream.receive({ msg: 'connected', session: SESSION_ID + 1 });
@@ -2559,8 +2561,137 @@ if (Meteor.isClient) {
);
}
// ============================================================================
// DDP Session Resumption Tests (Client-side)
// ============================================================================
Tinytest.addAsync('livedata connection - receivedCount tracking', async function(test) {
const stream = new StubStream();
const conn = newConnection(stream);
// Initially receivedCount should be 0
test.equal(conn._receivedCount, 0);
await startAndConnect(test, stream);
// After receiving 'connected', receivedCount should be 1
// (the 'connected' message itself is counted)
test.equal(conn._receivedCount, 1);
// Receive some data messages
await stream.receive({ msg: 'added', collection: 'test', id: '1', fields: { a: 1 } });
test.equal(conn._receivedCount, 2);
await stream.receive({ msg: 'added', collection: 'test', id: '2', fields: { b: 2 } });
test.equal(conn._receivedCount, 3);
// Ping/pong should NOT increment receivedCount
await stream.receive({ msg: 'ping', id: 'ping1' });
test.equal(conn._receivedCount, 3, "ping should not increment receivedCount");
await stream.receive({ msg: 'pong', id: 'pong1' });
test.equal(conn._receivedCount, 3, "pong should not increment receivedCount");
// More data messages should continue incrementing
await stream.receive({ msg: 'changed', collection: 'test', id: '1', fields: { a: 2 } });
test.equal(conn._receivedCount, 4);
});
Tinytest.addAsync('livedata connection - receivedCount sent on reconnect', async function(test) {
const stream = new StubStream();
const conn = newConnection(stream);
await startAndConnect(test, stream);
// Receive some messages to build up receivedCount
await stream.receive({ msg: 'added', collection: 'test', id: '1', fields: {} });
await stream.receive({ msg: 'added', collection: 'test', id: '2', fields: {} });
await stream.receive({ msg: 'ready', subs: ['sub1'] });
const expectedReceivedCount = conn._receivedCount;
test.equal(expectedReceivedCount, 4); // connected + 3 messages
// Simulate disconnect and reconnect
await stream.reset();
// The connect message should include the receivedCount
const connectMsg = JSON.parse(stream.sent.shift());
test.equal(connectMsg.msg, 'connect');
test.equal(connectMsg.session, SESSION_ID);
test.equal(connectMsg.receivedCount, expectedReceivedCount,
"Connect message should include receivedCount for session resumption");
});
Tinytest.addAsync('livedata connection - receivedCount reset on new session', async function(test) {
const stream = new StubStream();
const conn = newConnection(stream);
await startAndConnect(test, stream);
// Build up some receivedCount
await stream.receive({ msg: 'added', collection: 'test', id: '1', fields: {} });
await stream.receive({ msg: 'added', collection: 'test', id: '2', fields: {} });
test.equal(conn._receivedCount, 3);
// Simulate reconnect
await stream.reset();
stream.sent.shift(); // consume connect message
// Server responds with a DIFFERENT session (new session, not resumed)
const newSessionId = SESSION_ID + '_new';
await stream.receive({ msg: 'connected', session: newSessionId });
// receivedCount should be reset to 1 (counting the new connected message)
test.equal(conn._receivedCount, 1,
"receivedCount should be reset to 1 when getting a new session");
test.equal(conn._lastSessionId, newSessionId);
});
Tinytest.addAsync('livedata connection - receivedCount preserved on session resume', async function(test) {
const stream = new StubStream();
const conn = newConnection(stream);
await startAndConnect(test, stream);
// Build up some receivedCount
await stream.receive({ msg: 'added', collection: 'test', id: '1', fields: {} });
await stream.receive({ msg: 'added', collection: 'test', id: '2', fields: {} });
const countBeforeDisconnect = conn._receivedCount;
test.equal(countBeforeDisconnect, 3);
// Simulate reconnect
await stream.reset();
stream.sent.shift(); // consume connect message
// Server responds with the SAME session (resumed)
await stream.receive({ msg: 'connected', session: SESSION_ID });
// receivedCount should continue from where it was (plus the connected message)
test.equal(conn._receivedCount, countBeforeDisconnect + 1,
"receivedCount should continue incrementing on session resume");
test.equal(conn._lastSessionId, SESSION_ID);
});
Tinytest.addAsync('livedata connection - disconnect sends disconnect message', async function(test) {
const stream = new StubStream();
const conn = newConnection(stream);
await startAndConnect(test, stream);
// Clear any pending messages
stream.sent.length = 0;
// Call disconnect
conn.disconnect();
// Should have sent a disconnect message
test.isTrue(stream.sent.length > 0, "Should have sent at least one message");
const disconnectMsg = JSON.parse(stream.sent.shift());
test.equal(disconnectMsg.msg, 'disconnect',
"disconnect() should send a disconnect message to the server");
});
// XXX also test:
// - reconnect, with session resume.
// - restart on update flag
// - on_update event
// - reloading when the app changes, including session migration

View File

@@ -27,6 +27,10 @@ Object.assign(StubStream.prototype, {
// no-op
},
disconnect: function() {
// no-op - for testing Connection.disconnect()
},
_lostConnection: function() {
// no-op
},

View File

@@ -81,11 +81,16 @@ var Session = function (server, version, socket, options) {
var self = this;
self.id = Random.id();
// how many messages we've actually sent (not queued to send) excluding ping/pong
// we'll use this to detect mismatch of data on reconnect.
self.sentCount = 0;
self.server = server;
self.version = version;
self.initialized = false;
self.socket = socket;
self.options = options;
// Set to null when the session is destroyed. Multiple places below
// use this to determine if the session is alive or not.
@@ -134,6 +139,8 @@ var Session = function (server, version, socket, options) {
self.connectionHandle = {
id: self.id,
close: function () {
// Server-initiated close should not be resumable
self._expectingDisconnect = true;
self.close();
},
onClose: function (fn) {
@@ -175,6 +182,8 @@ var Session = function (server, version, socket, options) {
"livedata", "sessions", 1);
};
const ignoredMsgsForSessionOutOfDateCheck = ['ping', 'pong'];
Object.assign(Session.prototype, {
sendReady: function (subscriptionIds) {
var self = this;
@@ -269,77 +278,103 @@ Object.assign(Session.prototype, {
},
startUniversalSubs: function () {
var self = this;
const self = this;
// Make a shallow copy of the set of universal handlers and start them. If
// additional universal publishers start while we're running them (due to
// yielding), they will run separately as part of Server.publish.
var handlers = [...self.server.universal_publish_handlers];
handlers.forEach(function (handler) {
for (const handler of [...self.server.universal_publish_handlers]) {
self._startSubscription(handler);
});
}
},
// Stop heartbeat if running
_stopHeartbeat: function () {
if (this.heartbeat) {
this.heartbeat.stop();
this.heartbeat = null;
}
},
// Destroy this session and unregister it at the server.
close: function () {
var self = this;
const self = this;
// Destroy this session, even if it's not registered at the
// server. Stop all processing and tear everything down. If a socket
// was attached, close it.
// Already destroyed.
if (! self.inQueue)
// Already closing or closed - prevent multiple close() calls
if (self._isClosing) {
return;
}
self._isClosing = true;
// Drop the merge box data immediately.
self.inQueue = null;
self.collectionViews = new Map();
if (self.heartbeat) {
self.heartbeat.stop();
self.heartbeat = null;
if (self._removeTimeoutHandle) {
Meteor.clearTimeout(self._removeTimeoutHandle);
self._removeTimeoutHandle = null;
}
if (self.socket) {
self.socket.close();
self.socket._meteorSession = null;
self.socket = null;
}
Package['facts-base'] && Package['facts-base'].Facts.incrementServerFact(
"livedata", "sessions", -1);
// Stop heartbeat immediately - we don't need it during the grace period
// since we have no socket to send pings on anyway.
self._stopHeartbeat();
Meteor.defer(function () {
// Stop callbacks can yield, so we defer this on close.
// sub._isDeactivated() detects that we set inQueue to null and
// treats it as semi-deactivated (it will ignore incoming callbacks, etc).
self._deactivateAllSubscriptions();
self.server._removeSession(self, () => {
Package['facts-base'] && Package['facts-base'].Facts.incrementServerFact(
"livedata", "sessions", -1);
// Defer calling the close callbacks, so that the caller closing
// the session isn't waiting for all the callbacks to complete.
self._closeCallbacks.forEach(function (callback) {
callback();
self.inQueue = null;
self.collectionViews = new Map();
self._stopHeartbeat();
Meteor.defer(function () {
// stop callbacks can yield, so we defer this on close.
// sub._isDeactivated() detects that we set inQueue to null and
// treats it as semi-deactivated (it will ignore incoming callbacks, etc).
self._deactivateAllSubscriptions();
// Defer calling the close callbacks, so that the caller closing
// the session isn't waiting for all the callbacks to complete.
self._closeCallbacks.forEach(callback => {
callback();
});
});
});
// Unregister the session.
self.server._removeSession(self);
},
// Send a message (doing nothing if no socket is connected right now).
// It should be a JSON object (it will be stringified).
send: function (msg) {
const self = this;
const isIgnoredMsg = ignoredMsgsForSessionOutOfDateCheck.includes(msg.msg);
if (self.messageQueue && !isIgnoredMsg) {
self.messageQueue.push(msg);
if (self.messageQueue.length > self.options.maxMessageQueueLength) {
Meteor.clearTimeout(self._removeTimeoutHandle);
self._pendingRemoveFunction();
}
return;
}
if (self.socket) {
if (Meteor._printSentDDP)
Meteor._debug("Sent DDP", DDPCommon.stringifyDDP(msg));
if (!isIgnoredMsg) {
self.sentCount++;
}
self.socket.send(DDPCommon.stringifyDDP(msg));
}
},
// Send a connection error.
sendError: function (reason, offendingMessage) {
var self = this;
var msg = {msg: 'error', reason: reason};
const self = this;
const msg = {msg: 'error', reason: reason};
if (offendingMessage)
msg.offendingMessage = offendingMessage;
self.send(msg);
@@ -379,7 +414,7 @@ Object.assign(Session.prototype, {
// the client is still alive.
if (self.heartbeat) {
self.heartbeat.messageReceived();
};
}
if (self.version !== 'pre1' && msg_in.msg === 'ping') {
if (self._respondToPings)
@@ -391,6 +426,11 @@ Object.assign(Session.prototype, {
return;
}
if (msg_in.msg === 'disconnect') {
// Pre-empt the queue - a disconnect is imminent.
return self.protocol_handlers.disconnect.call(self, msg_in, () => {});
}
self.inQueue.push(msg_in);
if (self.workerRunning)
return;
@@ -444,6 +484,9 @@ Object.assign(Session.prototype, {
},
protocol_handlers: {
disconnect: function(msg) {
this._expectingDisconnect = true;
},
sub: async function (msg, unblock) {
var self = this;
@@ -1252,6 +1295,19 @@ Server = function (options = {}) {
// For testing, allow responding to pings to be disabled.
respondToPings: true,
defaultPublicationStrategy: publicationStrategies.SERVER_MERGE,
/**
* @summary How many messages should we queue during a non-graceful disconnect before we destroy the session, to help prevent memory leaks.
* @type {Number}
* @locus Server
*/
maxMessageQueueLength: 100,
/**
* @summary How long we should maintain a session for after a non-graceful disconnect before killing it
* sessions that reconnect within this time will be resumed with minimal performance impact.
* @type {Number}
* @locus Server
*/
disconnectGracePeriod: 15000,
...options,
};
@@ -1421,16 +1477,66 @@ Object.assign(Server.prototype, {
return;
}
// Yay, version matches! Create a new session.
// Yay, version matches! Resume existing session if possible, otherwise create a new one.
// Note: Troposphere depends on the ability to mutate
// Meteor.server.options.heartbeatTimeout! This is a hack, but it's life.
socket._meteorSession = new Session(self, version, socket, self.options);
self.sessions.set(socket._meteorSession.id, socket._meteorSession);
self.onConnectionHook.each(function (callback) {
if (socket._meteorSession)
callback(socket._meteorSession.connectionHandle);
return true;
});
const existingSession = self.sessions.get(msg.session);
// we've found a session with:
// the right ID
// a matching sent/received count
// was disconnected and hasn't been reconnected to yet.
if (existingSession && existingSession.sentCount === msg.receivedCount && existingSession._removeTimeoutHandle) {
Meteor.clearTimeout(existingSession._removeTimeoutHandle);
existingSession._removeTimeoutHandle = undefined;
existingSession._pendingRemoveFunction = undefined;
existingSession._isClosing = false; // Reset so session can be closed again later
socket._meteorSession = existingSession;
const messageQueue = existingSession.messageQueue;
existingSession.messageQueue = undefined;
existingSession.socket = socket;
// Restart heartbeat for the resumed session
if (existingSession.version !== 'pre1' && self.options.heartbeatInterval !== 0) {
socket.setWebsocketTimeout(0);
existingSession.heartbeat = new DDPCommon.Heartbeat({
heartbeatInterval: self.options.heartbeatInterval,
heartbeatTimeout: self.options.heartbeatTimeout,
onTimeout: function () {
existingSession.close();
},
sendPing: function () {
existingSession.send({msg: 'ping'});
}
});
existingSession.heartbeat.start();
}
// Send connected message so client can restart heartbeat and confirm resumption
existingSession.send({ msg: 'connected', session: existingSession.id });
if (messageQueue) {
Meteor.defer(() => {
messageQueue.forEach(msg => existingSession.send(msg));
});
}
// Note: onConnectionHook is NOT called on session resume - the connection
// is considered to be the same logical connection as before.
}
else {
// immediately remove the old session since we're out of date.
if (existingSession && existingSession._pendingRemoveFunction) {
Meteor.clearTimeout(existingSession._removeTimeoutHandle);
existingSession._pendingRemoveFunction();
}
socket._meteorSession = new Session(self, version, socket, self.options);
self.sessions.set(socket._meteorSession.id, socket._meteorSession);
self.onConnectionHook.each(function (callback) {
if (socket._meteorSession)
callback(socket._meteorSession.connectionHandle);
return true;
});
}
},
/**
* Register a publish handler function.
@@ -1520,9 +1626,31 @@ Object.assign(Server.prototype, {
}
},
_removeSession: function (session) {
_removeSession: function (session, callback = () => {}) {
var self = this;
self.sessions.delete(session.id);
const sessionRemoveFunction = () => {
// Guard against being called multiple times (e.g., from both overflow and timeout)
if (!self.sessions.has(session.id)) {
return;
}
// Clear timeout handle if it exists to prevent double execution
if (session._removeTimeoutHandle) {
Meteor.clearTimeout(session._removeTimeoutHandle);
session._removeTimeoutHandle = null;
}
session._pendingRemoveFunction = null;
self.sessions.delete(session.id);
callback();
};
if (session._expectingDisconnect) {
return sessionRemoveFunction();
}
session.messageQueue = [];
session._pendingRemoveFunction = sessionRemoveFunction;
if (session._removeTimeoutHandle) {
Meteor.clearTimeout(session._removeTimeoutHandle);
}
session._removeTimeoutHandle = Meteor.setTimeout(sessionRemoveFunction, self.options.disconnectGracePeriod);
},
/**

View File

@@ -168,9 +168,24 @@ Tinytest.addAsync('livedata server - async publish cursor', function(
connection: clientConn,
});
clientConn.subscribe('asyncPublishCursor', async () => {
const actual = await remoteCollection.find().fetch();
test.equal(actual[0].name, 'async');
onComplete();
// Wait for data to arrive - the subscription is ready but data may still be in transit
// This can happen when a previous test run was interrupted (page reload) and the
// server is still processing the old session's grace period
let attempts = 0;
const maxAttempts = 50; // 5 seconds max wait
const checkData = async () => {
const actual = await remoteCollection.find().fetch();
if (actual.length > 0) {
test.equal(actual[0].name, 'async');
onComplete();
} else if (attempts++ < maxAttempts) {
setTimeout(checkData, 100);
} else {
test.fail('Timed out waiting for data in async publish cursor test');
onComplete();
}
};
await checkData();
});
});
});

View File

@@ -1,3 +1,38 @@
// Helper to temporarily set disconnectGracePeriod for DDP resumption tests
// This ensures test isolation - other tests run with the default grace period
const DEFAULT_GRACE_PERIOD = Meteor.server.options.disconnectGracePeriod;
const TEST_GRACE_PERIOD = 5000; // Short grace period for fast tests (ms)
// Derived timing constants to avoid hardcoding throughout tests
const WITHIN_GRACE_PERIOD_MS = Math.floor(TEST_GRACE_PERIOD / 4); // Well within grace period
const AFTER_GRACE_PERIOD_MS = Math.ceil(TEST_GRACE_PERIOD * 1.5); // After grace period expires
const POLL_TIMEOUT_MS = TEST_GRACE_PERIOD * 2; // Max time to wait for async operations before failing
async function withTestGracePeriod(fn) {
const previous = Meteor.server.options.disconnectGracePeriod;
Meteor.server.options.disconnectGracePeriod = TEST_GRACE_PERIOD;
try {
await fn();
} finally {
Meteor.server.options.disconnectGracePeriod = previous ?? DEFAULT_GRACE_PERIOD;
}
}
// Helper to poll for a condition with timeout to prevent hanging tests
function pollUntil(conditionFn, timeoutMs = POLL_TIMEOUT_MS) {
return new Promise((resolve, reject) => {
const startTime = Date.now();
const interval = setInterval(() => {
if (conditionFn()) {
clearInterval(interval);
resolve();
} else if (Date.now() - startTime > timeoutMs) {
clearInterval(interval);
reject(new Error(`Timed out after ${timeoutMs}ms waiting for condition`));
}
}, 10);
});
}
Tinytest.addAsync(
"livedata server - connectionHandle.onClose()",
function (test, onComplete) {
@@ -593,4 +628,344 @@ function getTestConnections(test) {
function sleep(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
}
// ============================================================================
// DDP Session Resumption Tests
// ============================================================================
// Test that unexpected disconnects allow session resumption within grace period
Tinytest.addAsync(
"livedata server - DDP resumption: unexpected disconnect preserves session",
async function (test) {
await withTestGracePeriod(async () => {
const { clientConn, serverConn } = await getTestConnections(test);
const originalSessionId = serverConn.id;
// Verify the session exists
test.isTrue(Meteor.server.sessions.has(originalSessionId));
// Simulate unexpected disconnect by forcing the stream to close
// without sending a disconnect message
clientConn._stream._lostConnection();
// Wait a bit but less than the grace period
await sleep(WITHIN_GRACE_PERIOD_MS);
// Session should still exist during grace period
test.isTrue(
Meteor.server.sessions.has(originalSessionId),
"Session should be preserved during grace period"
);
// Wait for grace period to expire
await sleep(AFTER_GRACE_PERIOD_MS);
// Session should be removed after grace period
test.isFalse(
Meteor.server.sessions.has(originalSessionId),
"Session should be removed after grace period expires"
);
});
}
);
// Test that graceful disconnects (client sends disconnect message) remove session immediately
Tinytest.addAsync(
"livedata server - DDP resumption: graceful disconnect removes session immediately",
async function (test) {
await withTestGracePeriod(async () => {
const { clientConn, serverConn } = await getTestConnections(test);
const originalSessionId = serverConn.id;
// Verify the session exists
test.isTrue(Meteor.server.sessions.has(originalSessionId));
// Graceful disconnect - this sends the disconnect message
clientConn.disconnect();
// Wait a moment for the disconnect to process
await sleep(WITHIN_GRACE_PERIOD_MS);
// Session should be removed immediately (not waiting for grace period)
test.isFalse(
Meteor.server.sessions.has(originalSessionId),
"Session should be removed immediately after graceful disconnect"
);
});
}
);
// Test that server-initiated close removes session immediately (not resumable)
Tinytest.addAsync(
"livedata server - DDP resumption: server-initiated close removes session immediately",
async function (test) {
await withTestGracePeriod(async () => {
const { serverConn } = await getTestConnections(test);
const originalSessionId = serverConn.id;
// Verify the session exists
test.isTrue(Meteor.server.sessions.has(originalSessionId));
// Server-initiated close via connectionHandle.close()
serverConn.close();
// Wait a moment for the close to process
await sleep(WITHIN_GRACE_PERIOD_MS);
// Session should be removed immediately (server kicks should not be resumable)
test.isFalse(
Meteor.server.sessions.has(originalSessionId),
"Session should be removed immediately after server-initiated close"
);
});
}
);
// Test that onConnection hook is NOT called on session resume
Tinytest.addAsync(
"livedata server - DDP resumption: onConnection not called on resume",
async function (test) {
await withTestGracePeriod(async () => {
let onConnectionCallCount = 0;
let lastConnectionId = null;
const handle = Meteor.onConnection(function (conn) {
onConnectionCallCount++;
lastConnectionId = conn.id;
});
// Create initial connection
const clientConn = DDP.connect(Meteor.absoluteUrl(), { retry: false });
// Wait for connection with timeout
await pollUntil(() => clientConn._lastSessionId);
const originalSessionId = clientConn._lastSessionId;
test.equal(onConnectionCallCount, 1, "onConnection should be called once on initial connect");
test.equal(lastConnectionId, originalSessionId);
// Get the server session and verify it exists
const serverSession = Meteor.server.sessions.get(originalSessionId);
test.isTrue(serverSession, "Server session should exist");
// Simulate unexpected disconnect
clientConn._stream._lostConnection();
// Wait a bit (less than grace period)
await sleep(WITHIN_GRACE_PERIOD_MS);
// Session should still exist
test.isTrue(
Meteor.server.sessions.has(originalSessionId),
"Session should still exist during grace period"
);
// Reconnect - this should resume the session
clientConn._stream.reconnect();
// Wait for reconnection with timeout
await pollUntil(() => clientConn.status().connected);
// Give it a moment to process
await sleep(WITHIN_GRACE_PERIOD_MS);
// IMPORTANT: Assert that session was actually resumed (same session ID)
// If this fails, the test is not actually testing resumption
test.equal(
clientConn._lastSessionId,
originalSessionId,
"Session should be resumed with same session ID"
);
// onConnection should NOT have been called again for a resumed session
test.equal(
onConnectionCallCount,
1,
"onConnection should not be called again on session resume"
);
handle.stop();
clientConn.disconnect();
});
}
);
// Test that server-initiated close prevents session resumption
Tinytest.addAsync(
"livedata server - DDP resumption: server close prevents resumption",
async function (test) {
await withTestGracePeriod(async () => {
let onConnectionCallCount = 0;
const handle = Meteor.onConnection(function (conn) {
onConnectionCallCount++;
});
// Create initial connection
const clientConn = DDP.connect(Meteor.absoluteUrl(), { retry: true });
// Wait for connection with timeout
await pollUntil(() => clientConn._lastSessionId);
const originalSessionId = clientConn._lastSessionId;
test.equal(onConnectionCallCount, 1, "onConnection should be called once on initial connect");
// Get the server session
const serverSession = Meteor.server.sessions.get(originalSessionId);
test.isTrue(serverSession, "Server session should exist");
// Server-initiated close (kick the client)
serverSession.connectionHandle.close();
// Wait for client to reconnect with new session (retry is enabled)
await pollUntil(() =>
clientConn.status().connected && clientConn._lastSessionId !== originalSessionId
);
// Should have a NEW session (not resumed)
test.notEqual(
clientConn._lastSessionId,
originalSessionId,
"Should have a new session ID after server-initiated close"
);
// onConnection should have been called again (new session, not resumed)
test.equal(
onConnectionCallCount,
2,
"onConnection should be called again after server-initiated close"
);
handle.stop();
clientConn.disconnect();
});
}
);
// Test that graceful client disconnect prevents session resumption
Tinytest.addAsync(
"livedata server - DDP resumption: graceful disconnect prevents resumption",
async function (test) {
await withTestGracePeriod(async () => {
let onConnectionCallCount = 0;
const handle = Meteor.onConnection(function (conn) {
onConnectionCallCount++;
});
// Create initial connection with retry enabled
const clientConn = DDP.connect(Meteor.absoluteUrl(), { retry: true });
// Wait for connection with timeout
await pollUntil(() => clientConn._lastSessionId);
const originalSessionId = clientConn._lastSessionId;
test.equal(onConnectionCallCount, 1, "onConnection should be called once on initial connect");
// Graceful disconnect (sends disconnect message)
clientConn.disconnect();
// Wait for session to be removed
await sleep(WITHIN_GRACE_PERIOD_MS);
// Session should be removed immediately
test.isFalse(
Meteor.server.sessions.has(originalSessionId),
"Session should be removed after graceful disconnect"
);
// Reconnect
clientConn.reconnect();
// Wait for reconnection with timeout
await pollUntil(() => clientConn.status().connected);
// Should have a NEW session (not resumed, because we gracefully disconnected)
test.notEqual(
clientConn._lastSessionId,
originalSessionId,
"Should have a new session ID after graceful disconnect and reconnect"
);
// onConnection should have been called again
test.equal(
onConnectionCallCount,
2,
"onConnection should be called again after graceful disconnect"
);
handle.stop();
clientConn.disconnect();
});
}
);
// Test that receivedCount mismatch causes new session (not resume)
Tinytest.addAsync(
"livedata server - DDP resumption: count mismatch creates new session",
async function (test) {
await withTestGracePeriod(async () => {
let onConnectionCallCount = 0;
const handle = Meteor.onConnection(function (conn) {
onConnectionCallCount++;
});
// Create initial connection
const clientConn = DDP.connect(Meteor.absoluteUrl(), { retry: false });
// Wait for connection with timeout
await pollUntil(() => clientConn._lastSessionId);
const originalSessionId = clientConn._lastSessionId;
test.equal(onConnectionCallCount, 1, "onConnection should be called once on initial connect");
// Get the server session
const serverSession = Meteor.server.sessions.get(originalSessionId);
test.isTrue(serverSession, "Server session should exist");
// Artificially increment sentCount to create a mismatch
// This simulates messages sent by server that client didn't receive
serverSession.sentCount += 5;
// Simulate unexpected disconnect
clientConn._stream._lostConnection();
// Wait a bit (less than grace period)
await sleep(WITHIN_GRACE_PERIOD_MS);
// Session should still exist during grace period
test.isTrue(
Meteor.server.sessions.has(originalSessionId),
"Session should still exist during grace period"
);
// Reconnect - this should NOT resume due to count mismatch
clientConn._stream.reconnect();
// Wait for reconnection with timeout
await pollUntil(() => clientConn.status().connected);
// Give it a moment to process
await sleep(WITHIN_GRACE_PERIOD_MS);
// Should have a NEW session (counts didn't match)
test.notEqual(
clientConn._lastSessionId,
originalSessionId,
"Should have a new session ID when counts mismatch"
);
// onConnection should have been called again (new session)
test.equal(
onConnectionCallCount,
2,
"onConnection should be called again when counts mismatch"
);
handle.stop();
clientConn.disconnect();
});
}
);

View File

@@ -993,16 +993,19 @@ contains the following fields:
security risk for this transport. For details and alternatives, see
the [SockJS documentation](https://github.com/sockjs/sockjs-node#authorisation).
> Currently when a client reconnects to the server (such as after
> temporarily losing its Internet connection), it will get a new
> connection each time. The `onConnection` callbacks will be called
> again, and the new connection will have a new connection `id`.
## Reconnection
> In the future, when client reconnection is fully implemented,
> reconnecting from the client will reconnect to the same connection on
> the server: the `onConnection` callback won't be called for that
> connection again, and the connection will still have the same
> connection `id`.
Meteor 3.5+ supports [DDP session resumption](https://github.com/meteor/meteor/pull/14051), allowing clients to automatically resume their previous connection after a temporary network disconnect. When a client reconnects within the grace period, the `onConnection` callback is not called again and the connection retains its original `id`.
This behavior is controlled by the following server options:
### Meteor.server.options.disconnectGracePeriod
Defines how long (in milliseconds) we should maintain a session for after a non-graceful disconnect before destroying it. Sessions that reconnect within this time will be resumed with minimal performance impact. Defaults to `15000`.
### Meteor.server.options.maxMessageQueueLength
Determines how many messages we should queue during a non-graceful disconnect before we destroy the session, to help prevent memory leaks. Defaults to `100`.
<ApiBox name="DDP.connect" hasCustomExample/>