Run nodejs stream client callbacks in a fiber.

Poll instead of using Deps.autorun in server test.

When polling the client connection, tests don't have a chance to
disconnect before the stream client automatically reconnects, so add
an option to disable retries for testing.

Callers of `Meteor.bindEnvironment` often have the `onException`
argument print the exception stack trace.  To allow for less code
duplication, let the argument be a string providing the context
(e.g. "connection closed callback"), and then on an exception print
the context and the exception stack trace.
This commit is contained in:
Andrew Wilcox
2013-11-19 15:40:58 -05:00
committed by Nick Martin
parent e7ef2a2406
commit ccaeef516f
10 changed files with 137 additions and 63 deletions

View File

@@ -6,6 +6,7 @@ Package.on_use(function (api) {
api.use('webapp', 'server');
api.use('deps', 'client');
api.use(['livedata', 'mongo-livedata'], ['client', 'server']);
api.use('deps', 'client');
api.use('reload', 'client', {weak: true});
api.export('Autoupdate');

View File

@@ -18,7 +18,8 @@ var Connection = function (url, options) {
},
// These options are only for testing.
reloadWithOutstanding: false,
supportedDDPVersions: SUPPORTED_DDP_VERSIONS
supportedDDPVersions: SUPPORTED_DDP_VERSIONS,
retry: true
}, options);
// If set, called when we reconnect, queuing method calls _before_ the
@@ -30,7 +31,9 @@ var Connection = function (url, options) {
if (typeof url === "object") {
self._stream = url;
} else {
self._stream = new LivedataTest.ClientStream(url);
self._stream = new LivedataTest.ClientStream(url, {
retry: options.retry
});
}
self._lastSessionId = null;

View File

@@ -1,3 +1,5 @@
var Fiber = Npm.require('fibers');
Tinytest.addAsync(
"livedata server - sessionHandle.onClose()",
function (test, onComplete) {
@@ -16,45 +18,43 @@ Tinytest.addAsync(
}
);
Tinytest.addAsync(
"livedata server - sessionHandle.close()",
function (test, onComplete) {
// like pollUntil but doesn't have to be called from testAsyncMulti.
var poll = function (test, onComplete, fn) {
var timeout = 10000;
var step = 200;
var start = (new Date()).valueOf();
var helper = function () {
if (fn()) {
test.ok();
onComplete();
return;
}
if (start + timeout < (new Date()).valueOf()) {
test.fail();
onComplete();
return;
}
Meteor.setTimeout(helper, step);
};
helper();
};
Tinytest.addAsync("livedata server - sessionHandle.close()", function (test, onComplete) {
var connection;
var callbackHandle = Meteor.server.onConnection(function (sessionHandle) {
callbackHandle.stop();
// XXX stream_client_nodejs.js should not be requiring a developer
// to use Meteor.bindEnvironment themselves when using Meteor's
// public API. The problem is that the computation rerunning is
// triggered by the close event firing on the stream's connection
// object, and that callback in stream_client_nodejs.js is not
// wrapped in a Meteor.bindEnvironment for us.
done = Meteor.bindEnvironment(
function () {
Meteor.defer(onComplete);
},
function (err) {
Meteor._debug("Exception thrown from Meteor.defer", err && err.stack);
}
);
var connection;
var callbackHandle = Meteor.server.onConnection(function (sessionHandle) {
callbackHandle.stop();
// Wait for connection to be closed on the client side.
Deps.autorun(function (computation) {
if (computation.firstRun)
test.isTrue(connection.status().connected);
if (! connection.status().connected) {
computation.stop();
// Avoid reconnecting from the client.
connection.disconnect();
done();
}
});
// Close the connection from the server.
sessionHandle.close();
poll(test, onComplete, function () {
return ! connection.status().connected;
});
connection = DDP.connect(Meteor.absoluteUrl());
}
);
// Close the connection from the server.
sessionHandle.close();
});
connection = DDP.connect(Meteor.absoluteUrl(), {retry: false});
});
var innerCalled = null;

View File

@@ -73,5 +73,6 @@ Package.on_test(function (api) {
api.use('http', 'client');
api.add_files(['stream_tests.js'], 'client');
api.add_files('stream_client_tests.js', 'server');
api.use('check', ['client', 'server']);
});

View File

@@ -195,10 +195,13 @@ _.extend(LivedataTest.ClientStream.prototype, {
_retryLater: function () {
var self = this;
var timeout = self._retry.retryLater(
self.currentStatus.retryCount,
_.bind(self._retryNow, self)
);
var timeout = 0;
if (self.options.retry) {
timeout = self._retry.retryLater(
self.currentStatus.retryCount,
_.bind(self._retryNow, self)
);
}
self.currentStatus.status = "waiting";
self.currentStatus.connected = false;

View File

@@ -9,8 +9,11 @@
// We don't do any heartbeating. (The logic that did this in sockjs was removed,
// because it used a built-in sockjs mechanism. We could do it with WebSocket
// ping frames or with DDP-level messages.)
LivedataTest.ClientStream = function (endpoint) {
LivedataTest.ClientStream = function (endpoint, options) {
var self = this;
self.options = _.extend({
retry: true
}, options);
// WebSocket-Node https://github.com/Worlize/WebSocket-Node
// Chosen because it can run without native components. It has a
@@ -31,9 +34,12 @@ LivedataTest.ClientStream = function (endpoint) {
self.endpoint = endpoint;
self.currentConnection = null;
self.client.on('connect', function (connection) {
return self._onConnect(connection);
});
self.client.on('connect', Meteor.bindEnvironment(
function (connection) {
return self._onConnect(connection);
},
"stream connect callback"
));
self.client.on('connectFailed', function (error) {
// XXX: Make this do something better than make the tests hang if it does not work.
@@ -89,20 +95,41 @@ _.extend(LivedataTest.ClientStream.prototype, {
self.connectionTimer = null;
}
connection.on('error', function (error) {
if (self.currentConnection !== this)
return;
var onError = Meteor.bindEnvironment(
function (_this) {
if (self.currentConnection !== _this)
return;
Meteor._debug("stream error", error.toString(),
(new Date()).toDateString());
self._lostConnection();
Meteor._debug("stream error", error.toString(),
(new Date()).toDateString());
self._lostConnection();
},
"stream error callback"
);
connection.on('error', function (error) {
// We have to pass in `this` explicitly because bindEnvironment
// doesn't propagate it for us.
onError(this);
});
connection.on('close', function () {
if (self.currentConnection !== this)
return;
var onClose = Meteor.bindEnvironment(
function (_this) {
if (self.options._testOnClose)
self.options._testOnClose();
self._lostConnection();
if (self.currentConnection !== _this)
return;
self._lostConnection();
},
"stream close callback"
);
connection.on('close', function () {
// We have to pass in `this` explicitly because bindEnvironment
// doesn't propagate it for us.
onClose(this);
});
connection.on('message', function (message) {

View File

@@ -1,8 +1,11 @@
// @param url {String} URL to Meteor app
// "http://subdomain.meteor.com/" or "/" or
// "ddp+sockjs://foo-**.meteor.com/sockjs"
LivedataTest.ClientStream = function (url) {
LivedataTest.ClientStream = function (url, options) {
var self = this;
self.options = _.extend({
retry: true
}, options);
self._initCommon();
//// Constants

View File

@@ -0,0 +1,17 @@
var Fiber = Npm.require('fibers');
Tinytest.addAsync("stream client - callbacks run in a fiber", function (test, onComplete) {
stream = new LivedataTest.ClientStream(
Meteor.absoluteUrl(),
{
_testOnClose: function () {
test.isTrue(Fiber.current);
onComplete();
}
}
);
stream.on('reset', function () {
test.isTrue(Fiber.current);
stream.disconnect();
});
});

View File

@@ -30,8 +30,15 @@ Meteor.bindEnvironment = function (func, onException, _this) {
// values
var boundValues = _.clone(currentValues);
if (!onException)
throw new Error("onException must be supplied");
if (!onException || typeof(onException) === 'string') {
var description = onException || "callback of async function";
onException = function (error) {
Meteor._debug(
"Exception in " + description + ":",
error && error.stack || error
);
};
}
return function (/* arguments */) {
var savedValues = currentValues;

View File

@@ -55,14 +55,26 @@ _.extend(Meteor.EnvironmentVariable.prototype, {
// return value of the function will be passed through, and no new
// fiber will be created.)
//
Meteor.bindEnvironment = function (func, onException, _this) {
// `onException` should be a function or a string. When it is a
// function, it is called as a callback when the bound function raises
// an exception. If it is a string, it should be a description of the
// callback, and when an exception is raised a debug message will be
// printed with the description.
Meteor.bindEnvironment = function (func, onException, _this, context) {
if (!Fiber.current)
throw new Error(noFiberMessage);
var boundValues = _.clone(Fiber.current._meteor_dynamics || []);
if (!onException)
throw new Error("onException must be supplied");
if (!onException || typeof(onException) === 'string') {
var description = onException || "callback of async function";
onException = function (error) {
Meteor._debug(
"Exception in " + description + ":",
error && error.stack || error
);
};
}
return function (/* arguments */) {
var args = _.toArray(arguments);