Files
meteor/tools/service-connection.js
David Glasser 4079c2276f ServiceConnection directly loads ddp
Caching works fine here.
2014-11-07 16:45:58 -08:00

167 lines
5.9 KiB
JavaScript

var Future = require("fibers/future");
var _ = require("underscore");
var isopackets = require("./isopackets.js");
// Wrapper to manage a connection to a DDP service. The main difference between
// it and a raw DDP connection is that the constructor blocks until a successful
// connection is made; you can't call methods or subscribe asynchronously (ie,
// there's always a wait); and if the connection disconnects (with or without
// error) while we're waiting on a method call or subscription, the
// apply/subscribeAndWait call throws the given error. This functionality should
// eventually end up in the DDP client in one form or another.
//
// ServiceConnections never reconnect once they have successfully negotiated the
// DDP protocol: other than perhaps some initial attempts with the wrong
// protocol selected, they use just one underlying TCP connection, and fail
// fast.
//
// - endpointUrl: the url to connect to
// - options:
// - headers: an object containing extra headers to use when opening the
// DDP connection
// - _dontPrintErrors: boolean
// ...and anything else you'd normally pass as options to DDP.connect
//
var ServiceConnection = function (endpointUrl, options) {
var self = this;
var Package = isopackets.load('ddp');
// ServiceConnection never should retry connections: just one TCP connection
// is enough, and any errors on it should be detected promptly.
options = _.extend({}, options, {
// We found that this was likely to time out with the DDP default of 10s,
// especially if the CPU is churning on bundling (eg, for the stats
// connection which we start in parallel with bundling).
connectTimeoutMs: 30000,
// Disable client->server heartbeats for service connections. Users with
// slow internet connections were seeing heartbeat timeouts because the
// heartbeats were buried behind large responses (eg
// https://github.com/meteor/meteor/issues/2777).
heartbeatInterval: 0,
retry: false,
onConnected: function () {
self.connected = true;
if (!self.currentFuture)
throw Error("nobody waiting for connection?");
if (self.currentFuture !== connectFuture)
throw Error("waiting for something that isn't connection?");
self.currentFuture = null;
connectFuture.return();
}
});
self.connection = Package.ddp.DDP.connect(endpointUrl, options);
// Wait until we have some sort of initial connection or error (including the
// 10-second timeout built into our DDP client).
var connectFuture = self.currentFuture = new Future;
self.connection._stream.on('disconnect', function (error) {
self.connected = false;
if (error && error.errorType === "DDP.ForcedReconnectError") {
// OK, we requested this, probably due to version negotation failure.
//
// This ought to have happened before we successfully connect, unless
// somebody adds other calls to forced reconnect to Meteor...
if (connectFuture.isResolved())
throw Error("disconnect before connect?");
// Otherwise, ignore this error. We're going to reconnect!
return;
}
// Are we waiting to connect or for the result of a method apply or a
// subscribeAndWait? If so, disconnecting is a problem.
if (self.currentFuture) {
var fut = self.currentFuture;
self.currentFuture = null;
fut.throw(error ||
new Package.ddp.DDP.ConnectionError(
"DDP disconnected while connection in progress"));
} else if (error) {
// We got some sort of error with nobody listening for it; handle it.
// XXX probably have a better way to handle it than this
throw error;
}
});
connectFuture.wait();
};
_.extend(ServiceConnection.prototype, {
call: function (/* arguments */) {
var self = this;
var args = _.toArray(arguments);
var name = args.shift();
return self.apply(name, args);
},
apply: function (/* arguments */) {
var self = this;
if (self.currentFuture)
throw Error("Can't wait on two things at once!");
self.currentFuture = new Future;
var args = _.toArray(arguments);
args.push(function (err, result) {
if (!self.currentFuture) {
// We're not still waiting? That means we had a disconnect event. But
// then how did we actually get this result?
throw Error("nobody listening for result?");
}
var fut = self.currentFuture;
self.currentFuture = null;
fut.resolver()(err, result); // throw or return
});
self.connection.apply.apply(self.connection, args);
return self.currentFuture.wait();
},
// XXX derived from _subscribeAndWait in ddp_connection.js
// -- but with a different signature..
subscribeAndWait: function (/* arguments */) {
var self = this;
if (self.currentFuture)
throw Error("Can't wait on two things at once!");
var subFuture = self.currentFuture = new Future;
var args = _.toArray(arguments);
args.push({
onReady: function () {
if (!self.currentFuture) {
// We're not still waiting? That means we had a disconnect event. But
// then how did we actually get this result?
throw Error("nobody listening for subscribe result?");
}
var fut = self.currentFuture;
self.currentFuture = null;
fut.return();
},
onError: function (e) {
if (self.currentFuture === subFuture) {
// Error while waiting for this sub to become ready? Throw it.
self.currentFuture = null;
subFuture.throw(e);
}
// ... ok, this is a late error on the sub.
// XXX handle it somehow better
throw e;
}
});
var sub = self.connection.subscribe.apply(self.connection, args);
subFuture.wait();
return sub;
},
close: function () {
var self = this;
if (self.connection) {
self.connection.close();
self.connection = null;
}
}
});
module.exports = ServiceConnection;