From c8b5052adddeb303dc5d5f3250e4ffbcd7f551a5 Mon Sep 17 00:00:00 2001 From: Geoff Schmidt Date: Thu, 1 Mar 2012 00:59:57 -0800 Subject: [PATCH] move message dispatch fully into LivedataSession --- packages/livedata/livedata_server.js | 135 ++++++++++++++------------- 1 file changed, 69 insertions(+), 66 deletions(-) diff --git a/packages/livedata/livedata_server.js b/packages/livedata/livedata_server.js index 37085425a1..98fa5911b5 100644 --- a/packages/livedata/livedata_server.js +++ b/packages/livedata/livedata_server.js @@ -154,6 +154,74 @@ Meteor._LivedataSession = function (socket, server) { }; _.extend(Meteor._LivedataSession.prototype, { + processMessage: function (msg) { + var self = this; + + if (msg.msg in self.protocol_handlers) + self.protocol_handlers[msg.msg].call(self, msg); + else + self.server._sendError(socket, 'Bad request', msg); + }, + + protocol_handlers: { + sub: function (msg) { + var self = this; + + // reject malformed messages + if (typeof (msg.id) !== "string" || + typeof (msg.name) !== "string" || + (('params' in msg) && typeof (msg.params) !== "object")) { + self.server._sendError(self.socket, "Malformed subscription", msg); + return; + } + + if (!self.server.publish_handlers[msg.name]) { + self.socket.send(JSON.stringify({ + msg: 'nosub', id: msg.id, + error: {error: 404, reason: "Subscription not found"}})); + return; + } + + Fiber(function () { + if (msg.id in self.named_subs) + // subs are idempotent, or rather, they are ignored if a sub + // with that id already exists. this is important during + // reconnect. + return; + + var handler = self.server.publish_handlers[msg.name]; + self.startSubscription(handler, msg.id, msg.params); + }).run(); + }, + + // XXX Fiber() doesn't interlock. if a client subs then unsubs, the + // subscription should end up as off. + unsub: function (msg) { + var self = this; + + Fiber(function () { + self.stopSubscription(msg.id); + }).run(); + + self.socket.send(JSON.stringify({msg: 'nosub', id: msg.id})); + }, + + method: function (msg) { + var self = this; + + // reject malformed messages + // XXX should also reject messages with unknown attributes? + if (typeof (msg.id) !== "string" || + typeof (msg.method) !== "string" || + (('params' in msg) && !(msg.params instanceof Array))) { + self.server._sendError(self.socket, "Malformed method invocation", msg); + return; + } + + self.enqueueMethod(msg); + } + }, + startSubscription: function (handler, sub_id, params) { var self = this; @@ -435,15 +503,7 @@ Meteor._LivedataServer = function () { self._sendError(socket, 'Must connect first', msg); return; } - - if (msg.msg === 'sub') - self._livedata_sub(socket, msg); - else if (msg.msg === 'unsub') - self._livedata_unsub(socket, msg); - else if (msg.msg === 'method') - self._livedata_method(socket, msg); - else - self._sendError(socket, 'Bad request', msg); + socket.meteor_session.processMessage(msg); } catch (e) { // XXX print stack nicely Meteor._debug("Internal exception while processing message", msg, @@ -458,7 +518,6 @@ Meteor._LivedataServer = function () { }); }; - _.extend(Meteor._LivedataServer.prototype, { _sendError: function (socket, reason, offending_message) { var self = this; @@ -468,7 +527,6 @@ _.extend(Meteor._LivedataServer.prototype, { socket.send(msg); }, - // XXX 'connect' message should have a protocol version _livedata_connect: function (socket, msg) { var self = this; @@ -492,61 +550,6 @@ _.extend(Meteor._LivedataServer.prototype, { // XXX what to do here on reconnect? oh, probably just fake a sub message. }, - _livedata_sub: function (socket, msg) { - var self = this; - - // reject malformed messages - if (typeof (msg.id) !== "string" || - typeof (msg.name) !== "string" || - (('params' in msg) && typeof (msg.params) !== "object")) { - self._sendError(socket, "Malformed subscription", msg); - return; - } - - if (!self.publish_handlers[msg.name]) { - socket.send(JSON.stringify({ - msg: 'nosub', id: msg.id, error: {error: 404, - reason: "Subscription not found"}})); - return; - } - - Fiber(function () { - if (msg.id in socket.meteor_session.named_subs) - // XXX client screwed up - socket.meteor_session.stopSubscription(msg.id); - - var handler = self.publish_handlers[msg.name]; - socket.meteor_session.startSubscription(handler, msg.id, msg.params); - }).run(); - }, - - // XXX Fiber() doesn't interlock. if a client subs then unsubs, the - // subscription should end up as off. - _livedata_unsub: function (socket, msg) { - var self = this; - - Fiber(function () { - socket.meteor_session.stopSubscription(msg.id); - }).run(); - - socket.send(JSON.stringify({msg: 'nosub', id: msg.id})); - }, - - _livedata_method: function (socket, msg) { - var self = this; - - // reject malformed messages - // XXX should also reject messages with unknown attributes? - if (typeof (msg.id) !== "string" || - typeof (msg.method) !== "string" || - (('params' in msg) && !(msg.params instanceof Array))) { - self._sendError(socket, "Malformed method invocation", msg); - return; - } - - socket.meteor_session.enqueueMethod(msg); - }, - /** * Register a publish handler function. *