diff --git a/crates/net/network/src/fetch/mod.rs b/crates/net/network/src/fetch/mod.rs index 19c605fb9a..1f85f242da 100644 --- a/crates/net/network/src/fetch/mod.rs +++ b/crates/net/network/src/fetch/mod.rs @@ -77,8 +77,16 @@ impl StateFetcher { best_number: u64, timeout: Arc, ) { - self.peers - .insert(peer_id, Peer { state: PeerState::Idle, best_hash, best_number, timeout }); + self.peers.insert( + peer_id, + Peer { + state: PeerState::Idle, + best_hash, + best_number, + timeout, + last_response_likely_bad: false, + }, + ); } /// Removes the peer from the peer list, after which it is no longer available for future @@ -119,14 +127,29 @@ impl StateFetcher { } /// Returns the _next_ idle peer that's ready to accept a request, - /// prioritizing those with the lowest timeout/latency. - /// Once a peer has been yielded, it will be moved to the end of the map - fn next_peer(&mut self) -> Option { - self.peers - .iter() - .filter(|(_, peer)| peer.state.is_idle()) - .min_by_key(|(_, peer)| peer.timeout()) - .map(|(id, _)| *id) + /// prioritizing those with the lowest timeout/latency and those that recently responded with + /// adequate data. + fn next_best_peer(&mut self) -> Option { + let mut idle = self.peers.iter().filter(|(_, peer)| peer.state.is_idle()); + + let mut best_peer = idle.next()?; + + for maybe_better in idle { + // replace best peer if our current best peer sent us a bad response last time + if best_peer.1.last_response_likely_bad && !maybe_better.1.last_response_likely_bad { + best_peer = maybe_better; + continue + } + + // replace best peer if this peer has better rtt + if maybe_better.1.timeout() < best_peer.1.timeout() && + !maybe_better.1.last_response_likely_bad + { + best_peer = maybe_better; + } + } + + Some(*best_peer.0) } /// Returns the next action to return @@ -136,7 +159,7 @@ impl StateFetcher { return PollAction::NoRequests } - let Some(peer_id) = self.next_peer() else { return PollAction::NoPeersAvailable }; + let Some(peer_id) = self.next_best_peer() else { return PollAction::NoPeersAvailable }; let request = self.queued_requests.pop_front().expect("not empty"); let request = self.prepare_block_request(peer_id, request); @@ -249,6 +272,9 @@ impl StateFetcher { } if let Some(peer) = self.peers.get_mut(&peer_id) { + // update the peer's response state + peer.last_response_likely_bad = is_likely_bad_response; + // If the peer is still ready to accept new requests, we try to send a followup // request immediately. if peer.state.on_request_finished() && !is_error && !is_likely_bad_response { @@ -268,11 +294,16 @@ impl StateFetcher { peer_id: PeerId, res: RequestResult>, ) -> Option { + let is_likely_bad_response = res.as_ref().map_or(true, |bodies| bodies.is_empty()); + if let Some(resp) = self.inflight_bodies_requests.remove(&peer_id) { let _ = resp.response.send(res.map(|b| (peer_id, b).into())); } if let Some(peer) = self.peers.get_mut(&peer_id) { - if peer.state.on_request_finished() { + // update the peer's response state + peer.last_response_likely_bad = is_likely_bad_response; + + if peer.state.on_request_finished() && !is_likely_bad_response { return self.followup_request(peer_id) } } @@ -307,6 +338,13 @@ struct Peer { best_number: u64, /// Tracks the current timeout value we use for the peer. timeout: Arc, + /// Tracks whether the peer has recently responded with a likely bad response. + /// + /// This is used to de-rank the peer if there are other peers available. + /// This exists because empty responses may not be penalized (e.g. when blocks near the tip are + /// downloaded), but we still want to avoid requesting from the same peer again if it has the + /// lowest timeout. + last_response_likely_bad: bool, } impl Peer { @@ -462,17 +500,17 @@ mod tests { fetcher.new_active_peer(peer1, B256::random(), 1, Arc::new(AtomicU64::new(1))); fetcher.new_active_peer(peer2, B256::random(), 2, Arc::new(AtomicU64::new(1))); - let first_peer = fetcher.next_peer().unwrap(); + let first_peer = fetcher.next_best_peer().unwrap(); assert!(first_peer == peer1 || first_peer == peer2); // Pending disconnect for first_peer fetcher.on_pending_disconnect(&first_peer); // first_peer now isn't idle, so we should get other peer - let second_peer = fetcher.next_peer().unwrap(); + let second_peer = fetcher.next_best_peer().unwrap(); assert!(first_peer == peer1 || first_peer == peer2); assert_ne!(first_peer, second_peer); // without idle peers, returns None fetcher.on_pending_disconnect(&second_peer); - assert_eq!(fetcher.next_peer(), None); + assert_eq!(fetcher.next_best_peer(), None); } #[tokio::test] @@ -491,13 +529,13 @@ mod tests { fetcher.new_active_peer(peer3, B256::random(), 3, Arc::new(AtomicU64::new(50))); // Must always get peer1 (lowest timeout) - assert_eq!(fetcher.next_peer(), Some(peer1)); - assert_eq!(fetcher.next_peer(), Some(peer1)); + assert_eq!(fetcher.next_best_peer(), Some(peer1)); + assert_eq!(fetcher.next_best_peer(), Some(peer1)); // peer2's timeout changes below peer1's peer2_timeout.store(10, Ordering::Relaxed); // Then we get peer 2 always (now lowest) - assert_eq!(fetcher.next_peer(), Some(peer2)); - assert_eq!(fetcher.next_peer(), Some(peer2)); + assert_eq!(fetcher.next_best_peer(), Some(peer2)); + assert_eq!(fetcher.next_best_peer(), Some(peer2)); } #[tokio::test]