extract stream handlers

This commit is contained in:
Leonardo Venturini
2024-11-06 14:07:36 -04:00
parent 2779e65630
commit 6c1fd9c313
3 changed files with 227 additions and 161 deletions

View File

@@ -0,0 +1,202 @@
import { DDPCommon } from 'meteor/ddp-common';
import { Meteor } from 'meteor/meteor';
export class ConnectionStreamHandlers {
constructor(connection) {
this._connection = connection;
}
/**
* Handles incoming raw messages from the DDP stream
* @param {String} raw_msg The raw message received from the stream
*/
async onMessage(raw_msg) {
let msg;
try {
msg = DDPCommon.parseDDP(raw_msg);
} catch (e) {
Meteor._debug('Exception while parsing DDP', e);
return;
}
// Any message counts as receiving a pong, as it demonstrates that
// the server is still alive.
if (this._connection._heartbeat) {
this._connection._heartbeat.messageReceived();
}
if (msg === null || !msg.msg) {
if(!msg || !msg.testMessageOnConnect) {
if (Object.keys(msg).length === 1 && msg.server_id) return;
Meteor._debug('discarding invalid livedata message', msg);
}
return;
}
// Important: This was missing from previous version
// We need to set the current version before routing the message
if (msg.msg === 'connected') {
this._connection._version = this._connection._versionSuggestion;
}
await this._routeMessage(msg);
}
/**
* Routes messages to their appropriate handlers based on message type
* @private
* @param {Object} msg The parsed DDP message
*/
async _routeMessage(msg) {
switch (msg.msg) {
case 'connected':
await this._connection._livedata_connected(msg);
this._connection.options.onConnected();
break;
case 'failed':
await this._handleFailedMessage(msg);
break;
case 'ping':
if (this._connection.options.respondToPings) {
this._connection._send({ msg: 'pong', id: msg.id });
}
break;
case 'pong':
// noop, as we assume everything's a pong
break;
case 'added':
case 'changed':
case 'removed':
case 'ready':
case 'updated':
await this._connection._livedata_data(msg);
break;
case 'nosub':
await this._connection._livedata_nosub(msg);
break;
case 'result':
await this._connection._livedata_result(msg);
break;
case 'error':
this._connection._livedata_error(msg);
break;
default:
Meteor._debug('discarding unknown livedata message type', msg);
}
}
/**
* Handles failed connection messages
* @private
* @param {Object} msg The failed message object
*/
_handleFailedMessage(msg) {
if (this._connection._supportedDDPVersions.indexOf(msg.version) >= 0) {
this._connection._versionSuggestion = msg.version;
this._connection._stream.reconnect({ _force: true });
} else {
const description =
'DDP version negotiation failed; server requested version ' +
msg.version;
this._connection._stream.disconnect({ _permanent: true, _error: description });
this._connection.options.onDDPVersionNegotiationFailure(description);
}
}
/**
* Handles connection reset events
*/
onReset() {
// Reset is called even on the first connection, so this is
// the only place we send this message.
const msg = this._buildConnectMessage();
this._connection._send(msg);
// Mark non-retry calls as failed and handle outstanding methods
this._handleOutstandingMethodsOnReset();
// Now, to minimize setup latency, go ahead and blast out all of
// our pending methods ands subscriptions before we've even taken
// the necessary RTT to know if we successfully reconnected.
this._connection._callOnReconnectAndSendAppropriateOutstandingMethods();
this._resendSubscriptions();
}
/**
* Builds the initial connect message
* @private
* @returns {Object} The connect message object
*/
_buildConnectMessage() {
const msg = { msg: 'connect' };
if (this._connection._lastSessionId) {
msg.session = this._connection._lastSessionId;
}
msg.version = this._connection._versionSuggestion || this._connection._supportedDDPVersions[0];
this._connection._versionSuggestion = msg.version;
msg.support = this._connection._supportedDDPVersions;
return msg;
}
/**
* Handles outstanding methods during a reset
* @private
*/
_handleOutstandingMethodsOnReset() {
const blocks = this._connection._outstandingMethodBlocks;
if (blocks.length === 0) return;
const currentMethodBlock = blocks[0].methods;
blocks[0].methods = currentMethodBlock.filter(
methodInvoker => {
// Methods with 'noRetry' option set are not allowed to re-send after
// recovering dropped connection.
if (methodInvoker.sentMessage && methodInvoker.noRetry) {
methodInvoker.receiveResult(
new Meteor.Error(
'invocation-failed',
'Method invocation might have failed due to dropped connection. ' +
'Failing because `noRetry` option was passed to Meteor.apply.'
)
);
}
// Only keep a method if it wasn't sent or it's allowed to retry.
return !(methodInvoker.sentMessage && methodInvoker.noRetry);
}
);
// Clear empty blocks
if (blocks.length > 0 && blocks[0].methods.length === 0) {
blocks.shift();
}
// Reset all method invokers as unsent
Object.values(this._connection._methodInvokers).forEach(invoker => {
invoker.sentMessage = false;
});
}
/**
* Resends all active subscriptions
* @private
*/
_resendSubscriptions() {
Object.entries(this._connection._subscriptions).forEach(([id, sub]) => {
this._connection._sendQueued({
msg: 'sub',
id: id,
name: sub.name,
params: sub.params
});
});
}
}

View File

@@ -13,12 +13,8 @@ import {
isEmpty,
last,
} from "meteor/ddp-common/utils";
class MongoIDMap extends IdMap {
constructor() {
super(MongoID.idStringify, MongoID.idParse);
}
}
import { ConnectionStreamHandlers } from './connection_stream_handlers';
import { MongoIDMap } from './mongo_id_map';
// @param url {String|Object} URL to Meteor app,
// or an object as a test hook (see code)
@@ -243,33 +239,38 @@ export class Connection {
});
}
this._streamHandlers = new ConnectionStreamHandlers(this);
const onDisconnect = () => {
if (self._heartbeat) {
self._heartbeat.stop();
self._heartbeat = null;
if (this._heartbeat) {
this._heartbeat.stop();
this._heartbeat = null;
}
};
if (Meteor.isServer) {
self._stream.on(
this._stream.on(
'message',
Meteor.bindEnvironment(
this.onMessage.bind(this),
msg => this._streamHandlers.onMessage(msg),
'handling DDP message'
)
);
self._stream.on(
this._stream.on(
'reset',
Meteor.bindEnvironment(this.onReset.bind(this), 'handling DDP reset')
Meteor.bindEnvironment(
() => this._streamHandlers.onReset(),
'handling DDP reset'
)
);
self._stream.on(
this._stream.on(
'disconnect',
Meteor.bindEnvironment(onDisconnect, 'handling DDP disconnect')
);
} else {
self._stream.on('message', this.onMessage.bind(this));
self._stream.on('reset', this.onReset.bind(this));
self._stream.on('disconnect', onDisconnect);
this._stream.on('message', msg => this._streamHandlers.onMessage(msg));
this._stream.on('reset', () => this._streamHandlers.onReset());
this._stream.on('disconnect', onDisconnect);
}
}
@@ -1854,148 +1855,4 @@ export class Connection {
self._retryMigrate = null;
}
}
async onMessage(raw_msg) {
let msg;
try {
msg = DDPCommon.parseDDP(raw_msg);
} catch (e) {
Meteor._debug('Exception while parsing DDP', e);
return;
}
// Any message counts as receiving a pong, as it demonstrates that
// the server is still alive.
if (this._heartbeat) {
this._heartbeat.messageReceived();
}
if (msg === null || !msg.msg) {
if(!msg || !msg.testMessageOnConnect) {
if (Object.keys(msg).length === 1 && msg.server_id) return;
Meteor._debug('discarding invalid livedata message', msg);
}
return;
}
if (msg.msg === 'connected') {
this._version = this._versionSuggestion;
await this._livedata_connected(msg);
this.options.onConnected();
} else if (msg.msg === 'failed') {
if (this._supportedDDPVersions.indexOf(msg.version) >= 0) {
this._versionSuggestion = msg.version;
this._stream.reconnect({ _force: true });
} else {
const description =
'DDP version negotiation failed; server requested version ' +
msg.version;
this._stream.disconnect({ _permanent: true, _error: description });
this.options.onDDPVersionNegotiationFailure(description);
}
} else if (msg.msg === 'ping' && this.options.respondToPings) {
this._send({ msg: 'pong', id: msg.id });
} else if (msg.msg === 'pong') {
// noop, as we assume everything's a pong
} else if (
['added', 'changed', 'removed', 'ready', 'updated'].includes(msg.msg)
) {
await this._livedata_data(msg);
} else if (msg.msg === 'nosub') {
await this._livedata_nosub(msg);
} else if (msg.msg === 'result') {
await this._livedata_result(msg);
} else if (msg.msg === 'error') {
this._livedata_error(msg);
} else {
Meteor._debug('discarding unknown livedata message type', msg);
}
}
onReset() {
// Send a connect message at the beginning of the stream.
// NOTE: reset is called even on the first connection, so this is
// the only place we send this message.
const msg = { msg: 'connect' };
if (this._lastSessionId) msg.session = this._lastSessionId;
msg.version = this._versionSuggestion || this._supportedDDPVersions[0];
this._versionSuggestion = msg.version;
msg.support = this._supportedDDPVersions;
this._send(msg);
// Mark non-retry calls as failed. This has to be done early as getting these methods out of the
// current block is pretty important to making sure that quiescence is properly calculated, as
// well as possibly moving on to another useful block.
// Only bother testing if there is an outstandingMethodBlock (there might not be, especially if
// we are connecting for the first time.
if (this._outstandingMethodBlocks.length > 0) {
// If there is an outstanding method block, we only care about the first one as that is the
// one that could have already sent messages with no response, that are not allowed to retry.
const currentMethodBlock = this._outstandingMethodBlocks[0].methods;
this._outstandingMethodBlocks[0].methods = currentMethodBlock.filter(
methodInvoker => {
// Methods with 'noRetry' option set are not allowed to re-send after
// recovering dropped connection.
if (methodInvoker.sentMessage && methodInvoker.noRetry) {
// Make sure that the method is told that it failed.
methodInvoker.receiveResult(
new Meteor.Error(
'invocation-failed',
'Method invocation might have failed due to dropped connection. ' +
'Failing because `noRetry` option was passed to Meteor.apply.'
)
);
}
// Only keep a method if it wasn't sent or it's allowed to retry.
// This may leave the block empty, but we don't move on to the next
// block until the callback has been delivered, in _outstandingMethodFinished.
return !(methodInvoker.sentMessage && methodInvoker.noRetry);
}
);
}
// Now, to minimize setup latency, go ahead and blast out all of
// our pending methods ands subscriptions before we've even taken
// the necessary RTT to know if we successfully reconnected. (1)
// They're supposed to be idempotent, and where they are not,
// they can block retry in apply; (2) even if we did reconnect,
// we're not sure what messages might have gotten lost
// (in either direction) since we were disconnected (TCP being
// sloppy about that.)
// If the current block of methods all got their results (but didn't all get
// their data visible), discard the empty block now.
if (
this._outstandingMethodBlocks.length > 0 &&
this._outstandingMethodBlocks[0].methods.length === 0
) {
this._outstandingMethodBlocks.shift();
}
// Mark all messages as unsent, they have not yet been sent on this
// connection.
keys(this._methodInvokers).forEach(id => {
this._methodInvokers[id].sentMessage = false;
});
// If an `onReconnect` handler is set, call it first. Go through
// some hoops to ensure that methods that are called from within
// `onReconnect` get executed _before_ ones that were originally
// outstanding (since `onReconnect` is used to re-establish auth
// certificates)
this._callOnReconnectAndSendAppropriateOutstandingMethods();
// add new subscriptions at the end. this way they take effect after
// the handlers and we don't see flicker.
Object.entries(this._subscriptions).forEach(([id, sub]) => {
this._sendQueued({
msg: 'sub',
id: id,
name: sub.name,
params: sub.params
});
});
}
}

View File

@@ -0,0 +1,7 @@
import { MongoID } from 'meteor/mongo-id';
export class MongoIDMap extends IdMap {
constructor() {
super(MongoID.idStringify, MongoID.idParse);
}
}