DDP Heartbeats

Add "ping" and "pong" messages to DDP.  This allows us to detect at
the DDP level when we've lost the connection.

Bump the DDP version to "pre2".  Preserve backwards compatibility by
not enabling pings if the negotiated DDP version is an earlier version.

Since receiving a ping indicates that the connection is alive, one
side of the connection doesn't have to send its own pings as long as
its receiving pings from the other side.  The ping interval defaults
to 30 seconds on the server and 35 seconds on the client, which means
that normally the pings go just one way (saving on bandwidth).

Increase the sockjs heartbeats from 25s to 45s, so they do not
normally fire.
This commit is contained in:
Andrew Wilcox
2014-02-25 18:53:42 -05:00
committed by Nick Martin
parent 7dab23c3ce
commit 05c4edf9f7
13 changed files with 349 additions and 9 deletions

View File

@@ -0,0 +1 @@
local

View File

@@ -0,0 +1,6 @@
# Meteor packages used by this project, one per line.
#
# 'meteor add' and 'meteor remove' will edit this file for you,
# but you can also edit it by hand.
standard-app-packages

View File

@@ -0,0 +1 @@
none

View File

@@ -0,0 +1,12 @@
# DDP Heartbeat Test
Run with:
meteor --once
Do not connect to the server from a browser, this is a server-only
test.
The test will run, which takes about a minute waiting for timeouts,
and then the server process will exit. A zero status code indicates a
successful test.

View File

@@ -0,0 +1,101 @@
var Fiber = Npm.require("fibers");
var Future = Npm.require("fibers/future");
// XXX Deps isn't supported on the server... but we need a way to
// capture client connection status transitions.
var waitReactive = function (fn) {
var future = new Future();
var timeoutHandle = Meteor.setTimeout(
function () {
future.throw(new Error("timeout"));
},
60000
);
Deps.autorun(function (c) {
var ret = fn();
if (ret) {
c.stop();
Meteor.clearTimeout(timeoutHandle);
// We need to run in a fiber for `defer`.
Fiber(function () {
// Use `defer` because yields are blocked inside of autorun.
Meteor.defer(function () {
future.return(ret);
})
}).run();
}
});
return future.wait();
};
var waitForClientConnectionStatus = function (connection, status) {
waitReactive(function () {
return connection.status().status === status;
});
};
// Override the server heartbeat for an incoming connection.
var serverHeartbeatOverride = {};
Meteor.onConnection(function (serverConnection) {
_.extend(serverConnection._internal.heartbeat, serverHeartbeatOverride);
});
// Expect to connect, and then to reconnect (presumably because of a
// timeout).
var expectConnectAndReconnect = function (clientConnection) {
console.log(". client is connecting");
waitForClientConnectionStatus(clientConnection, "connected");
console.log(". client is connected, expecting ping timeout and reconnect");
waitForClientConnectionStatus(clientConnection, "connecting");
console.log(". client is reconnecting");
};
var testClientTimeout = function () {
console.log("Test client timeout");
serverHeartbeatOverride = {
_sendPing: false, // don't send pings from server
_sendPong: false // don't respond to pings, which should cause the client to timeout
};
var clientConnection = DDP.connect(Meteor.absoluteUrl());
expectConnectAndReconnect(clientConnection);
clientConnection.close();
console.log("test successful\n");
};
var testServerTimeout = function () {
console.log("Test server timeout");
serverHeartbeatOverride = {};
var clientConnection = DDP.connect(Meteor.absoluteUrl());
_.extend(clientConnection._heartbeat, {
_sendPing: false, // don't send pings from client
_sendPong: false // don't respond to pings, which should cause the server to timeout
});
expectConnectAndReconnect(clientConnection);
clientConnection.close();
console.log("test successful\n");
};
Fiber(function () {
testClientTimeout();
testServerTimeout();
process.exit(0);
}).run();

View File

@@ -0,0 +1,145 @@
// Ensure that we're running in a fiber on the server.
var runInFiber;
if (Meteor.isServer) {
var Fiber = Npm.require('fibers');
runInFiber = function (fn) {
Fiber(fn).run();
};
} else {
runInFiber = function (fn) {
fn();
};
}
// Heartbeat options:
// heartbeatInterval: interval to send pings, in milliseconds.
// heartbeatTimeout: timeout to close the connection if a reply isn't received, in milliseconds.
// sendMessage: function to call to send a message on the connection.
// closeConnection: function to call to close the connection.
Heartbeat = function (options) {
var self = this;
// Whether "ping" and "pong" messages are supported by the DDP
// version. Set by `start` below.
self.supported = false;
self.heartbeatInterval = options.heartbeatInterval;
self.heartbeatTimeout = options.heartbeatTimeout;
self._sendMessage = options.sendMessage;
self._closeConnection = options.closeConnection;
self._heartbeatIntervalHandle = null;
self._heartbeatTimeoutHandle = null;
// For testing, sending pings or pongs can be disabled.
self._sendPing = true;
self._sendPong = true;
};
_.extend(Heartbeat.prototype, {
stop: function () {
var self = this;
self._clearHeartbeatIntervalTimer();
self._clearHeartbeatTimeoutTimer();
},
start: function (ddpVersion) {
var self = this;
self.stop();
if (ddpVersion === 'pre2') {
self.supported = true;
self._startHeartbeatIntervalTimer();
}
},
_startHeartbeatIntervalTimer: function () {
var self = this;
if (!self._sendPing)
return;
runInFiber(function () {
if (!self.supported)
return;
self._heartbeatIntervalHandle = Meteor.setTimeout(
_.bind(self._heartbeatIntervalFired, self),
self.heartbeatInterval
);
});
},
_startHeartbeatTimeoutTimer: function () {
var self = this;
runInFiber(function () {
if (!self.supported)
return;
self._heartbeatTimeoutHandle = Meteor.setTimeout(
_.bind(self._heartbeatTimeoutFired, self),
self.heartbeatTimeout
);
});
},
_clearHeartbeatIntervalTimer: function () {
var self = this;
if (self._heartbeatIntervalHandle) {
Meteor.clearTimeout(self._heartbeatIntervalHandle);
self._heartbeatIntervalHandle = null;
}
},
_clearHeartbeatTimeoutTimer: function () {
var self = this;
if (self._heartbeatTimeoutHandle) {
Meteor.clearTimeout(self._heartbeatTimeoutHandle);
self._heartbeatTimeoutHandle = null;
}
},
// The heartbeat interval timer is fired when we should send a ping.
_heartbeatIntervalFired: function () {
var self = this;
self._heartbeatIntervalHandle = null;
if (!self._sendPing)
return;
self._sendMessage({msg: "ping"});
// Wait for a pong.
self._startHeartbeatTimeoutTimer();
},
// The heartbeat timeout timer is fired when we sent a ping, but we
// timed out waiting for the pong.
_heartbeatTimeoutFired: function () {
var self = this;
self._heartbeatTimeoutHandle = null;
if (!self._sendPing)
return;
self._closeConnection();
},
pingpongReceived: function (msg) {
var self = this;
if (msg.msg === 'ping') {
// Respond to a ping by sending a pong.
if (self._sendPong)
self._sendMessage({msg: "pong", id: msg.id});
// We know the connection is alive if we receive a ping, so we
// don't need to send a ping ourselves. Reset the interval timer.
if (self._heartbeatIntervalHandle) {
self._clearHeartbeatIntervalTimer();
self._startHeartbeatIntervalTimer();
}
}
else if (msg.msg === 'pong') {
// Receiving a pong means we won't timeout, so clear the timeout
// timer and start the interval again.
if (self._heartbeatTimeoutHandle) {
self._clearHeartbeatTimeoutTimer();
self._startHeartbeatIntervalTimer();
}
}
}
});

View File

@@ -1,6 +1,6 @@
DDP = {};
SUPPORTED_DDP_VERSIONS = [ 'pre1' ];
SUPPORTED_DDP_VERSIONS = [ 'pre2', 'pre1' ];
LivedataTest.SUPPORTED_DDP_VERSIONS = SUPPORTED_DDP_VERSIONS;

View File

@@ -31,6 +31,8 @@ var Connection = function (url, options) {
onDDPVersionNegotiationFailure: function (description) {
Meteor._debug(description);
},
heartbeatInterval: 35000,
heartbeatTimeout: 15000,
// These options are only for testing.
reloadWithOutstanding: false,
supportedDDPVersions: SUPPORTED_DDP_VERSIONS,
@@ -177,6 +179,13 @@ var Connection = function (url, options) {
self._userId = null;
self._userIdDeps = (typeof Deps !== "undefined") && new Deps.Dependency;
self._heartbeat = new Heartbeat({
heartbeatInterval: options.heartbeatInterval,
heartbeatTimeout: options.heartbeatTimeout,
sendMessage: _.bind(self._send, self),
closeConnection: _.bind(self._lostConnection, self)
});
// Block auto-reload while we're waiting for method responses.
if (Meteor.isClient && Package.reload && !options.reloadWithOutstanding) {
Package.reload.Reload._onMigrate(function (retry) {
@@ -232,6 +241,8 @@ var Connection = function (url, options) {
self._livedata_result(msg);
else if (msg.msg === 'error')
self._livedata_error(msg);
else if (_.include(['ping', 'pong'], msg.msg))
self._heartbeat.pingpongReceived(msg);
else
Meteor._debug("discarding unknown livedata message type", msg);
};
@@ -834,6 +845,14 @@ _.extend(Connection.prototype, {
self._stream.send(stringifyDDP(obj));
},
// We detected via DDP-level heartbeats that we've lost the
// connection. Unlike `disconnect` or `close`, a lost connection
// will be automatically retried.
_lostConnection: function () {
var self = this;
self._stream._lostConnection();
},
status: function (/*passthrough args*/) {
var self = this;
return self._stream.status.apply(self._stream, arguments);
@@ -893,6 +912,8 @@ _.extend(Connection.prototype, {
_livedata_connected: function (msg) {
var self = this;
self._heartbeat.start(self._version);
// If this is a reconnect, we'll have to reset all stores.
if (self._lastSessionId)
self._resetStores = true;

View File

@@ -1285,6 +1285,16 @@ Tinytest.add("livedata connection - onReconnect prepends messages correctly with
]);
});
Tinytest.add("livedata connection - ping", function (test) {
var stream = new StubStream();
var conn = newConnection(stream);
startAndConnect(test, stream);
var id = Random.id();
stream.receive({msg: 'ping', id: id});
testGotMessage(test, stream, {msg: 'pong', id: id});
});
var getSelfConnectionUrl = function () {
if (Meteor.isClient) {
return Meteor._relativeToSiteRootUrl("/");

View File

@@ -216,7 +216,7 @@ _.extend(SessionCollectionView.prototype, {
/* Session */
/******************************************************************************/
var Session = function (server, version, socket) {
var Session = function (server, version, socket, options) {
var self = this;
self.id = Random.id();
@@ -268,7 +268,7 @@ var Session = function (server, version, socket) {
self.connectionHandle = {
id: self.id,
close: function () {
self.server._closeSession(self);
self.close();
},
onClose: function (fn) {
var cb = Meteor.bindEnvironment(fn, "connection onClose callback");
@@ -280,7 +280,8 @@ var Session = function (server, version, socket) {
}
},
clientAddress: self._clientAddress(),
httpHeaders: self.socket.headers
httpHeaders: self.socket.headers,
_internal: self
};
socket.send(stringifyDDP({msg: 'connected',
@@ -290,6 +291,14 @@ var Session = function (server, version, socket) {
self.startUniversalSubs();
}).run();
self.heartbeat = new Heartbeat({
heartbeatInterval: options.heartbeatInterval,
heartbeatTimeout: options.heartbeatTimeout,
sendMessage: _.bind(self.send, self),
closeConnection: _.bind(self.destroy, self)
});
self.heartbeat.start(version);
Package.facts && Package.facts.Facts.incrementServerFact(
"livedata", "sessions", 1);
};
@@ -391,6 +400,12 @@ _.extend(Session.prototype, {
destroy: function () {
var self = this;
// Already destroyed.
if (!self.inQueue)
return;
self.heartbeat.stop();
if (self.socket) {
self.socket.close();
self.socket._meteorSession = null;
@@ -417,6 +432,19 @@ _.extend(Session.prototype, {
});
},
// Destroy this session and unregister it at the server.
close: function () {
var self = this;
// Unconditionally destroy this session, even if it's not
// registered at the server.
self.destroy();
// Unregister the session. This will also call `destroy`, but
// that's OK because `destroy` is idempotent.
self.server._closeSession(self);
},
// Send a message (doing nothing if no socket is connected right now.)
// It should be a JSON object (it will be stringified.)
send: function (msg) {
@@ -457,6 +485,15 @@ _.extend(Session.prototype, {
if (!self.inQueue) // we have been destroyed.
return;
// Respond to ping and pong messages immediately without queuing.
// If the negotiated DDP version is "pre1" which didn't support
// pings, preserve the "pre1" behavior of responding with a "bad
// request" for the unknown messages.
if (self.heartbeat.supported && _.include(['ping', 'pong'], msg_in.msg)) {
self.heartbeat.pingpongReceived(msg_in);
return;
}
self.inQueue.push(msg_in);
if (self.workerRunning)
return;
@@ -1047,9 +1084,14 @@ _.extend(Subscription.prototype, {
/* Server */
/******************************************************************************/
Server = function () {
Server = function (options) {
var self = this;
self.options = _.defaults({} || options, {
heartbeatInterval: 30000,
heartbeatTimeout: 15000
});
// Map of callbacks to call when a new connection comes in to the
// server and completes DDP version negotiation. Use an object instead
// of an array so we can safely remove one from the list while
@@ -1120,7 +1162,7 @@ Server = function () {
socket.on('close', function () {
if (socket._meteorSession) {
Fiber(function () {
self._closeSession(socket._meteorSession);
socket._meteorSession.close();
}).run();
}
});
@@ -1142,7 +1184,7 @@ _.extend(Server.prototype, {
if (msg.version === version) {
// Creating a new session
socket._meteorSession = new Session(self, version, socket);
socket._meteorSession = new Session(self, version, socket, self.options);
self.sessions[socket._meteorSession.id] = socket._meteorSession;
self.onConnectionHook.each(function (callback) {
if (socket._meteorSession)

View File

@@ -49,6 +49,7 @@ Package.on_use(function (api) {
// _idParse, _idStringify.
api.use('minimongo', ['client', 'server']);
api.add_files('heartbeat.js', ['client', 'server']);
api.add_files('livedata_server.js', 'server');

View File

@@ -12,7 +12,7 @@ LivedataTest.ClientStream = function (url, options) {
// how long between hearing heartbeat from the server until we declare
// the connection dead. heartbeats come every 25s (stream_server.js)
// the connection dead. heartbeats come every 45s (stream_server.js)
//
// NOTE: this is a workaround until sockjs detects heartbeats on the
// client automatically.

View File

@@ -23,7 +23,7 @@ StreamServer = function () {
log: function() {},
// this is the default, but we code it explicitly because we depend
// on it in stream_client:HEARTBEAT_TIMEOUT
heartbeat_delay: 25000,
heartbeat_delay: 45000,
// The default disconnect_delay is 5 seconds, but if the server ends up CPU
// bound for that much time, SockJS might not notice that the user has
// reconnected because the timer (of disconnect_delay ms) can fire before