mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-01-23 22:28:13 -05:00
fix: derank peers that responded with bad data (#7854)
Co-authored-by: Oliver Nordbjerg <onbjerg@users.noreply.github.com>
This commit is contained in:
@@ -77,8 +77,16 @@ impl StateFetcher {
|
||||
best_number: u64,
|
||||
timeout: Arc<AtomicU64>,
|
||||
) {
|
||||
self.peers
|
||||
.insert(peer_id, Peer { state: PeerState::Idle, best_hash, best_number, timeout });
|
||||
self.peers.insert(
|
||||
peer_id,
|
||||
Peer {
|
||||
state: PeerState::Idle,
|
||||
best_hash,
|
||||
best_number,
|
||||
timeout,
|
||||
last_response_likely_bad: false,
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
/// Removes the peer from the peer list, after which it is no longer available for future
|
||||
@@ -119,14 +127,29 @@ impl StateFetcher {
|
||||
}
|
||||
|
||||
/// Returns the _next_ idle peer that's ready to accept a request,
|
||||
/// prioritizing those with the lowest timeout/latency.
|
||||
/// Once a peer has been yielded, it will be moved to the end of the map
|
||||
fn next_peer(&mut self) -> Option<PeerId> {
|
||||
self.peers
|
||||
.iter()
|
||||
.filter(|(_, peer)| peer.state.is_idle())
|
||||
.min_by_key(|(_, peer)| peer.timeout())
|
||||
.map(|(id, _)| *id)
|
||||
/// prioritizing those with the lowest timeout/latency and those that recently responded with
|
||||
/// adequate data.
|
||||
fn next_best_peer(&mut self) -> Option<PeerId> {
|
||||
let mut idle = self.peers.iter().filter(|(_, peer)| peer.state.is_idle());
|
||||
|
||||
let mut best_peer = idle.next()?;
|
||||
|
||||
for maybe_better in idle {
|
||||
// replace best peer if our current best peer sent us a bad response last time
|
||||
if best_peer.1.last_response_likely_bad && !maybe_better.1.last_response_likely_bad {
|
||||
best_peer = maybe_better;
|
||||
continue
|
||||
}
|
||||
|
||||
// replace best peer if this peer has better rtt
|
||||
if maybe_better.1.timeout() < best_peer.1.timeout() &&
|
||||
!maybe_better.1.last_response_likely_bad
|
||||
{
|
||||
best_peer = maybe_better;
|
||||
}
|
||||
}
|
||||
|
||||
Some(*best_peer.0)
|
||||
}
|
||||
|
||||
/// Returns the next action to return
|
||||
@@ -136,7 +159,7 @@ impl StateFetcher {
|
||||
return PollAction::NoRequests
|
||||
}
|
||||
|
||||
let Some(peer_id) = self.next_peer() else { return PollAction::NoPeersAvailable };
|
||||
let Some(peer_id) = self.next_best_peer() else { return PollAction::NoPeersAvailable };
|
||||
|
||||
let request = self.queued_requests.pop_front().expect("not empty");
|
||||
let request = self.prepare_block_request(peer_id, request);
|
||||
@@ -249,6 +272,9 @@ impl StateFetcher {
|
||||
}
|
||||
|
||||
if let Some(peer) = self.peers.get_mut(&peer_id) {
|
||||
// update the peer's response state
|
||||
peer.last_response_likely_bad = is_likely_bad_response;
|
||||
|
||||
// If the peer is still ready to accept new requests, we try to send a followup
|
||||
// request immediately.
|
||||
if peer.state.on_request_finished() && !is_error && !is_likely_bad_response {
|
||||
@@ -268,11 +294,16 @@ impl StateFetcher {
|
||||
peer_id: PeerId,
|
||||
res: RequestResult<Vec<BlockBody>>,
|
||||
) -> Option<BlockResponseOutcome> {
|
||||
let is_likely_bad_response = res.as_ref().map_or(true, |bodies| bodies.is_empty());
|
||||
|
||||
if let Some(resp) = self.inflight_bodies_requests.remove(&peer_id) {
|
||||
let _ = resp.response.send(res.map(|b| (peer_id, b).into()));
|
||||
}
|
||||
if let Some(peer) = self.peers.get_mut(&peer_id) {
|
||||
if peer.state.on_request_finished() {
|
||||
// update the peer's response state
|
||||
peer.last_response_likely_bad = is_likely_bad_response;
|
||||
|
||||
if peer.state.on_request_finished() && !is_likely_bad_response {
|
||||
return self.followup_request(peer_id)
|
||||
}
|
||||
}
|
||||
@@ -307,6 +338,13 @@ struct Peer {
|
||||
best_number: u64,
|
||||
/// Tracks the current timeout value we use for the peer.
|
||||
timeout: Arc<AtomicU64>,
|
||||
/// Tracks whether the peer has recently responded with a likely bad response.
|
||||
///
|
||||
/// This is used to de-rank the peer if there are other peers available.
|
||||
/// This exists because empty responses may not be penalized (e.g. when blocks near the tip are
|
||||
/// downloaded), but we still want to avoid requesting from the same peer again if it has the
|
||||
/// lowest timeout.
|
||||
last_response_likely_bad: bool,
|
||||
}
|
||||
|
||||
impl Peer {
|
||||
@@ -462,17 +500,17 @@ mod tests {
|
||||
fetcher.new_active_peer(peer1, B256::random(), 1, Arc::new(AtomicU64::new(1)));
|
||||
fetcher.new_active_peer(peer2, B256::random(), 2, Arc::new(AtomicU64::new(1)));
|
||||
|
||||
let first_peer = fetcher.next_peer().unwrap();
|
||||
let first_peer = fetcher.next_best_peer().unwrap();
|
||||
assert!(first_peer == peer1 || first_peer == peer2);
|
||||
// Pending disconnect for first_peer
|
||||
fetcher.on_pending_disconnect(&first_peer);
|
||||
// first_peer now isn't idle, so we should get other peer
|
||||
let second_peer = fetcher.next_peer().unwrap();
|
||||
let second_peer = fetcher.next_best_peer().unwrap();
|
||||
assert!(first_peer == peer1 || first_peer == peer2);
|
||||
assert_ne!(first_peer, second_peer);
|
||||
// without idle peers, returns None
|
||||
fetcher.on_pending_disconnect(&second_peer);
|
||||
assert_eq!(fetcher.next_peer(), None);
|
||||
assert_eq!(fetcher.next_best_peer(), None);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
@@ -491,13 +529,13 @@ mod tests {
|
||||
fetcher.new_active_peer(peer3, B256::random(), 3, Arc::new(AtomicU64::new(50)));
|
||||
|
||||
// Must always get peer1 (lowest timeout)
|
||||
assert_eq!(fetcher.next_peer(), Some(peer1));
|
||||
assert_eq!(fetcher.next_peer(), Some(peer1));
|
||||
assert_eq!(fetcher.next_best_peer(), Some(peer1));
|
||||
assert_eq!(fetcher.next_best_peer(), Some(peer1));
|
||||
// peer2's timeout changes below peer1's
|
||||
peer2_timeout.store(10, Ordering::Relaxed);
|
||||
// Then we get peer 2 always (now lowest)
|
||||
assert_eq!(fetcher.next_peer(), Some(peer2));
|
||||
assert_eq!(fetcher.next_peer(), Some(peer2));
|
||||
assert_eq!(fetcher.next_best_peer(), Some(peer2));
|
||||
assert_eq!(fetcher.next_best_peer(), Some(peer2));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
||||
Reference in New Issue
Block a user