diff --git a/packages/ddp-client/common/livedata_connection.js b/packages/ddp-client/common/livedata_connection.js index ce66379ac8..08bc3ebce0 100644 --- a/packages/ddp-client/common/livedata_connection.js +++ b/packages/ddp-client/common/livedata_connection.js @@ -45,7 +45,8 @@ class MongoIDMap extends IdMap { export class Connection { constructor(url, options) { var self = this; - options = { + + this.options = options = { onConnected() {}, onDDPVersionNegotiationFailure(description) { Meteor._debug(description); @@ -247,58 +248,6 @@ export class Connection { }); } - var onMessage = function(raw_msg) { - try { - var msg = DDPCommon.parseDDP(raw_msg); - } catch (e) { - Meteor._debug('Exception while parsing DDP', e); - return; - } - - // Any message counts as receiving a pong, as it demonstrates that - // the server is still alive. - if (self._heartbeat) { - self._heartbeat.messageReceived(); - } - - if (msg === null || !msg.msg) { - // XXX COMPAT WITH 0.6.6. ignore the old welcome message for back - // compat. Remove this 'if' once the server stops sending welcome - // messages (stream_server.js). - if (!(msg && msg.server_id)) - Meteor._debug('discarding invalid livedata message', msg); - return; - } - - if (msg.msg === 'connected') { - self._version = self._versionSuggestion; - self._livedata_connected(msg); - options.onConnected(); - } else if (msg.msg === 'failed') { - if (_.contains(self._supportedDDPVersions, msg.version)) { - self._versionSuggestion = msg.version; - self._stream.reconnect({ _force: true }); - } else { - var description = - 'DDP version negotiation failed; server requested version ' + - msg.version; - self._stream.disconnect({ _permanent: true, _error: description }); - options.onDDPVersionNegotiationFailure(description); - } - } else if (msg.msg === 'ping' && options.respondToPings) { - self._send({ msg: 'pong', id: msg.id }); - } else if (msg.msg === 'pong') { - // noop, as we assume everything's a pong - } else if ( - _.include(['added', 'changed', 'removed', 'ready', 'updated'], msg.msg) - ) - self._livedata_data(msg); - else if (msg.msg === 'nosub') self._livedata_nosub(msg); - else if (msg.msg === 'result') self._livedata_result(msg); - else if (msg.msg === 'error') self._livedata_error(msg); - else Meteor._debug('discarding unknown livedata message type', msg); - }; - var onReset = function() { // Send a connect message at the beginning of the stream. // NOTE: reset is called even on the first connection, so this is @@ -396,7 +345,7 @@ export class Connection { if (Meteor.isServer) { self._stream.on( 'message', - Meteor.bindEnvironment(onMessage, 'handling DDP message') + Meteor.bindEnvironment(this.onMessage.bind(this), 'handling DDP message') ); self._stream.on( 'reset', @@ -407,7 +356,7 @@ export class Connection { Meteor.bindEnvironment(onDisconnect, 'handling DDP disconnect') ); } else { - self._stream.on('message', onMessage); + self._stream.on('message', this.onMessage.bind(this)); self._stream.on('reset', onReset); self._stream.on('disconnect', onDisconnect); } @@ -1697,4 +1646,56 @@ export class Connection { self._retryMigrate = null; } } + + onMessage(raw_msg) { + try { + var msg = DDPCommon.parseDDP(raw_msg); + } catch (e) { + Meteor._debug('Exception while parsing DDP', e); + return; + } + + // Any message counts as receiving a pong, as it demonstrates that + // the server is still alive. + if (this._heartbeat) { + this._heartbeat.messageReceived(); + } + + if (msg === null || !msg.msg) { + // XXX COMPAT WITH 0.6.6. ignore the old welcome message for back + // compat. Remove this 'if' once the server stops sending welcome + // messages (stream_server.js). + if (!(msg && msg.server_id)) + Meteor._debug('discarding invalid livedata message', msg); + return; + } + + if (msg.msg === 'connected') { + this._version = this._versionSuggestion; + this._livedata_connected(msg); + this.options.onConnected(); + } else if (msg.msg === 'failed') { + if (_.contains(this._supportedDDPVersions, msg.version)) { + this._versionSuggestion = msg.version; + this._stream.reconnect({ _force: true }); + } else { + var description = + 'DDP version negotiation failed; server requested version ' + + msg.version; + this._stream.disconnect({ _permanent: true, _error: description }); + options.onDDPVersionNegotiationFailure(description); + } + } else if (msg.msg === 'ping' && this.options.respondToPings) { + this._send({ msg: 'pong', id: msg.id }); + } else if (msg.msg === 'pong') { + // noop, as we assume everything's a pong + } else if ( + _.include(['added', 'changed', 'removed', 'ready', 'updated'], msg.msg) + ) + this._livedata_data(msg); + else if (msg.msg === 'nosub') this._livedata_nosub(msg); + else if (msg.msg === 'result') this._livedata_result(msg); + else if (msg.msg === 'error') this._livedata_error(msg); + else Meteor._debug('discarding unknown livedata message type', msg); + } }