From 54744b3e6b06cefa6f35ac2f7a7de9a2082215e8 Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Mon, 6 Feb 2023 09:53:00 +0100 Subject: [PATCH] fix(net): always reset peer state on response (#1179) --- crates/net/network/src/fetch/mod.rs | 80 ++++++++++++++++++++++++----- 1 file changed, 67 insertions(+), 13 deletions(-) diff --git a/crates/net/network/src/fetch/mod.rs b/crates/net/network/src/fetch/mod.rs index 017208dc3b..b1db5a85c7 100644 --- a/crates/net/network/src/fetch/mod.rs +++ b/crates/net/network/src/fetch/mod.rs @@ -219,41 +219,43 @@ impl StateFetcher { Some(BlockResponseOutcome::Request(peer_id, req)) } - /// Called on a `GetBlockHeaders` response from a peer + /// Called on a `GetBlockHeaders` response from a peer. + /// + /// This delegates the response and returns a [BlockResponseOutcome] to either queue in a direct + /// followup request or get the peer reported if the response was a + /// [EthResponseValidator::reputation_change_err] pub(crate) fn on_block_headers_response( &mut self, peer_id: PeerId, res: RequestResult>, ) -> Option { let is_error = res.is_err(); - let reputation_change = res.reputation_change_err(); + let maybe_reputation_change = res.reputation_change_err(); let resp = self.inflight_headers_requests.remove(&peer_id); + let is_likely_bad_response = resp .as_ref() .map(|r| res.is_likely_bad_headers_response(&r.request)) .unwrap_or_default(); if let Some(resp) = resp { + // delegate the response let _ = resp.response.send(res.map(|h| (peer_id, h).into())); } - if is_error { - // if the response was erroneous we want to report the peer. - return reputation_change.map(|reputation_change| { - BlockResponseOutcome::BadResponse(peer_id, reputation_change) - }) - } - if let Some(peer) = self.peers.get_mut(&peer_id) { - // If the peer is still ready to be accept new requests, we try to send a followup + // If the peer is still ready to accept new requests, we try to send a followup // request immediately. - if peer.state.on_request_finished() && !is_likely_bad_response { + if peer.state.on_request_finished() && !is_error && !is_likely_bad_response { return self.followup_request(peer_id) } } - None + // if the response was an `Err` worth reporting the peer for then we return a `BadResponse` + // outcome + maybe_reputation_change + .map(|reputation_change| BlockResponseOutcome::BadResponse(peer_id, reputation_change)) } /// Called on a `GetBlockBodies` response from a peer @@ -413,7 +415,7 @@ pub(crate) enum BlockResponseOutcome { mod tests { use super::*; use crate::{peers::PeersManager, PeersConfig}; - use reth_primitives::{H256, H512}; + use reth_primitives::{SealedHeader, H256, H512}; use std::future::poll_fn; #[tokio::test(flavor = "multi_thread")] @@ -513,4 +515,56 @@ mod tests { None ); } + + #[tokio::test] + async fn test_header_response_outcome() { + let manager = PeersManager::new(PeersConfig::default()); + let mut fetcher = StateFetcher::new(manager.handle(), Default::default()); + let peer_id = H512::random(); + + let request_pair = || { + let (tx, _rx) = oneshot::channel(); + let req = Request { + request: HeadersRequest { + start: 0u64.into(), + limit: 1, + direction: Default::default(), + }, + response: tx, + }; + let mut header = SealedHeader::default().unseal(); + header.number = 0u64; + (req, header) + }; + + fetcher.new_active_peer( + peer_id, + Default::default(), + Default::default(), + Default::default(), + ); + + let (req, header) = request_pair(); + fetcher.inflight_headers_requests.insert(peer_id, req); + + let outcome = fetcher.on_block_headers_response(peer_id, Ok(vec![header])); + assert!(outcome.is_none()); + assert!(fetcher.peers[&peer_id].state.is_idle()); + + let outcome = + fetcher.on_block_headers_response(peer_id, Err(RequestError::Timeout)).unwrap(); + + assert!(EthResponseValidator::reputation_change_err(&Err(RequestError::Timeout)).is_some()); + + match outcome { + BlockResponseOutcome::BadResponse(peer, _) => { + assert_eq!(peer, peer_id) + } + BlockResponseOutcome::Request(_, _) => { + unreachable!() + } + }; + + assert!(fetcher.peers[&peer_id].state.is_idle()); + } }