From 6c1fd9c3138ec3fbc6959cd4e335897cb025f5b5 Mon Sep 17 00:00:00 2001 From: Leonardo Venturini Date: Wed, 6 Nov 2024 14:07:36 -0400 Subject: [PATCH] extract stream handlers --- .../common/connection_stream_handlers.js | 202 ++++++++++++++++++ .../ddp-client/common/livedata_connection.js | 179 ++-------------- packages/ddp-client/common/mongo_id_map.js | 7 + 3 files changed, 227 insertions(+), 161 deletions(-) create mode 100644 packages/ddp-client/common/connection_stream_handlers.js create mode 100644 packages/ddp-client/common/mongo_id_map.js diff --git a/packages/ddp-client/common/connection_stream_handlers.js b/packages/ddp-client/common/connection_stream_handlers.js new file mode 100644 index 0000000000..bfef109ff0 --- /dev/null +++ b/packages/ddp-client/common/connection_stream_handlers.js @@ -0,0 +1,202 @@ +import { DDPCommon } from 'meteor/ddp-common'; +import { Meteor } from 'meteor/meteor'; + +export class ConnectionStreamHandlers { + constructor(connection) { + this._connection = connection; + } + + /** + * Handles incoming raw messages from the DDP stream + * @param {String} raw_msg The raw message received from the stream + */ + async onMessage(raw_msg) { + let msg; + try { + msg = DDPCommon.parseDDP(raw_msg); + } catch (e) { + Meteor._debug('Exception while parsing DDP', e); + return; + } + + // Any message counts as receiving a pong, as it demonstrates that + // the server is still alive. + if (this._connection._heartbeat) { + this._connection._heartbeat.messageReceived(); + } + + if (msg === null || !msg.msg) { + if(!msg || !msg.testMessageOnConnect) { + if (Object.keys(msg).length === 1 && msg.server_id) return; + Meteor._debug('discarding invalid livedata message', msg); + } + return; + } + + // Important: This was missing from previous version + // We need to set the current version before routing the message + if (msg.msg === 'connected') { + this._connection._version = this._connection._versionSuggestion; + } + + await this._routeMessage(msg); + } + + /** + * Routes messages to their appropriate handlers based on message type + * @private + * @param {Object} msg The parsed DDP message + */ + async _routeMessage(msg) { + switch (msg.msg) { + case 'connected': + await this._connection._livedata_connected(msg); + this._connection.options.onConnected(); + break; + + case 'failed': + await this._handleFailedMessage(msg); + break; + + case 'ping': + if (this._connection.options.respondToPings) { + this._connection._send({ msg: 'pong', id: msg.id }); + } + break; + + case 'pong': + // noop, as we assume everything's a pong + break; + + case 'added': + case 'changed': + case 'removed': + case 'ready': + case 'updated': + await this._connection._livedata_data(msg); + break; + + case 'nosub': + await this._connection._livedata_nosub(msg); + break; + + case 'result': + await this._connection._livedata_result(msg); + break; + + case 'error': + this._connection._livedata_error(msg); + break; + + default: + Meteor._debug('discarding unknown livedata message type', msg); + } + } + + /** + * Handles failed connection messages + * @private + * @param {Object} msg The failed message object + */ + _handleFailedMessage(msg) { + if (this._connection._supportedDDPVersions.indexOf(msg.version) >= 0) { + this._connection._versionSuggestion = msg.version; + this._connection._stream.reconnect({ _force: true }); + } else { + const description = + 'DDP version negotiation failed; server requested version ' + + msg.version; + this._connection._stream.disconnect({ _permanent: true, _error: description }); + this._connection.options.onDDPVersionNegotiationFailure(description); + } + } + + /** + * Handles connection reset events + */ + onReset() { + // Reset is called even on the first connection, so this is + // the only place we send this message. + const msg = this._buildConnectMessage(); + this._connection._send(msg); + + // Mark non-retry calls as failed and handle outstanding methods + this._handleOutstandingMethodsOnReset(); + + // Now, to minimize setup latency, go ahead and blast out all of + // our pending methods ands subscriptions before we've even taken + // the necessary RTT to know if we successfully reconnected. + this._connection._callOnReconnectAndSendAppropriateOutstandingMethods(); + this._resendSubscriptions(); + } + + /** + * Builds the initial connect message + * @private + * @returns {Object} The connect message object + */ + _buildConnectMessage() { + const msg = { msg: 'connect' }; + if (this._connection._lastSessionId) { + msg.session = this._connection._lastSessionId; + } + msg.version = this._connection._versionSuggestion || this._connection._supportedDDPVersions[0]; + this._connection._versionSuggestion = msg.version; + msg.support = this._connection._supportedDDPVersions; + return msg; + } + + /** + * Handles outstanding methods during a reset + * @private + */ + _handleOutstandingMethodsOnReset() { + const blocks = this._connection._outstandingMethodBlocks; + if (blocks.length === 0) return; + + const currentMethodBlock = blocks[0].methods; + blocks[0].methods = currentMethodBlock.filter( + methodInvoker => { + // Methods with 'noRetry' option set are not allowed to re-send after + // recovering dropped connection. + if (methodInvoker.sentMessage && methodInvoker.noRetry) { + methodInvoker.receiveResult( + new Meteor.Error( + 'invocation-failed', + 'Method invocation might have failed due to dropped connection. ' + + 'Failing because `noRetry` option was passed to Meteor.apply.' + ) + ); + } + + // Only keep a method if it wasn't sent or it's allowed to retry. + return !(methodInvoker.sentMessage && methodInvoker.noRetry); + } + ); + + // Clear empty blocks + if (blocks.length > 0 && blocks[0].methods.length === 0) { + blocks.shift(); + } + + // Reset all method invokers as unsent + Object.values(this._connection._methodInvokers).forEach(invoker => { + invoker.sentMessage = false; + }); + } + + /** + * Resends all active subscriptions + * @private + */ + _resendSubscriptions() { + Object.entries(this._connection._subscriptions).forEach(([id, sub]) => { + this._connection._sendQueued({ + msg: 'sub', + id: id, + name: sub.name, + params: sub.params + }); + }); + } +} \ No newline at end of file diff --git a/packages/ddp-client/common/livedata_connection.js b/packages/ddp-client/common/livedata_connection.js index 08e080b7e4..d7c67c2038 100644 --- a/packages/ddp-client/common/livedata_connection.js +++ b/packages/ddp-client/common/livedata_connection.js @@ -13,12 +13,8 @@ import { isEmpty, last, } from "meteor/ddp-common/utils"; - -class MongoIDMap extends IdMap { - constructor() { - super(MongoID.idStringify, MongoID.idParse); - } -} +import { ConnectionStreamHandlers } from './connection_stream_handlers'; +import { MongoIDMap } from './mongo_id_map'; // @param url {String|Object} URL to Meteor app, // or an object as a test hook (see code) @@ -243,33 +239,38 @@ export class Connection { }); } + this._streamHandlers = new ConnectionStreamHandlers(this); + const onDisconnect = () => { - if (self._heartbeat) { - self._heartbeat.stop(); - self._heartbeat = null; + if (this._heartbeat) { + this._heartbeat.stop(); + this._heartbeat = null; } }; if (Meteor.isServer) { - self._stream.on( + this._stream.on( 'message', Meteor.bindEnvironment( - this.onMessage.bind(this), + msg => this._streamHandlers.onMessage(msg), 'handling DDP message' ) ); - self._stream.on( + this._stream.on( 'reset', - Meteor.bindEnvironment(this.onReset.bind(this), 'handling DDP reset') + Meteor.bindEnvironment( + () => this._streamHandlers.onReset(), + 'handling DDP reset' + ) ); - self._stream.on( + this._stream.on( 'disconnect', Meteor.bindEnvironment(onDisconnect, 'handling DDP disconnect') ); } else { - self._stream.on('message', this.onMessage.bind(this)); - self._stream.on('reset', this.onReset.bind(this)); - self._stream.on('disconnect', onDisconnect); + this._stream.on('message', msg => this._streamHandlers.onMessage(msg)); + this._stream.on('reset', () => this._streamHandlers.onReset()); + this._stream.on('disconnect', onDisconnect); } } @@ -1854,148 +1855,4 @@ export class Connection { self._retryMigrate = null; } } - - async onMessage(raw_msg) { - let msg; - try { - msg = DDPCommon.parseDDP(raw_msg); - } catch (e) { - Meteor._debug('Exception while parsing DDP', e); - return; - } - - // Any message counts as receiving a pong, as it demonstrates that - // the server is still alive. - if (this._heartbeat) { - this._heartbeat.messageReceived(); - } - - if (msg === null || !msg.msg) { - if(!msg || !msg.testMessageOnConnect) { - if (Object.keys(msg).length === 1 && msg.server_id) return; - Meteor._debug('discarding invalid livedata message', msg); - } - return; - } - - if (msg.msg === 'connected') { - this._version = this._versionSuggestion; - await this._livedata_connected(msg); - this.options.onConnected(); - } else if (msg.msg === 'failed') { - if (this._supportedDDPVersions.indexOf(msg.version) >= 0) { - this._versionSuggestion = msg.version; - this._stream.reconnect({ _force: true }); - } else { - const description = - 'DDP version negotiation failed; server requested version ' + - msg.version; - this._stream.disconnect({ _permanent: true, _error: description }); - this.options.onDDPVersionNegotiationFailure(description); - } - } else if (msg.msg === 'ping' && this.options.respondToPings) { - this._send({ msg: 'pong', id: msg.id }); - } else if (msg.msg === 'pong') { - // noop, as we assume everything's a pong - } else if ( - ['added', 'changed', 'removed', 'ready', 'updated'].includes(msg.msg) - ) { - await this._livedata_data(msg); - } else if (msg.msg === 'nosub') { - await this._livedata_nosub(msg); - } else if (msg.msg === 'result') { - await this._livedata_result(msg); - } else if (msg.msg === 'error') { - this._livedata_error(msg); - } else { - Meteor._debug('discarding unknown livedata message type', msg); - } - } - - onReset() { - // Send a connect message at the beginning of the stream. - // NOTE: reset is called even on the first connection, so this is - // the only place we send this message. - const msg = { msg: 'connect' }; - if (this._lastSessionId) msg.session = this._lastSessionId; - msg.version = this._versionSuggestion || this._supportedDDPVersions[0]; - this._versionSuggestion = msg.version; - msg.support = this._supportedDDPVersions; - this._send(msg); - - // Mark non-retry calls as failed. This has to be done early as getting these methods out of the - // current block is pretty important to making sure that quiescence is properly calculated, as - // well as possibly moving on to another useful block. - - // Only bother testing if there is an outstandingMethodBlock (there might not be, especially if - // we are connecting for the first time. - if (this._outstandingMethodBlocks.length > 0) { - // If there is an outstanding method block, we only care about the first one as that is the - // one that could have already sent messages with no response, that are not allowed to retry. - const currentMethodBlock = this._outstandingMethodBlocks[0].methods; - this._outstandingMethodBlocks[0].methods = currentMethodBlock.filter( - methodInvoker => { - // Methods with 'noRetry' option set are not allowed to re-send after - // recovering dropped connection. - if (methodInvoker.sentMessage && methodInvoker.noRetry) { - // Make sure that the method is told that it failed. - methodInvoker.receiveResult( - new Meteor.Error( - 'invocation-failed', - 'Method invocation might have failed due to dropped connection. ' + - 'Failing because `noRetry` option was passed to Meteor.apply.' - ) - ); - } - - // Only keep a method if it wasn't sent or it's allowed to retry. - // This may leave the block empty, but we don't move on to the next - // block until the callback has been delivered, in _outstandingMethodFinished. - return !(methodInvoker.sentMessage && methodInvoker.noRetry); - } - ); - } - - // Now, to minimize setup latency, go ahead and blast out all of - // our pending methods ands subscriptions before we've even taken - // the necessary RTT to know if we successfully reconnected. (1) - // They're supposed to be idempotent, and where they are not, - // they can block retry in apply; (2) even if we did reconnect, - // we're not sure what messages might have gotten lost - // (in either direction) since we were disconnected (TCP being - // sloppy about that.) - - // If the current block of methods all got their results (but didn't all get - // their data visible), discard the empty block now. - if ( - this._outstandingMethodBlocks.length > 0 && - this._outstandingMethodBlocks[0].methods.length === 0 - ) { - this._outstandingMethodBlocks.shift(); - } - - // Mark all messages as unsent, they have not yet been sent on this - // connection. - keys(this._methodInvokers).forEach(id => { - this._methodInvokers[id].sentMessage = false; - }); - - // If an `onReconnect` handler is set, call it first. Go through - // some hoops to ensure that methods that are called from within - // `onReconnect` get executed _before_ ones that were originally - // outstanding (since `onReconnect` is used to re-establish auth - // certificates) - this._callOnReconnectAndSendAppropriateOutstandingMethods(); - - // add new subscriptions at the end. this way they take effect after - // the handlers and we don't see flicker. - Object.entries(this._subscriptions).forEach(([id, sub]) => { - this._sendQueued({ - msg: 'sub', - id: id, - name: sub.name, - params: sub.params - }); - }); - } } diff --git a/packages/ddp-client/common/mongo_id_map.js b/packages/ddp-client/common/mongo_id_map.js new file mode 100644 index 0000000000..179aa214f1 --- /dev/null +++ b/packages/ddp-client/common/mongo_id_map.js @@ -0,0 +1,7 @@ +import { MongoID } from 'meteor/mongo-id'; + +export class MongoIDMap extends IdMap { + constructor() { + super(MongoID.idStringify, MongoID.idParse); + } +} \ No newline at end of file