diff --git a/crates/net/network/src/lib.rs b/crates/net/network/src/lib.rs index 766ac66849..25733cc110 100644 --- a/crates/net/network/src/lib.rs +++ b/crates/net/network/src/lib.rs @@ -151,6 +151,10 @@ pub use manager::{NetworkEvent, NetworkManager}; pub use message::PeerRequest; pub use network::NetworkHandle; pub use peers::PeersConfig; -pub use session::{PeerInfo, SessionsConfig}; +pub use session::{ + ActiveSessionHandle, ActiveSessionMessage, Direction, PeerInfo, PendingSessionEvent, + PendingSessionHandle, PendingSessionHandshakeError, SessionCommand, SessionEvent, SessionId, + SessionLimits, SessionManager, SessionsConfig, +}; pub use reth_eth_wire::{DisconnectReason, HelloBuilder, HelloMessage}; diff --git a/crates/net/network/src/session/handle.rs b/crates/net/network/src/session/handle.rs index 8d233039de..1b4f893b6e 100644 --- a/crates/net/network/src/session/handle.rs +++ b/crates/net/network/src/session/handle.rs @@ -14,7 +14,10 @@ use reth_primitives::PeerId; use std::{io, net::SocketAddr, sync::Arc, time::Instant}; use tokio::{ net::TcpStream, - sync::{mpsc, oneshot}, + sync::{ + mpsc::{self, error::SendError}, + oneshot, + }, }; /// A handler attached to a peer session that's not authenticated yet, pending Handshake and hello @@ -22,7 +25,7 @@ use tokio::{ /// /// This session needs to wait until it is authenticated. #[derive(Debug)] -pub(crate) struct PendingSessionHandle { +pub struct PendingSessionHandle { /// Can be used to tell the session to disconnect the connection/abort the handshake process. pub(crate) disconnect_tx: Option>, /// The direction of the session @@ -33,11 +36,16 @@ pub(crate) struct PendingSessionHandle { impl PendingSessionHandle { /// Sends a disconnect command to the pending session. - pub(crate) fn disconnect(&mut self) { + pub fn disconnect(&mut self) { if let Some(tx) = self.disconnect_tx.take() { let _ = tx.send(()); } } + + /// Returns the direction of the pending session (inbound or outbound). + pub fn direction(&self) -> Direction { + self.direction + } } /// An established session with a remote peer. @@ -46,7 +54,7 @@ impl PendingSessionHandle { /// be performed: chain synchronization, block propagation and transaction exchange. #[derive(Debug)] #[allow(unused)] -pub(crate) struct ActiveSessionHandle { +pub struct ActiveSessionHandle { /// The direction of the session pub(crate) direction: Direction, /// The assigned id for this session @@ -71,10 +79,59 @@ pub(crate) struct ActiveSessionHandle { impl ActiveSessionHandle { /// Sends a disconnect command to the session. - pub(crate) fn disconnect(&self, reason: Option) { + pub fn disconnect(&self, reason: Option) { // Note: we clone the sender which ensures the channel has capacity to send the message let _ = self.commands_to_session.clone().try_send(SessionCommand::Disconnect { reason }); } + + /// Sends a disconnect command to the session, awaiting the command channel for available + /// capacity. + pub async fn try_disconnect( + &self, + reason: Option, + ) -> Result<(), SendError> { + self.commands_to_session.clone().send(SessionCommand::Disconnect { reason }).await + } + + /// Returns the direction of the active session (inbound or outbound). + pub fn direction(&self) -> Direction { + self.direction + } + + /// Returns the assigned session id for this session. + pub fn session_id(&self) -> SessionId { + self.session_id + } + + /// Returns the negotiated eth version for this session. + pub fn version(&self) -> EthVersion { + self.version + } + + /// Returns the identifier of the remote peer. + pub fn remote_id(&self) -> PeerId { + self.remote_id + } + + /// Returns the timestamp when the session has been established. + pub fn established(&self) -> Instant { + self.established + } + + /// Returns the announced capabilities of the peer. + pub fn capabilities(&self) -> Arc { + self.capabilities.clone() + } + + /// Returns the client's name and version. + pub fn client_version(&self) -> Arc { + self.client_version.clone() + } + + /// Returns the address we're connected to. + pub fn remote_addr(&self) -> SocketAddr { + self.remote_addr + } } /// Info about an active peer session. @@ -98,46 +155,66 @@ pub struct PeerInfo { /// /// A session starts with a `Handshake`, followed by a `Hello` message which #[derive(Debug)] -pub(crate) enum PendingSessionEvent { +pub enum PendingSessionEvent { /// Represents a successful `Hello` and `Status` exchange: Established { + /// An internal identifier for the established session session_id: SessionId, + /// The remote node's socket address remote_addr: SocketAddr, /// The remote node's public key peer_id: PeerId, + /// All capabilities the peer announced capabilities: Arc, + /// The Status message the peer sent for the `eth` handshake status: Status, + /// The actual connection stream which can be used to send and receive `eth` protocol + /// messages conn: EthStream>>>, + /// The direction of the session, either `Inbound` or `Outgoing` direction: Direction, + /// The remote node's user agent, usually containing the client name and version client_id: String, }, /// Handshake unsuccessful, session was disconnected. Disconnected { + /// The remote node's socket address remote_addr: SocketAddr, + /// The internal identifier for the disconnected session session_id: SessionId, + /// The direction of the session, either `Inbound` or `Outgoing` direction: Direction, + /// The error that caused the disconnect error: Option, }, /// Thrown when unable to establish a [`TcpStream`]. OutgoingConnectionError { + /// The remote node's socket address remote_addr: SocketAddr, + /// The internal identifier for the disconnected session session_id: SessionId, + /// The remote node's public key peer_id: PeerId, + /// The error that caused the outgoing connection failure error: io::Error, }, - /// Thrown when authentication via Ecies failed. + /// Thrown when authentication via ECIES failed. EciesAuthError { + /// The remote node's socket address remote_addr: SocketAddr, + /// The internal identifier for the disconnected session session_id: SessionId, + /// The error that caused the ECIES session to fail error: ECIESError, + /// The direction of the session, either `Inbound` or `Outgoing` direction: Direction, }, } /// Commands that can be sent to the spawned session. #[derive(Debug)] -pub(crate) enum SessionCommand { +pub enum SessionCommand { /// Disconnect the connection Disconnect { /// Why the disconnect was initiated @@ -150,12 +227,19 @@ pub(crate) enum SessionCommand { /// Message variants an active session can produce and send back to the /// [`SessionManager`](crate::session::SessionManager) #[derive(Debug)] -pub(crate) enum ActiveSessionMessage { +pub enum ActiveSessionMessage { /// Session was gracefully disconnected. - Disconnected { peer_id: PeerId, remote_addr: SocketAddr }, + Disconnected { + /// The remote node's public key + peer_id: PeerId, + /// The remote node's socket address + remote_addr: SocketAddr, + }, /// Session was closed due an error ClosedOnConnectionError { + /// The remote node's public key peer_id: PeerId, + /// The remote node's socket address remote_addr: SocketAddr, /// The error that caused the session to close error: EthStreamError, diff --git a/crates/net/network/src/session/mod.rs b/crates/net/network/src/session/mod.rs index b0d628d48c..d31d2c635b 100644 --- a/crates/net/network/src/session/mod.rs +++ b/crates/net/network/src/session/mod.rs @@ -2,14 +2,7 @@ use crate::{ message::PeerMessage, metrics::SesssionManagerMetrics, - session::{ - active::ActiveSession, - config::SessionCounter, - handle::{ - ActiveSessionHandle, ActiveSessionMessage, PendingSessionEvent, PendingSessionHandle, - SessionCommand, - }, - }, + session::{active::ActiveSession, config::SessionCounter}, }; pub use crate::{message::PeerRequestSender, session::handle::PeerInfo}; use fnv::FnvHashMap; @@ -47,7 +40,11 @@ use tracing::{instrument, trace}; mod active; mod config; mod handle; -pub use config::SessionsConfig; +pub use config::{SessionLimits, SessionsConfig}; +pub use handle::{ + ActiveSessionHandle, ActiveSessionMessage, PendingSessionEvent, PendingSessionHandle, + SessionCommand, +}; /// Internal identifier for active sessions. #[derive(Debug, Clone, Copy, PartialOrd, PartialEq, Eq, Hash)] @@ -56,7 +53,7 @@ pub struct SessionId(usize); /// Manages a set of sessions. #[must_use = "Session Manager must be polled to process session events."] #[derive(Debug)] -pub(crate) struct SessionManager { +pub struct SessionManager { /// Tracks the identifier for the next session. next_id: usize, /// Keeps track of all sessions @@ -110,7 +107,7 @@ pub(crate) struct SessionManager { impl SessionManager { /// Creates a new empty [`SessionManager`]. - pub(crate) fn new( + pub fn new( secret_key: SecretKey, config: SessionsConfig, executor: Box, @@ -146,7 +143,7 @@ impl SessionManager { /// Check whether the provided [`ForkId`] is compatible based on the validation rules in /// `EIP-2124`. - pub(crate) fn is_valid_fork_id(&self, fork_id: ForkId) -> bool { + pub fn is_valid_fork_id(&self, fork_id: ForkId) -> bool { self.fork_filter.validate(fork_id).is_ok() } @@ -158,12 +155,12 @@ impl SessionManager { } /// Returns the current status of the session. - pub(crate) fn status(&self) -> Status { + pub fn status(&self) -> Status { self.status } /// Returns the session hello message. - pub(crate) fn hello_message(&self) -> HelloMessage { + pub fn hello_message(&self) -> HelloMessage { self.hello_message.clone() } @@ -235,7 +232,7 @@ impl SessionManager { } /// Starts a new pending session from the local node to the given remote node. - pub(crate) fn dial_outbound(&mut self, remote_addr: SocketAddr, remote_peer_id: PeerId) { + pub fn dial_outbound(&mut self, remote_addr: SocketAddr, remote_peer_id: PeerId) { // The error can be dropped because no dial will be made if it would exceed the limit if self.counter.ensure_pending_outbound().is_ok() { let session_id = self.next_id(); @@ -272,7 +269,7 @@ impl SessionManager { /// /// This will trigger the disconnect on the session task to gracefully terminate. The result /// will be picked up by the receiver. - pub(crate) fn disconnect(&self, node: PeerId, reason: Option) { + pub fn disconnect(&self, node: PeerId, reason: Option) { if let Some(session) = self.active_sessions.get(&node) { session.disconnect(reason); } @@ -297,21 +294,21 @@ impl SessionManager { /// /// It will trigger the disconnect on all the session tasks to gracefully terminate. The result /// will be picked by the receiver. - pub(crate) fn disconnect_all(&self, reason: Option) { + pub fn disconnect_all(&self, reason: Option) { for (_, session) in self.active_sessions.iter() { session.disconnect(reason); } } /// Disconnects all pending sessions. - pub(crate) fn disconnect_all_pending(&mut self) { + pub fn disconnect_all_pending(&mut self) { for (_, session) in self.pending_sessions.iter_mut() { session.disconnect(); } } /// Sends a message to the peer's session - pub(crate) fn send_message(&mut self, peer_id: &PeerId, msg: PeerMessage) { + pub fn send_message(&mut self, peer_id: &PeerId, msg: PeerMessage) { if let Some(session) = self.active_sessions.get_mut(peer_id) { let _ = session.commands_to_session.try_send(SessionCommand::Message(msg)); } @@ -565,7 +562,7 @@ impl SessionManager { } /// Returns [`PeerInfo`] for all connected peers - pub(crate) fn get_peer_info(&self) -> Vec { + pub fn get_peer_info(&self) -> Vec { self.active_sessions .values() .map(|session| PeerInfo { @@ -581,7 +578,7 @@ impl SessionManager { /// Returns [`PeerInfo`] for a given peer. /// /// Returns `None` if there's no active session to the peer. - pub(crate) fn get_peer_info_by_id(&self, peer_id: PeerId) -> Option { + pub fn get_peer_info_by_id(&self, peer_id: PeerId) -> Option { self.active_sessions.get(&peer_id).map(|session| PeerInfo { remote_id: session.remote_id, direction: session.direction, @@ -594,35 +591,50 @@ impl SessionManager { /// Events produced by the [`SessionManager`] #[derive(Debug)] -pub(crate) enum SessionEvent { +pub enum SessionEvent { /// A new session was successfully authenticated. /// /// This session is now able to exchange data. SessionEstablished { + /// The remote node's public key peer_id: PeerId, + /// The remote node's socket address remote_addr: SocketAddr, + /// The user agent of the remote node, usually containing the client name and version client_version: Arc, + /// The capabilities the remote node has announced capabilities: Arc, /// negotiated eth version version: EthVersion, + /// The Status message the peer sent during the `eth` handshake status: Status, + /// The channel for sending messages to the peer with the session messages: PeerRequestSender, + /// The direction of the session, either `Inbound` or `Outgoing` direction: Direction, + /// The maximum time that the session waits for a response from the peer before timing out + /// the connection timeout: Arc, }, + /// The peer was already connected with another session. AlreadyConnected { + /// The remote node's public key peer_id: PeerId, + /// The remote node's socket address remote_addr: SocketAddr, + /// The direction of the session, either `Inbound` or `Outgoing` direction: Direction, }, /// A session received a valid message via RLPx. ValidMessage { + /// The remote node's public key peer_id: PeerId, /// Message received from the peer. message: PeerMessage, }, /// Received a message that does not match the announced capabilities of the peer. InvalidMessage { + /// The remote node's public key peer_id: PeerId, /// Announced capabilities of the remote peer. capabilities: Arc, @@ -641,19 +653,27 @@ pub(crate) enum SessionEvent { }, /// Closed an incoming pending session during handshaking. IncomingPendingSessionClosed { + /// The remote node's socket address remote_addr: SocketAddr, + /// The pending handshake session error that caused the session to close error: Option, }, /// Closed an outgoing pending session during handshaking. OutgoingPendingSessionClosed { + /// The remote node's socket address remote_addr: SocketAddr, + /// The remote node's public key peer_id: PeerId, + /// The pending handshake session error that caused the session to close error: Option, }, /// Failed to establish a tcp stream OutgoingConnectionError { + /// The remote node's socket address remote_addr: SocketAddr, + /// The remote node's public key peer_id: PeerId, + /// The error that caused the outgoing connection to fail error: io::Error, }, /// Session was closed due to an error @@ -667,15 +687,19 @@ pub(crate) enum SessionEvent { }, /// Active session was gracefully disconnected. Disconnected { + /// The remote node's public key peer_id: PeerId, + /// The remote node's socket address that we were connected to remote_addr: SocketAddr, }, } /// Errors that can occur during handshaking/authenticating the underlying streams. #[derive(Debug)] -pub(crate) enum PendingSessionHandshakeError { +pub enum PendingSessionHandshakeError { + /// The pending session failed due to an error while establishing the `eth` stream Eth(EthStreamError), + /// The pending session failed due to an error while establishing the ECIES stream Ecies(ECIESError), } @@ -700,7 +724,7 @@ pub enum Direction { impl Direction { /// Returns `true` if this an incoming connection. - pub(crate) fn is_incoming(&self) -> bool { + pub fn is_incoming(&self) -> bool { matches!(self, Direction::Incoming) }