Stateful overridable handshake (#14567)

Co-authored-by: DaniPopes <57450786+DaniPopes@users.noreply.github.com>
Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
This commit is contained in:
Luca Provini
2025-02-28 14:17:41 +01:00
committed by GitHub
parent 6758612495
commit bffdda4312
19 changed files with 735 additions and 188 deletions

View File

@@ -1,6 +1,6 @@
//! Disconnect
use std::future::Future;
use std::{future::Future, pin::Pin};
use futures::{Sink, SinkExt};
use reth_ecies::stream::ECIESStream;
@@ -8,6 +8,8 @@ use reth_eth_wire_types::DisconnectReason;
use tokio::io::AsyncWrite;
use tokio_util::codec::{Encoder, Framed};
type DisconnectResult<E> = Result<(), E>;
/// This trait is meant to allow higher level protocols like `eth` to disconnect from a peer, using
/// lower-level disconnect functions (such as those that exist in the `p2p` protocol) if the
/// underlying stream supports it.
@@ -18,7 +20,7 @@ pub trait CanDisconnect<T>: Sink<T> + Unpin {
fn disconnect(
&mut self,
reason: DisconnectReason,
) -> impl Future<Output = Result<(), <Self as Sink<T>>::Error>> + Send;
) -> Pin<Box<dyn Future<Output = DisconnectResult<Self::Error>> + Send + '_>>;
}
// basic impls for things like Framed<TcpStream, etc>
@@ -27,11 +29,11 @@ where
T: AsyncWrite + Unpin + Send,
U: Encoder<I> + Send,
{
async fn disconnect(
fn disconnect(
&mut self,
_reason: DisconnectReason,
) -> Result<(), <Self as Sink<I>>::Error> {
self.close().await
) -> Pin<Box<dyn Future<Output = Result<(), <Self as Sink<I>>::Error>> + Send + '_>> {
Box::pin(async move { self.close().await })
}
}
@@ -39,8 +41,11 @@ impl<S> CanDisconnect<bytes::Bytes> for ECIESStream<S>
where
S: AsyncWrite + Unpin + Send,
{
async fn disconnect(&mut self, _reason: DisconnectReason) -> Result<(), std::io::Error> {
self.close().await
fn disconnect(
&mut self,
_reason: DisconnectReason,
) -> Pin<Box<dyn Future<Output = Result<(), std::io::Error>> + Send + '_>> {
Box::pin(async move { self.close().await })
}
}

View File

@@ -1,6 +1,7 @@
use crate::{
capability::RawCapabilityMessage,
errors::{EthHandshakeError, EthStreamError},
handshake::EthereumEthHandshake,
message::{EthBroadcastMessage, ProtocolBroadcastMessage},
p2pstream::HANDSHAKE_TIMEOUT,
CanDisconnect, DisconnectReason, EthMessage, EthNetworkPrimitives, EthVersion, ProtocolMessage,
@@ -8,12 +9,12 @@ use crate::{
};
use alloy_primitives::bytes::{Bytes, BytesMut};
use alloy_rlp::Encodable;
use futures::{ready, Sink, SinkExt, StreamExt};
use futures::{ready, Sink, SinkExt};
use pin_project::pin_project;
use reth_eth_wire_types::NetworkPrimitives;
use reth_ethereum_forks::ForkFilter;
use reth_primitives_traits::GotExpected;
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
time::Duration,
@@ -52,7 +53,7 @@ impl<S> UnauthedEthStream<S> {
impl<S, E> UnauthedEthStream<S>
where
S: Stream<Item = Result<BytesMut, E>> + CanDisconnect<Bytes> + Unpin,
S: Stream<Item = Result<BytesMut, E>> + CanDisconnect<Bytes> + Send + Unpin,
EthStreamError: From<E> + From<<S as Sink<Bytes>>::Error>,
{
/// Consumes the [`UnauthedEthStream`] and returns an [`EthStream`] after the `Status`
@@ -88,106 +89,13 @@ where
%status,
"sending eth status to peer"
);
EthereumEthHandshake(&mut self.inner).eth_handshake(status, fork_filter).await?;
// we need to encode and decode here on our own because we don't have an `EthStream` yet
// The max length for a status with TTD is: <msg id = 1 byte> + <rlp(status) = 88 byte>
self.inner
.send(
alloy_rlp::encode(ProtocolMessage::<N>::from(EthMessage::<N>::Status(status)))
.into(),
)
.await?;
// now we can create the `EthStream` because the peer has successfully completed
// the handshake
let stream = EthStream::new(status.version, self.inner);
let their_msg_res = self.inner.next().await;
let their_msg = match their_msg_res {
Some(msg) => msg,
None => {
self.inner.disconnect(DisconnectReason::DisconnectRequested).await?;
return Err(EthStreamError::EthHandshakeError(EthHandshakeError::NoResponse))
}
}?;
if their_msg.len() > MAX_STATUS_SIZE {
self.inner.disconnect(DisconnectReason::ProtocolBreach).await?;
return Err(EthStreamError::MessageTooBig(their_msg.len()))
}
let version = status.version;
let msg = match ProtocolMessage::<N>::decode_message(version, &mut their_msg.as_ref()) {
Ok(m) => m,
Err(err) => {
debug!("decode error in eth handshake: msg={their_msg:x}");
self.inner.disconnect(DisconnectReason::DisconnectRequested).await?;
return Err(EthStreamError::InvalidMessage(err))
}
};
// The following checks should match the checks in go-ethereum:
// https://github.com/ethereum/go-ethereum/blob/9244d5cd61f3ea5a7645fdf2a1a96d53421e412f/eth/protocols/eth/handshake.go#L87-L89
match msg.message {
EthMessage::Status(resp) => {
trace!(
status=%resp,
"validating incoming eth status from peer"
);
if status.genesis != resp.genesis {
self.inner.disconnect(DisconnectReason::ProtocolBreach).await?;
return Err(EthHandshakeError::MismatchedGenesis(
GotExpected { expected: status.genesis, got: resp.genesis }.into(),
)
.into())
}
if status.version != resp.version {
self.inner.disconnect(DisconnectReason::ProtocolBreach).await?;
return Err(EthHandshakeError::MismatchedProtocolVersion(GotExpected {
got: resp.version,
expected: status.version,
})
.into())
}
if status.chain != resp.chain {
self.inner.disconnect(DisconnectReason::ProtocolBreach).await?;
return Err(EthHandshakeError::MismatchedChain(GotExpected {
got: resp.chain,
expected: status.chain,
})
.into())
}
// TD at mainnet block #7753254 is 76 bits. If it becomes 100 million times
// larger, it will still fit within 160 bits
if status.total_difficulty.bit_len() > 160 {
self.inner.disconnect(DisconnectReason::ProtocolBreach).await?;
return Err(EthHandshakeError::TotalDifficultyBitLenTooLarge {
got: status.total_difficulty.bit_len(),
maximum: 160,
}
.into())
}
if let Err(err) =
fork_filter.validate(resp.forkid).map_err(EthHandshakeError::InvalidFork)
{
self.inner.disconnect(DisconnectReason::ProtocolBreach).await?;
return Err(err.into())
}
// now we can create the `EthStream` because the peer has successfully completed
// the handshake
let stream = EthStream::new(version, self.inner);
Ok((stream, resp))
}
_ => {
self.inner.disconnect(DisconnectReason::ProtocolBreach).await?;
Err(EthStreamError::EthHandshakeError(
EthHandshakeError::NonStatusMessageInHandshake,
))
}
}
Ok((stream, status))
}
}
@@ -362,8 +270,11 @@ where
EthStreamError: From<<S as Sink<Bytes>>::Error>,
N: NetworkPrimitives,
{
async fn disconnect(&mut self, reason: DisconnectReason) -> Result<(), EthStreamError> {
self.inner.disconnect(reason).await.map_err(Into::into)
fn disconnect(
&mut self,
reason: DisconnectReason,
) -> Pin<Box<dyn Future<Output = Result<(), EthStreamError>> + Send + '_>> {
Box::pin(async move { self.inner.disconnect(reason).await.map_err(Into::into) })
}
}

View File

@@ -0,0 +1,215 @@
use crate::{
errors::{EthHandshakeError, EthStreamError, P2PStreamError},
ethstream::MAX_STATUS_SIZE,
CanDisconnect,
};
use bytes::{Bytes, BytesMut};
use derive_more::with_trait::Debug;
use futures::{Sink, SinkExt, Stream};
use reth_eth_wire_types::{
DisconnectReason, EthMessage, EthNetworkPrimitives, ProtocolMessage, Status,
};
use reth_ethereum_forks::ForkFilter;
use reth_primitives_traits::GotExpected;
use std::{future::Future, pin::Pin, time::Duration};
use tokio::time::timeout;
use tokio_stream::StreamExt;
use tracing::{debug, trace};
/// A trait that knows how to perform the P2P handshake.
pub trait EthRlpxHandshake: Debug + Send + Sync + 'static {
/// Perform the P2P handshake for the `eth` protocol.
fn handshake<'a>(
&'a self,
unauth: &'a mut dyn UnauthEth,
status: Status,
fork_filter: ForkFilter,
timeout_limit: Duration,
) -> Pin<Box<dyn Future<Output = Result<Status, EthStreamError>> + 'a + Send>>;
}
/// An unauthenticated stream that can send and receive messages.
pub trait UnauthEth:
Stream<Item = Result<BytesMut, P2PStreamError>>
+ Sink<Bytes, Error = P2PStreamError>
+ CanDisconnect<Bytes>
+ Unpin
+ Send
{
}
impl<T> UnauthEth for T where
T: Stream<Item = Result<BytesMut, P2PStreamError>>
+ Sink<Bytes, Error = P2PStreamError>
+ CanDisconnect<Bytes>
+ Unpin
+ Send
{
}
/// The Ethereum P2P handshake.
///
/// This performs the regular ethereum `eth` rlpx handshake.
#[derive(Debug, Default, Clone)]
#[non_exhaustive]
pub struct EthHandshake;
impl EthRlpxHandshake for EthHandshake {
fn handshake<'a>(
&'a self,
unauth: &'a mut dyn UnauthEth,
status: Status,
fork_filter: ForkFilter,
timeout_limit: Duration,
) -> Pin<Box<dyn Future<Output = Result<Status, EthStreamError>> + 'a + Send>> {
Box::pin(async move {
timeout(timeout_limit, EthereumEthHandshake(unauth).eth_handshake(status, fork_filter))
.await
.map_err(|_| EthStreamError::StreamTimeout)?
})
}
}
/// A type that performs the ethereum specific `eth` protocol handshake.
#[derive(Debug)]
pub struct EthereumEthHandshake<'a, S: ?Sized>(pub &'a mut S);
impl<S: ?Sized, E> EthereumEthHandshake<'_, S>
where
S: Stream<Item = Result<BytesMut, E>> + CanDisconnect<Bytes> + Send + Unpin,
EthStreamError: From<E> + From<<S as Sink<Bytes>>::Error>,
{
/// Performs the `eth` rlpx protocol handshake using the given input stream.
pub async fn eth_handshake(
self,
status: Status,
fork_filter: ForkFilter,
) -> Result<Status, EthStreamError> {
let unauth = self.0;
// Send our status message
let status_msg =
alloy_rlp::encode(ProtocolMessage::<EthNetworkPrimitives>::from(EthMessage::<
EthNetworkPrimitives,
>::Status(
status
)))
.into();
unauth.send(status_msg).await.map_err(EthStreamError::from)?;
// Receive peer's response
let their_msg_res = unauth.next().await;
let their_msg = match their_msg_res {
Some(Ok(msg)) => msg,
Some(Err(e)) => return Err(EthStreamError::from(e)),
None => {
unauth
.disconnect(DisconnectReason::DisconnectRequested)
.await
.map_err(EthStreamError::from)?;
return Err(EthStreamError::EthHandshakeError(EthHandshakeError::NoResponse));
}
};
if their_msg.len() > MAX_STATUS_SIZE {
unauth
.disconnect(DisconnectReason::ProtocolBreach)
.await
.map_err(EthStreamError::from)?;
return Err(EthStreamError::MessageTooBig(their_msg.len()));
}
let version = status.version;
let msg = match ProtocolMessage::<EthNetworkPrimitives>::decode_message(
version,
&mut their_msg.as_ref(),
) {
Ok(m) => m,
Err(err) => {
debug!("decode error in eth handshake: msg={their_msg:x}");
unauth
.disconnect(DisconnectReason::DisconnectRequested)
.await
.map_err(EthStreamError::from)?;
return Err(EthStreamError::InvalidMessage(err));
}
};
// Validate peer response
match msg.message {
EthMessage::Status(their_status) => {
trace!("Validating incoming ETH status from peer");
if status.genesis != their_status.genesis {
unauth
.disconnect(DisconnectReason::ProtocolBreach)
.await
.map_err(EthStreamError::from)?;
return Err(EthHandshakeError::MismatchedGenesis(
GotExpected { expected: status.genesis, got: their_status.genesis }.into(),
)
.into());
}
if status.version != their_status.version {
unauth
.disconnect(DisconnectReason::ProtocolBreach)
.await
.map_err(EthStreamError::from)?;
return Err(EthHandshakeError::MismatchedProtocolVersion(GotExpected {
got: their_status.version,
expected: status.version,
})
.into());
}
if status.chain != their_status.chain {
unauth
.disconnect(DisconnectReason::ProtocolBreach)
.await
.map_err(EthStreamError::from)?;
return Err(EthHandshakeError::MismatchedChain(GotExpected {
got: their_status.chain,
expected: status.chain,
})
.into());
}
// Ensure total difficulty is reasonable
if status.total_difficulty.bit_len() > 160 {
unauth
.disconnect(DisconnectReason::ProtocolBreach)
.await
.map_err(EthStreamError::from)?;
return Err(EthHandshakeError::TotalDifficultyBitLenTooLarge {
got: status.total_difficulty.bit_len(),
maximum: 160,
}
.into());
}
// Fork validation
if let Err(err) = fork_filter
.validate(their_status.forkid)
.map_err(EthHandshakeError::InvalidFork)
{
unauth
.disconnect(DisconnectReason::ProtocolBreach)
.await
.map_err(EthStreamError::from)?;
return Err(err.into());
}
Ok(their_status)
}
_ => {
unauth
.disconnect(DisconnectReason::ProtocolBreach)
.await
.map_err(EthStreamError::from)?;
Err(EthStreamError::EthHandshakeError(
EthHandshakeError::NonStatusMessageInHandshake,
))
}
}
}
}

View File

@@ -23,6 +23,9 @@ mod p2pstream;
mod pinger;
pub mod protocol;
/// Handshake logic
pub mod handshake;
#[cfg(test)]
pub mod test_utils;
@@ -36,7 +39,7 @@ pub use crate::{
ethstream::{EthStream, UnauthedEthStream, MAX_MESSAGE_SIZE},
hello::{HelloMessage, HelloMessageBuilder, HelloMessageWithProtocols},
p2pstream::{
DisconnectP2P, P2PMessage, P2PMessageID, P2PStream, UnauthedP2PStream,
DisconnectP2P, P2PMessage, P2PMessageID, P2PStream, UnauthedP2PStream, HANDSHAKE_TIMEOUT,
MAX_RESERVED_MESSAGE_ID,
},
Capability, ProtocolVersion,

View File

@@ -367,12 +367,12 @@ impl Sink<Bytes> for ProtocolProxy {
}
impl CanDisconnect<Bytes> for ProtocolProxy {
async fn disconnect(
fn disconnect(
&mut self,
_reason: DisconnectReason,
) -> Result<(), <Self as Sink<Bytes>>::Error> {
) -> Pin<Box<dyn Future<Output = Result<(), <Self as Sink<Bytes>>::Error>> + Send + '_>> {
// TODO handle disconnects
Ok(())
Box::pin(async move { Ok(()) })
}
}

View File

@@ -17,6 +17,7 @@ use reth_metrics::metrics::counter;
use reth_primitives_traits::GotExpected;
use std::{
collections::VecDeque,
future::Future,
io,
pin::Pin,
task::{ready, Context, Poll},
@@ -41,7 +42,7 @@ const MAX_P2P_MESSAGE_ID: u8 = P2PMessageID::Pong as u8;
/// [`HANDSHAKE_TIMEOUT`] determines the amount of time to wait before determining that a `p2p`
/// handshake has timed out.
pub(crate) const HANDSHAKE_TIMEOUT: Duration = Duration::from_secs(10);
pub const HANDSHAKE_TIMEOUT: Duration = Duration::from_secs(10);
/// [`PING_TIMEOUT`] determines the amount of time to wait before determining that a `p2p` ping has
/// timed out.
@@ -194,8 +195,11 @@ impl<S> CanDisconnect<Bytes> for P2PStream<S>
where
S: Sink<Bytes, Error = io::Error> + Unpin + Send + Sync,
{
async fn disconnect(&mut self, reason: DisconnectReason) -> Result<(), P2PStreamError> {
self.disconnect(reason).await
fn disconnect(
&mut self,
reason: DisconnectReason,
) -> Pin<Box<dyn Future<Output = Result<(), P2PStreamError>> + Send + '_>> {
Box::pin(async move { self.disconnect(reason).await })
}
}

View File

@@ -11,6 +11,7 @@ use reth_discv4::{Discv4Config, Discv4ConfigBuilder, NatResolver, DEFAULT_DISCOV
use reth_discv5::NetworkStackId;
use reth_dns_discovery::DnsDiscoveryConfig;
use reth_eth_wire::{
handshake::{EthHandshake, EthRlpxHandshake},
EthNetworkPrimitives, HelloMessage, HelloMessageWithProtocols, NetworkPrimitives, Status,
};
use reth_ethereum_forks::{ForkFilter, Head};
@@ -83,6 +84,11 @@ pub struct NetworkConfig<C, N: NetworkPrimitives = EthNetworkPrimitives> {
pub transactions_manager_config: TransactionsManagerConfig,
/// The NAT resolver for external IP
pub nat: Option<NatResolver>,
/// The Ethereum P2P handshake, see also:
/// <https://github.com/ethereum/devp2p/blob/master/rlpx.md#initial-handshake>.
/// This can be overridden to support custom handshake logic via the
/// [`NetworkConfigBuilder`].
pub handshake: Arc<dyn EthRlpxHandshake>,
}
// === impl NetworkConfig ===
@@ -207,6 +213,9 @@ pub struct NetworkConfigBuilder<N: NetworkPrimitives = EthNetworkPrimitives> {
transactions_manager_config: TransactionsManagerConfig,
/// The NAT resolver for external IP
nat: Option<NatResolver>,
/// The Ethereum P2P handshake, see also:
/// <https://github.com/ethereum/devp2p/blob/master/rlpx.md#initial-handshake>.
handshake: Arc<dyn EthRlpxHandshake>,
}
impl NetworkConfigBuilder<EthNetworkPrimitives> {
@@ -246,6 +255,7 @@ impl<N: NetworkPrimitives> NetworkConfigBuilder<N> {
block_import: None,
transactions_manager_config: Default::default(),
nat: None,
handshake: Arc::new(EthHandshake::default()),
}
}
@@ -533,6 +543,12 @@ impl<N: NetworkPrimitives> NetworkConfigBuilder<N> {
self
}
/// Overrides the default Eth `RLPx` handshake.
pub fn eth_rlpx_handshake(mut self, handshake: Arc<dyn EthRlpxHandshake>) -> Self {
self.handshake = handshake;
self
}
/// Consumes the type and creates the actual [`NetworkConfig`]
/// for the given client type that can interact with the chain.
///
@@ -564,6 +580,7 @@ impl<N: NetworkPrimitives> NetworkConfigBuilder<N> {
block_import,
transactions_manager_config,
nat,
handshake,
} = self;
discovery_v5_builder = discovery_v5_builder.map(|mut builder| {
@@ -631,6 +648,7 @@ impl<N: NetworkPrimitives> NetworkConfigBuilder<N> {
tx_gossip_disabled,
transactions_manager_config,
nat,
handshake,
}
}
}

View File

@@ -248,6 +248,7 @@ impl<N: NetworkPrimitives> NetworkManager<N> {
tx_gossip_disabled,
transactions_manager_config: _,
nat,
handshake,
} = config;
let peers_manager = PeersManager::new(peers_config);
@@ -299,6 +300,7 @@ impl<N: NetworkPrimitives> NetworkManager<N> {
hello_message,
fork_filter,
extra_protocols,
handshake,
);
let state = NetworkState::new(

View File

@@ -845,8 +845,9 @@ mod tests {
use reth_chainspec::MAINNET;
use reth_ecies::stream::ECIESStream;
use reth_eth_wire::{
EthNetworkPrimitives, EthStream, GetBlockBodies, HelloMessageWithProtocols, P2PStream,
Status, StatusBuilder, UnauthedEthStream, UnauthedP2PStream,
handshake::EthHandshake, EthNetworkPrimitives, EthStream, GetBlockBodies,
HelloMessageWithProtocols, P2PStream, Status, StatusBuilder, UnauthedEthStream,
UnauthedP2PStream,
};
use reth_network_peers::pk2id;
use reth_network_types::session::config::PROTOCOL_BREACH_REQUEST_TIMEOUT;
@@ -919,6 +920,7 @@ mod tests {
let (pending_sessions_tx, pending_sessions_rx) = mpsc::channel(1);
tokio::task::spawn(start_pending_incoming_session(
Arc::new(EthHandshake::default()),
disconnect_rx,
session_id,
stream,

View File

@@ -33,9 +33,9 @@ use counter::SessionCounter;
use futures::{future::Either, io, FutureExt, StreamExt};
use reth_ecies::{stream::ECIESStream, ECIESError};
use reth_eth_wire::{
errors::EthStreamError, multiplex::RlpxProtocolMultiplexer, Capabilities, DisconnectReason,
EthVersion, HelloMessageWithProtocols, NetworkPrimitives, Status, UnauthedEthStream,
UnauthedP2PStream,
errors::EthStreamError, handshake::EthRlpxHandshake, multiplex::RlpxProtocolMultiplexer,
Capabilities, DisconnectReason, EthStream, EthVersion, HelloMessageWithProtocols,
NetworkPrimitives, Status, UnauthedP2PStream, HANDSHAKE_TIMEOUT,
};
use reth_ethereum_forks::{ForkFilter, ForkId, ForkTransition, Head};
use reth_metrics::common::mpsc::MeteredPollSender;
@@ -113,6 +113,8 @@ pub struct SessionManager<N: NetworkPrimitives> {
disconnections_counter: DisconnectionsCounter,
/// Metrics for the session manager.
metrics: SessionManagerMetrics,
/// The [`EthRlpxHandshake`] is used to perform the initial handshake with the peer.
handshake: Arc<dyn EthRlpxHandshake>,
}
// === impl SessionManager ===
@@ -128,6 +130,7 @@ impl<N: NetworkPrimitives> SessionManager<N> {
hello_message: HelloMessageWithProtocols,
fork_filter: ForkFilter,
extra_protocols: RlpxSubProtocols,
handshake: Arc<dyn EthRlpxHandshake>,
) -> Self {
let (pending_sessions_tx, pending_sessions_rx) = mpsc::channel(config.session_event_buffer);
let (active_session_tx, active_session_rx) = mpsc::channel(config.session_event_buffer);
@@ -154,6 +157,7 @@ impl<N: NetworkPrimitives> SessionManager<N> {
extra_protocols,
disconnections_counter: Default::default(),
metrics: Default::default(),
handshake,
}
}
@@ -256,6 +260,7 @@ impl<N: NetworkPrimitives> SessionManager<N> {
Direction::Incoming,
pending_events.clone(),
start_pending_incoming_session(
self.handshake.clone(),
disconnect_rx,
session_id,
stream,
@@ -297,6 +302,7 @@ impl<N: NetworkPrimitives> SessionManager<N> {
Direction::Outgoing(remote_peer_id),
pending_events.clone(),
start_pending_outbound_session(
self.handshake.clone(),
disconnect_rx,
pending_events,
session_id,
@@ -811,6 +817,7 @@ pub(crate) async fn pending_session_with_timeout<F, N: NetworkPrimitives>(
/// This will wait for the _incoming_ handshake request and answer it.
#[allow(clippy::too_many_arguments)]
pub(crate) async fn start_pending_incoming_session<N: NetworkPrimitives>(
handshake: Arc<dyn EthRlpxHandshake>,
disconnect_rx: oneshot::Receiver<()>,
session_id: SessionId,
stream: TcpStream,
@@ -823,6 +830,7 @@ pub(crate) async fn start_pending_incoming_session<N: NetworkPrimitives>(
extra_handlers: RlpxSubProtocolHandlers,
) {
authenticate(
handshake,
disconnect_rx,
events,
stream,
@@ -842,6 +850,7 @@ pub(crate) async fn start_pending_incoming_session<N: NetworkPrimitives>(
#[instrument(skip_all, fields(%remote_addr, peer_id), target = "net")]
#[allow(clippy::too_many_arguments)]
async fn start_pending_outbound_session<N: NetworkPrimitives>(
handshake: Arc<dyn EthRlpxHandshake>,
disconnect_rx: oneshot::Receiver<()>,
events: mpsc::Sender<PendingSessionEvent<N>>,
session_id: SessionId,
@@ -873,6 +882,7 @@ async fn start_pending_outbound_session<N: NetworkPrimitives>(
}
};
authenticate(
handshake,
disconnect_rx,
events,
stream,
@@ -891,6 +901,7 @@ async fn start_pending_outbound_session<N: NetworkPrimitives>(
/// Authenticates a session
#[allow(clippy::too_many_arguments)]
async fn authenticate<N: NetworkPrimitives>(
handshake: Arc<dyn EthRlpxHandshake>,
disconnect_rx: oneshot::Receiver<()>,
events: mpsc::Sender<PendingSessionEvent<N>>,
stream: TcpStream,
@@ -922,6 +933,7 @@ async fn authenticate<N: NetworkPrimitives>(
let unauthed = UnauthedP2PStream::new(stream);
let auth = authenticate_stream(
handshake,
unauthed,
session_id,
remote_addr,
@@ -974,6 +986,7 @@ async fn get_ecies_stream<Io: AsyncRead + AsyncWrite + Unpin>(
/// also negotiate the additional protocols.
#[allow(clippy::too_many_arguments)]
async fn authenticate_stream<N: NetworkPrimitives>(
handshake: Arc<dyn EthRlpxHandshake>,
stream: UnauthedP2PStream<ECIESStream<TcpStream>>,
session_id: SessionId,
remote_addr: SocketAddr,
@@ -987,8 +1000,8 @@ async fn authenticate_stream<N: NetworkPrimitives>(
// Add extra protocols to the hello message
extra_handlers.retain(|handler| hello.try_add_protocol(handler.protocol()).is_ok());
// conduct the p2p handshake and return the authenticated stream
let (p2p_stream, their_hello) = match stream.handshake(hello).await {
// conduct the p2p rlpx handshake and return the rlpx authenticated stream
let (mut p2p_stream, their_hello) = match stream.handshake(hello).await {
Ok(stream_res) => stream_res,
Err(err) => {
return PendingSessionEvent::Disconnected {
@@ -1018,9 +1031,16 @@ async fn authenticate_stream<N: NetworkPrimitives>(
//
// Before trying status handshake, set up the version to negotiated shared version
status.set_eth_version(eth_version);
let eth_unauthed = UnauthedEthStream::new(p2p_stream);
let (eth_stream, their_status) = match eth_unauthed.handshake(status, fork_filter).await {
Ok(stream_res) => stream_res,
// perform the eth protocol handshake
match handshake
.handshake(&mut p2p_stream, status, fork_filter.clone(), HANDSHAKE_TIMEOUT)
.await
{
Ok(their_status) => {
let eth_stream = EthStream::new(status.version, p2p_stream);
(eth_stream.into(), their_status)
}
Err(err) => {
return PendingSessionEvent::Disconnected {
remote_addr,
@@ -1029,8 +1049,7 @@ async fn authenticate_stream<N: NetworkPrimitives>(
error: Some(PendingSessionHandshakeError::Eth(err)),
}
}
};
(eth_stream.into(), their_status)
}
} else {
// Multiplex the stream with the extra protocols
let mut multiplex_stream = RlpxProtocolMultiplexer::new(p2p_stream);