From c1b7eb78dec0986acd2cb1f33aaf0438bbfa0eab Mon Sep 17 00:00:00 2001 From: gejeduck <47668701+gejeduck@users.noreply.github.com> Date: Fri, 6 Jun 2025 05:02:48 -0400 Subject: [PATCH] feat: introduce supported range to Peer info (#16687) Co-authored-by: Matthias Seitz --- crates/net/network/src/fetch/mod.rs | 18 ++++-- crates/net/network/src/session/active.rs | 11 +++- crates/net/network/src/session/mod.rs | 40 ++++++------ crates/net/network/src/session/types.rs | 79 ++++++++++++++++++++++++ crates/net/network/src/state.rs | 11 +++- crates/net/network/src/swarm.rs | 2 + 6 files changed, 134 insertions(+), 27 deletions(-) create mode 100644 crates/net/network/src/session/types.rs diff --git a/crates/net/network/src/fetch/mod.rs b/crates/net/network/src/fetch/mod.rs index c64d83757b..ae271041a8 100644 --- a/crates/net/network/src/fetch/mod.rs +++ b/crates/net/network/src/fetch/mod.rs @@ -4,7 +4,7 @@ mod client; pub use client::FetchClient; -use crate::message::BlockRequest; +use crate::{message::BlockRequest, session::BlockRangeInfo}; use alloy_primitives::B256; use futures::StreamExt; use reth_eth_wire::{EthNetworkPrimitives, GetBlockBodies, GetBlockHeaders, NetworkPrimitives}; @@ -80,6 +80,7 @@ impl StateFetcher { best_hash: B256, best_number: u64, timeout: Arc, + range_info: Option, ) { self.peers.insert( peer_id, @@ -89,6 +90,7 @@ impl StateFetcher { best_number, timeout, last_response_likely_bad: false, + range_info, }, ); } @@ -347,6 +349,9 @@ struct Peer { /// downloaded), but we still want to avoid requesting from the same peer again if it has the /// lowest timeout. last_response_likely_bad: bool, + /// Tracks the range info for the peer. + #[allow(dead_code)] + range_info: Option, } impl Peer { @@ -502,8 +507,8 @@ mod tests { // Add a few random peers let peer1 = B512::random(); let peer2 = B512::random(); - fetcher.new_active_peer(peer1, B256::random(), 1, Arc::new(AtomicU64::new(1))); - fetcher.new_active_peer(peer2, B256::random(), 2, Arc::new(AtomicU64::new(1))); + fetcher.new_active_peer(peer1, B256::random(), 1, Arc::new(AtomicU64::new(1)), None); + fetcher.new_active_peer(peer2, B256::random(), 2, Arc::new(AtomicU64::new(1)), None); let first_peer = fetcher.next_best_peer().unwrap(); assert!(first_peer == peer1 || first_peer == peer2); @@ -530,9 +535,9 @@ mod tests { let peer2_timeout = Arc::new(AtomicU64::new(300)); - fetcher.new_active_peer(peer1, B256::random(), 1, Arc::new(AtomicU64::new(30))); - fetcher.new_active_peer(peer2, B256::random(), 2, Arc::clone(&peer2_timeout)); - fetcher.new_active_peer(peer3, B256::random(), 3, Arc::new(AtomicU64::new(50))); + fetcher.new_active_peer(peer1, B256::random(), 1, Arc::new(AtomicU64::new(30)), None); + fetcher.new_active_peer(peer2, B256::random(), 2, Arc::clone(&peer2_timeout), None); + fetcher.new_active_peer(peer3, B256::random(), 3, Arc::new(AtomicU64::new(50)), None); // Must always get peer1 (lowest timeout) assert_eq!(fetcher.next_best_peer(), Some(peer1)); @@ -601,6 +606,7 @@ mod tests { Default::default(), Default::default(), Default::default(), + None, ); let (req, header) = request_pair(); diff --git a/crates/net/network/src/session/active.rs b/crates/net/network/src/session/active.rs index 0e3b5243f9..19f57f0f24 100644 --- a/crates/net/network/src/session/active.rs +++ b/crates/net/network/src/session/active.rs @@ -16,7 +16,7 @@ use crate::{ session::{ conn::EthRlpxConnection, handle::{ActiveSessionMessage, SessionCommand}, - SessionId, + BlockRangeInfo, SessionId, }, }; use alloy_primitives::Sealable; @@ -114,6 +114,8 @@ pub(crate) struct ActiveSession { /// Used to reserve a slot to guarantee that the termination message is delivered pub(crate) terminate_message: Option<(PollSender>, ActiveSessionMessage)>, + /// The eth69 range info for the remote peer. + pub(crate) range_info: Option, } impl ActiveSession { @@ -262,7 +264,11 @@ impl ActiveSession { on_response!(resp, GetReceipts) } EthMessage::BlockRangeUpdate(msg) => { - self.try_emit_broadcast(PeerMessage::BlockRangeUpdated(msg)).into() + if let Some(range_info) = self.range_info.as_ref() { + range_info.update(msg.earliest, msg.latest, msg.latest_hash); + } + + OnIncomingMessageOutcome::Ok } EthMessage::Other(bytes) => self.try_emit_broadcast(PeerMessage::Other(bytes)).into(), } @@ -987,6 +993,7 @@ mod tests { )), protocol_breach_request_timeout: PROTOCOL_BREACH_REQUEST_TIMEOUT, terminate_message: None, + range_info: None, } } ev => { diff --git a/crates/net/network/src/session/mod.rs b/crates/net/network/src/session/mod.rs index 1cc62a6358..1b73b87f8f 100644 --- a/crates/net/network/src/session/mod.rs +++ b/crates/net/network/src/session/mod.rs @@ -4,24 +4,8 @@ mod active; mod conn; mod counter; mod handle; - -use active::QueuedOutgoingMessages; -pub use conn::EthRlpxConnection; -pub use handle::{ - ActiveSessionHandle, ActiveSessionMessage, PendingSessionEvent, PendingSessionHandle, - SessionCommand, -}; - -pub use reth_network_api::{Direction, PeerInfo}; - -use std::{ - collections::HashMap, - future::Future, - net::SocketAddr, - sync::{atomic::AtomicU64, Arc}, - task::{Context, Poll}, - time::{Duration, Instant}, -}; +mod types; +pub use types::BlockRangeInfo; use crate::{ message::PeerMessage, @@ -29,6 +13,7 @@ use crate::{ protocol::{IntoRlpxSubProtocol, OnNotSupported, RlpxSubProtocolHandlers, RlpxSubProtocols}, session::active::ActiveSession, }; +use active::QueuedOutgoingMessages; use counter::SessionCounter; use futures::{future::Either, io, FutureExt, StreamExt}; use reth_ecies::{stream::ECIESStream, ECIESError}; @@ -46,6 +31,14 @@ use reth_network_types::SessionsConfig; use reth_tasks::TaskSpawner; use rustc_hash::FxHashMap; use secp256k1::SecretKey; +use std::{ + collections::HashMap, + future::Future, + net::SocketAddr, + sync::{atomic::AtomicU64, Arc}, + task::{Context, Poll}, + time::{Duration, Instant}, +}; use tokio::{ io::{AsyncRead, AsyncWrite}, net::TcpStream, @@ -55,6 +48,13 @@ use tokio_stream::wrappers::ReceiverStream; use tokio_util::sync::PollSender; use tracing::{debug, instrument, trace}; +pub use conn::EthRlpxConnection; +pub use handle::{ + ActiveSessionHandle, ActiveSessionMessage, PendingSessionEvent, PendingSessionHandle, + SessionCommand, +}; +pub use reth_network_api::{Direction, PeerInfo}; + /// Internal identifier for active sessions. #[derive(Debug, Clone, Copy, PartialOrd, PartialEq, Eq, Hash)] pub struct SessionId(usize); @@ -543,6 +543,7 @@ impl SessionManager { internal_request_timeout: Arc::clone(&timeout), protocol_breach_request_timeout: self.protocol_breach_request_timeout, terminate_message: None, + range_info: None, }; self.spawn(session); @@ -579,6 +580,7 @@ impl SessionManager { messages, direction, timeout, + range_info: None, }) } PendingSessionEvent::Disconnected { remote_addr, session_id, direction, error } => { @@ -701,6 +703,8 @@ pub enum SessionEvent { /// The maximum time that the session waits for a response from the peer before timing out /// the connection timeout: Arc, + /// The range info for the peer. + range_info: Option, }, /// The peer was already connected with another session. AlreadyConnected { diff --git a/crates/net/network/src/session/types.rs b/crates/net/network/src/session/types.rs new file mode 100644 index 0000000000..c8cd98c3cb --- /dev/null +++ b/crates/net/network/src/session/types.rs @@ -0,0 +1,79 @@ +//! Shared types for network sessions. + +use alloy_primitives::B256; +use parking_lot::RwLock; +use std::{ + ops::RangeInclusive, + sync::{ + atomic::{AtomicU64, Ordering}, + Arc, + }, +}; + +/// Information about the range of blocks available from a peer. +/// +/// This represents the announced `eth69` +/// [`BlockRangeUpdate`](reth_eth_wire_types::BlockRangeUpdate) of a peer. +#[derive(Debug, Clone)] +pub struct BlockRangeInfo { + /// The inner range information. + inner: Arc, +} + +impl BlockRangeInfo { + /// Creates a new range information. + pub fn new(earliest: u64, latest: u64, latest_hash: B256) -> Self { + Self { + inner: Arc::new(BlockRangeInfoInner { + earliest: AtomicU64::new(earliest), + latest: AtomicU64::new(latest), + latest_hash: RwLock::new(latest_hash), + }), + } + } + + /// Returns true if the block number is within the range of blocks available from the peer. + pub fn contains(&self, block_number: u64) -> bool { + self.range().contains(&block_number) + } + + /// Returns the range of blocks available from the peer. + pub fn range(&self) -> RangeInclusive { + let earliest = self.earliest(); + let latest = self.latest(); + RangeInclusive::new(earliest, latest) + } + + /// Returns the earliest block number available from the peer. + pub fn earliest(&self) -> u64 { + self.inner.earliest.load(Ordering::Relaxed) + } + + /// Returns the latest block number available from the peer. + pub fn latest(&self) -> u64 { + self.inner.latest.load(Ordering::Relaxed) + } + + /// Returns the latest block hash available from the peer. + pub fn latest_hash(&self) -> B256 { + *self.inner.latest_hash.read() + } + + /// Updates the range information. + pub fn update(&self, earliest: u64, latest: u64, latest_hash: B256) { + self.inner.earliest.store(earliest, Ordering::Relaxed); + self.inner.latest.store(latest, Ordering::Relaxed); + *self.inner.latest_hash.write() = latest_hash; + } +} + +/// Inner structure containing the range information with atomic and thread-safe fields. +#[derive(Debug)] +pub(crate) struct BlockRangeInfoInner { + /// The earliest block which is available. + earliest: AtomicU64, + /// The latest block which is available. + latest: AtomicU64, + /// Latest available block's hash. + latest_hash: RwLock, +} diff --git a/crates/net/network/src/state.rs b/crates/net/network/src/state.rs index 933c096b0f..be01312bff 100644 --- a/crates/net/network/src/state.rs +++ b/crates/net/network/src/state.rs @@ -6,6 +6,7 @@ use crate::{ fetch::{BlockResponseOutcome, FetchAction, StateFetcher}, message::{BlockRequest, NewBlockMessage, PeerResponse, PeerResponseResult}, peers::{PeerAction, PeersManager}, + session::BlockRangeInfo, FetchClient, }; use alloy_consensus::BlockHeader; @@ -149,13 +150,20 @@ impl NetworkState { status: Arc, request_tx: PeerRequestSender>, timeout: Arc, + range_info: Option, ) { debug_assert!(!self.active_peers.contains_key(&peer), "Already connected; not possible"); // find the corresponding block number let block_number = self.client.block_number(status.blockhash).ok().flatten().unwrap_or_default(); - self.state_fetcher.new_active_peer(peer, status.blockhash, block_number, timeout); + self.state_fetcher.new_active_peer( + peer, + status.blockhash, + block_number, + timeout, + range_info, + ); self.active_peers.insert( peer, @@ -613,6 +621,7 @@ mod tests { Arc::default(), peer_tx, Arc::new(AtomicU64::new(1)), + None, ); assert!(state.active_peers.contains_key(&peer_id)); diff --git a/crates/net/network/src/swarm.rs b/crates/net/network/src/swarm.rs index 7566c285d8..fbb7b0bf94 100644 --- a/crates/net/network/src/swarm.rs +++ b/crates/net/network/src/swarm.rs @@ -122,6 +122,7 @@ impl Swarm { messages, direction, timeout, + range_info, } => { self.state.on_session_activated( peer_id, @@ -129,6 +130,7 @@ impl Swarm { status.clone(), messages.clone(), timeout, + range_info, ); Some(SwarmEvent::SessionEstablished { peer_id,