mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-01-29 09:08:05 -05:00
feat: support additional protocols abstraction (#5360)
This commit is contained in:
@@ -19,6 +19,7 @@ use reth_tasks::{TaskSpawner, TokioTaskExecutor};
|
||||
use secp256k1::SECP256K1;
|
||||
use std::{collections::HashSet, net::SocketAddr, sync::Arc};
|
||||
// re-export for convenience
|
||||
use crate::protocol::{IntoRlpxSubProtocol, RlpxSubProtocols};
|
||||
pub use secp256k1::SecretKey;
|
||||
|
||||
/// Convenience function to create a new random [`SecretKey`]
|
||||
@@ -69,6 +70,8 @@ pub struct NetworkConfig<C> {
|
||||
pub status: Status,
|
||||
/// Sets the hello message for the p2p handshake in RLPx
|
||||
pub hello_message: HelloMessageWithProtocols,
|
||||
/// Additional protocols to announce and handle in RLPx
|
||||
pub extra_protocols: RlpxSubProtocols,
|
||||
/// Whether to disable transaction gossip
|
||||
pub tx_gossip_disabled: bool,
|
||||
/// Optimism Network Config
|
||||
@@ -159,6 +162,9 @@ pub struct NetworkConfigBuilder {
|
||||
executor: Option<Box<dyn TaskSpawner>>,
|
||||
/// Sets the hello message for the p2p handshake in RLPx
|
||||
hello_message: Option<HelloMessageWithProtocols>,
|
||||
/// The executor to use for spawning tasks.
|
||||
#[serde(skip)]
|
||||
extra_protocols: RlpxSubProtocols,
|
||||
/// Head used to start set for the fork filter and status.
|
||||
head: Option<Head>,
|
||||
/// Whether tx gossip is disabled
|
||||
@@ -195,6 +201,7 @@ impl NetworkConfigBuilder {
|
||||
network_mode: Default::default(),
|
||||
executor: None,
|
||||
hello_message: None,
|
||||
extra_protocols: Default::default(),
|
||||
head: None,
|
||||
tx_gossip_disabled: false,
|
||||
#[cfg(feature = "optimism")]
|
||||
@@ -377,6 +384,12 @@ impl NetworkConfigBuilder {
|
||||
}
|
||||
}
|
||||
|
||||
/// Adds a new additional protocol to the RLPx sub-protocol list.
|
||||
pub fn add_rlpx_sub_protocol(mut self, protocol: impl IntoRlpxSubProtocol) -> Self {
|
||||
self.extra_protocols.push(protocol);
|
||||
self
|
||||
}
|
||||
|
||||
/// Sets whether tx gossip is disabled.
|
||||
pub fn disable_tx_gossip(mut self, disable_tx_gossip: bool) -> Self {
|
||||
self.tx_gossip_disabled = disable_tx_gossip;
|
||||
@@ -411,6 +424,7 @@ impl NetworkConfigBuilder {
|
||||
network_mode,
|
||||
executor,
|
||||
hello_message,
|
||||
extra_protocols,
|
||||
head,
|
||||
tx_gossip_disabled,
|
||||
#[cfg(feature = "optimism")]
|
||||
@@ -464,6 +478,7 @@ impl NetworkConfigBuilder {
|
||||
executor: executor.unwrap_or_else(|| Box::<TokioTaskExecutor>::default()),
|
||||
status,
|
||||
hello_message,
|
||||
extra_protocols,
|
||||
fork_filter,
|
||||
tx_gossip_disabled,
|
||||
#[cfg(feature = "optimism")]
|
||||
|
||||
@@ -129,6 +129,7 @@ mod message;
|
||||
mod metrics;
|
||||
mod network;
|
||||
pub mod peers;
|
||||
pub mod protocol;
|
||||
mod session;
|
||||
mod state;
|
||||
mod swarm;
|
||||
|
||||
@@ -182,6 +182,7 @@ where
|
||||
status,
|
||||
fork_filter,
|
||||
dns_discovery_config,
|
||||
extra_protocols,
|
||||
tx_gossip_disabled,
|
||||
#[cfg(feature = "optimism")]
|
||||
optimism_network_config: crate::config::OptimismNetworkConfig { sequencer_endpoint },
|
||||
@@ -218,6 +219,7 @@ where
|
||||
status,
|
||||
hello_message,
|
||||
fork_filter,
|
||||
extra_protocols,
|
||||
bandwidth_meter.clone(),
|
||||
);
|
||||
|
||||
|
||||
252
crates/net/network/src/protocol.rs
Normal file
252
crates/net/network/src/protocol.rs
Normal file
@@ -0,0 +1,252 @@
|
||||
//! Support for handling additional RLPx-based application-level protocols.
|
||||
//!
|
||||
//! See also <https://github.com/ethereum/devp2p/blob/master/README.md>
|
||||
|
||||
use futures::{Stream, StreamExt};
|
||||
use reth_eth_wire::{capability::SharedCapability, protocol::Protocol};
|
||||
use reth_network_api::Direction;
|
||||
use reth_primitives::BytesMut;
|
||||
use reth_rpc_types::PeerId;
|
||||
use std::{
|
||||
fmt,
|
||||
future::Future,
|
||||
net::SocketAddr,
|
||||
pin::Pin,
|
||||
task::{Context, Poll},
|
||||
};
|
||||
use tokio::sync::mpsc::UnboundedSender;
|
||||
use tokio_stream::wrappers::UnboundedReceiverStream;
|
||||
|
||||
/// A trait that allows to offer additional RLPx-based application-level protocols when establishing
|
||||
/// a peer-to-peer connection.
|
||||
pub trait ProtocolHandler: fmt::Debug + Send + Sync + 'static {
|
||||
/// The type responsible for negotiating the protocol with the remote.
|
||||
type ConnectionHandler: ConnectionHandler;
|
||||
|
||||
/// Invoked when a new incoming connection from the remote is requested
|
||||
///
|
||||
/// If protocols for this outgoing should be announced to the remote, return a connection
|
||||
/// handler.
|
||||
fn on_incoming(&self, socket_addr: SocketAddr) -> Option<Self::ConnectionHandler>;
|
||||
|
||||
/// Invoked when a new outgoing connection to the remote is requested.
|
||||
///
|
||||
/// If protocols for this outgoing should be announced to the remote, return a connection
|
||||
/// handler.
|
||||
fn on_outgoing(
|
||||
&self,
|
||||
socket_addr: SocketAddr,
|
||||
peer_id: PeerId,
|
||||
) -> Option<Self::ConnectionHandler>;
|
||||
}
|
||||
|
||||
/// A trait that allows to authenticate a protocol after the RLPx connection was established.
|
||||
pub trait ConnectionHandler: Send + Sync + 'static {
|
||||
/// The future that handles the connection
|
||||
type Connection: Future<Output = ()> + Send + 'static;
|
||||
|
||||
/// Returns the protocols to announce when the RLPx connection will be established.
|
||||
///
|
||||
/// This will be negotiated with the remote peer.
|
||||
fn protocol(&self) -> Protocol;
|
||||
|
||||
/// Invoked when the RLPx connection has been established by the peer does not share the
|
||||
/// protocol.
|
||||
fn on_unsupported_by_peer(
|
||||
self,
|
||||
supported: &SharedCapability,
|
||||
direction: Direction,
|
||||
peer_id: PeerId,
|
||||
) -> OnNotSupported;
|
||||
|
||||
/// Invoked when the RLPx connection was established.
|
||||
///
|
||||
/// The returned future should resolve when the connection should disconnect.
|
||||
fn into_connection(
|
||||
self,
|
||||
direction: Direction,
|
||||
peer_id: PeerId,
|
||||
conn: ProtocolConnection,
|
||||
) -> Self::Connection;
|
||||
}
|
||||
|
||||
/// What to do when a protocol is not supported by the remote.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
|
||||
pub enum OnNotSupported {
|
||||
/// Proceed with the connection and ignore the protocol.
|
||||
#[default]
|
||||
KeepAlive,
|
||||
/// Disconnect the connection.
|
||||
Disconnect,
|
||||
}
|
||||
|
||||
/// A connection channel to send and receive messages for a specific protocols.
|
||||
///
|
||||
/// This is a [Stream] that returns raw bytes of the received messages for this protocol.
|
||||
#[derive(Debug)]
|
||||
pub struct ProtocolConnection {
|
||||
from_wire: UnboundedReceiverStream<BytesMut>,
|
||||
to_wire: UnboundedSender<ProtocolMessage>,
|
||||
}
|
||||
|
||||
impl ProtocolConnection {
|
||||
/// Sends a message to the remote.
|
||||
///
|
||||
/// Returns an error if the connection has been disconnected.
|
||||
pub fn send(&self, msg: BytesMut) {
|
||||
self.to_wire.send(ProtocolMessage::Message(msg)).ok();
|
||||
}
|
||||
|
||||
/// Disconnects the connection.
|
||||
pub fn disconnect(&self) {
|
||||
let _ = self.to_wire.send(ProtocolMessage::Disconnect);
|
||||
}
|
||||
}
|
||||
|
||||
impl Stream for ProtocolConnection {
|
||||
type Item = BytesMut;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
self.from_wire.poll_next_unpin(cx)
|
||||
}
|
||||
}
|
||||
|
||||
/// Messages that can be sent from a protocol connection
|
||||
#[derive(Debug)]
|
||||
pub(crate) enum ProtocolMessage {
|
||||
/// New message to send to the remote.
|
||||
Message(BytesMut),
|
||||
/// Disconnect the connection.
|
||||
Disconnect,
|
||||
}
|
||||
|
||||
/// Errors that can occur when handling a protocol.
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum ProtocolError {
|
||||
/// custom error message
|
||||
#[error("{0}")]
|
||||
Message(String),
|
||||
/// Ayn other error
|
||||
#[error(transparent)]
|
||||
Other(Box<dyn std::error::Error + Send + Sync + 'static>),
|
||||
}
|
||||
|
||||
impl ProtocolError {
|
||||
/// Creates a new error with the given message.
|
||||
pub fn msg(msg: impl fmt::Display) -> Self {
|
||||
ProtocolError::Message(msg.to_string())
|
||||
}
|
||||
|
||||
/// Wraps the given error in a `ProtocolError`.
|
||||
pub fn new(err: impl std::error::Error + Send + Sync + 'static) -> Self {
|
||||
ProtocolError::Other(Box::new(err))
|
||||
}
|
||||
}
|
||||
|
||||
/// A wrapper type for a RLPx sub-protocol.
|
||||
#[derive(Debug)]
|
||||
pub struct RlpxSubProtocol(Box<dyn DynProtocolHandler>);
|
||||
|
||||
/// A helper trait to convert a [ProtocolHandler] into a dynamic type
|
||||
pub trait IntoRlpxSubProtocol {
|
||||
/// Converts the type into a [RlpxSubProtocol].
|
||||
fn into_rlpx_sub_protocol(self) -> RlpxSubProtocol;
|
||||
}
|
||||
|
||||
impl<T> IntoRlpxSubProtocol for T
|
||||
where
|
||||
T: ProtocolHandler + Send + Sync + 'static,
|
||||
{
|
||||
fn into_rlpx_sub_protocol(self) -> RlpxSubProtocol {
|
||||
RlpxSubProtocol(Box::new(self))
|
||||
}
|
||||
}
|
||||
|
||||
/// Additional RLPx-based sub-protocols.
|
||||
#[derive(Debug, Default)]
|
||||
pub struct RlpxSubProtocols {
|
||||
/// All extra protocols
|
||||
protocols: Vec<RlpxSubProtocol>,
|
||||
}
|
||||
|
||||
impl RlpxSubProtocols {
|
||||
/// Adds a new protocol.
|
||||
pub fn push(&mut self, protocol: impl IntoRlpxSubProtocol) {
|
||||
self.protocols.push(protocol.into_rlpx_sub_protocol());
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) trait DynProtocolHandler: fmt::Debug + Send + Sync + 'static {
|
||||
fn on_incoming(&self, socket_addr: SocketAddr) -> Option<Box<dyn DynConnectionHandler>>;
|
||||
|
||||
fn on_outgoing(
|
||||
&self,
|
||||
socket_addr: SocketAddr,
|
||||
peer_id: PeerId,
|
||||
) -> Option<Box<dyn DynConnectionHandler>>;
|
||||
}
|
||||
|
||||
impl<T> DynProtocolHandler for T
|
||||
where
|
||||
T: ProtocolHandler,
|
||||
{
|
||||
fn on_incoming(&self, socket_addr: SocketAddr) -> Option<Box<dyn DynConnectionHandler>> {
|
||||
T::on_incoming(self, socket_addr)
|
||||
.map(|handler| Box::new(handler) as Box<dyn DynConnectionHandler>)
|
||||
}
|
||||
|
||||
fn on_outgoing(
|
||||
&self,
|
||||
socket_addr: SocketAddr,
|
||||
peer_id: PeerId,
|
||||
) -> Option<Box<dyn DynConnectionHandler>> {
|
||||
T::on_outgoing(self, socket_addr, peer_id)
|
||||
.map(|handler| Box::new(handler) as Box<dyn DynConnectionHandler>)
|
||||
}
|
||||
}
|
||||
|
||||
/// Wrapper trait for internal ease of use.
|
||||
pub(crate) trait DynConnectionHandler: Send + Sync + 'static {
|
||||
fn protocol(&self) -> Protocol;
|
||||
|
||||
fn on_unsupported_by_peer(
|
||||
self,
|
||||
supported: &SharedCapability,
|
||||
direction: Direction,
|
||||
peer_id: PeerId,
|
||||
) -> OnNotSupported;
|
||||
|
||||
fn into_connection(
|
||||
self,
|
||||
direction: Direction,
|
||||
peer_id: PeerId,
|
||||
conn: ProtocolConnection,
|
||||
) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>>;
|
||||
}
|
||||
|
||||
impl<T> DynConnectionHandler for T
|
||||
where
|
||||
T: ConnectionHandler,
|
||||
{
|
||||
fn protocol(&self) -> Protocol {
|
||||
T::protocol(self)
|
||||
}
|
||||
|
||||
fn on_unsupported_by_peer(
|
||||
self,
|
||||
supported: &SharedCapability,
|
||||
direction: Direction,
|
||||
peer_id: PeerId,
|
||||
) -> OnNotSupported {
|
||||
T::on_unsupported_by_peer(self, supported, direction, peer_id)
|
||||
}
|
||||
|
||||
fn into_connection(
|
||||
self,
|
||||
direction: Direction,
|
||||
peer_id: PeerId,
|
||||
conn: ProtocolConnection,
|
||||
) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> {
|
||||
Box::pin(T::into_connection(self, direction, peer_id, conn))
|
||||
}
|
||||
}
|
||||
@@ -48,6 +48,7 @@ pub use handle::{
|
||||
SessionCommand,
|
||||
};
|
||||
|
||||
use crate::protocol::RlpxSubProtocols;
|
||||
pub use reth_network_api::{Direction, PeerInfo};
|
||||
|
||||
/// Internal identifier for active sessions.
|
||||
@@ -101,6 +102,9 @@ pub struct SessionManager {
|
||||
active_session_tx: MeteredPollSender<ActiveSessionMessage>,
|
||||
/// Receiver half that listens for [`ActiveSessionMessage`] produced by pending sessions.
|
||||
active_session_rx: ReceiverStream<ActiveSessionMessage>,
|
||||
/// Additional RLPx sub-protocols to be used by the session manager.
|
||||
#[allow(unused)]
|
||||
extra_protocols: RlpxSubProtocols,
|
||||
/// Used to measure inbound & outbound bandwidth across all managed streams
|
||||
bandwidth_meter: BandwidthMeter,
|
||||
/// Metrics for the session manager.
|
||||
@@ -111,6 +115,7 @@ pub struct SessionManager {
|
||||
|
||||
impl SessionManager {
|
||||
/// Creates a new empty [`SessionManager`].
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn new(
|
||||
secret_key: SecretKey,
|
||||
config: SessionsConfig,
|
||||
@@ -118,6 +123,7 @@ impl SessionManager {
|
||||
status: Status,
|
||||
hello_message: HelloMessageWithProtocols,
|
||||
fork_filter: ForkFilter,
|
||||
extra_protocols: RlpxSubProtocols,
|
||||
bandwidth_meter: BandwidthMeter,
|
||||
) -> Self {
|
||||
let (pending_sessions_tx, pending_sessions_rx) = mpsc::channel(config.session_event_buffer);
|
||||
@@ -142,6 +148,7 @@ impl SessionManager {
|
||||
active_session_tx: MeteredPollSender::new(active_session_tx, "network_active_session"),
|
||||
active_session_rx: ReceiverStream::new(active_session_rx),
|
||||
bandwidth_meter,
|
||||
extra_protocols,
|
||||
metrics: Default::default(),
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user