diff --git a/crates/net/network/src/fetch/mod.rs b/crates/net/network/src/fetch/mod.rs index 6e2d644336..891b401ae6 100644 --- a/crates/net/network/src/fetch/mod.rs +++ b/crates/net/network/src/fetch/mod.rs @@ -2,7 +2,6 @@ use crate::{message::BlockRequest, peers::PeersHandle}; use futures::StreamExt; -use linked_hash_map::LinkedHashMap; use reth_eth_wire::{BlockBody, GetBlockBodies, GetBlockHeaders}; use reth_interfaces::p2p::{ error::{PeerRequestResult, RequestError, RequestResult}, @@ -11,6 +10,10 @@ use reth_interfaces::p2p::{ use reth_primitives::{Header, PeerId, H256}; use std::{ collections::{HashMap, VecDeque}, + sync::{ + atomic::{AtomicU64, Ordering}, + Arc, + }, task::{Context, Poll}, }; use tokio::sync::{mpsc, mpsc::UnboundedSender, oneshot}; @@ -34,7 +37,7 @@ pub struct StateFetcher { inflight_bodies_requests: HashMap, PeerRequestResult>>>, /// The list of _available_ peers for requests. - peers: LinkedHashMap, + peers: HashMap, /// The handle to the peers manager peers_handle: PeersHandle, /// Requests queued for processing @@ -62,8 +65,15 @@ impl StateFetcher { } /// Invoked when connected to a new peer. - pub(crate) fn new_active_peer(&mut self, peer_id: PeerId, best_hash: H256, best_number: u64) { - self.peers.insert(peer_id, Peer { state: PeerState::Idle, best_hash, best_number }); + pub(crate) fn new_active_peer( + &mut self, + peer_id: PeerId, + best_hash: H256, + best_number: u64, + timeout: Arc, + ) { + self.peers + .insert(peer_id, Peer { state: PeerState::Idle, best_hash, best_number, timeout }); } /// Removes the peer from the peer list, after which it is no longer available for future @@ -103,16 +113,15 @@ impl StateFetcher { } } - /// Returns the _next_ idle peer that's ready to accept a request. + /// Returns the _next_ idle peer that's ready to accept a request, + /// prioritizing those with the lowest timeout/latency. /// Once a peer has been yielded, it will be moved to the end of the map fn next_peer(&mut self) -> Option { - let peer = - self.peers.iter().find_map(|(peer_id, peer)| peer.state.is_idle().then_some(*peer_id)); - if let Some(peer_id) = peer { - // Move to end of the map - self.peers.get_refresh(&peer_id); - } - peer + self.peers + .iter() + .filter(|(_, peer)| peer.state.is_idle()) + .min_by_key(|(_, peer)| peer.timeout()) + .map(|(id, _)| *id) } /// Returns the next action to return @@ -271,6 +280,14 @@ struct Peer { best_hash: H256, /// Tracks the best number of the peer. best_number: u64, + /// Tracks the current timeout value we use for the peer. + timeout: Arc, +} + +impl Peer { + fn timeout(&self) -> u64 { + self.timeout.load(Ordering::Relaxed) + } } /// Tracks the state of an individual peer @@ -390,22 +407,52 @@ mod tests { }) .await; } + #[tokio::test] async fn test_peer_rotation() { + let manager = PeersManager::new(PeersConfig::default()); + let mut fetcher = StateFetcher::new(manager.handle()); + // Add a few random peers + let peer1 = H512::random(); + let peer2 = H512::random(); + fetcher.new_active_peer(peer1, H256::random(), 1, Arc::new(AtomicU64::new(1))); + fetcher.new_active_peer(peer2, H256::random(), 2, Arc::new(AtomicU64::new(1))); + + let first_peer = fetcher.next_peer().unwrap(); + assert!(first_peer == peer1 || first_peer == peer2); + // Pending disconnect for first_peer + fetcher.on_pending_disconnect(&first_peer); + // first_peer now isn't idle, so we should get other peer + let second_peer = fetcher.next_peer().unwrap(); + assert!(first_peer == peer1 || first_peer == peer2); + assert_ne!(first_peer, second_peer); + // without idle peers, returns None + fetcher.on_pending_disconnect(&second_peer); + assert_eq!(fetcher.next_peer(), None); + } + + #[tokio::test] + async fn test_peer_prioritization() { let manager = PeersManager::new(PeersConfig::default()); let mut fetcher = StateFetcher::new(manager.handle()); // Add a few random peers let peer1 = H512::random(); let peer2 = H512::random(); let peer3 = H512::random(); - fetcher.new_active_peer(peer1, H256::random(), 1); - fetcher.new_active_peer(peer2, H256::random(), 2); - fetcher.new_active_peer(peer3, H256::random(), 3); - let next_peer = fetcher.next_peer(); - // Must get peer1 as our first peer - assert_eq!(next_peer, Some(peer1)); - // peer1 must now move to the end of the map - assert_eq!(&peer1, fetcher.peers.back().unwrap().0); + + let peer2_timeout = Arc::new(AtomicU64::new(300)); + + fetcher.new_active_peer(peer1, H256::random(), 1, Arc::new(AtomicU64::new(30))); + fetcher.new_active_peer(peer2, H256::random(), 2, Arc::clone(&peer2_timeout)); + fetcher.new_active_peer(peer3, H256::random(), 3, Arc::new(AtomicU64::new(50))); + + // Must always get peer1 (lowest timeout) + assert_eq!(fetcher.next_peer(), Some(peer1)); + assert_eq!(fetcher.next_peer(), Some(peer1)); + // peer2's timeout changes below peer1's + peer2_timeout.store(10, Ordering::Relaxed); + // Then we get peer 2 always (now lowest) + assert_eq!(fetcher.next_peer(), Some(peer2)); assert_eq!(fetcher.next_peer(), Some(peer2)); } } diff --git a/crates/net/network/src/session/active.rs b/crates/net/network/src/session/active.rs index f548e826f5..99c6cb0f8c 100644 --- a/crates/net/network/src/session/active.rs +++ b/crates/net/network/src/session/active.rs @@ -8,6 +8,7 @@ use crate::{ SessionId, }, }; +use core::sync::atomic::Ordering; use fnv::FnvHashMap; use futures::{stream::Fuse, SinkExt, StreamExt}; use reth_ecies::stream::ECIESStream; @@ -25,7 +26,7 @@ use std::{ future::Future, net::SocketAddr, pin::Pin, - sync::Arc, + sync::{atomic::AtomicU64, Arc}, task::{ready, Context, Poll}, time::{Duration, Instant}, }; @@ -71,7 +72,7 @@ pub(crate) struct ActiveSession { /// Buffered messages that should be handled and sent to the peer. pub(crate) queued_outgoing: VecDeque, /// The maximum time we wait for a response from a peer. - pub(crate) request_timeout: Duration, + pub(crate) request_timeout: Arc, /// Interval when to check for timed out requests. pub(crate) timeout_interval: Interval, } @@ -242,7 +243,7 @@ impl ActiveSession { /// Returns the deadline timestamp at which the request times out fn request_deadline(&self) -> Instant { - Instant::now() + self.request_timeout + Instant::now() + Duration::from_millis(self.request_timeout.load(Ordering::Relaxed)) } /// Handle a Response to the peer @@ -356,8 +357,10 @@ impl ActiveSession { fn update_request_timeout(&mut self, sent: Instant, received: Instant) { let elapsed = received.saturating_duration_since(sent); - self.request_timeout = calculate_new_timeout(self.request_timeout, elapsed); - self.timeout_interval = tokio::time::interval(self.request_timeout); + let current = Duration::from_millis(self.request_timeout.load(Ordering::Relaxed)); + let request_timeout = calculate_new_timeout(current, elapsed); + self.request_timeout.store(request_timeout.as_millis() as u64, Ordering::Relaxed); + self.timeout_interval = tokio::time::interval(request_timeout); } } @@ -682,7 +685,9 @@ mod tests { queued_outgoing: Default::default(), received_requests: Default::default(), timeout_interval: tokio::time::interval(INITIAL_REQUEST_TIMEOUT), - request_timeout: INITIAL_REQUEST_TIMEOUT, + request_timeout: Arc::new(AtomicU64::new( + INITIAL_REQUEST_TIMEOUT.as_millis() as u64, + )), } } _ => { diff --git a/crates/net/network/src/session/mod.rs b/crates/net/network/src/session/mod.rs index c51ecbd895..8d7a68fdaf 100644 --- a/crates/net/network/src/session/mod.rs +++ b/crates/net/network/src/session/mod.rs @@ -27,7 +27,7 @@ use std::{ collections::HashMap, future::Future, net::SocketAddr, - sync::Arc, + sync::{atomic::AtomicU64, Arc}, task::{Context, Poll}, time::{Duration, Instant}, }; @@ -365,6 +365,8 @@ impl SessionManager { let messages = PeerRequestSender::new(peer_id, to_session_tx); + let timeout = Arc::new(AtomicU64::new(self.request_timeout.as_millis() as u64)); + let session = ActiveSession { next_id: 0, remote_peer_id: peer_id, @@ -379,7 +381,7 @@ impl SessionManager { queued_outgoing: Default::default(), received_requests: Default::default(), timeout_interval: tokio::time::interval(self.request_timeout), - request_timeout: self.request_timeout, + request_timeout: Arc::clone(&timeout), }; self.spawn(session); @@ -405,6 +407,7 @@ impl SessionManager { status, messages, direction, + timeout, }) } PendingSessionEvent::Disconnected { remote_addr, session_id, direction, error } => { @@ -519,6 +522,7 @@ pub(crate) enum SessionEvent { status: Status, messages: PeerRequestSender, direction: Direction, + timeout: Arc, }, AlreadyConnected { peer_id: PeerId, diff --git a/crates/net/network/src/state.rs b/crates/net/network/src/state.rs index 741da96291..a79e9e635b 100644 --- a/crates/net/network/src/state.rs +++ b/crates/net/network/src/state.rs @@ -20,7 +20,7 @@ use std::{ collections::{HashMap, VecDeque}, net::{IpAddr, SocketAddr}, num::NonZeroUsize, - sync::Arc, + sync::{atomic::AtomicU64, Arc}, task::{Context, Poll}, }; use tokio::sync::oneshot; @@ -117,13 +117,14 @@ where capabilities: Arc, status: Status, request_tx: PeerRequestSender, + timeout: Arc, ) { debug_assert!(!self.active_peers.contains_key(&peer), "Already connected; not possible"); // find the corresponding block number let block_number = self.client.block_number(status.blockhash).ok().flatten().unwrap_or_default(); - self.state_fetcher.new_active_peer(peer, status.blockhash, block_number); + self.state_fetcher.new_active_peer(peer, status.blockhash, block_number, timeout); self.active_peers.insert( peer, @@ -500,7 +501,10 @@ mod tests { use reth_interfaces::p2p::{bodies::client::BodiesClient, error::RequestError}; use reth_primitives::{Header, PeerId, H256}; use reth_provider::test_utils::NoopProvider; - use std::{future::poll_fn, sync::Arc}; + use std::{ + future::poll_fn, + sync::{atomic::AtomicU64, Arc}, + }; use tokio::sync::mpsc; use tokio_stream::{wrappers::ReceiverStream, StreamExt}; @@ -534,7 +538,13 @@ mod tests { let (tx, session_rx) = mpsc::channel(1); let peer_tx = PeerRequestSender::new(peer_id, tx); - state.on_session_activated(peer_id, capabilities(), Status::default(), peer_tx); + state.on_session_activated( + peer_id, + capabilities(), + Status::default(), + peer_tx, + Arc::new(AtomicU64::new(1)), + ); assert!(state.active_peers.contains_key(&peer_id)); diff --git a/crates/net/network/src/swarm.rs b/crates/net/network/src/swarm.rs index 0f46384f1e..dc562b88d9 100644 --- a/crates/net/network/src/swarm.rs +++ b/crates/net/network/src/swarm.rs @@ -121,12 +121,14 @@ where status, messages, direction, + timeout, } => { self.state.on_session_activated( peer_id, capabilities.clone(), status, messages.clone(), + timeout, ); Some(SwarmEvent::SessionEstablished { peer_id,