feat: introduce supported range to Peer info (#16687)

Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
This commit is contained in:
gejeduck
2025-06-06 05:02:48 -04:00
committed by GitHub
parent 95c68ae584
commit c1b7eb78de
6 changed files with 134 additions and 27 deletions

View File

@@ -4,7 +4,7 @@ mod client;
pub use client::FetchClient;
use crate::message::BlockRequest;
use crate::{message::BlockRequest, session::BlockRangeInfo};
use alloy_primitives::B256;
use futures::StreamExt;
use reth_eth_wire::{EthNetworkPrimitives, GetBlockBodies, GetBlockHeaders, NetworkPrimitives};
@@ -80,6 +80,7 @@ impl<N: NetworkPrimitives> StateFetcher<N> {
best_hash: B256,
best_number: u64,
timeout: Arc<AtomicU64>,
range_info: Option<BlockRangeInfo>,
) {
self.peers.insert(
peer_id,
@@ -89,6 +90,7 @@ impl<N: NetworkPrimitives> StateFetcher<N> {
best_number,
timeout,
last_response_likely_bad: false,
range_info,
},
);
}
@@ -347,6 +349,9 @@ struct Peer {
/// downloaded), but we still want to avoid requesting from the same peer again if it has the
/// lowest timeout.
last_response_likely_bad: bool,
/// Tracks the range info for the peer.
#[allow(dead_code)]
range_info: Option<BlockRangeInfo>,
}
impl Peer {
@@ -502,8 +507,8 @@ mod tests {
// Add a few random peers
let peer1 = B512::random();
let peer2 = B512::random();
fetcher.new_active_peer(peer1, B256::random(), 1, Arc::new(AtomicU64::new(1)));
fetcher.new_active_peer(peer2, B256::random(), 2, Arc::new(AtomicU64::new(1)));
fetcher.new_active_peer(peer1, B256::random(), 1, Arc::new(AtomicU64::new(1)), None);
fetcher.new_active_peer(peer2, B256::random(), 2, Arc::new(AtomicU64::new(1)), None);
let first_peer = fetcher.next_best_peer().unwrap();
assert!(first_peer == peer1 || first_peer == peer2);
@@ -530,9 +535,9 @@ mod tests {
let peer2_timeout = Arc::new(AtomicU64::new(300));
fetcher.new_active_peer(peer1, B256::random(), 1, Arc::new(AtomicU64::new(30)));
fetcher.new_active_peer(peer2, B256::random(), 2, Arc::clone(&peer2_timeout));
fetcher.new_active_peer(peer3, B256::random(), 3, Arc::new(AtomicU64::new(50)));
fetcher.new_active_peer(peer1, B256::random(), 1, Arc::new(AtomicU64::new(30)), None);
fetcher.new_active_peer(peer2, B256::random(), 2, Arc::clone(&peer2_timeout), None);
fetcher.new_active_peer(peer3, B256::random(), 3, Arc::new(AtomicU64::new(50)), None);
// Must always get peer1 (lowest timeout)
assert_eq!(fetcher.next_best_peer(), Some(peer1));
@@ -601,6 +606,7 @@ mod tests {
Default::default(),
Default::default(),
Default::default(),
None,
);
let (req, header) = request_pair();

View File

@@ -16,7 +16,7 @@ use crate::{
session::{
conn::EthRlpxConnection,
handle::{ActiveSessionMessage, SessionCommand},
SessionId,
BlockRangeInfo, SessionId,
},
};
use alloy_primitives::Sealable;
@@ -114,6 +114,8 @@ pub(crate) struct ActiveSession<N: NetworkPrimitives> {
/// Used to reserve a slot to guarantee that the termination message is delivered
pub(crate) terminate_message:
Option<(PollSender<ActiveSessionMessage<N>>, ActiveSessionMessage<N>)>,
/// The eth69 range info for the remote peer.
pub(crate) range_info: Option<BlockRangeInfo>,
}
impl<N: NetworkPrimitives> ActiveSession<N> {
@@ -262,7 +264,11 @@ impl<N: NetworkPrimitives> ActiveSession<N> {
on_response!(resp, GetReceipts)
}
EthMessage::BlockRangeUpdate(msg) => {
self.try_emit_broadcast(PeerMessage::BlockRangeUpdated(msg)).into()
if let Some(range_info) = self.range_info.as_ref() {
range_info.update(msg.earliest, msg.latest, msg.latest_hash);
}
OnIncomingMessageOutcome::Ok
}
EthMessage::Other(bytes) => self.try_emit_broadcast(PeerMessage::Other(bytes)).into(),
}
@@ -987,6 +993,7 @@ mod tests {
)),
protocol_breach_request_timeout: PROTOCOL_BREACH_REQUEST_TIMEOUT,
terminate_message: None,
range_info: None,
}
}
ev => {

View File

@@ -4,24 +4,8 @@ mod active;
mod conn;
mod counter;
mod handle;
use active::QueuedOutgoingMessages;
pub use conn::EthRlpxConnection;
pub use handle::{
ActiveSessionHandle, ActiveSessionMessage, PendingSessionEvent, PendingSessionHandle,
SessionCommand,
};
pub use reth_network_api::{Direction, PeerInfo};
use std::{
collections::HashMap,
future::Future,
net::SocketAddr,
sync::{atomic::AtomicU64, Arc},
task::{Context, Poll},
time::{Duration, Instant},
};
mod types;
pub use types::BlockRangeInfo;
use crate::{
message::PeerMessage,
@@ -29,6 +13,7 @@ use crate::{
protocol::{IntoRlpxSubProtocol, OnNotSupported, RlpxSubProtocolHandlers, RlpxSubProtocols},
session::active::ActiveSession,
};
use active::QueuedOutgoingMessages;
use counter::SessionCounter;
use futures::{future::Either, io, FutureExt, StreamExt};
use reth_ecies::{stream::ECIESStream, ECIESError};
@@ -46,6 +31,14 @@ use reth_network_types::SessionsConfig;
use reth_tasks::TaskSpawner;
use rustc_hash::FxHashMap;
use secp256k1::SecretKey;
use std::{
collections::HashMap,
future::Future,
net::SocketAddr,
sync::{atomic::AtomicU64, Arc},
task::{Context, Poll},
time::{Duration, Instant},
};
use tokio::{
io::{AsyncRead, AsyncWrite},
net::TcpStream,
@@ -55,6 +48,13 @@ use tokio_stream::wrappers::ReceiverStream;
use tokio_util::sync::PollSender;
use tracing::{debug, instrument, trace};
pub use conn::EthRlpxConnection;
pub use handle::{
ActiveSessionHandle, ActiveSessionMessage, PendingSessionEvent, PendingSessionHandle,
SessionCommand,
};
pub use reth_network_api::{Direction, PeerInfo};
/// Internal identifier for active sessions.
#[derive(Debug, Clone, Copy, PartialOrd, PartialEq, Eq, Hash)]
pub struct SessionId(usize);
@@ -543,6 +543,7 @@ impl<N: NetworkPrimitives> SessionManager<N> {
internal_request_timeout: Arc::clone(&timeout),
protocol_breach_request_timeout: self.protocol_breach_request_timeout,
terminate_message: None,
range_info: None,
};
self.spawn(session);
@@ -579,6 +580,7 @@ impl<N: NetworkPrimitives> SessionManager<N> {
messages,
direction,
timeout,
range_info: None,
})
}
PendingSessionEvent::Disconnected { remote_addr, session_id, direction, error } => {
@@ -701,6 +703,8 @@ pub enum SessionEvent<N: NetworkPrimitives> {
/// The maximum time that the session waits for a response from the peer before timing out
/// the connection
timeout: Arc<AtomicU64>,
/// The range info for the peer.
range_info: Option<BlockRangeInfo>,
},
/// The peer was already connected with another session.
AlreadyConnected {

View File

@@ -0,0 +1,79 @@
//! Shared types for network sessions.
use alloy_primitives::B256;
use parking_lot::RwLock;
use std::{
ops::RangeInclusive,
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
};
/// Information about the range of blocks available from a peer.
///
/// This represents the announced `eth69`
/// [`BlockRangeUpdate`](reth_eth_wire_types::BlockRangeUpdate) of a peer.
#[derive(Debug, Clone)]
pub struct BlockRangeInfo {
/// The inner range information.
inner: Arc<BlockRangeInfoInner>,
}
impl BlockRangeInfo {
/// Creates a new range information.
pub fn new(earliest: u64, latest: u64, latest_hash: B256) -> Self {
Self {
inner: Arc::new(BlockRangeInfoInner {
earliest: AtomicU64::new(earliest),
latest: AtomicU64::new(latest),
latest_hash: RwLock::new(latest_hash),
}),
}
}
/// Returns true if the block number is within the range of blocks available from the peer.
pub fn contains(&self, block_number: u64) -> bool {
self.range().contains(&block_number)
}
/// Returns the range of blocks available from the peer.
pub fn range(&self) -> RangeInclusive<u64> {
let earliest = self.earliest();
let latest = self.latest();
RangeInclusive::new(earliest, latest)
}
/// Returns the earliest block number available from the peer.
pub fn earliest(&self) -> u64 {
self.inner.earliest.load(Ordering::Relaxed)
}
/// Returns the latest block number available from the peer.
pub fn latest(&self) -> u64 {
self.inner.latest.load(Ordering::Relaxed)
}
/// Returns the latest block hash available from the peer.
pub fn latest_hash(&self) -> B256 {
*self.inner.latest_hash.read()
}
/// Updates the range information.
pub fn update(&self, earliest: u64, latest: u64, latest_hash: B256) {
self.inner.earliest.store(earliest, Ordering::Relaxed);
self.inner.latest.store(latest, Ordering::Relaxed);
*self.inner.latest_hash.write() = latest_hash;
}
}
/// Inner structure containing the range information with atomic and thread-safe fields.
#[derive(Debug)]
pub(crate) struct BlockRangeInfoInner {
/// The earliest block which is available.
earliest: AtomicU64,
/// The latest block which is available.
latest: AtomicU64,
/// Latest available block's hash.
latest_hash: RwLock<B256>,
}

View File

@@ -6,6 +6,7 @@ use crate::{
fetch::{BlockResponseOutcome, FetchAction, StateFetcher},
message::{BlockRequest, NewBlockMessage, PeerResponse, PeerResponseResult},
peers::{PeerAction, PeersManager},
session::BlockRangeInfo,
FetchClient,
};
use alloy_consensus::BlockHeader;
@@ -149,13 +150,20 @@ impl<N: NetworkPrimitives> NetworkState<N> {
status: Arc<UnifiedStatus>,
request_tx: PeerRequestSender<PeerRequest<N>>,
timeout: Arc<AtomicU64>,
range_info: Option<BlockRangeInfo>,
) {
debug_assert!(!self.active_peers.contains_key(&peer), "Already connected; not possible");
// find the corresponding block number
let block_number =
self.client.block_number(status.blockhash).ok().flatten().unwrap_or_default();
self.state_fetcher.new_active_peer(peer, status.blockhash, block_number, timeout);
self.state_fetcher.new_active_peer(
peer,
status.blockhash,
block_number,
timeout,
range_info,
);
self.active_peers.insert(
peer,
@@ -613,6 +621,7 @@ mod tests {
Arc::default(),
peer_tx,
Arc::new(AtomicU64::new(1)),
None,
);
assert!(state.active_peers.contains_key(&peer_id));

View File

@@ -122,6 +122,7 @@ impl<N: NetworkPrimitives> Swarm<N> {
messages,
direction,
timeout,
range_info,
} => {
self.state.on_session_activated(
peer_id,
@@ -129,6 +130,7 @@ impl<N: NetworkPrimitives> Swarm<N> {
status.clone(),
messages.clone(),
timeout,
range_info,
);
Some(SwarmEvent::SessionEstablished {
peer_id,