From a98c6d030a732739d9beecef474e4c90d495a2f2 Mon Sep 17 00:00:00 2001 From: David Glasser Date: Tue, 7 Oct 2014 15:43:35 -0700 Subject: [PATCH] server DDP: Use low-level websocket-driver module We want to support running DDP through a corporate proxy, but the higher-level faye-websocket can't support that and won't be changed to allow that: https://github.com/faye/faye-websocket-node/pull/30 Fair enough. Let's just switch to the lower-level module, since we don't care about getting a browser-compatible websocket API. This is a first step towards fixing #2515. --- packages/ddp/.npm/package/npm-shrinkwrap.json | 14 ++-- packages/ddp/package.js | 17 ++--- packages/ddp/stream_client_nodejs.js | 67 ++++++++++++++----- 3 files changed, 63 insertions(+), 35 deletions(-) diff --git a/packages/ddp/.npm/package/npm-shrinkwrap.json b/packages/ddp/.npm/package/npm-shrinkwrap.json index 3c44970ac2..650acac22d 100644 --- a/packages/ddp/.npm/package/npm-shrinkwrap.json +++ b/packages/ddp/.npm/package/npm-shrinkwrap.json @@ -1,20 +1,18 @@ { "dependencies": { - "faye-websocket": { - "version": "https://github.com/meteor/faye-websocket-node/tarball/ccc180998b1396093c24d0df7ebc1d199c276552", - "dependencies": { - "websocket-driver": { - "version": "0.3.4" - } - } - }, "sockjs": { "version": "0.3.9", "dependencies": { "node-uuid": { "version": "1.3.3" + }, + "faye-websocket": { + "version": "0.7.2" } } + }, + "websocket-driver": { + "version": "0.3.6" } } } diff --git a/packages/ddp/package.js b/packages/ddp/package.js index 720544a08d..89a098b44c 100644 --- a/packages/ddp/package.js +++ b/packages/ddp/package.js @@ -3,19 +3,16 @@ Package.describe({ version: '1.0.10-rc.0' }); -// We use 'faye-websocket' for connections in server-to-server DDP, mostly -// because it's the same library used as a server in sockjs, and it's easiest to -// deal with a single websocket implementation. (Plus, its maintainer is easy -// to work with on pull requests.) +// We use Faye's 'websocket-driver' for connections in server-to-server DDP, +// mostly because it's the same library used as a server in sockjs, and it's +// easiest to deal with a single websocket implementation. (Plus, its +// maintainer is easy to work with on pull requests.) // -// (By listing faye-websocket first, it's more likely that npm deduplication -// will prevent a second copy of faye-websocket from being installed inside +// (By listing websocket-driver first, it's more likely that npm deduplication +// will prevent a second copy of websocket-driver from being installed inside // sockjs.) Npm.depends({ - // A fork fixing https://github.com/faye/websocket-driver-node/pull/8 (ie - // "open from inactive client" errors). Note that sockjs won't use this fork, - // but the bug only affects the websocket client, not the server. - "faye-websocket": "https://github.com/meteor/faye-websocket-node/tarball/ccc180998b1396093c24d0df7ebc1d199c276552", + "websocket-driver": "0.3.6", sockjs: "0.3.9" }); diff --git a/packages/ddp/stream_client_nodejs.js b/packages/ddp/stream_client_nodejs.js index 6ee13a1a23..09df55c540 100644 --- a/packages/ddp/stream_client_nodejs.js +++ b/packages/ddp/stream_client_nodejs.js @@ -36,7 +36,7 @@ _.extend(LivedataTest.ClientStream.prototype, { send: function (data) { var self = this; if (self.currentStatus.connected) { - self.client.send(data); + self.client.messages.write(data); } }, @@ -118,18 +118,33 @@ _.extend(LivedataTest.ClientStream.prototype, { // Since server-to-server DDP is still an experimental feature, we only // require the module if we actually create a server-to-server // connection. - var FayeWebSocket = Npm.require('faye-websocket'); + var websocketDriver = Npm.require('websocket-driver'); // We would like to specify 'ddp' as the subprotocol here. The npm module we // used to use as a client would fail the handshake if we ask for a // subprotocol and the server doesn't send one back (and sockjs doesn't). // Faye doesn't have that behavior; it's unclear from reading RFC 6455 if // Faye is erroneous or not. So for now, we don't specify protocols. - var client = self.client = new FayeWebSocket.Client( - toWebsocketUrl(self.endpoint), - [/*no subprotocols*/], - {headers: self.headers} - ); + var wsUrl = toWebsocketUrl(self.endpoint); + var client = self.client = websocketDriver.client(wsUrl); + + var parsedUrl = Npm.require('url').parse(wsUrl); + var stream; + var onConnect = function () { + client.start(); + }; + if (parsedUrl.protocol === 'wss:') { + stream = Npm.require('tls').connect( + parsedUrl.port || 443, parsedUrl.hostname, onConnect); + } else { + stream = Npm.require('net').createConnection( + parsedUrl.port || 80, parsedUrl.hostname); + stream.on('connect', onConnect); + } + + _.each(self.headers, function (header, name) { + client.setHeader(name, header); + }); self._clearConnectionTimer(); self.connectionTimer = Meteor.setTimeout( @@ -152,29 +167,47 @@ _.extend(LivedataTest.ClientStream.prototype, { }, description)); }; - clientOnIfCurrent('error', 'stream error callback', function (error) { + var finalize = Meteor.bindEnvironment(function () { + stream.end(); + if (client === self.client) { + self._lostConnection(); + } + }, "finalizing stream"); + + stream.on('end', finalize); + stream.on('close', finalize); + client.on('close', finalize); + + var onError = function (message) { if (!self.options._dontPrintErrors) - Meteor._debug("stream error", error.message); + Meteor._debug("driver error", message); // Faye's 'error' object is not a JS error (and among other things, // doesn't stringify well). Convert it to one. - self._lostConnection(new DDP.ConnectionError(error.message)); - }); - - - clientOnIfCurrent('close', 'stream close callback', function () { - self._lostConnection(); + self._lostConnection(new DDP.ConnectionError(message)); + }; + + clientOnIfCurrent('error', 'driver error callback', function (error) { + onError(error.message); }); + stream.on('error', Meteor.bindEnvironment(function (error) { + if (client === self.client) { + onError('Network error: ' + wsUrl + ': ' + error.message); + } + stream.end(); + })); clientOnIfCurrent('message', 'stream message callback', function (message) { - // Ignore binary frames, where message.data is a Buffer + // Ignore binary frames, where data is a Buffer if (typeof message.data !== "string") return; - _.each(self.eventCallbacks.message, function (callback) { callback(message.data); }); }); + + stream.pipe(self.client.io); + self.client.io.pipe(stream); } });