move message dispatch fully into LivedataSession

This commit is contained in:
Geoff Schmidt
2012-03-01 00:59:57 -08:00
parent 5bfc4c472b
commit c8b5052add

View File

@@ -154,6 +154,74 @@ Meteor._LivedataSession = function (socket, server) {
};
_.extend(Meteor._LivedataSession.prototype, {
processMessage: function (msg) {
var self = this;
if (msg.msg in self.protocol_handlers)
self.protocol_handlers[msg.msg].call(self, msg);
else
self.server._sendError(socket, 'Bad request', msg);
},
protocol_handlers: {
sub: function (msg) {
var self = this;
// reject malformed messages
if (typeof (msg.id) !== "string" ||
typeof (msg.name) !== "string" ||
(('params' in msg) && typeof (msg.params) !== "object")) {
self.server._sendError(self.socket, "Malformed subscription", msg);
return;
}
if (!self.server.publish_handlers[msg.name]) {
self.socket.send(JSON.stringify({
msg: 'nosub', id: msg.id,
error: {error: 404, reason: "Subscription not found"}}));
return;
}
Fiber(function () {
if (msg.id in self.named_subs)
// subs are idempotent, or rather, they are ignored if a sub
// with that id already exists. this is important during
// reconnect.
return;
var handler = self.server.publish_handlers[msg.name];
self.startSubscription(handler, msg.id, msg.params);
}).run();
},
// XXX Fiber() doesn't interlock. if a client subs then unsubs, the
// subscription should end up as off.
unsub: function (msg) {
var self = this;
Fiber(function () {
self.stopSubscription(msg.id);
}).run();
self.socket.send(JSON.stringify({msg: 'nosub', id: msg.id}));
},
method: function (msg) {
var self = this;
// reject malformed messages
// XXX should also reject messages with unknown attributes?
if (typeof (msg.id) !== "string" ||
typeof (msg.method) !== "string" ||
(('params' in msg) && !(msg.params instanceof Array))) {
self.server._sendError(self.socket, "Malformed method invocation", msg);
return;
}
self.enqueueMethod(msg);
}
},
startSubscription: function (handler, sub_id, params) {
var self = this;
@@ -435,15 +503,7 @@ Meteor._LivedataServer = function () {
self._sendError(socket, 'Must connect first', msg);
return;
}
if (msg.msg === 'sub')
self._livedata_sub(socket, msg);
else if (msg.msg === 'unsub')
self._livedata_unsub(socket, msg);
else if (msg.msg === 'method')
self._livedata_method(socket, msg);
else
self._sendError(socket, 'Bad request', msg);
socket.meteor_session.processMessage(msg);
} catch (e) {
// XXX print stack nicely
Meteor._debug("Internal exception while processing message", msg,
@@ -458,7 +518,6 @@ Meteor._LivedataServer = function () {
});
};
_.extend(Meteor._LivedataServer.prototype, {
_sendError: function (socket, reason, offending_message) {
var self = this;
@@ -468,7 +527,6 @@ _.extend(Meteor._LivedataServer.prototype, {
socket.send(msg);
},
// XXX 'connect' message should have a protocol version
_livedata_connect: function (socket, msg) {
var self = this;
@@ -492,61 +550,6 @@ _.extend(Meteor._LivedataServer.prototype, {
// XXX what to do here on reconnect? oh, probably just fake a sub message.
},
_livedata_sub: function (socket, msg) {
var self = this;
// reject malformed messages
if (typeof (msg.id) !== "string" ||
typeof (msg.name) !== "string" ||
(('params' in msg) && typeof (msg.params) !== "object")) {
self._sendError(socket, "Malformed subscription", msg);
return;
}
if (!self.publish_handlers[msg.name]) {
socket.send(JSON.stringify({
msg: 'nosub', id: msg.id, error: {error: 404,
reason: "Subscription not found"}}));
return;
}
Fiber(function () {
if (msg.id in socket.meteor_session.named_subs)
// XXX client screwed up
socket.meteor_session.stopSubscription(msg.id);
var handler = self.publish_handlers[msg.name];
socket.meteor_session.startSubscription(handler, msg.id, msg.params);
}).run();
},
// XXX Fiber() doesn't interlock. if a client subs then unsubs, the
// subscription should end up as off.
_livedata_unsub: function (socket, msg) {
var self = this;
Fiber(function () {
socket.meteor_session.stopSubscription(msg.id);
}).run();
socket.send(JSON.stringify({msg: 'nosub', id: msg.id}));
},
_livedata_method: function (socket, msg) {
var self = this;
// reject malformed messages
// XXX should also reject messages with unknown attributes?
if (typeof (msg.id) !== "string" ||
typeof (msg.method) !== "string" ||
(('params' in msg) && !(msg.params instanceof Array))) {
self._sendError(socket, "Malformed method invocation", msg);
return;
}
socket.meteor_session.enqueueMethod(msg);
},
/**
* Register a publish handler function.
*