From 1b5bc5e1f7884cd32ee4e380af8e86744f9f6bfe Mon Sep 17 00:00:00 2001 From: Sanket Shanbhag Date: Fri, 6 Jan 2023 16:51:42 +0530 Subject: [PATCH] Rotate peers when fetching (#743) --- Cargo.lock | 1 + crates/net/network/Cargo.toml | 1 + crates/net/network/src/fetch/mod.rs | 35 +++++++++++++++++++++++++---- 3 files changed, 33 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6805fd3864..367592de9f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3815,6 +3815,7 @@ dependencies = [ "fnv", "futures", "hex", + "linked-hash-map", "linked_hash_set", "metrics", "parking_lot 0.12.1", diff --git a/crates/net/network/Cargo.toml b/crates/net/network/Cargo.toml index 845003bdb8..008f8cf67d 100644 --- a/crates/net/network/Cargo.toml +++ b/crates/net/network/Cargo.toml @@ -52,6 +52,7 @@ parking_lot = "0.12" async-trait = "0.1" bytes = "1.2" linked_hash_set = "0.1" +linked-hash-map = "0.5.6" rand = "0.8" secp256k1 = { version = "0.24", features = [ "global-context", diff --git a/crates/net/network/src/fetch/mod.rs b/crates/net/network/src/fetch/mod.rs index 0256cfdb17..0eaf860b5f 100644 --- a/crates/net/network/src/fetch/mod.rs +++ b/crates/net/network/src/fetch/mod.rs @@ -2,6 +2,7 @@ use crate::{message::BlockRequest, peers::PeersHandle}; use futures::StreamExt; +use linked_hash_map::LinkedHashMap; use reth_eth_wire::{BlockBody, GetBlockBodies, GetBlockHeaders}; use reth_interfaces::p2p::{ error::{PeerRequestResult, RequestError, RequestResult}, @@ -33,7 +34,7 @@ pub struct StateFetcher { inflight_bodies_requests: HashMap, PeerRequestResult>>>, /// The list of _available_ peers for requests. - peers: HashMap, + peers: LinkedHashMap, /// The handle to the peers manager peers_handle: PeersHandle, /// Requests queued for processing @@ -103,8 +104,15 @@ impl StateFetcher { } /// Returns the _next_ idle peer that's ready to accept a request. - fn next_peer(&self) -> Option<&PeerId> { - self.peers.iter().find_map(|(peer_id, peer)| peer.state.is_idle().then_some(peer_id)) + /// Once a peer has been yielded, it will be moved to the end of the map + fn next_peer(&mut self) -> Option { + let peer = + self.peers.iter().find_map(|(peer_id, peer)| peer.state.is_idle().then_some(*peer_id)); + if let Some(peer_id) = peer { + // Move to end of the map + self.peers.get_refresh(&peer_id); + } + peer } /// Returns the next action to return @@ -115,7 +123,7 @@ impl StateFetcher { } let peer_id = if let Some(peer_id) = self.next_peer() { - *peer_id + peer_id } else { return PollAction::NoPeersAvailable }; @@ -360,6 +368,7 @@ pub(crate) enum BlockResponseOutcome { #[cfg(test)] mod tests { use crate::{peers::PeersManager, PeersConfig}; + use reth_primitives::{H256, H512}; use super::*; use std::future::poll_fn; @@ -381,4 +390,22 @@ mod tests { }) .await; } + #[tokio::test] + async fn test_peer_rotation() { + let manager = PeersManager::new(PeersConfig::default()); + let mut fetcher = StateFetcher::new(manager.handle()); + // Add a few random peers + let peer1 = H512::random(); + let peer2 = H512::random(); + let peer3 = H512::random(); + fetcher.new_active_peer(peer1, H256::random(), 1); + fetcher.new_active_peer(peer2, H256::random(), 2); + fetcher.new_active_peer(peer3, H256::random(), 3); + let next_peer = fetcher.next_peer(); + // Must get peer1 as our first peer + assert_eq!(next_peer, Some(peer1)); + // peer1 must now move to the end of the map + assert_eq!(&peer1, fetcher.peers.back().unwrap().0); + assert_eq!(fetcher.next_peer(), Some(peer2)); + } }