diff --git a/examples/other/ddp-heartbeat/.meteor/.gitignore b/examples/other/ddp-heartbeat/.meteor/.gitignore new file mode 100644 index 0000000000..4083037423 --- /dev/null +++ b/examples/other/ddp-heartbeat/.meteor/.gitignore @@ -0,0 +1 @@ +local diff --git a/examples/other/ddp-heartbeat/.meteor/packages b/examples/other/ddp-heartbeat/.meteor/packages new file mode 100644 index 0000000000..5d992a7f9a --- /dev/null +++ b/examples/other/ddp-heartbeat/.meteor/packages @@ -0,0 +1,6 @@ +# Meteor packages used by this project, one per line. +# +# 'meteor add' and 'meteor remove' will edit this file for you, +# but you can also edit it by hand. + +standard-app-packages diff --git a/examples/other/ddp-heartbeat/.meteor/release b/examples/other/ddp-heartbeat/.meteor/release new file mode 100644 index 0000000000..621e94f0ec --- /dev/null +++ b/examples/other/ddp-heartbeat/.meteor/release @@ -0,0 +1 @@ +none diff --git a/examples/other/ddp-heartbeat/README.md b/examples/other/ddp-heartbeat/README.md new file mode 100644 index 0000000000..da21a5b82b --- /dev/null +++ b/examples/other/ddp-heartbeat/README.md @@ -0,0 +1,12 @@ +# DDP Heartbeat Test + +Run with: + + meteor --once + +Do not connect to the server from a browser, this is a server-only +test. + +The test will run, which takes about a minute waiting for timeouts, +and then the server process will exit. A zero status code indicates a +successful test. diff --git a/examples/other/ddp-heartbeat/server/heartbeat_test.js b/examples/other/ddp-heartbeat/server/heartbeat_test.js new file mode 100644 index 0000000000..c16c95aeaf --- /dev/null +++ b/examples/other/ddp-heartbeat/server/heartbeat_test.js @@ -0,0 +1,101 @@ +var Fiber = Npm.require("fibers"); +var Future = Npm.require("fibers/future"); + +// XXX Deps isn't supported on the server... but we need a way to +// capture client connection status transitions. + +var waitReactive = function (fn) { + var future = new Future(); + var timeoutHandle = Meteor.setTimeout( + function () { + future.throw(new Error("timeout")); + }, + 60000 + ); + Deps.autorun(function (c) { + var ret = fn(); + if (ret) { + c.stop(); + Meteor.clearTimeout(timeoutHandle); + + // We need to run in a fiber for `defer`. + Fiber(function () { + // Use `defer` because yields are blocked inside of autorun. + Meteor.defer(function () { + future.return(ret); + }) + }).run(); + } + }); + return future.wait(); +}; + +var waitForClientConnectionStatus = function (connection, status) { + waitReactive(function () { + return connection.status().status === status; + }); +}; + +// Override the server heartbeat for an incoming connection. + +var serverHeartbeatOverride = {}; + +Meteor.onConnection(function (serverConnection) { + _.extend(serverConnection._internal.heartbeat, serverHeartbeatOverride); +}); + + +// Expect to connect, and then to reconnect (presumably because of a +// timeout). + +var expectConnectAndReconnect = function (clientConnection) { + console.log(". client is connecting"); + waitForClientConnectionStatus(clientConnection, "connected"); + + console.log(". client is connected, expecting ping timeout and reconnect"); + waitForClientConnectionStatus(clientConnection, "connecting"); + + console.log(". client is reconnecting"); +}; + + +var testClientTimeout = function () { + console.log("Test client timeout"); + + serverHeartbeatOverride = { + _sendPing: false, // don't send pings from server + _sendPong: false // don't respond to pings, which should cause the client to timeout + }; + + var clientConnection = DDP.connect(Meteor.absoluteUrl()); + + expectConnectAndReconnect(clientConnection); + + clientConnection.close(); + console.log("test successful\n"); +}; + + +var testServerTimeout = function () { + console.log("Test server timeout"); + + serverHeartbeatOverride = {}; + + var clientConnection = DDP.connect(Meteor.absoluteUrl()); + + _.extend(clientConnection._heartbeat, { + _sendPing: false, // don't send pings from client + _sendPong: false // don't respond to pings, which should cause the server to timeout + }); + + expectConnectAndReconnect(clientConnection); + + clientConnection.close(); + console.log("test successful\n"); +}; + +Fiber(function () { + testClientTimeout(); + testServerTimeout(); + process.exit(0); +}).run(); diff --git a/packages/livedata/heartbeat.js b/packages/livedata/heartbeat.js new file mode 100644 index 0000000000..90ddeac929 --- /dev/null +++ b/packages/livedata/heartbeat.js @@ -0,0 +1,145 @@ +// Ensure that we're running in a fiber on the server. + +var runInFiber; +if (Meteor.isServer) { + var Fiber = Npm.require('fibers'); + runInFiber = function (fn) { + Fiber(fn).run(); + }; +} else { + runInFiber = function (fn) { + fn(); + }; +} + +// Heartbeat options: +// heartbeatInterval: interval to send pings, in milliseconds. +// heartbeatTimeout: timeout to close the connection if a reply isn't received, in milliseconds. +// sendMessage: function to call to send a message on the connection. +// closeConnection: function to call to close the connection. + +Heartbeat = function (options) { + var self = this; + + // Whether "ping" and "pong" messages are supported by the DDP + // version. Set by `start` below. + self.supported = false; + + self.heartbeatInterval = options.heartbeatInterval; + self.heartbeatTimeout = options.heartbeatTimeout; + self._sendMessage = options.sendMessage; + self._closeConnection = options.closeConnection; + + self._heartbeatIntervalHandle = null; + self._heartbeatTimeoutHandle = null; + + // For testing, sending pings or pongs can be disabled. + self._sendPing = true; + self._sendPong = true; +}; + +_.extend(Heartbeat.prototype, { + stop: function () { + var self = this; + self._clearHeartbeatIntervalTimer(); + self._clearHeartbeatTimeoutTimer(); + }, + + start: function (ddpVersion) { + var self = this; + self.stop(); + if (ddpVersion === 'pre2') { + self.supported = true; + self._startHeartbeatIntervalTimer(); + } + }, + + _startHeartbeatIntervalTimer: function () { + var self = this; + if (!self._sendPing) + return; + runInFiber(function () { + if (!self.supported) + return; + self._heartbeatIntervalHandle = Meteor.setTimeout( + _.bind(self._heartbeatIntervalFired, self), + self.heartbeatInterval + ); + }); + }, + + _startHeartbeatTimeoutTimer: function () { + var self = this; + runInFiber(function () { + if (!self.supported) + return; + self._heartbeatTimeoutHandle = Meteor.setTimeout( + _.bind(self._heartbeatTimeoutFired, self), + self.heartbeatTimeout + ); + }); + }, + + _clearHeartbeatIntervalTimer: function () { + var self = this; + if (self._heartbeatIntervalHandle) { + Meteor.clearTimeout(self._heartbeatIntervalHandle); + self._heartbeatIntervalHandle = null; + } + }, + + _clearHeartbeatTimeoutTimer: function () { + var self = this; + if (self._heartbeatTimeoutHandle) { + Meteor.clearTimeout(self._heartbeatTimeoutHandle); + self._heartbeatTimeoutHandle = null; + } + }, + + // The heartbeat interval timer is fired when we should send a ping. + _heartbeatIntervalFired: function () { + var self = this; + self._heartbeatIntervalHandle = null; + if (!self._sendPing) + return; + self._sendMessage({msg: "ping"}); + // Wait for a pong. + self._startHeartbeatTimeoutTimer(); + }, + + // The heartbeat timeout timer is fired when we sent a ping, but we + // timed out waiting for the pong. + _heartbeatTimeoutFired: function () { + var self = this; + self._heartbeatTimeoutHandle = null; + if (!self._sendPing) + return; + self._closeConnection(); + }, + + pingpongReceived: function (msg) { + var self = this; + + if (msg.msg === 'ping') { + // Respond to a ping by sending a pong. + if (self._sendPong) + self._sendMessage({msg: "pong", id: msg.id}); + + // We know the connection is alive if we receive a ping, so we + // don't need to send a ping ourselves. Reset the interval timer. + if (self._heartbeatIntervalHandle) { + self._clearHeartbeatIntervalTimer(); + self._startHeartbeatIntervalTimer(); + } + } + + else if (msg.msg === 'pong') { + // Receiving a pong means we won't timeout, so clear the timeout + // timer and start the interval again. + if (self._heartbeatTimeoutHandle) { + self._clearHeartbeatTimeoutTimer(); + self._startHeartbeatIntervalTimer(); + } + } + } +}); diff --git a/packages/livedata/livedata_common.js b/packages/livedata/livedata_common.js index f9a873c6f7..9c59d0e510 100644 --- a/packages/livedata/livedata_common.js +++ b/packages/livedata/livedata_common.js @@ -1,6 +1,6 @@ DDP = {}; -SUPPORTED_DDP_VERSIONS = [ 'pre1' ]; +SUPPORTED_DDP_VERSIONS = [ 'pre2', 'pre1' ]; LivedataTest.SUPPORTED_DDP_VERSIONS = SUPPORTED_DDP_VERSIONS; diff --git a/packages/livedata/livedata_connection.js b/packages/livedata/livedata_connection.js index be5b79e15c..e572c190e3 100644 --- a/packages/livedata/livedata_connection.js +++ b/packages/livedata/livedata_connection.js @@ -31,6 +31,8 @@ var Connection = function (url, options) { onDDPVersionNegotiationFailure: function (description) { Meteor._debug(description); }, + heartbeatInterval: 35000, + heartbeatTimeout: 15000, // These options are only for testing. reloadWithOutstanding: false, supportedDDPVersions: SUPPORTED_DDP_VERSIONS, @@ -177,6 +179,13 @@ var Connection = function (url, options) { self._userId = null; self._userIdDeps = (typeof Deps !== "undefined") && new Deps.Dependency; + self._heartbeat = new Heartbeat({ + heartbeatInterval: options.heartbeatInterval, + heartbeatTimeout: options.heartbeatTimeout, + sendMessage: _.bind(self._send, self), + closeConnection: _.bind(self._lostConnection, self) + }); + // Block auto-reload while we're waiting for method responses. if (Meteor.isClient && Package.reload && !options.reloadWithOutstanding) { Package.reload.Reload._onMigrate(function (retry) { @@ -232,6 +241,8 @@ var Connection = function (url, options) { self._livedata_result(msg); else if (msg.msg === 'error') self._livedata_error(msg); + else if (_.include(['ping', 'pong'], msg.msg)) + self._heartbeat.pingpongReceived(msg); else Meteor._debug("discarding unknown livedata message type", msg); }; @@ -834,6 +845,14 @@ _.extend(Connection.prototype, { self._stream.send(stringifyDDP(obj)); }, + // We detected via DDP-level heartbeats that we've lost the + // connection. Unlike `disconnect` or `close`, a lost connection + // will be automatically retried. + _lostConnection: function () { + var self = this; + self._stream._lostConnection(); + }, + status: function (/*passthrough args*/) { var self = this; return self._stream.status.apply(self._stream, arguments); @@ -893,6 +912,8 @@ _.extend(Connection.prototype, { _livedata_connected: function (msg) { var self = this; + self._heartbeat.start(self._version); + // If this is a reconnect, we'll have to reset all stores. if (self._lastSessionId) self._resetStores = true; diff --git a/packages/livedata/livedata_connection_tests.js b/packages/livedata/livedata_connection_tests.js index b37e3d97a7..f17a98b863 100644 --- a/packages/livedata/livedata_connection_tests.js +++ b/packages/livedata/livedata_connection_tests.js @@ -1285,6 +1285,16 @@ Tinytest.add("livedata connection - onReconnect prepends messages correctly with ]); }); +Tinytest.add("livedata connection - ping", function (test) { + var stream = new StubStream(); + var conn = newConnection(stream); + startAndConnect(test, stream); + + var id = Random.id(); + stream.receive({msg: 'ping', id: id}); + testGotMessage(test, stream, {msg: 'pong', id: id}); +}); + var getSelfConnectionUrl = function () { if (Meteor.isClient) { return Meteor._relativeToSiteRootUrl("/"); diff --git a/packages/livedata/livedata_server.js b/packages/livedata/livedata_server.js index cbdd29ae63..a05f5be551 100644 --- a/packages/livedata/livedata_server.js +++ b/packages/livedata/livedata_server.js @@ -216,7 +216,7 @@ _.extend(SessionCollectionView.prototype, { /* Session */ /******************************************************************************/ -var Session = function (server, version, socket) { +var Session = function (server, version, socket, options) { var self = this; self.id = Random.id(); @@ -268,7 +268,7 @@ var Session = function (server, version, socket) { self.connectionHandle = { id: self.id, close: function () { - self.server._closeSession(self); + self.close(); }, onClose: function (fn) { var cb = Meteor.bindEnvironment(fn, "connection onClose callback"); @@ -280,7 +280,8 @@ var Session = function (server, version, socket) { } }, clientAddress: self._clientAddress(), - httpHeaders: self.socket.headers + httpHeaders: self.socket.headers, + _internal: self }; socket.send(stringifyDDP({msg: 'connected', @@ -290,6 +291,14 @@ var Session = function (server, version, socket) { self.startUniversalSubs(); }).run(); + self.heartbeat = new Heartbeat({ + heartbeatInterval: options.heartbeatInterval, + heartbeatTimeout: options.heartbeatTimeout, + sendMessage: _.bind(self.send, self), + closeConnection: _.bind(self.destroy, self) + }); + self.heartbeat.start(version); + Package.facts && Package.facts.Facts.incrementServerFact( "livedata", "sessions", 1); }; @@ -391,6 +400,12 @@ _.extend(Session.prototype, { destroy: function () { var self = this; + // Already destroyed. + if (!self.inQueue) + return; + + self.heartbeat.stop(); + if (self.socket) { self.socket.close(); self.socket._meteorSession = null; @@ -417,6 +432,19 @@ _.extend(Session.prototype, { }); }, + // Destroy this session and unregister it at the server. + close: function () { + var self = this; + + // Unconditionally destroy this session, even if it's not + // registered at the server. + self.destroy(); + + // Unregister the session. This will also call `destroy`, but + // that's OK because `destroy` is idempotent. + self.server._closeSession(self); + }, + // Send a message (doing nothing if no socket is connected right now.) // It should be a JSON object (it will be stringified.) send: function (msg) { @@ -457,6 +485,15 @@ _.extend(Session.prototype, { if (!self.inQueue) // we have been destroyed. return; + // Respond to ping and pong messages immediately without queuing. + // If the negotiated DDP version is "pre1" which didn't support + // pings, preserve the "pre1" behavior of responding with a "bad + // request" for the unknown messages. + if (self.heartbeat.supported && _.include(['ping', 'pong'], msg_in.msg)) { + self.heartbeat.pingpongReceived(msg_in); + return; + } + self.inQueue.push(msg_in); if (self.workerRunning) return; @@ -1047,9 +1084,14 @@ _.extend(Subscription.prototype, { /* Server */ /******************************************************************************/ -Server = function () { +Server = function (options) { var self = this; + self.options = _.defaults({} || options, { + heartbeatInterval: 30000, + heartbeatTimeout: 15000 + }); + // Map of callbacks to call when a new connection comes in to the // server and completes DDP version negotiation. Use an object instead // of an array so we can safely remove one from the list while @@ -1120,7 +1162,7 @@ Server = function () { socket.on('close', function () { if (socket._meteorSession) { Fiber(function () { - self._closeSession(socket._meteorSession); + socket._meteorSession.close(); }).run(); } }); @@ -1142,7 +1184,7 @@ _.extend(Server.prototype, { if (msg.version === version) { // Creating a new session - socket._meteorSession = new Session(self, version, socket); + socket._meteorSession = new Session(self, version, socket, self.options); self.sessions[socket._meteorSession.id] = socket._meteorSession; self.onConnectionHook.each(function (callback) { if (socket._meteorSession) diff --git a/packages/livedata/package.js b/packages/livedata/package.js index b3c5f33cb3..3fe9b835e4 100644 --- a/packages/livedata/package.js +++ b/packages/livedata/package.js @@ -49,6 +49,7 @@ Package.on_use(function (api) { // _idParse, _idStringify. api.use('minimongo', ['client', 'server']); + api.add_files('heartbeat.js', ['client', 'server']); api.add_files('livedata_server.js', 'server'); diff --git a/packages/livedata/stream_client_sockjs.js b/packages/livedata/stream_client_sockjs.js index 3f42a76371..5cdbbcf69c 100644 --- a/packages/livedata/stream_client_sockjs.js +++ b/packages/livedata/stream_client_sockjs.js @@ -12,7 +12,7 @@ LivedataTest.ClientStream = function (url, options) { // how long between hearing heartbeat from the server until we declare - // the connection dead. heartbeats come every 25s (stream_server.js) + // the connection dead. heartbeats come every 45s (stream_server.js) // // NOTE: this is a workaround until sockjs detects heartbeats on the // client automatically. diff --git a/packages/livedata/stream_server.js b/packages/livedata/stream_server.js index f344085380..83dffadb59 100644 --- a/packages/livedata/stream_server.js +++ b/packages/livedata/stream_server.js @@ -23,7 +23,7 @@ StreamServer = function () { log: function() {}, // this is the default, but we code it explicitly because we depend // on it in stream_client:HEARTBEAT_TIMEOUT - heartbeat_delay: 25000, + heartbeat_delay: 45000, // The default disconnect_delay is 5 seconds, but if the server ends up CPU // bound for that much time, SockJS might not notice that the user has // reconnected because the timer (of disconnect_delay ms) can fire before