chore: make some session types pub (#3666)

This commit is contained in:
Dan Cline
2023-07-14 16:23:00 -04:00
committed by GitHub
parent 77faa04ca6
commit c1b1eac505
3 changed files with 147 additions and 35 deletions

View File

@@ -151,6 +151,10 @@ pub use manager::{NetworkEvent, NetworkManager};
pub use message::PeerRequest;
pub use network::NetworkHandle;
pub use peers::PeersConfig;
pub use session::{PeerInfo, SessionsConfig};
pub use session::{
ActiveSessionHandle, ActiveSessionMessage, Direction, PeerInfo, PendingSessionEvent,
PendingSessionHandle, PendingSessionHandshakeError, SessionCommand, SessionEvent, SessionId,
SessionLimits, SessionManager, SessionsConfig,
};
pub use reth_eth_wire::{DisconnectReason, HelloBuilder, HelloMessage};

View File

@@ -14,7 +14,10 @@ use reth_primitives::PeerId;
use std::{io, net::SocketAddr, sync::Arc, time::Instant};
use tokio::{
net::TcpStream,
sync::{mpsc, oneshot},
sync::{
mpsc::{self, error::SendError},
oneshot,
},
};
/// A handler attached to a peer session that's not authenticated yet, pending Handshake and hello
@@ -22,7 +25,7 @@ use tokio::{
///
/// This session needs to wait until it is authenticated.
#[derive(Debug)]
pub(crate) struct PendingSessionHandle {
pub struct PendingSessionHandle {
/// Can be used to tell the session to disconnect the connection/abort the handshake process.
pub(crate) disconnect_tx: Option<oneshot::Sender<()>>,
/// The direction of the session
@@ -33,11 +36,16 @@ pub(crate) struct PendingSessionHandle {
impl PendingSessionHandle {
/// Sends a disconnect command to the pending session.
pub(crate) fn disconnect(&mut self) {
pub fn disconnect(&mut self) {
if let Some(tx) = self.disconnect_tx.take() {
let _ = tx.send(());
}
}
/// Returns the direction of the pending session (inbound or outbound).
pub fn direction(&self) -> Direction {
self.direction
}
}
/// An established session with a remote peer.
@@ -46,7 +54,7 @@ impl PendingSessionHandle {
/// be performed: chain synchronization, block propagation and transaction exchange.
#[derive(Debug)]
#[allow(unused)]
pub(crate) struct ActiveSessionHandle {
pub struct ActiveSessionHandle {
/// The direction of the session
pub(crate) direction: Direction,
/// The assigned id for this session
@@ -71,10 +79,59 @@ pub(crate) struct ActiveSessionHandle {
impl ActiveSessionHandle {
/// Sends a disconnect command to the session.
pub(crate) fn disconnect(&self, reason: Option<DisconnectReason>) {
pub fn disconnect(&self, reason: Option<DisconnectReason>) {
// Note: we clone the sender which ensures the channel has capacity to send the message
let _ = self.commands_to_session.clone().try_send(SessionCommand::Disconnect { reason });
}
/// Sends a disconnect command to the session, awaiting the command channel for available
/// capacity.
pub async fn try_disconnect(
&self,
reason: Option<DisconnectReason>,
) -> Result<(), SendError<SessionCommand>> {
self.commands_to_session.clone().send(SessionCommand::Disconnect { reason }).await
}
/// Returns the direction of the active session (inbound or outbound).
pub fn direction(&self) -> Direction {
self.direction
}
/// Returns the assigned session id for this session.
pub fn session_id(&self) -> SessionId {
self.session_id
}
/// Returns the negotiated eth version for this session.
pub fn version(&self) -> EthVersion {
self.version
}
/// Returns the identifier of the remote peer.
pub fn remote_id(&self) -> PeerId {
self.remote_id
}
/// Returns the timestamp when the session has been established.
pub fn established(&self) -> Instant {
self.established
}
/// Returns the announced capabilities of the peer.
pub fn capabilities(&self) -> Arc<Capabilities> {
self.capabilities.clone()
}
/// Returns the client's name and version.
pub fn client_version(&self) -> Arc<String> {
self.client_version.clone()
}
/// Returns the address we're connected to.
pub fn remote_addr(&self) -> SocketAddr {
self.remote_addr
}
}
/// Info about an active peer session.
@@ -98,46 +155,66 @@ pub struct PeerInfo {
///
/// A session starts with a `Handshake`, followed by a `Hello` message which
#[derive(Debug)]
pub(crate) enum PendingSessionEvent {
pub enum PendingSessionEvent {
/// Represents a successful `Hello` and `Status` exchange: <https://github.com/ethereum/devp2p/blob/6b0abc3d956a626c28dce1307ee9f546db17b6bd/rlpx.md#hello-0x00>
Established {
/// An internal identifier for the established session
session_id: SessionId,
/// The remote node's socket address
remote_addr: SocketAddr,
/// The remote node's public key
peer_id: PeerId,
/// All capabilities the peer announced
capabilities: Arc<Capabilities>,
/// The Status message the peer sent for the `eth` handshake
status: Status,
/// The actual connection stream which can be used to send and receive `eth` protocol
/// messages
conn: EthStream<P2PStream<ECIESStream<MeteredStream<TcpStream>>>>,
/// The direction of the session, either `Inbound` or `Outgoing`
direction: Direction,
/// The remote node's user agent, usually containing the client name and version
client_id: String,
},
/// Handshake unsuccessful, session was disconnected.
Disconnected {
/// The remote node's socket address
remote_addr: SocketAddr,
/// The internal identifier for the disconnected session
session_id: SessionId,
/// The direction of the session, either `Inbound` or `Outgoing`
direction: Direction,
/// The error that caused the disconnect
error: Option<EthStreamError>,
},
/// Thrown when unable to establish a [`TcpStream`].
OutgoingConnectionError {
/// The remote node's socket address
remote_addr: SocketAddr,
/// The internal identifier for the disconnected session
session_id: SessionId,
/// The remote node's public key
peer_id: PeerId,
/// The error that caused the outgoing connection failure
error: io::Error,
},
/// Thrown when authentication via Ecies failed.
/// Thrown when authentication via ECIES failed.
EciesAuthError {
/// The remote node's socket address
remote_addr: SocketAddr,
/// The internal identifier for the disconnected session
session_id: SessionId,
/// The error that caused the ECIES session to fail
error: ECIESError,
/// The direction of the session, either `Inbound` or `Outgoing`
direction: Direction,
},
}
/// Commands that can be sent to the spawned session.
#[derive(Debug)]
pub(crate) enum SessionCommand {
pub enum SessionCommand {
/// Disconnect the connection
Disconnect {
/// Why the disconnect was initiated
@@ -150,12 +227,19 @@ pub(crate) enum SessionCommand {
/// Message variants an active session can produce and send back to the
/// [`SessionManager`](crate::session::SessionManager)
#[derive(Debug)]
pub(crate) enum ActiveSessionMessage {
pub enum ActiveSessionMessage {
/// Session was gracefully disconnected.
Disconnected { peer_id: PeerId, remote_addr: SocketAddr },
Disconnected {
/// The remote node's public key
peer_id: PeerId,
/// The remote node's socket address
remote_addr: SocketAddr,
},
/// Session was closed due an error
ClosedOnConnectionError {
/// The remote node's public key
peer_id: PeerId,
/// The remote node's socket address
remote_addr: SocketAddr,
/// The error that caused the session to close
error: EthStreamError,

View File

@@ -2,14 +2,7 @@
use crate::{
message::PeerMessage,
metrics::SesssionManagerMetrics,
session::{
active::ActiveSession,
config::SessionCounter,
handle::{
ActiveSessionHandle, ActiveSessionMessage, PendingSessionEvent, PendingSessionHandle,
SessionCommand,
},
},
session::{active::ActiveSession, config::SessionCounter},
};
pub use crate::{message::PeerRequestSender, session::handle::PeerInfo};
use fnv::FnvHashMap;
@@ -47,7 +40,11 @@ use tracing::{instrument, trace};
mod active;
mod config;
mod handle;
pub use config::SessionsConfig;
pub use config::{SessionLimits, SessionsConfig};
pub use handle::{
ActiveSessionHandle, ActiveSessionMessage, PendingSessionEvent, PendingSessionHandle,
SessionCommand,
};
/// Internal identifier for active sessions.
#[derive(Debug, Clone, Copy, PartialOrd, PartialEq, Eq, Hash)]
@@ -56,7 +53,7 @@ pub struct SessionId(usize);
/// Manages a set of sessions.
#[must_use = "Session Manager must be polled to process session events."]
#[derive(Debug)]
pub(crate) struct SessionManager {
pub struct SessionManager {
/// Tracks the identifier for the next session.
next_id: usize,
/// Keeps track of all sessions
@@ -110,7 +107,7 @@ pub(crate) struct SessionManager {
impl SessionManager {
/// Creates a new empty [`SessionManager`].
pub(crate) fn new(
pub fn new(
secret_key: SecretKey,
config: SessionsConfig,
executor: Box<dyn TaskSpawner>,
@@ -146,7 +143,7 @@ impl SessionManager {
/// Check whether the provided [`ForkId`] is compatible based on the validation rules in
/// `EIP-2124`.
pub(crate) fn is_valid_fork_id(&self, fork_id: ForkId) -> bool {
pub fn is_valid_fork_id(&self, fork_id: ForkId) -> bool {
self.fork_filter.validate(fork_id).is_ok()
}
@@ -158,12 +155,12 @@ impl SessionManager {
}
/// Returns the current status of the session.
pub(crate) fn status(&self) -> Status {
pub fn status(&self) -> Status {
self.status
}
/// Returns the session hello message.
pub(crate) fn hello_message(&self) -> HelloMessage {
pub fn hello_message(&self) -> HelloMessage {
self.hello_message.clone()
}
@@ -235,7 +232,7 @@ impl SessionManager {
}
/// Starts a new pending session from the local node to the given remote node.
pub(crate) fn dial_outbound(&mut self, remote_addr: SocketAddr, remote_peer_id: PeerId) {
pub fn dial_outbound(&mut self, remote_addr: SocketAddr, remote_peer_id: PeerId) {
// The error can be dropped because no dial will be made if it would exceed the limit
if self.counter.ensure_pending_outbound().is_ok() {
let session_id = self.next_id();
@@ -272,7 +269,7 @@ impl SessionManager {
///
/// This will trigger the disconnect on the session task to gracefully terminate. The result
/// will be picked up by the receiver.
pub(crate) fn disconnect(&self, node: PeerId, reason: Option<DisconnectReason>) {
pub fn disconnect(&self, node: PeerId, reason: Option<DisconnectReason>) {
if let Some(session) = self.active_sessions.get(&node) {
session.disconnect(reason);
}
@@ -297,21 +294,21 @@ impl SessionManager {
///
/// It will trigger the disconnect on all the session tasks to gracefully terminate. The result
/// will be picked by the receiver.
pub(crate) fn disconnect_all(&self, reason: Option<DisconnectReason>) {
pub fn disconnect_all(&self, reason: Option<DisconnectReason>) {
for (_, session) in self.active_sessions.iter() {
session.disconnect(reason);
}
}
/// Disconnects all pending sessions.
pub(crate) fn disconnect_all_pending(&mut self) {
pub fn disconnect_all_pending(&mut self) {
for (_, session) in self.pending_sessions.iter_mut() {
session.disconnect();
}
}
/// Sends a message to the peer's session
pub(crate) fn send_message(&mut self, peer_id: &PeerId, msg: PeerMessage) {
pub fn send_message(&mut self, peer_id: &PeerId, msg: PeerMessage) {
if let Some(session) = self.active_sessions.get_mut(peer_id) {
let _ = session.commands_to_session.try_send(SessionCommand::Message(msg));
}
@@ -565,7 +562,7 @@ impl SessionManager {
}
/// Returns [`PeerInfo`] for all connected peers
pub(crate) fn get_peer_info(&self) -> Vec<PeerInfo> {
pub fn get_peer_info(&self) -> Vec<PeerInfo> {
self.active_sessions
.values()
.map(|session| PeerInfo {
@@ -581,7 +578,7 @@ impl SessionManager {
/// Returns [`PeerInfo`] for a given peer.
///
/// Returns `None` if there's no active session to the peer.
pub(crate) fn get_peer_info_by_id(&self, peer_id: PeerId) -> Option<PeerInfo> {
pub fn get_peer_info_by_id(&self, peer_id: PeerId) -> Option<PeerInfo> {
self.active_sessions.get(&peer_id).map(|session| PeerInfo {
remote_id: session.remote_id,
direction: session.direction,
@@ -594,35 +591,50 @@ impl SessionManager {
/// Events produced by the [`SessionManager`]
#[derive(Debug)]
pub(crate) enum SessionEvent {
pub enum SessionEvent {
/// A new session was successfully authenticated.
///
/// This session is now able to exchange data.
SessionEstablished {
/// The remote node's public key
peer_id: PeerId,
/// The remote node's socket address
remote_addr: SocketAddr,
/// The user agent of the remote node, usually containing the client name and version
client_version: Arc<String>,
/// The capabilities the remote node has announced
capabilities: Arc<Capabilities>,
/// negotiated eth version
version: EthVersion,
/// The Status message the peer sent during the `eth` handshake
status: Status,
/// The channel for sending messages to the peer with the session
messages: PeerRequestSender,
/// The direction of the session, either `Inbound` or `Outgoing`
direction: Direction,
/// The maximum time that the session waits for a response from the peer before timing out
/// the connection
timeout: Arc<AtomicU64>,
},
/// The peer was already connected with another session.
AlreadyConnected {
/// The remote node's public key
peer_id: PeerId,
/// The remote node's socket address
remote_addr: SocketAddr,
/// The direction of the session, either `Inbound` or `Outgoing`
direction: Direction,
},
/// A session received a valid message via RLPx.
ValidMessage {
/// The remote node's public key
peer_id: PeerId,
/// Message received from the peer.
message: PeerMessage,
},
/// Received a message that does not match the announced capabilities of the peer.
InvalidMessage {
/// The remote node's public key
peer_id: PeerId,
/// Announced capabilities of the remote peer.
capabilities: Arc<Capabilities>,
@@ -641,19 +653,27 @@ pub(crate) enum SessionEvent {
},
/// Closed an incoming pending session during handshaking.
IncomingPendingSessionClosed {
/// The remote node's socket address
remote_addr: SocketAddr,
/// The pending handshake session error that caused the session to close
error: Option<PendingSessionHandshakeError>,
},
/// Closed an outgoing pending session during handshaking.
OutgoingPendingSessionClosed {
/// The remote node's socket address
remote_addr: SocketAddr,
/// The remote node's public key
peer_id: PeerId,
/// The pending handshake session error that caused the session to close
error: Option<PendingSessionHandshakeError>,
},
/// Failed to establish a tcp stream
OutgoingConnectionError {
/// The remote node's socket address
remote_addr: SocketAddr,
/// The remote node's public key
peer_id: PeerId,
/// The error that caused the outgoing connection to fail
error: io::Error,
},
/// Session was closed due to an error
@@ -667,15 +687,19 @@ pub(crate) enum SessionEvent {
},
/// Active session was gracefully disconnected.
Disconnected {
/// The remote node's public key
peer_id: PeerId,
/// The remote node's socket address that we were connected to
remote_addr: SocketAddr,
},
}
/// Errors that can occur during handshaking/authenticating the underlying streams.
#[derive(Debug)]
pub(crate) enum PendingSessionHandshakeError {
pub enum PendingSessionHandshakeError {
/// The pending session failed due to an error while establishing the `eth` stream
Eth(EthStreamError),
/// The pending session failed due to an error while establishing the ECIES stream
Ecies(ECIESError),
}
@@ -700,7 +724,7 @@ pub enum Direction {
impl Direction {
/// Returns `true` if this an incoming connection.
pub(crate) fn is_incoming(&self) -> bool {
pub fn is_incoming(&self) -> bool {
matches!(self, Direction::Incoming)
}