Use faye-websocket for server-to-server DDP

This matches what the SockJS server uses; now we only need to understand
and fix bugs in the implementation of one websocket npm module.

Some notes:

  - I actually trust that it's possible to close a connection before it
    successfully connects, which allows me to simplify the code a
    lot (since there shouldn't be multiple connections active per
    ClientStream). I put in some assertions to make sure this is the
    case, though.  (Note that this module also has a simpler model,
    where there's a single object representing the client connection,
    not a "client" object that spawns "connections".)

  - We now print connect errors as well as post-connect errors.  (This
    required adding a flag to keep tests quiet since it makes an
    expected-to-fail-to-connect connection.)  We need a better approach
    to stream error handling, though.

  - We used to have a test to make sure that a certain not-user-visible
    callback is called within a Fiber; structuring the code such that
    this test is still possible would lead to the code being less
    consistent and harder to read, so I dropped the test.

  - Fix a few bugs where we weren't using Meteor.setTimeout.
This commit is contained in:
David Glasser
2014-02-24 20:22:49 -08:00
parent 9c1dc8782c
commit d049bf7506
9 changed files with 127 additions and 135 deletions

View File

@@ -1,5 +1,9 @@
## v.NEXT
* Use "faye-websocket" (0.7.2) npm module instead of "websocket" (1.0.8) for
server-to-server DDP.
## v0.7.1.2
* Fix bug in tool error handling that caused `meteor` to crash on Mac

View File

@@ -16,8 +16,13 @@
}
}
},
"websocket": {
"version": "1.0.8"
"faye-websocket": {
"version": "0.7.2",
"dependencies": {
"websocket-driver": {
"version": "0.3.2"
}
}
}
}
}

View File

@@ -49,7 +49,10 @@ var Connection = function (url, options) {
self._stream = new LivedataTest.ClientStream(url, {
retry: options.retry,
headers: options.headers,
_sockjsOptions: options._sockjsOptions
_sockjsOptions: options._sockjsOptions,
// To keep some tests quiet (because we don't have a real API for handling
// client-stream-level errors).
_dontPrintErrors: options._dontPrintErrors
});
}

View File

@@ -1160,7 +1160,7 @@ _.extend(Server.prototype, {
// drop all future data coming over this connection on the
// floor. We don't want to confuse things.
socket.removeAllListeners('data');
setTimeout(function () {
Meteor.setTimeout(function () {
socket.send(stringifyDDP({msg: 'failed', version: version}));
socket.close();
}, timeout);

View File

@@ -683,9 +683,10 @@ if (Meteor.isServer) {
testAsyncMulti("livedata - connect fails to unknown place", [
function (test, expect) {
var self = this;
self.conn = DDP.connect("example.com");
self.conn = DDP.connect("example.com", {_dontPrintErrors: true});
Meteor.setTimeout(expect(function () {
test.isFalse(self.conn.status().connected, "Not connected");
self.conn.close();
}), 500);
}
]);

View File

@@ -3,7 +3,11 @@ Package.describe({
internal: true
});
Npm.depends({sockjs: "0.3.8", websocket: "1.0.8"});
// We use 'faye-websocket' for connections in server-to-server DDP, mostly
// because it's the same library used as a server in sockjs, and it's easiest to
// deal with a single websocket implementation. (Plus, its maintainer is easy
// to work with on pull requests.)
Npm.depends({sockjs: "0.3.8", "faye-websocket": "0.7.2"});
Package.on_use(function (api) {
api.use(['check', 'random', 'ejson', 'json', 'underscore', 'deps',

View File

@@ -11,43 +11,16 @@
// ping frames or with DDP-level messages.)
LivedataTest.ClientStream = function (endpoint, options) {
var self = this;
options = options || {};
self.options = _.extend({
retry: true
}, options);
// WebSocket-Node https://github.com/Worlize/WebSocket-Node
// Chosen because it can run without native components. It has a
// somewhat idiosyncratic API. We may want to use 'ws' instead in the
// future.
//
// Since server-to-server DDP is still an experimental feature, we only
// require the module if we actually create a server-to-server
// connection. This is a minor efficiency improvement, but moreover: while
// 'websocket' doesn't require native components, it tries to use some
// optional native components and prints a warning if it can't load
// them. Since native components in packages don't work when transferred to
// other architectures yet, this means that require('websocket') prints a
// spammy log message when deployed to another architecture. Delaying the
// require means you only get the log message if you're actually using the
// feature.
self.client = new (Npm.require('websocket').client)();
self.client = null; // created in _launchConnection
self.endpoint = endpoint;
self.currentConnection = null;
options = options || {};
self.headers = options.headers || {};
self.client.on('connect', Meteor.bindEnvironment(
function (connection) {
return self._onConnect(connection);
},
"stream connect callback"
));
self.client.on('connectFailed', function (error) {
// XXX: Make this do something better than make the tests hang if it does not work.
return self._lostConnection();
});
self.headers = self.options.headers || {};
self._initCommon();
@@ -63,7 +36,7 @@ _.extend(LivedataTest.ClientStream.prototype, {
send: function (data) {
var self = this;
if (self.currentStatus.connected) {
self.currentConnection.send(data);
self.client.send(data);
}
},
@@ -73,80 +46,37 @@ _.extend(LivedataTest.ClientStream.prototype, {
self.endpoint = url;
},
_onConnect: function (connection) {
_onConnect: function (client) {
var self = this;
if (client !== self.client) {
// This connection is not from the last call to _launchConnection.
// But _launchConnection calls _cleanup which closes previous connections.
// It's our belief that this stifles future 'open' events, but maybe
// we are wrong?
throw new Error("Got open from inactive client");
}
if (self._forcedToDisconnect) {
// We were asked to disconnect between trying to open the connection and
// actually opening it. Let's just pretend this never happened.
connection.close();
self.client.close();
self.client = null;
return;
}
if (self.currentStatus.connected) {
// We already have a connection. It must have been the case that
// we started two parallel connection attempts (because we
// wanted to 'reconnect now' on a hanging connection and we had
// no way to cancel the connection attempt.) Just ignore/close
// the latecomer.
connection.close();
return;
// We already have a connection. It must have been the case that we
// started two parallel connection attempts (because we wanted to
// 'reconnect now' on a hanging connection and we had no way to cancel the
// connection attempt.) But this shouldn't happen (similarly to the client
// !== self.client check above).
throw new Error("Two parallel connections?");
}
if (self.connectionTimer) {
clearTimeout(self.connectionTimer);
self.connectionTimer = null;
}
var onError = Meteor.bindEnvironment(
function (_this, error) {
if (self.currentConnection !== _this)
return;
Meteor._debug("stream error", error.toString(),
(new Date()).toDateString());
self._lostConnection();
},
"stream error callback"
);
connection.on('error', function (error) {
// We have to pass in `this` explicitly because bindEnvironment
// doesn't propagate it for us.
onError(this, error);
});
var onClose = Meteor.bindEnvironment(
function (_this) {
if (self.options._testOnClose)
self.options._testOnClose();
if (self.currentConnection !== _this)
return;
self._lostConnection();
},
"stream close callback"
);
connection.on('close', function () {
// We have to pass in `this` explicitly because bindEnvironment
// doesn't propagate it for us.
onClose(this);
});
connection.on('message', function (message) {
if (self.currentConnection !== this)
return; // old connection still emitting messages
if (message.type === "utf8") // ignore binary frames
_.each(self.eventCallbacks.message, function (callback) {
callback(message.utf8Data);
});
});
self._clearConnectionTimer();
// update status
self.currentConnection = connection;
self.currentStatus.status = "connected";
self.currentStatus.connected = true;
self.currentStatus.retryCount = 0;
@@ -161,10 +91,10 @@ _.extend(LivedataTest.ClientStream.prototype, {
var self = this;
self._clearConnectionTimer();
if (self.currentConnection) {
var conn = self.currentConnection;
self.currentConnection = null;
conn.close();
if (self.client) {
var client = self.client;
self.client = null;
client.close();
}
},
@@ -181,25 +111,63 @@ _.extend(LivedataTest.ClientStream.prototype, {
var self = this;
self._cleanup(); // cleanup the old socket, if there was one.
// launch a connect attempt. we have no way to track it. we either
// get an _onConnect event, or we don't.
// Since server-to-server DDP is still an experimental feature, we only
// require the module if we actually create a server-to-server
// connection.
var FayeWebSocket = Npm.require('faye-websocket');
// XXX: set up a timeout on this.
// We would like to specify 'ddp' as the subprotocol here. The npm module we
// used to use as a client would fail the handshake if we ask for a
// subprotocol and the server doesn't send one back (and sockjs doesn't).
// Faye doesn't have that behavior; it's unclear from reading RFC 6455 if
// Faye is erroneous or not. So for now, we don't specify protocols.
var client = self.client = new FayeWebSocket.Client(
toWebsocketUrl(self.endpoint),
[/*no subprotocols*/],
{headers: self.headers}
);
// we would like to specify 'ddp' as the protocol here, but
// unfortunately WebSocket-Node fails the handshake if we ask for
// a protocol and the server doesn't send one back (and sockjs
// doesn't). also, related: I guess we have to accept that
// 'stream' is ddp-specific
self.client.connect(toWebsocketUrl(self.endpoint),
undefined, // protocols
undefined, // origin
self.headers);
if (self.connectionTimer)
clearTimeout(self.connectionTimer);
self.connectionTimer = setTimeout(
self._clearConnectionTimer();
self.connectionTimer = Meteor.setTimeout(
_.bind(self._lostConnection, self),
self.CONNECT_TIMEOUT);
self.client.on('open', Meteor.bindEnvironment(function () {
return self._onConnect(client);
}, "stream connect callback"));
var clientOnIfCurrent = function (event, description, f) {
self.client.on(event, Meteor.bindEnvironment(function () {
// Ignore events from any connection we've already cleaned up.
if (client !== self.client)
return;
f.apply(this, arguments);
}, description));
};
clientOnIfCurrent('error', 'stream error callback', function (error) {
if (!self.options._dontPrintErrors)
Meteor._debug("stream error", error.message);
// XXX: Make this do something better than make the tests hang if it does
// not work.
self._lostConnection();
});
clientOnIfCurrent('close', 'stream close callback', function () {
self._lostConnection();
});
clientOnIfCurrent('message', 'stream message callback', function (message) {
// Ignore binary frames, where message.data is a Buffer
if (typeof message.data !== "string")
return;
_.each(self.eventCallbacks.message, function (callback) {
callback(message.data);
});
});
}
});

View File

@@ -1,17 +1,24 @@
var Fiber = Npm.require('fibers');
Tinytest.addAsync("stream client - callbacks run in a fiber", function (test, onComplete) {
stream = new LivedataTest.ClientStream(
Meteor.absoluteUrl(),
{
_testOnClose: function () {
test.isTrue(Fiber.current);
onComplete();
}
}
);
stream.on('reset', function () {
test.isTrue(Fiber.current);
stream.disconnect();
});
});
testAsyncMulti("stream client - callbacks run in a fiber", [
function (test, expect) {
var stream = new LivedataTest.ClientStream(Meteor.absoluteUrl());
var messageFired = false;
var resetFired = false;
stream.on('message', expect(function () {
test.isTrue(Fiber.current);
if (resetFired)
stream.disconnect();
messageFired = true;
}));
stream.on('reset', expect(function () {
test.isTrue(Fiber.current);
if (messageFired)
stream.disconnect();
resetFired = true;
}));
}
]);

View File

@@ -57,7 +57,7 @@ _.extend(Retry.prototype, {
var timeout = self._timeout(count);
if (self.retryTimer)
clearTimeout(self.retryTimer);
self.retryTimer = setTimeout(fn, timeout);
self.retryTimer = Meteor.setTimeout(fn, timeout);
return timeout;
}