From 05c4edf9f77dffb8a89202af620dcad67c77aec7 Mon Sep 17 00:00:00 2001 From: Andrew Wilcox Date: Tue, 25 Feb 2014 18:53:42 -0500 Subject: [PATCH] DDP Heartbeats Add "ping" and "pong" messages to DDP. This allows us to detect at the DDP level when we've lost the connection. Bump the DDP version to "pre2". Preserve backwards compatibility by not enabling pings if the negotiated DDP version is an earlier version. Since receiving a ping indicates that the connection is alive, one side of the connection doesn't have to send its own pings as long as its receiving pings from the other side. The ping interval defaults to 30 seconds on the server and 35 seconds on the client, which means that normally the pings go just one way (saving on bandwidth). Increase the sockjs heartbeats from 25s to 45s, so they do not normally fire. --- .../other/ddp-heartbeat/.meteor/.gitignore | 1 + examples/other/ddp-heartbeat/.meteor/packages | 6 + examples/other/ddp-heartbeat/.meteor/release | 1 + examples/other/ddp-heartbeat/README.md | 12 ++ .../ddp-heartbeat/server/heartbeat_test.js | 101 ++++++++++++ packages/livedata/heartbeat.js | 145 ++++++++++++++++++ packages/livedata/livedata_common.js | 2 +- packages/livedata/livedata_connection.js | 21 +++ .../livedata/livedata_connection_tests.js | 10 ++ packages/livedata/livedata_server.js | 54 ++++++- packages/livedata/package.js | 1 + packages/livedata/stream_client_sockjs.js | 2 +- packages/livedata/stream_server.js | 2 +- 13 files changed, 349 insertions(+), 9 deletions(-) create mode 100644 examples/other/ddp-heartbeat/.meteor/.gitignore create mode 100644 examples/other/ddp-heartbeat/.meteor/packages create mode 100644 examples/other/ddp-heartbeat/.meteor/release create mode 100644 examples/other/ddp-heartbeat/README.md create mode 100644 examples/other/ddp-heartbeat/server/heartbeat_test.js create mode 100644 packages/livedata/heartbeat.js diff --git a/examples/other/ddp-heartbeat/.meteor/.gitignore b/examples/other/ddp-heartbeat/.meteor/.gitignore new file mode 100644 index 0000000000..4083037423 --- /dev/null +++ b/examples/other/ddp-heartbeat/.meteor/.gitignore @@ -0,0 +1 @@ +local diff --git a/examples/other/ddp-heartbeat/.meteor/packages b/examples/other/ddp-heartbeat/.meteor/packages new file mode 100644 index 0000000000..5d992a7f9a --- /dev/null +++ b/examples/other/ddp-heartbeat/.meteor/packages @@ -0,0 +1,6 @@ +# Meteor packages used by this project, one per line. +# +# 'meteor add' and 'meteor remove' will edit this file for you, +# but you can also edit it by hand. + +standard-app-packages diff --git a/examples/other/ddp-heartbeat/.meteor/release b/examples/other/ddp-heartbeat/.meteor/release new file mode 100644 index 0000000000..621e94f0ec --- /dev/null +++ b/examples/other/ddp-heartbeat/.meteor/release @@ -0,0 +1 @@ +none diff --git a/examples/other/ddp-heartbeat/README.md b/examples/other/ddp-heartbeat/README.md new file mode 100644 index 0000000000..da21a5b82b --- /dev/null +++ b/examples/other/ddp-heartbeat/README.md @@ -0,0 +1,12 @@ +# DDP Heartbeat Test + +Run with: + + meteor --once + +Do not connect to the server from a browser, this is a server-only +test. + +The test will run, which takes about a minute waiting for timeouts, +and then the server process will exit. A zero status code indicates a +successful test. diff --git a/examples/other/ddp-heartbeat/server/heartbeat_test.js b/examples/other/ddp-heartbeat/server/heartbeat_test.js new file mode 100644 index 0000000000..c16c95aeaf --- /dev/null +++ b/examples/other/ddp-heartbeat/server/heartbeat_test.js @@ -0,0 +1,101 @@ +var Fiber = Npm.require("fibers"); +var Future = Npm.require("fibers/future"); + +// XXX Deps isn't supported on the server... but we need a way to +// capture client connection status transitions. + +var waitReactive = function (fn) { + var future = new Future(); + var timeoutHandle = Meteor.setTimeout( + function () { + future.throw(new Error("timeout")); + }, + 60000 + ); + Deps.autorun(function (c) { + var ret = fn(); + if (ret) { + c.stop(); + Meteor.clearTimeout(timeoutHandle); + + // We need to run in a fiber for `defer`. + Fiber(function () { + // Use `defer` because yields are blocked inside of autorun. + Meteor.defer(function () { + future.return(ret); + }) + }).run(); + } + }); + return future.wait(); +}; + +var waitForClientConnectionStatus = function (connection, status) { + waitReactive(function () { + return connection.status().status === status; + }); +}; + +// Override the server heartbeat for an incoming connection. + +var serverHeartbeatOverride = {}; + +Meteor.onConnection(function (serverConnection) { + _.extend(serverConnection._internal.heartbeat, serverHeartbeatOverride); +}); + + +// Expect to connect, and then to reconnect (presumably because of a +// timeout). + +var expectConnectAndReconnect = function (clientConnection) { + console.log(". client is connecting"); + waitForClientConnectionStatus(clientConnection, "connected"); + + console.log(". client is connected, expecting ping timeout and reconnect"); + waitForClientConnectionStatus(clientConnection, "connecting"); + + console.log(". client is reconnecting"); +}; + + +var testClientTimeout = function () { + console.log("Test client timeout"); + + serverHeartbeatOverride = { + _sendPing: false, // don't send pings from server + _sendPong: false // don't respond to pings, which should cause the client to timeout + }; + + var clientConnection = DDP.connect(Meteor.absoluteUrl()); + + expectConnectAndReconnect(clientConnection); + + clientConnection.close(); + console.log("test successful\n"); +}; + + +var testServerTimeout = function () { + console.log("Test server timeout"); + + serverHeartbeatOverride = {}; + + var clientConnection = DDP.connect(Meteor.absoluteUrl()); + + _.extend(clientConnection._heartbeat, { + _sendPing: false, // don't send pings from client + _sendPong: false // don't respond to pings, which should cause the server to timeout + }); + + expectConnectAndReconnect(clientConnection); + + clientConnection.close(); + console.log("test successful\n"); +}; + +Fiber(function () { + testClientTimeout(); + testServerTimeout(); + process.exit(0); +}).run(); diff --git a/packages/livedata/heartbeat.js b/packages/livedata/heartbeat.js new file mode 100644 index 0000000000..90ddeac929 --- /dev/null +++ b/packages/livedata/heartbeat.js @@ -0,0 +1,145 @@ +// Ensure that we're running in a fiber on the server. + +var runInFiber; +if (Meteor.isServer) { + var Fiber = Npm.require('fibers'); + runInFiber = function (fn) { + Fiber(fn).run(); + }; +} else { + runInFiber = function (fn) { + fn(); + }; +} + +// Heartbeat options: +// heartbeatInterval: interval to send pings, in milliseconds. +// heartbeatTimeout: timeout to close the connection if a reply isn't received, in milliseconds. +// sendMessage: function to call to send a message on the connection. +// closeConnection: function to call to close the connection. + +Heartbeat = function (options) { + var self = this; + + // Whether "ping" and "pong" messages are supported by the DDP + // version. Set by `start` below. + self.supported = false; + + self.heartbeatInterval = options.heartbeatInterval; + self.heartbeatTimeout = options.heartbeatTimeout; + self._sendMessage = options.sendMessage; + self._closeConnection = options.closeConnection; + + self._heartbeatIntervalHandle = null; + self._heartbeatTimeoutHandle = null; + + // For testing, sending pings or pongs can be disabled. + self._sendPing = true; + self._sendPong = true; +}; + +_.extend(Heartbeat.prototype, { + stop: function () { + var self = this; + self._clearHeartbeatIntervalTimer(); + self._clearHeartbeatTimeoutTimer(); + }, + + start: function (ddpVersion) { + var self = this; + self.stop(); + if (ddpVersion === 'pre2') { + self.supported = true; + self._startHeartbeatIntervalTimer(); + } + }, + + _startHeartbeatIntervalTimer: function () { + var self = this; + if (!self._sendPing) + return; + runInFiber(function () { + if (!self.supported) + return; + self._heartbeatIntervalHandle = Meteor.setTimeout( + _.bind(self._heartbeatIntervalFired, self), + self.heartbeatInterval + ); + }); + }, + + _startHeartbeatTimeoutTimer: function () { + var self = this; + runInFiber(function () { + if (!self.supported) + return; + self._heartbeatTimeoutHandle = Meteor.setTimeout( + _.bind(self._heartbeatTimeoutFired, self), + self.heartbeatTimeout + ); + }); + }, + + _clearHeartbeatIntervalTimer: function () { + var self = this; + if (self._heartbeatIntervalHandle) { + Meteor.clearTimeout(self._heartbeatIntervalHandle); + self._heartbeatIntervalHandle = null; + } + }, + + _clearHeartbeatTimeoutTimer: function () { + var self = this; + if (self._heartbeatTimeoutHandle) { + Meteor.clearTimeout(self._heartbeatTimeoutHandle); + self._heartbeatTimeoutHandle = null; + } + }, + + // The heartbeat interval timer is fired when we should send a ping. + _heartbeatIntervalFired: function () { + var self = this; + self._heartbeatIntervalHandle = null; + if (!self._sendPing) + return; + self._sendMessage({msg: "ping"}); + // Wait for a pong. + self._startHeartbeatTimeoutTimer(); + }, + + // The heartbeat timeout timer is fired when we sent a ping, but we + // timed out waiting for the pong. + _heartbeatTimeoutFired: function () { + var self = this; + self._heartbeatTimeoutHandle = null; + if (!self._sendPing) + return; + self._closeConnection(); + }, + + pingpongReceived: function (msg) { + var self = this; + + if (msg.msg === 'ping') { + // Respond to a ping by sending a pong. + if (self._sendPong) + self._sendMessage({msg: "pong", id: msg.id}); + + // We know the connection is alive if we receive a ping, so we + // don't need to send a ping ourselves. Reset the interval timer. + if (self._heartbeatIntervalHandle) { + self._clearHeartbeatIntervalTimer(); + self._startHeartbeatIntervalTimer(); + } + } + + else if (msg.msg === 'pong') { + // Receiving a pong means we won't timeout, so clear the timeout + // timer and start the interval again. + if (self._heartbeatTimeoutHandle) { + self._clearHeartbeatTimeoutTimer(); + self._startHeartbeatIntervalTimer(); + } + } + } +}); diff --git a/packages/livedata/livedata_common.js b/packages/livedata/livedata_common.js index f9a873c6f7..9c59d0e510 100644 --- a/packages/livedata/livedata_common.js +++ b/packages/livedata/livedata_common.js @@ -1,6 +1,6 @@ DDP = {}; -SUPPORTED_DDP_VERSIONS = [ 'pre1' ]; +SUPPORTED_DDP_VERSIONS = [ 'pre2', 'pre1' ]; LivedataTest.SUPPORTED_DDP_VERSIONS = SUPPORTED_DDP_VERSIONS; diff --git a/packages/livedata/livedata_connection.js b/packages/livedata/livedata_connection.js index be5b79e15c..e572c190e3 100644 --- a/packages/livedata/livedata_connection.js +++ b/packages/livedata/livedata_connection.js @@ -31,6 +31,8 @@ var Connection = function (url, options) { onDDPVersionNegotiationFailure: function (description) { Meteor._debug(description); }, + heartbeatInterval: 35000, + heartbeatTimeout: 15000, // These options are only for testing. reloadWithOutstanding: false, supportedDDPVersions: SUPPORTED_DDP_VERSIONS, @@ -177,6 +179,13 @@ var Connection = function (url, options) { self._userId = null; self._userIdDeps = (typeof Deps !== "undefined") && new Deps.Dependency; + self._heartbeat = new Heartbeat({ + heartbeatInterval: options.heartbeatInterval, + heartbeatTimeout: options.heartbeatTimeout, + sendMessage: _.bind(self._send, self), + closeConnection: _.bind(self._lostConnection, self) + }); + // Block auto-reload while we're waiting for method responses. if (Meteor.isClient && Package.reload && !options.reloadWithOutstanding) { Package.reload.Reload._onMigrate(function (retry) { @@ -232,6 +241,8 @@ var Connection = function (url, options) { self._livedata_result(msg); else if (msg.msg === 'error') self._livedata_error(msg); + else if (_.include(['ping', 'pong'], msg.msg)) + self._heartbeat.pingpongReceived(msg); else Meteor._debug("discarding unknown livedata message type", msg); }; @@ -834,6 +845,14 @@ _.extend(Connection.prototype, { self._stream.send(stringifyDDP(obj)); }, + // We detected via DDP-level heartbeats that we've lost the + // connection. Unlike `disconnect` or `close`, a lost connection + // will be automatically retried. + _lostConnection: function () { + var self = this; + self._stream._lostConnection(); + }, + status: function (/*passthrough args*/) { var self = this; return self._stream.status.apply(self._stream, arguments); @@ -893,6 +912,8 @@ _.extend(Connection.prototype, { _livedata_connected: function (msg) { var self = this; + self._heartbeat.start(self._version); + // If this is a reconnect, we'll have to reset all stores. if (self._lastSessionId) self._resetStores = true; diff --git a/packages/livedata/livedata_connection_tests.js b/packages/livedata/livedata_connection_tests.js index b37e3d97a7..f17a98b863 100644 --- a/packages/livedata/livedata_connection_tests.js +++ b/packages/livedata/livedata_connection_tests.js @@ -1285,6 +1285,16 @@ Tinytest.add("livedata connection - onReconnect prepends messages correctly with ]); }); +Tinytest.add("livedata connection - ping", function (test) { + var stream = new StubStream(); + var conn = newConnection(stream); + startAndConnect(test, stream); + + var id = Random.id(); + stream.receive({msg: 'ping', id: id}); + testGotMessage(test, stream, {msg: 'pong', id: id}); +}); + var getSelfConnectionUrl = function () { if (Meteor.isClient) { return Meteor._relativeToSiteRootUrl("/"); diff --git a/packages/livedata/livedata_server.js b/packages/livedata/livedata_server.js index cbdd29ae63..a05f5be551 100644 --- a/packages/livedata/livedata_server.js +++ b/packages/livedata/livedata_server.js @@ -216,7 +216,7 @@ _.extend(SessionCollectionView.prototype, { /* Session */ /******************************************************************************/ -var Session = function (server, version, socket) { +var Session = function (server, version, socket, options) { var self = this; self.id = Random.id(); @@ -268,7 +268,7 @@ var Session = function (server, version, socket) { self.connectionHandle = { id: self.id, close: function () { - self.server._closeSession(self); + self.close(); }, onClose: function (fn) { var cb = Meteor.bindEnvironment(fn, "connection onClose callback"); @@ -280,7 +280,8 @@ var Session = function (server, version, socket) { } }, clientAddress: self._clientAddress(), - httpHeaders: self.socket.headers + httpHeaders: self.socket.headers, + _internal: self }; socket.send(stringifyDDP({msg: 'connected', @@ -290,6 +291,14 @@ var Session = function (server, version, socket) { self.startUniversalSubs(); }).run(); + self.heartbeat = new Heartbeat({ + heartbeatInterval: options.heartbeatInterval, + heartbeatTimeout: options.heartbeatTimeout, + sendMessage: _.bind(self.send, self), + closeConnection: _.bind(self.destroy, self) + }); + self.heartbeat.start(version); + Package.facts && Package.facts.Facts.incrementServerFact( "livedata", "sessions", 1); }; @@ -391,6 +400,12 @@ _.extend(Session.prototype, { destroy: function () { var self = this; + // Already destroyed. + if (!self.inQueue) + return; + + self.heartbeat.stop(); + if (self.socket) { self.socket.close(); self.socket._meteorSession = null; @@ -417,6 +432,19 @@ _.extend(Session.prototype, { }); }, + // Destroy this session and unregister it at the server. + close: function () { + var self = this; + + // Unconditionally destroy this session, even if it's not + // registered at the server. + self.destroy(); + + // Unregister the session. This will also call `destroy`, but + // that's OK because `destroy` is idempotent. + self.server._closeSession(self); + }, + // Send a message (doing nothing if no socket is connected right now.) // It should be a JSON object (it will be stringified.) send: function (msg) { @@ -457,6 +485,15 @@ _.extend(Session.prototype, { if (!self.inQueue) // we have been destroyed. return; + // Respond to ping and pong messages immediately without queuing. + // If the negotiated DDP version is "pre1" which didn't support + // pings, preserve the "pre1" behavior of responding with a "bad + // request" for the unknown messages. + if (self.heartbeat.supported && _.include(['ping', 'pong'], msg_in.msg)) { + self.heartbeat.pingpongReceived(msg_in); + return; + } + self.inQueue.push(msg_in); if (self.workerRunning) return; @@ -1047,9 +1084,14 @@ _.extend(Subscription.prototype, { /* Server */ /******************************************************************************/ -Server = function () { +Server = function (options) { var self = this; + self.options = _.defaults({} || options, { + heartbeatInterval: 30000, + heartbeatTimeout: 15000 + }); + // Map of callbacks to call when a new connection comes in to the // server and completes DDP version negotiation. Use an object instead // of an array so we can safely remove one from the list while @@ -1120,7 +1162,7 @@ Server = function () { socket.on('close', function () { if (socket._meteorSession) { Fiber(function () { - self._closeSession(socket._meteorSession); + socket._meteorSession.close(); }).run(); } }); @@ -1142,7 +1184,7 @@ _.extend(Server.prototype, { if (msg.version === version) { // Creating a new session - socket._meteorSession = new Session(self, version, socket); + socket._meteorSession = new Session(self, version, socket, self.options); self.sessions[socket._meteorSession.id] = socket._meteorSession; self.onConnectionHook.each(function (callback) { if (socket._meteorSession) diff --git a/packages/livedata/package.js b/packages/livedata/package.js index b3c5f33cb3..3fe9b835e4 100644 --- a/packages/livedata/package.js +++ b/packages/livedata/package.js @@ -49,6 +49,7 @@ Package.on_use(function (api) { // _idParse, _idStringify. api.use('minimongo', ['client', 'server']); + api.add_files('heartbeat.js', ['client', 'server']); api.add_files('livedata_server.js', 'server'); diff --git a/packages/livedata/stream_client_sockjs.js b/packages/livedata/stream_client_sockjs.js index 3f42a76371..5cdbbcf69c 100644 --- a/packages/livedata/stream_client_sockjs.js +++ b/packages/livedata/stream_client_sockjs.js @@ -12,7 +12,7 @@ LivedataTest.ClientStream = function (url, options) { // how long between hearing heartbeat from the server until we declare - // the connection dead. heartbeats come every 25s (stream_server.js) + // the connection dead. heartbeats come every 45s (stream_server.js) // // NOTE: this is a workaround until sockjs detects heartbeats on the // client automatically. diff --git a/packages/livedata/stream_server.js b/packages/livedata/stream_server.js index f344085380..83dffadb59 100644 --- a/packages/livedata/stream_server.js +++ b/packages/livedata/stream_server.js @@ -23,7 +23,7 @@ StreamServer = function () { log: function() {}, // this is the default, but we code it explicitly because we depend // on it in stream_client:HEARTBEAT_TIMEOUT - heartbeat_delay: 25000, + heartbeat_delay: 45000, // The default disconnect_delay is 5 seconds, but if the server ends up CPU // bound for that much time, SockJS might not notice that the user has // reconnected because the timer (of disconnect_delay ms) can fire before