From 820f67e74172ac2eaf95bb0792c549882e44704f Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Fri, 10 Nov 2023 19:41:06 +0100 Subject: [PATCH] feat: support additional protocols abstraction (#5360) --- crates/net/network/src/config.rs | 15 ++ crates/net/network/src/lib.rs | 1 + crates/net/network/src/manager.rs | 2 + crates/net/network/src/protocol.rs | 252 ++++++++++++++++++++++++++ crates/net/network/src/session/mod.rs | 7 + 5 files changed, 277 insertions(+) create mode 100644 crates/net/network/src/protocol.rs diff --git a/crates/net/network/src/config.rs b/crates/net/network/src/config.rs index 6af2d74a9d..f97f4004f9 100644 --- a/crates/net/network/src/config.rs +++ b/crates/net/network/src/config.rs @@ -19,6 +19,7 @@ use reth_tasks::{TaskSpawner, TokioTaskExecutor}; use secp256k1::SECP256K1; use std::{collections::HashSet, net::SocketAddr, sync::Arc}; // re-export for convenience +use crate::protocol::{IntoRlpxSubProtocol, RlpxSubProtocols}; pub use secp256k1::SecretKey; /// Convenience function to create a new random [`SecretKey`] @@ -69,6 +70,8 @@ pub struct NetworkConfig { pub status: Status, /// Sets the hello message for the p2p handshake in RLPx pub hello_message: HelloMessageWithProtocols, + /// Additional protocols to announce and handle in RLPx + pub extra_protocols: RlpxSubProtocols, /// Whether to disable transaction gossip pub tx_gossip_disabled: bool, /// Optimism Network Config @@ -159,6 +162,9 @@ pub struct NetworkConfigBuilder { executor: Option>, /// Sets the hello message for the p2p handshake in RLPx hello_message: Option, + /// The executor to use for spawning tasks. + #[serde(skip)] + extra_protocols: RlpxSubProtocols, /// Head used to start set for the fork filter and status. head: Option, /// Whether tx gossip is disabled @@ -195,6 +201,7 @@ impl NetworkConfigBuilder { network_mode: Default::default(), executor: None, hello_message: None, + extra_protocols: Default::default(), head: None, tx_gossip_disabled: false, #[cfg(feature = "optimism")] @@ -377,6 +384,12 @@ impl NetworkConfigBuilder { } } + /// Adds a new additional protocol to the RLPx sub-protocol list. + pub fn add_rlpx_sub_protocol(mut self, protocol: impl IntoRlpxSubProtocol) -> Self { + self.extra_protocols.push(protocol); + self + } + /// Sets whether tx gossip is disabled. pub fn disable_tx_gossip(mut self, disable_tx_gossip: bool) -> Self { self.tx_gossip_disabled = disable_tx_gossip; @@ -411,6 +424,7 @@ impl NetworkConfigBuilder { network_mode, executor, hello_message, + extra_protocols, head, tx_gossip_disabled, #[cfg(feature = "optimism")] @@ -464,6 +478,7 @@ impl NetworkConfigBuilder { executor: executor.unwrap_or_else(|| Box::::default()), status, hello_message, + extra_protocols, fork_filter, tx_gossip_disabled, #[cfg(feature = "optimism")] diff --git a/crates/net/network/src/lib.rs b/crates/net/network/src/lib.rs index e91a66c077..3975918e99 100644 --- a/crates/net/network/src/lib.rs +++ b/crates/net/network/src/lib.rs @@ -129,6 +129,7 @@ mod message; mod metrics; mod network; pub mod peers; +pub mod protocol; mod session; mod state; mod swarm; diff --git a/crates/net/network/src/manager.rs b/crates/net/network/src/manager.rs index 86f7ef5c57..7df8addb21 100644 --- a/crates/net/network/src/manager.rs +++ b/crates/net/network/src/manager.rs @@ -182,6 +182,7 @@ where status, fork_filter, dns_discovery_config, + extra_protocols, tx_gossip_disabled, #[cfg(feature = "optimism")] optimism_network_config: crate::config::OptimismNetworkConfig { sequencer_endpoint }, @@ -218,6 +219,7 @@ where status, hello_message, fork_filter, + extra_protocols, bandwidth_meter.clone(), ); diff --git a/crates/net/network/src/protocol.rs b/crates/net/network/src/protocol.rs new file mode 100644 index 0000000000..0587dd30a5 --- /dev/null +++ b/crates/net/network/src/protocol.rs @@ -0,0 +1,252 @@ +//! Support for handling additional RLPx-based application-level protocols. +//! +//! See also + +use futures::{Stream, StreamExt}; +use reth_eth_wire::{capability::SharedCapability, protocol::Protocol}; +use reth_network_api::Direction; +use reth_primitives::BytesMut; +use reth_rpc_types::PeerId; +use std::{ + fmt, + future::Future, + net::SocketAddr, + pin::Pin, + task::{Context, Poll}, +}; +use tokio::sync::mpsc::UnboundedSender; +use tokio_stream::wrappers::UnboundedReceiverStream; + +/// A trait that allows to offer additional RLPx-based application-level protocols when establishing +/// a peer-to-peer connection. +pub trait ProtocolHandler: fmt::Debug + Send + Sync + 'static { + /// The type responsible for negotiating the protocol with the remote. + type ConnectionHandler: ConnectionHandler; + + /// Invoked when a new incoming connection from the remote is requested + /// + /// If protocols for this outgoing should be announced to the remote, return a connection + /// handler. + fn on_incoming(&self, socket_addr: SocketAddr) -> Option; + + /// Invoked when a new outgoing connection to the remote is requested. + /// + /// If protocols for this outgoing should be announced to the remote, return a connection + /// handler. + fn on_outgoing( + &self, + socket_addr: SocketAddr, + peer_id: PeerId, + ) -> Option; +} + +/// A trait that allows to authenticate a protocol after the RLPx connection was established. +pub trait ConnectionHandler: Send + Sync + 'static { + /// The future that handles the connection + type Connection: Future + Send + 'static; + + /// Returns the protocols to announce when the RLPx connection will be established. + /// + /// This will be negotiated with the remote peer. + fn protocol(&self) -> Protocol; + + /// Invoked when the RLPx connection has been established by the peer does not share the + /// protocol. + fn on_unsupported_by_peer( + self, + supported: &SharedCapability, + direction: Direction, + peer_id: PeerId, + ) -> OnNotSupported; + + /// Invoked when the RLPx connection was established. + /// + /// The returned future should resolve when the connection should disconnect. + fn into_connection( + self, + direction: Direction, + peer_id: PeerId, + conn: ProtocolConnection, + ) -> Self::Connection; +} + +/// What to do when a protocol is not supported by the remote. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] +pub enum OnNotSupported { + /// Proceed with the connection and ignore the protocol. + #[default] + KeepAlive, + /// Disconnect the connection. + Disconnect, +} + +/// A connection channel to send and receive messages for a specific protocols. +/// +/// This is a [Stream] that returns raw bytes of the received messages for this protocol. +#[derive(Debug)] +pub struct ProtocolConnection { + from_wire: UnboundedReceiverStream, + to_wire: UnboundedSender, +} + +impl ProtocolConnection { + /// Sends a message to the remote. + /// + /// Returns an error if the connection has been disconnected. + pub fn send(&self, msg: BytesMut) { + self.to_wire.send(ProtocolMessage::Message(msg)).ok(); + } + + /// Disconnects the connection. + pub fn disconnect(&self) { + let _ = self.to_wire.send(ProtocolMessage::Disconnect); + } +} + +impl Stream for ProtocolConnection { + type Item = BytesMut; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.from_wire.poll_next_unpin(cx) + } +} + +/// Messages that can be sent from a protocol connection +#[derive(Debug)] +pub(crate) enum ProtocolMessage { + /// New message to send to the remote. + Message(BytesMut), + /// Disconnect the connection. + Disconnect, +} + +/// Errors that can occur when handling a protocol. +#[derive(Debug, thiserror::Error)] +pub enum ProtocolError { + /// custom error message + #[error("{0}")] + Message(String), + /// Ayn other error + #[error(transparent)] + Other(Box), +} + +impl ProtocolError { + /// Creates a new error with the given message. + pub fn msg(msg: impl fmt::Display) -> Self { + ProtocolError::Message(msg.to_string()) + } + + /// Wraps the given error in a `ProtocolError`. + pub fn new(err: impl std::error::Error + Send + Sync + 'static) -> Self { + ProtocolError::Other(Box::new(err)) + } +} + +/// A wrapper type for a RLPx sub-protocol. +#[derive(Debug)] +pub struct RlpxSubProtocol(Box); + +/// A helper trait to convert a [ProtocolHandler] into a dynamic type +pub trait IntoRlpxSubProtocol { + /// Converts the type into a [RlpxSubProtocol]. + fn into_rlpx_sub_protocol(self) -> RlpxSubProtocol; +} + +impl IntoRlpxSubProtocol for T +where + T: ProtocolHandler + Send + Sync + 'static, +{ + fn into_rlpx_sub_protocol(self) -> RlpxSubProtocol { + RlpxSubProtocol(Box::new(self)) + } +} + +/// Additional RLPx-based sub-protocols. +#[derive(Debug, Default)] +pub struct RlpxSubProtocols { + /// All extra protocols + protocols: Vec, +} + +impl RlpxSubProtocols { + /// Adds a new protocol. + pub fn push(&mut self, protocol: impl IntoRlpxSubProtocol) { + self.protocols.push(protocol.into_rlpx_sub_protocol()); + } +} + +pub(crate) trait DynProtocolHandler: fmt::Debug + Send + Sync + 'static { + fn on_incoming(&self, socket_addr: SocketAddr) -> Option>; + + fn on_outgoing( + &self, + socket_addr: SocketAddr, + peer_id: PeerId, + ) -> Option>; +} + +impl DynProtocolHandler for T +where + T: ProtocolHandler, +{ + fn on_incoming(&self, socket_addr: SocketAddr) -> Option> { + T::on_incoming(self, socket_addr) + .map(|handler| Box::new(handler) as Box) + } + + fn on_outgoing( + &self, + socket_addr: SocketAddr, + peer_id: PeerId, + ) -> Option> { + T::on_outgoing(self, socket_addr, peer_id) + .map(|handler| Box::new(handler) as Box) + } +} + +/// Wrapper trait for internal ease of use. +pub(crate) trait DynConnectionHandler: Send + Sync + 'static { + fn protocol(&self) -> Protocol; + + fn on_unsupported_by_peer( + self, + supported: &SharedCapability, + direction: Direction, + peer_id: PeerId, + ) -> OnNotSupported; + + fn into_connection( + self, + direction: Direction, + peer_id: PeerId, + conn: ProtocolConnection, + ) -> Pin + Send + 'static>>; +} + +impl DynConnectionHandler for T +where + T: ConnectionHandler, +{ + fn protocol(&self) -> Protocol { + T::protocol(self) + } + + fn on_unsupported_by_peer( + self, + supported: &SharedCapability, + direction: Direction, + peer_id: PeerId, + ) -> OnNotSupported { + T::on_unsupported_by_peer(self, supported, direction, peer_id) + } + + fn into_connection( + self, + direction: Direction, + peer_id: PeerId, + conn: ProtocolConnection, + ) -> Pin + Send + 'static>> { + Box::pin(T::into_connection(self, direction, peer_id, conn)) + } +} diff --git a/crates/net/network/src/session/mod.rs b/crates/net/network/src/session/mod.rs index 7931cb413a..0e6fdcd7fb 100644 --- a/crates/net/network/src/session/mod.rs +++ b/crates/net/network/src/session/mod.rs @@ -48,6 +48,7 @@ pub use handle::{ SessionCommand, }; +use crate::protocol::RlpxSubProtocols; pub use reth_network_api::{Direction, PeerInfo}; /// Internal identifier for active sessions. @@ -101,6 +102,9 @@ pub struct SessionManager { active_session_tx: MeteredPollSender, /// Receiver half that listens for [`ActiveSessionMessage`] produced by pending sessions. active_session_rx: ReceiverStream, + /// Additional RLPx sub-protocols to be used by the session manager. + #[allow(unused)] + extra_protocols: RlpxSubProtocols, /// Used to measure inbound & outbound bandwidth across all managed streams bandwidth_meter: BandwidthMeter, /// Metrics for the session manager. @@ -111,6 +115,7 @@ pub struct SessionManager { impl SessionManager { /// Creates a new empty [`SessionManager`]. + #[allow(clippy::too_many_arguments)] pub fn new( secret_key: SecretKey, config: SessionsConfig, @@ -118,6 +123,7 @@ impl SessionManager { status: Status, hello_message: HelloMessageWithProtocols, fork_filter: ForkFilter, + extra_protocols: RlpxSubProtocols, bandwidth_meter: BandwidthMeter, ) -> Self { let (pending_sessions_tx, pending_sessions_rx) = mpsc::channel(config.session_event_buffer); @@ -142,6 +148,7 @@ impl SessionManager { active_session_tx: MeteredPollSender::new(active_session_tx, "network_active_session"), active_session_rx: ReceiverStream::new(active_session_rx), bandwidth_meter, + extra_protocols, metrics: Default::default(), } }