diff --git a/History.md b/History.md index 513267f88c..15d33efebc 100644 --- a/History.md +++ b/History.md @@ -1,5 +1,9 @@ ## v.NEXT +* Use "faye-websocket" (0.7.2) npm module instead of "websocket" (1.0.8) for + server-to-server DDP. + + ## v0.7.1.2 * Fix bug in tool error handling that caused `meteor` to crash on Mac diff --git a/packages/livedata/.npm/package/npm-shrinkwrap.json b/packages/livedata/.npm/package/npm-shrinkwrap.json index f48e03a46b..92d8fd541c 100644 --- a/packages/livedata/.npm/package/npm-shrinkwrap.json +++ b/packages/livedata/.npm/package/npm-shrinkwrap.json @@ -16,8 +16,13 @@ } } }, - "websocket": { - "version": "1.0.8" + "faye-websocket": { + "version": "0.7.2", + "dependencies": { + "websocket-driver": { + "version": "0.3.2" + } + } } } } diff --git a/packages/livedata/livedata_connection.js b/packages/livedata/livedata_connection.js index 30ff1ef377..be5b79e15c 100644 --- a/packages/livedata/livedata_connection.js +++ b/packages/livedata/livedata_connection.js @@ -49,7 +49,10 @@ var Connection = function (url, options) { self._stream = new LivedataTest.ClientStream(url, { retry: options.retry, headers: options.headers, - _sockjsOptions: options._sockjsOptions + _sockjsOptions: options._sockjsOptions, + // To keep some tests quiet (because we don't have a real API for handling + // client-stream-level errors). + _dontPrintErrors: options._dontPrintErrors }); } diff --git a/packages/livedata/livedata_server.js b/packages/livedata/livedata_server.js index 95ddab9e6f..d579180911 100644 --- a/packages/livedata/livedata_server.js +++ b/packages/livedata/livedata_server.js @@ -1160,7 +1160,7 @@ _.extend(Server.prototype, { // drop all future data coming over this connection on the // floor. We don't want to confuse things. socket.removeAllListeners('data'); - setTimeout(function () { + Meteor.setTimeout(function () { socket.send(stringifyDDP({msg: 'failed', version: version})); socket.close(); }, timeout); diff --git a/packages/livedata/livedata_tests.js b/packages/livedata/livedata_tests.js index 65736d76f8..07bd42e779 100644 --- a/packages/livedata/livedata_tests.js +++ b/packages/livedata/livedata_tests.js @@ -683,9 +683,10 @@ if (Meteor.isServer) { testAsyncMulti("livedata - connect fails to unknown place", [ function (test, expect) { var self = this; - self.conn = DDP.connect("example.com"); + self.conn = DDP.connect("example.com", {_dontPrintErrors: true}); Meteor.setTimeout(expect(function () { test.isFalse(self.conn.status().connected, "Not connected"); + self.conn.close(); }), 500); } ]); diff --git a/packages/livedata/package.js b/packages/livedata/package.js index d2b407ae13..0e585b8467 100644 --- a/packages/livedata/package.js +++ b/packages/livedata/package.js @@ -3,7 +3,11 @@ Package.describe({ internal: true }); -Npm.depends({sockjs: "0.3.8", websocket: "1.0.8"}); +// We use 'faye-websocket' for connections in server-to-server DDP, mostly +// because it's the same library used as a server in sockjs, and it's easiest to +// deal with a single websocket implementation. (Plus, its maintainer is easy +// to work with on pull requests.) +Npm.depends({sockjs: "0.3.8", "faye-websocket": "0.7.2"}); Package.on_use(function (api) { api.use(['check', 'random', 'ejson', 'json', 'underscore', 'deps', diff --git a/packages/livedata/stream_client_nodejs.js b/packages/livedata/stream_client_nodejs.js index 708189deea..a3bc996d27 100644 --- a/packages/livedata/stream_client_nodejs.js +++ b/packages/livedata/stream_client_nodejs.js @@ -11,43 +11,16 @@ // ping frames or with DDP-level messages.) LivedataTest.ClientStream = function (endpoint, options) { var self = this; + options = options || {}; + self.options = _.extend({ retry: true }, options); - // WebSocket-Node https://github.com/Worlize/WebSocket-Node - // Chosen because it can run without native components. It has a - // somewhat idiosyncratic API. We may want to use 'ws' instead in the - // future. - // - // Since server-to-server DDP is still an experimental feature, we only - // require the module if we actually create a server-to-server - // connection. This is a minor efficiency improvement, but moreover: while - // 'websocket' doesn't require native components, it tries to use some - // optional native components and prints a warning if it can't load - // them. Since native components in packages don't work when transferred to - // other architectures yet, this means that require('websocket') prints a - // spammy log message when deployed to another architecture. Delaying the - // require means you only get the log message if you're actually using the - // feature. - self.client = new (Npm.require('websocket').client)(); + self.client = null; // created in _launchConnection self.endpoint = endpoint; - self.currentConnection = null; - options = options || {}; - self.headers = options.headers || {}; - - self.client.on('connect', Meteor.bindEnvironment( - function (connection) { - return self._onConnect(connection); - }, - "stream connect callback" - )); - - self.client.on('connectFailed', function (error) { - // XXX: Make this do something better than make the tests hang if it does not work. - return self._lostConnection(); - }); + self.headers = self.options.headers || {}; self._initCommon(); @@ -63,7 +36,7 @@ _.extend(LivedataTest.ClientStream.prototype, { send: function (data) { var self = this; if (self.currentStatus.connected) { - self.currentConnection.send(data); + self.client.send(data); } }, @@ -73,80 +46,37 @@ _.extend(LivedataTest.ClientStream.prototype, { self.endpoint = url; }, - _onConnect: function (connection) { + _onConnect: function (client) { var self = this; + if (client !== self.client) { + // This connection is not from the last call to _launchConnection. + // But _launchConnection calls _cleanup which closes previous connections. + // It's our belief that this stifles future 'open' events, but maybe + // we are wrong? + throw new Error("Got open from inactive client"); + } + if (self._forcedToDisconnect) { // We were asked to disconnect between trying to open the connection and // actually opening it. Let's just pretend this never happened. - connection.close(); + self.client.close(); + self.client = null; return; } if (self.currentStatus.connected) { - // We already have a connection. It must have been the case that - // we started two parallel connection attempts (because we - // wanted to 'reconnect now' on a hanging connection and we had - // no way to cancel the connection attempt.) Just ignore/close - // the latecomer. - connection.close(); - return; + // We already have a connection. It must have been the case that we + // started two parallel connection attempts (because we wanted to + // 'reconnect now' on a hanging connection and we had no way to cancel the + // connection attempt.) But this shouldn't happen (similarly to the client + // !== self.client check above). + throw new Error("Two parallel connections?"); } - if (self.connectionTimer) { - clearTimeout(self.connectionTimer); - self.connectionTimer = null; - } - - var onError = Meteor.bindEnvironment( - function (_this, error) { - if (self.currentConnection !== _this) - return; - - Meteor._debug("stream error", error.toString(), - (new Date()).toDateString()); - self._lostConnection(); - }, - "stream error callback" - ); - - connection.on('error', function (error) { - // We have to pass in `this` explicitly because bindEnvironment - // doesn't propagate it for us. - onError(this, error); - }); - - var onClose = Meteor.bindEnvironment( - function (_this) { - if (self.options._testOnClose) - self.options._testOnClose(); - - if (self.currentConnection !== _this) - return; - - self._lostConnection(); - }, - "stream close callback" - ); - - connection.on('close', function () { - // We have to pass in `this` explicitly because bindEnvironment - // doesn't propagate it for us. - onClose(this); - }); - - connection.on('message', function (message) { - if (self.currentConnection !== this) - return; // old connection still emitting messages - - if (message.type === "utf8") // ignore binary frames - _.each(self.eventCallbacks.message, function (callback) { - callback(message.utf8Data); - }); - }); + self._clearConnectionTimer(); // update status - self.currentConnection = connection; self.currentStatus.status = "connected"; self.currentStatus.connected = true; self.currentStatus.retryCount = 0; @@ -161,10 +91,10 @@ _.extend(LivedataTest.ClientStream.prototype, { var self = this; self._clearConnectionTimer(); - if (self.currentConnection) { - var conn = self.currentConnection; - self.currentConnection = null; - conn.close(); + if (self.client) { + var client = self.client; + self.client = null; + client.close(); } }, @@ -181,25 +111,63 @@ _.extend(LivedataTest.ClientStream.prototype, { var self = this; self._cleanup(); // cleanup the old socket, if there was one. - // launch a connect attempt. we have no way to track it. we either - // get an _onConnect event, or we don't. + // Since server-to-server DDP is still an experimental feature, we only + // require the module if we actually create a server-to-server + // connection. + var FayeWebSocket = Npm.require('faye-websocket'); - // XXX: set up a timeout on this. + // We would like to specify 'ddp' as the subprotocol here. The npm module we + // used to use as a client would fail the handshake if we ask for a + // subprotocol and the server doesn't send one back (and sockjs doesn't). + // Faye doesn't have that behavior; it's unclear from reading RFC 6455 if + // Faye is erroneous or not. So for now, we don't specify protocols. + var client = self.client = new FayeWebSocket.Client( + toWebsocketUrl(self.endpoint), + [/*no subprotocols*/], + {headers: self.headers} + ); - // we would like to specify 'ddp' as the protocol here, but - // unfortunately WebSocket-Node fails the handshake if we ask for - // a protocol and the server doesn't send one back (and sockjs - // doesn't). also, related: I guess we have to accept that - // 'stream' is ddp-specific - self.client.connect(toWebsocketUrl(self.endpoint), - undefined, // protocols - undefined, // origin - self.headers); - - if (self.connectionTimer) - clearTimeout(self.connectionTimer); - self.connectionTimer = setTimeout( + self._clearConnectionTimer(); + self.connectionTimer = Meteor.setTimeout( _.bind(self._lostConnection, self), self.CONNECT_TIMEOUT); + + self.client.on('open', Meteor.bindEnvironment(function () { + return self._onConnect(client); + }, "stream connect callback")); + + var clientOnIfCurrent = function (event, description, f) { + self.client.on(event, Meteor.bindEnvironment(function () { + // Ignore events from any connection we've already cleaned up. + if (client !== self.client) + return; + f.apply(this, arguments); + }, description)); + }; + + clientOnIfCurrent('error', 'stream error callback', function (error) { + if (!self.options._dontPrintErrors) + Meteor._debug("stream error", error.message); + + // XXX: Make this do something better than make the tests hang if it does + // not work. + self._lostConnection(); + }); + + + clientOnIfCurrent('close', 'stream close callback', function () { + self._lostConnection(); + }); + + + clientOnIfCurrent('message', 'stream message callback', function (message) { + // Ignore binary frames, where message.data is a Buffer + if (typeof message.data !== "string") + return; + + _.each(self.eventCallbacks.message, function (callback) { + callback(message.data); + }); + }); } }); diff --git a/packages/livedata/stream_client_tests.js b/packages/livedata/stream_client_tests.js index dbb675852d..d9402a2157 100644 --- a/packages/livedata/stream_client_tests.js +++ b/packages/livedata/stream_client_tests.js @@ -1,17 +1,24 @@ var Fiber = Npm.require('fibers'); -Tinytest.addAsync("stream client - callbacks run in a fiber", function (test, onComplete) { - stream = new LivedataTest.ClientStream( - Meteor.absoluteUrl(), - { - _testOnClose: function () { - test.isTrue(Fiber.current); - onComplete(); - } - } - ); - stream.on('reset', function () { - test.isTrue(Fiber.current); - stream.disconnect(); - }); -}); +testAsyncMulti("stream client - callbacks run in a fiber", [ + function (test, expect) { + var stream = new LivedataTest.ClientStream(Meteor.absoluteUrl()); + + var messageFired = false; + var resetFired = false; + + stream.on('message', expect(function () { + test.isTrue(Fiber.current); + if (resetFired) + stream.disconnect(); + messageFired = true; + })); + + stream.on('reset', expect(function () { + test.isTrue(Fiber.current); + if (messageFired) + stream.disconnect(); + resetFired = true; + })); + } +]); diff --git a/packages/retry/retry.js b/packages/retry/retry.js index a4407b5bdb..4a377bca4f 100644 --- a/packages/retry/retry.js +++ b/packages/retry/retry.js @@ -57,7 +57,7 @@ _.extend(Retry.prototype, { var timeout = self._timeout(count); if (self.retryTimer) clearTimeout(self.retryTimer); - self.retryTimer = setTimeout(fn, timeout); + self.retryTimer = Meteor.setTimeout(fn, timeout); return timeout; }