mirror of
https://github.com/meteor/meteor.git
synced 2026-05-02 03:01:46 -04:00
Use faye-websocket for server-to-server DDP
This matches what the SockJS server uses; now we only need to understand
and fix bugs in the implementation of one websocket npm module.
Some notes:
- I actually trust that it's possible to close a connection before it
successfully connects, which allows me to simplify the code a
lot (since there shouldn't be multiple connections active per
ClientStream). I put in some assertions to make sure this is the
case, though. (Note that this module also has a simpler model,
where there's a single object representing the client connection,
not a "client" object that spawns "connections".)
- We now print connect errors as well as post-connect errors. (This
required adding a flag to keep tests quiet since it makes an
expected-to-fail-to-connect connection.) We need a better approach
to stream error handling, though.
- We used to have a test to make sure that a certain not-user-visible
callback is called within a Fiber; structuring the code such that
this test is still possible would lead to the code being less
consistent and harder to read, so I dropped the test.
- Fix a few bugs where we weren't using Meteor.setTimeout.
This commit is contained in:
@@ -1,5 +1,9 @@
|
||||
## v.NEXT
|
||||
|
||||
* Use "faye-websocket" (0.7.2) npm module instead of "websocket" (1.0.8) for
|
||||
server-to-server DDP.
|
||||
|
||||
|
||||
## v0.7.1.2
|
||||
|
||||
* Fix bug in tool error handling that caused `meteor` to crash on Mac
|
||||
|
||||
@@ -16,8 +16,13 @@
|
||||
}
|
||||
}
|
||||
},
|
||||
"websocket": {
|
||||
"version": "1.0.8"
|
||||
"faye-websocket": {
|
||||
"version": "0.7.2",
|
||||
"dependencies": {
|
||||
"websocket-driver": {
|
||||
"version": "0.3.2"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -49,7 +49,10 @@ var Connection = function (url, options) {
|
||||
self._stream = new LivedataTest.ClientStream(url, {
|
||||
retry: options.retry,
|
||||
headers: options.headers,
|
||||
_sockjsOptions: options._sockjsOptions
|
||||
_sockjsOptions: options._sockjsOptions,
|
||||
// To keep some tests quiet (because we don't have a real API for handling
|
||||
// client-stream-level errors).
|
||||
_dontPrintErrors: options._dontPrintErrors
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -1160,7 +1160,7 @@ _.extend(Server.prototype, {
|
||||
// drop all future data coming over this connection on the
|
||||
// floor. We don't want to confuse things.
|
||||
socket.removeAllListeners('data');
|
||||
setTimeout(function () {
|
||||
Meteor.setTimeout(function () {
|
||||
socket.send(stringifyDDP({msg: 'failed', version: version}));
|
||||
socket.close();
|
||||
}, timeout);
|
||||
|
||||
@@ -683,9 +683,10 @@ if (Meteor.isServer) {
|
||||
testAsyncMulti("livedata - connect fails to unknown place", [
|
||||
function (test, expect) {
|
||||
var self = this;
|
||||
self.conn = DDP.connect("example.com");
|
||||
self.conn = DDP.connect("example.com", {_dontPrintErrors: true});
|
||||
Meteor.setTimeout(expect(function () {
|
||||
test.isFalse(self.conn.status().connected, "Not connected");
|
||||
self.conn.close();
|
||||
}), 500);
|
||||
}
|
||||
]);
|
||||
|
||||
@@ -3,7 +3,11 @@ Package.describe({
|
||||
internal: true
|
||||
});
|
||||
|
||||
Npm.depends({sockjs: "0.3.8", websocket: "1.0.8"});
|
||||
// We use 'faye-websocket' for connections in server-to-server DDP, mostly
|
||||
// because it's the same library used as a server in sockjs, and it's easiest to
|
||||
// deal with a single websocket implementation. (Plus, its maintainer is easy
|
||||
// to work with on pull requests.)
|
||||
Npm.depends({sockjs: "0.3.8", "faye-websocket": "0.7.2"});
|
||||
|
||||
Package.on_use(function (api) {
|
||||
api.use(['check', 'random', 'ejson', 'json', 'underscore', 'deps',
|
||||
|
||||
@@ -11,43 +11,16 @@
|
||||
// ping frames or with DDP-level messages.)
|
||||
LivedataTest.ClientStream = function (endpoint, options) {
|
||||
var self = this;
|
||||
options = options || {};
|
||||
|
||||
self.options = _.extend({
|
||||
retry: true
|
||||
}, options);
|
||||
|
||||
// WebSocket-Node https://github.com/Worlize/WebSocket-Node
|
||||
// Chosen because it can run without native components. It has a
|
||||
// somewhat idiosyncratic API. We may want to use 'ws' instead in the
|
||||
// future.
|
||||
//
|
||||
// Since server-to-server DDP is still an experimental feature, we only
|
||||
// require the module if we actually create a server-to-server
|
||||
// connection. This is a minor efficiency improvement, but moreover: while
|
||||
// 'websocket' doesn't require native components, it tries to use some
|
||||
// optional native components and prints a warning if it can't load
|
||||
// them. Since native components in packages don't work when transferred to
|
||||
// other architectures yet, this means that require('websocket') prints a
|
||||
// spammy log message when deployed to another architecture. Delaying the
|
||||
// require means you only get the log message if you're actually using the
|
||||
// feature.
|
||||
self.client = new (Npm.require('websocket').client)();
|
||||
self.client = null; // created in _launchConnection
|
||||
self.endpoint = endpoint;
|
||||
self.currentConnection = null;
|
||||
|
||||
options = options || {};
|
||||
self.headers = options.headers || {};
|
||||
|
||||
self.client.on('connect', Meteor.bindEnvironment(
|
||||
function (connection) {
|
||||
return self._onConnect(connection);
|
||||
},
|
||||
"stream connect callback"
|
||||
));
|
||||
|
||||
self.client.on('connectFailed', function (error) {
|
||||
// XXX: Make this do something better than make the tests hang if it does not work.
|
||||
return self._lostConnection();
|
||||
});
|
||||
self.headers = self.options.headers || {};
|
||||
|
||||
self._initCommon();
|
||||
|
||||
@@ -63,7 +36,7 @@ _.extend(LivedataTest.ClientStream.prototype, {
|
||||
send: function (data) {
|
||||
var self = this;
|
||||
if (self.currentStatus.connected) {
|
||||
self.currentConnection.send(data);
|
||||
self.client.send(data);
|
||||
}
|
||||
},
|
||||
|
||||
@@ -73,80 +46,37 @@ _.extend(LivedataTest.ClientStream.prototype, {
|
||||
self.endpoint = url;
|
||||
},
|
||||
|
||||
_onConnect: function (connection) {
|
||||
_onConnect: function (client) {
|
||||
var self = this;
|
||||
|
||||
if (client !== self.client) {
|
||||
// This connection is not from the last call to _launchConnection.
|
||||
// But _launchConnection calls _cleanup which closes previous connections.
|
||||
// It's our belief that this stifles future 'open' events, but maybe
|
||||
// we are wrong?
|
||||
throw new Error("Got open from inactive client");
|
||||
}
|
||||
|
||||
if (self._forcedToDisconnect) {
|
||||
// We were asked to disconnect between trying to open the connection and
|
||||
// actually opening it. Let's just pretend this never happened.
|
||||
connection.close();
|
||||
self.client.close();
|
||||
self.client = null;
|
||||
return;
|
||||
}
|
||||
|
||||
if (self.currentStatus.connected) {
|
||||
// We already have a connection. It must have been the case that
|
||||
// we started two parallel connection attempts (because we
|
||||
// wanted to 'reconnect now' on a hanging connection and we had
|
||||
// no way to cancel the connection attempt.) Just ignore/close
|
||||
// the latecomer.
|
||||
connection.close();
|
||||
return;
|
||||
// We already have a connection. It must have been the case that we
|
||||
// started two parallel connection attempts (because we wanted to
|
||||
// 'reconnect now' on a hanging connection and we had no way to cancel the
|
||||
// connection attempt.) But this shouldn't happen (similarly to the client
|
||||
// !== self.client check above).
|
||||
throw new Error("Two parallel connections?");
|
||||
}
|
||||
|
||||
if (self.connectionTimer) {
|
||||
clearTimeout(self.connectionTimer);
|
||||
self.connectionTimer = null;
|
||||
}
|
||||
|
||||
var onError = Meteor.bindEnvironment(
|
||||
function (_this, error) {
|
||||
if (self.currentConnection !== _this)
|
||||
return;
|
||||
|
||||
Meteor._debug("stream error", error.toString(),
|
||||
(new Date()).toDateString());
|
||||
self._lostConnection();
|
||||
},
|
||||
"stream error callback"
|
||||
);
|
||||
|
||||
connection.on('error', function (error) {
|
||||
// We have to pass in `this` explicitly because bindEnvironment
|
||||
// doesn't propagate it for us.
|
||||
onError(this, error);
|
||||
});
|
||||
|
||||
var onClose = Meteor.bindEnvironment(
|
||||
function (_this) {
|
||||
if (self.options._testOnClose)
|
||||
self.options._testOnClose();
|
||||
|
||||
if (self.currentConnection !== _this)
|
||||
return;
|
||||
|
||||
self._lostConnection();
|
||||
},
|
||||
"stream close callback"
|
||||
);
|
||||
|
||||
connection.on('close', function () {
|
||||
// We have to pass in `this` explicitly because bindEnvironment
|
||||
// doesn't propagate it for us.
|
||||
onClose(this);
|
||||
});
|
||||
|
||||
connection.on('message', function (message) {
|
||||
if (self.currentConnection !== this)
|
||||
return; // old connection still emitting messages
|
||||
|
||||
if (message.type === "utf8") // ignore binary frames
|
||||
_.each(self.eventCallbacks.message, function (callback) {
|
||||
callback(message.utf8Data);
|
||||
});
|
||||
});
|
||||
self._clearConnectionTimer();
|
||||
|
||||
// update status
|
||||
self.currentConnection = connection;
|
||||
self.currentStatus.status = "connected";
|
||||
self.currentStatus.connected = true;
|
||||
self.currentStatus.retryCount = 0;
|
||||
@@ -161,10 +91,10 @@ _.extend(LivedataTest.ClientStream.prototype, {
|
||||
var self = this;
|
||||
|
||||
self._clearConnectionTimer();
|
||||
if (self.currentConnection) {
|
||||
var conn = self.currentConnection;
|
||||
self.currentConnection = null;
|
||||
conn.close();
|
||||
if (self.client) {
|
||||
var client = self.client;
|
||||
self.client = null;
|
||||
client.close();
|
||||
}
|
||||
},
|
||||
|
||||
@@ -181,25 +111,63 @@ _.extend(LivedataTest.ClientStream.prototype, {
|
||||
var self = this;
|
||||
self._cleanup(); // cleanup the old socket, if there was one.
|
||||
|
||||
// launch a connect attempt. we have no way to track it. we either
|
||||
// get an _onConnect event, or we don't.
|
||||
// Since server-to-server DDP is still an experimental feature, we only
|
||||
// require the module if we actually create a server-to-server
|
||||
// connection.
|
||||
var FayeWebSocket = Npm.require('faye-websocket');
|
||||
|
||||
// XXX: set up a timeout on this.
|
||||
// We would like to specify 'ddp' as the subprotocol here. The npm module we
|
||||
// used to use as a client would fail the handshake if we ask for a
|
||||
// subprotocol and the server doesn't send one back (and sockjs doesn't).
|
||||
// Faye doesn't have that behavior; it's unclear from reading RFC 6455 if
|
||||
// Faye is erroneous or not. So for now, we don't specify protocols.
|
||||
var client = self.client = new FayeWebSocket.Client(
|
||||
toWebsocketUrl(self.endpoint),
|
||||
[/*no subprotocols*/],
|
||||
{headers: self.headers}
|
||||
);
|
||||
|
||||
// we would like to specify 'ddp' as the protocol here, but
|
||||
// unfortunately WebSocket-Node fails the handshake if we ask for
|
||||
// a protocol and the server doesn't send one back (and sockjs
|
||||
// doesn't). also, related: I guess we have to accept that
|
||||
// 'stream' is ddp-specific
|
||||
self.client.connect(toWebsocketUrl(self.endpoint),
|
||||
undefined, // protocols
|
||||
undefined, // origin
|
||||
self.headers);
|
||||
|
||||
if (self.connectionTimer)
|
||||
clearTimeout(self.connectionTimer);
|
||||
self.connectionTimer = setTimeout(
|
||||
self._clearConnectionTimer();
|
||||
self.connectionTimer = Meteor.setTimeout(
|
||||
_.bind(self._lostConnection, self),
|
||||
self.CONNECT_TIMEOUT);
|
||||
|
||||
self.client.on('open', Meteor.bindEnvironment(function () {
|
||||
return self._onConnect(client);
|
||||
}, "stream connect callback"));
|
||||
|
||||
var clientOnIfCurrent = function (event, description, f) {
|
||||
self.client.on(event, Meteor.bindEnvironment(function () {
|
||||
// Ignore events from any connection we've already cleaned up.
|
||||
if (client !== self.client)
|
||||
return;
|
||||
f.apply(this, arguments);
|
||||
}, description));
|
||||
};
|
||||
|
||||
clientOnIfCurrent('error', 'stream error callback', function (error) {
|
||||
if (!self.options._dontPrintErrors)
|
||||
Meteor._debug("stream error", error.message);
|
||||
|
||||
// XXX: Make this do something better than make the tests hang if it does
|
||||
// not work.
|
||||
self._lostConnection();
|
||||
});
|
||||
|
||||
|
||||
clientOnIfCurrent('close', 'stream close callback', function () {
|
||||
self._lostConnection();
|
||||
});
|
||||
|
||||
|
||||
clientOnIfCurrent('message', 'stream message callback', function (message) {
|
||||
// Ignore binary frames, where message.data is a Buffer
|
||||
if (typeof message.data !== "string")
|
||||
return;
|
||||
|
||||
_.each(self.eventCallbacks.message, function (callback) {
|
||||
callback(message.data);
|
||||
});
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
@@ -1,17 +1,24 @@
|
||||
var Fiber = Npm.require('fibers');
|
||||
|
||||
Tinytest.addAsync("stream client - callbacks run in a fiber", function (test, onComplete) {
|
||||
stream = new LivedataTest.ClientStream(
|
||||
Meteor.absoluteUrl(),
|
||||
{
|
||||
_testOnClose: function () {
|
||||
test.isTrue(Fiber.current);
|
||||
onComplete();
|
||||
}
|
||||
}
|
||||
);
|
||||
stream.on('reset', function () {
|
||||
test.isTrue(Fiber.current);
|
||||
stream.disconnect();
|
||||
});
|
||||
});
|
||||
testAsyncMulti("stream client - callbacks run in a fiber", [
|
||||
function (test, expect) {
|
||||
var stream = new LivedataTest.ClientStream(Meteor.absoluteUrl());
|
||||
|
||||
var messageFired = false;
|
||||
var resetFired = false;
|
||||
|
||||
stream.on('message', expect(function () {
|
||||
test.isTrue(Fiber.current);
|
||||
if (resetFired)
|
||||
stream.disconnect();
|
||||
messageFired = true;
|
||||
}));
|
||||
|
||||
stream.on('reset', expect(function () {
|
||||
test.isTrue(Fiber.current);
|
||||
if (messageFired)
|
||||
stream.disconnect();
|
||||
resetFired = true;
|
||||
}));
|
||||
}
|
||||
]);
|
||||
|
||||
@@ -57,7 +57,7 @@ _.extend(Retry.prototype, {
|
||||
var timeout = self._timeout(count);
|
||||
if (self.retryTimer)
|
||||
clearTimeout(self.retryTimer);
|
||||
self.retryTimer = setTimeout(fn, timeout);
|
||||
self.retryTimer = Meteor.setTimeout(fn, timeout);
|
||||
return timeout;
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user