mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-01-30 01:28:21 -05:00
Rotate peers when fetching (#743)
This commit is contained in:
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -3815,6 +3815,7 @@ dependencies = [
|
||||
"fnv",
|
||||
"futures",
|
||||
"hex",
|
||||
"linked-hash-map",
|
||||
"linked_hash_set",
|
||||
"metrics",
|
||||
"parking_lot 0.12.1",
|
||||
|
||||
@@ -52,6 +52,7 @@ parking_lot = "0.12"
|
||||
async-trait = "0.1"
|
||||
bytes = "1.2"
|
||||
linked_hash_set = "0.1"
|
||||
linked-hash-map = "0.5.6"
|
||||
rand = "0.8"
|
||||
secp256k1 = { version = "0.24", features = [
|
||||
"global-context",
|
||||
|
||||
@@ -2,6 +2,7 @@
|
||||
|
||||
use crate::{message::BlockRequest, peers::PeersHandle};
|
||||
use futures::StreamExt;
|
||||
use linked_hash_map::LinkedHashMap;
|
||||
use reth_eth_wire::{BlockBody, GetBlockBodies, GetBlockHeaders};
|
||||
use reth_interfaces::p2p::{
|
||||
error::{PeerRequestResult, RequestError, RequestResult},
|
||||
@@ -33,7 +34,7 @@ pub struct StateFetcher {
|
||||
inflight_bodies_requests:
|
||||
HashMap<PeerId, Request<Vec<H256>, PeerRequestResult<Vec<BlockBody>>>>,
|
||||
/// The list of _available_ peers for requests.
|
||||
peers: HashMap<PeerId, Peer>,
|
||||
peers: LinkedHashMap<PeerId, Peer>,
|
||||
/// The handle to the peers manager
|
||||
peers_handle: PeersHandle,
|
||||
/// Requests queued for processing
|
||||
@@ -103,8 +104,15 @@ impl StateFetcher {
|
||||
}
|
||||
|
||||
/// Returns the _next_ idle peer that's ready to accept a request.
|
||||
fn next_peer(&self) -> Option<&PeerId> {
|
||||
self.peers.iter().find_map(|(peer_id, peer)| peer.state.is_idle().then_some(peer_id))
|
||||
/// Once a peer has been yielded, it will be moved to the end of the map
|
||||
fn next_peer(&mut self) -> Option<PeerId> {
|
||||
let peer =
|
||||
self.peers.iter().find_map(|(peer_id, peer)| peer.state.is_idle().then_some(*peer_id));
|
||||
if let Some(peer_id) = peer {
|
||||
// Move to end of the map
|
||||
self.peers.get_refresh(&peer_id);
|
||||
}
|
||||
peer
|
||||
}
|
||||
|
||||
/// Returns the next action to return
|
||||
@@ -115,7 +123,7 @@ impl StateFetcher {
|
||||
}
|
||||
|
||||
let peer_id = if let Some(peer_id) = self.next_peer() {
|
||||
*peer_id
|
||||
peer_id
|
||||
} else {
|
||||
return PollAction::NoPeersAvailable
|
||||
};
|
||||
@@ -360,6 +368,7 @@ pub(crate) enum BlockResponseOutcome {
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::{peers::PeersManager, PeersConfig};
|
||||
use reth_primitives::{H256, H512};
|
||||
|
||||
use super::*;
|
||||
use std::future::poll_fn;
|
||||
@@ -381,4 +390,22 @@ mod tests {
|
||||
})
|
||||
.await;
|
||||
}
|
||||
#[tokio::test]
|
||||
async fn test_peer_rotation() {
|
||||
let manager = PeersManager::new(PeersConfig::default());
|
||||
let mut fetcher = StateFetcher::new(manager.handle());
|
||||
// Add a few random peers
|
||||
let peer1 = H512::random();
|
||||
let peer2 = H512::random();
|
||||
let peer3 = H512::random();
|
||||
fetcher.new_active_peer(peer1, H256::random(), 1);
|
||||
fetcher.new_active_peer(peer2, H256::random(), 2);
|
||||
fetcher.new_active_peer(peer3, H256::random(), 3);
|
||||
let next_peer = fetcher.next_peer();
|
||||
// Must get peer1 as our first peer
|
||||
assert_eq!(next_peer, Some(peer1));
|
||||
// peer1 must now move to the end of the map
|
||||
assert_eq!(&peer1, fetcher.peers.back().unwrap().0);
|
||||
assert_eq!(fetcher.next_peer(), Some(peer2));
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user