mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-01-30 09:38:24 -05:00
feat(net): prioritize requesting peers with low latency (#835)
Co-authored-by: lambdaclass-user <github@lambdaclass.com>
This commit is contained in:
@@ -2,7 +2,6 @@
|
||||
|
||||
use crate::{message::BlockRequest, peers::PeersHandle};
|
||||
use futures::StreamExt;
|
||||
use linked_hash_map::LinkedHashMap;
|
||||
use reth_eth_wire::{BlockBody, GetBlockBodies, GetBlockHeaders};
|
||||
use reth_interfaces::p2p::{
|
||||
error::{PeerRequestResult, RequestError, RequestResult},
|
||||
@@ -11,6 +10,10 @@ use reth_interfaces::p2p::{
|
||||
use reth_primitives::{Header, PeerId, H256};
|
||||
use std::{
|
||||
collections::{HashMap, VecDeque},
|
||||
sync::{
|
||||
atomic::{AtomicU64, Ordering},
|
||||
Arc,
|
||||
},
|
||||
task::{Context, Poll},
|
||||
};
|
||||
use tokio::sync::{mpsc, mpsc::UnboundedSender, oneshot};
|
||||
@@ -34,7 +37,7 @@ pub struct StateFetcher {
|
||||
inflight_bodies_requests:
|
||||
HashMap<PeerId, Request<Vec<H256>, PeerRequestResult<Vec<BlockBody>>>>,
|
||||
/// The list of _available_ peers for requests.
|
||||
peers: LinkedHashMap<PeerId, Peer>,
|
||||
peers: HashMap<PeerId, Peer>,
|
||||
/// The handle to the peers manager
|
||||
peers_handle: PeersHandle,
|
||||
/// Requests queued for processing
|
||||
@@ -62,8 +65,15 @@ impl StateFetcher {
|
||||
}
|
||||
|
||||
/// Invoked when connected to a new peer.
|
||||
pub(crate) fn new_active_peer(&mut self, peer_id: PeerId, best_hash: H256, best_number: u64) {
|
||||
self.peers.insert(peer_id, Peer { state: PeerState::Idle, best_hash, best_number });
|
||||
pub(crate) fn new_active_peer(
|
||||
&mut self,
|
||||
peer_id: PeerId,
|
||||
best_hash: H256,
|
||||
best_number: u64,
|
||||
timeout: Arc<AtomicU64>,
|
||||
) {
|
||||
self.peers
|
||||
.insert(peer_id, Peer { state: PeerState::Idle, best_hash, best_number, timeout });
|
||||
}
|
||||
|
||||
/// Removes the peer from the peer list, after which it is no longer available for future
|
||||
@@ -103,16 +113,15 @@ impl StateFetcher {
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the _next_ idle peer that's ready to accept a request.
|
||||
/// Returns the _next_ idle peer that's ready to accept a request,
|
||||
/// prioritizing those with the lowest timeout/latency.
|
||||
/// Once a peer has been yielded, it will be moved to the end of the map
|
||||
fn next_peer(&mut self) -> Option<PeerId> {
|
||||
let peer =
|
||||
self.peers.iter().find_map(|(peer_id, peer)| peer.state.is_idle().then_some(*peer_id));
|
||||
if let Some(peer_id) = peer {
|
||||
// Move to end of the map
|
||||
self.peers.get_refresh(&peer_id);
|
||||
}
|
||||
peer
|
||||
self.peers
|
||||
.iter()
|
||||
.filter(|(_, peer)| peer.state.is_idle())
|
||||
.min_by_key(|(_, peer)| peer.timeout())
|
||||
.map(|(id, _)| *id)
|
||||
}
|
||||
|
||||
/// Returns the next action to return
|
||||
@@ -271,6 +280,14 @@ struct Peer {
|
||||
best_hash: H256,
|
||||
/// Tracks the best number of the peer.
|
||||
best_number: u64,
|
||||
/// Tracks the current timeout value we use for the peer.
|
||||
timeout: Arc<AtomicU64>,
|
||||
}
|
||||
|
||||
impl Peer {
|
||||
fn timeout(&self) -> u64 {
|
||||
self.timeout.load(Ordering::Relaxed)
|
||||
}
|
||||
}
|
||||
|
||||
/// Tracks the state of an individual peer
|
||||
@@ -390,22 +407,52 @@ mod tests {
|
||||
})
|
||||
.await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_peer_rotation() {
|
||||
let manager = PeersManager::new(PeersConfig::default());
|
||||
let mut fetcher = StateFetcher::new(manager.handle());
|
||||
// Add a few random peers
|
||||
let peer1 = H512::random();
|
||||
let peer2 = H512::random();
|
||||
fetcher.new_active_peer(peer1, H256::random(), 1, Arc::new(AtomicU64::new(1)));
|
||||
fetcher.new_active_peer(peer2, H256::random(), 2, Arc::new(AtomicU64::new(1)));
|
||||
|
||||
let first_peer = fetcher.next_peer().unwrap();
|
||||
assert!(first_peer == peer1 || first_peer == peer2);
|
||||
// Pending disconnect for first_peer
|
||||
fetcher.on_pending_disconnect(&first_peer);
|
||||
// first_peer now isn't idle, so we should get other peer
|
||||
let second_peer = fetcher.next_peer().unwrap();
|
||||
assert!(first_peer == peer1 || first_peer == peer2);
|
||||
assert_ne!(first_peer, second_peer);
|
||||
// without idle peers, returns None
|
||||
fetcher.on_pending_disconnect(&second_peer);
|
||||
assert_eq!(fetcher.next_peer(), None);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_peer_prioritization() {
|
||||
let manager = PeersManager::new(PeersConfig::default());
|
||||
let mut fetcher = StateFetcher::new(manager.handle());
|
||||
// Add a few random peers
|
||||
let peer1 = H512::random();
|
||||
let peer2 = H512::random();
|
||||
let peer3 = H512::random();
|
||||
fetcher.new_active_peer(peer1, H256::random(), 1);
|
||||
fetcher.new_active_peer(peer2, H256::random(), 2);
|
||||
fetcher.new_active_peer(peer3, H256::random(), 3);
|
||||
let next_peer = fetcher.next_peer();
|
||||
// Must get peer1 as our first peer
|
||||
assert_eq!(next_peer, Some(peer1));
|
||||
// peer1 must now move to the end of the map
|
||||
assert_eq!(&peer1, fetcher.peers.back().unwrap().0);
|
||||
|
||||
let peer2_timeout = Arc::new(AtomicU64::new(300));
|
||||
|
||||
fetcher.new_active_peer(peer1, H256::random(), 1, Arc::new(AtomicU64::new(30)));
|
||||
fetcher.new_active_peer(peer2, H256::random(), 2, Arc::clone(&peer2_timeout));
|
||||
fetcher.new_active_peer(peer3, H256::random(), 3, Arc::new(AtomicU64::new(50)));
|
||||
|
||||
// Must always get peer1 (lowest timeout)
|
||||
assert_eq!(fetcher.next_peer(), Some(peer1));
|
||||
assert_eq!(fetcher.next_peer(), Some(peer1));
|
||||
// peer2's timeout changes below peer1's
|
||||
peer2_timeout.store(10, Ordering::Relaxed);
|
||||
// Then we get peer 2 always (now lowest)
|
||||
assert_eq!(fetcher.next_peer(), Some(peer2));
|
||||
assert_eq!(fetcher.next_peer(), Some(peer2));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -8,6 +8,7 @@ use crate::{
|
||||
SessionId,
|
||||
},
|
||||
};
|
||||
use core::sync::atomic::Ordering;
|
||||
use fnv::FnvHashMap;
|
||||
use futures::{stream::Fuse, SinkExt, StreamExt};
|
||||
use reth_ecies::stream::ECIESStream;
|
||||
@@ -25,7 +26,7 @@ use std::{
|
||||
future::Future,
|
||||
net::SocketAddr,
|
||||
pin::Pin,
|
||||
sync::Arc,
|
||||
sync::{atomic::AtomicU64, Arc},
|
||||
task::{ready, Context, Poll},
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
@@ -71,7 +72,7 @@ pub(crate) struct ActiveSession {
|
||||
/// Buffered messages that should be handled and sent to the peer.
|
||||
pub(crate) queued_outgoing: VecDeque<OutgoingMessage>,
|
||||
/// The maximum time we wait for a response from a peer.
|
||||
pub(crate) request_timeout: Duration,
|
||||
pub(crate) request_timeout: Arc<AtomicU64>,
|
||||
/// Interval when to check for timed out requests.
|
||||
pub(crate) timeout_interval: Interval,
|
||||
}
|
||||
@@ -242,7 +243,7 @@ impl ActiveSession {
|
||||
|
||||
/// Returns the deadline timestamp at which the request times out
|
||||
fn request_deadline(&self) -> Instant {
|
||||
Instant::now() + self.request_timeout
|
||||
Instant::now() + Duration::from_millis(self.request_timeout.load(Ordering::Relaxed))
|
||||
}
|
||||
|
||||
/// Handle a Response to the peer
|
||||
@@ -356,8 +357,10 @@ impl ActiveSession {
|
||||
fn update_request_timeout(&mut self, sent: Instant, received: Instant) {
|
||||
let elapsed = received.saturating_duration_since(sent);
|
||||
|
||||
self.request_timeout = calculate_new_timeout(self.request_timeout, elapsed);
|
||||
self.timeout_interval = tokio::time::interval(self.request_timeout);
|
||||
let current = Duration::from_millis(self.request_timeout.load(Ordering::Relaxed));
|
||||
let request_timeout = calculate_new_timeout(current, elapsed);
|
||||
self.request_timeout.store(request_timeout.as_millis() as u64, Ordering::Relaxed);
|
||||
self.timeout_interval = tokio::time::interval(request_timeout);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -682,7 +685,9 @@ mod tests {
|
||||
queued_outgoing: Default::default(),
|
||||
received_requests: Default::default(),
|
||||
timeout_interval: tokio::time::interval(INITIAL_REQUEST_TIMEOUT),
|
||||
request_timeout: INITIAL_REQUEST_TIMEOUT,
|
||||
request_timeout: Arc::new(AtomicU64::new(
|
||||
INITIAL_REQUEST_TIMEOUT.as_millis() as u64,
|
||||
)),
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
|
||||
@@ -27,7 +27,7 @@ use std::{
|
||||
collections::HashMap,
|
||||
future::Future,
|
||||
net::SocketAddr,
|
||||
sync::Arc,
|
||||
sync::{atomic::AtomicU64, Arc},
|
||||
task::{Context, Poll},
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
@@ -365,6 +365,8 @@ impl SessionManager {
|
||||
|
||||
let messages = PeerRequestSender::new(peer_id, to_session_tx);
|
||||
|
||||
let timeout = Arc::new(AtomicU64::new(self.request_timeout.as_millis() as u64));
|
||||
|
||||
let session = ActiveSession {
|
||||
next_id: 0,
|
||||
remote_peer_id: peer_id,
|
||||
@@ -379,7 +381,7 @@ impl SessionManager {
|
||||
queued_outgoing: Default::default(),
|
||||
received_requests: Default::default(),
|
||||
timeout_interval: tokio::time::interval(self.request_timeout),
|
||||
request_timeout: self.request_timeout,
|
||||
request_timeout: Arc::clone(&timeout),
|
||||
};
|
||||
|
||||
self.spawn(session);
|
||||
@@ -405,6 +407,7 @@ impl SessionManager {
|
||||
status,
|
||||
messages,
|
||||
direction,
|
||||
timeout,
|
||||
})
|
||||
}
|
||||
PendingSessionEvent::Disconnected { remote_addr, session_id, direction, error } => {
|
||||
@@ -519,6 +522,7 @@ pub(crate) enum SessionEvent {
|
||||
status: Status,
|
||||
messages: PeerRequestSender,
|
||||
direction: Direction,
|
||||
timeout: Arc<AtomicU64>,
|
||||
},
|
||||
AlreadyConnected {
|
||||
peer_id: PeerId,
|
||||
|
||||
@@ -20,7 +20,7 @@ use std::{
|
||||
collections::{HashMap, VecDeque},
|
||||
net::{IpAddr, SocketAddr},
|
||||
num::NonZeroUsize,
|
||||
sync::Arc,
|
||||
sync::{atomic::AtomicU64, Arc},
|
||||
task::{Context, Poll},
|
||||
};
|
||||
use tokio::sync::oneshot;
|
||||
@@ -117,13 +117,14 @@ where
|
||||
capabilities: Arc<Capabilities>,
|
||||
status: Status,
|
||||
request_tx: PeerRequestSender,
|
||||
timeout: Arc<AtomicU64>,
|
||||
) {
|
||||
debug_assert!(!self.active_peers.contains_key(&peer), "Already connected; not possible");
|
||||
|
||||
// find the corresponding block number
|
||||
let block_number =
|
||||
self.client.block_number(status.blockhash).ok().flatten().unwrap_or_default();
|
||||
self.state_fetcher.new_active_peer(peer, status.blockhash, block_number);
|
||||
self.state_fetcher.new_active_peer(peer, status.blockhash, block_number, timeout);
|
||||
|
||||
self.active_peers.insert(
|
||||
peer,
|
||||
@@ -500,7 +501,10 @@ mod tests {
|
||||
use reth_interfaces::p2p::{bodies::client::BodiesClient, error::RequestError};
|
||||
use reth_primitives::{Header, PeerId, H256};
|
||||
use reth_provider::test_utils::NoopProvider;
|
||||
use std::{future::poll_fn, sync::Arc};
|
||||
use std::{
|
||||
future::poll_fn,
|
||||
sync::{atomic::AtomicU64, Arc},
|
||||
};
|
||||
use tokio::sync::mpsc;
|
||||
use tokio_stream::{wrappers::ReceiverStream, StreamExt};
|
||||
|
||||
@@ -534,7 +538,13 @@ mod tests {
|
||||
let (tx, session_rx) = mpsc::channel(1);
|
||||
let peer_tx = PeerRequestSender::new(peer_id, tx);
|
||||
|
||||
state.on_session_activated(peer_id, capabilities(), Status::default(), peer_tx);
|
||||
state.on_session_activated(
|
||||
peer_id,
|
||||
capabilities(),
|
||||
Status::default(),
|
||||
peer_tx,
|
||||
Arc::new(AtomicU64::new(1)),
|
||||
);
|
||||
|
||||
assert!(state.active_peers.contains_key(&peer_id));
|
||||
|
||||
|
||||
@@ -121,12 +121,14 @@ where
|
||||
status,
|
||||
messages,
|
||||
direction,
|
||||
timeout,
|
||||
} => {
|
||||
self.state.on_session_activated(
|
||||
peer_id,
|
||||
capabilities.clone(),
|
||||
status,
|
||||
messages.clone(),
|
||||
timeout,
|
||||
);
|
||||
Some(SwarmEvent::SessionEstablished {
|
||||
peer_id,
|
||||
|
||||
Reference in New Issue
Block a user