From 583426bb95cfa65d5f5e1ee29e084f34620e5813 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aur=C3=A9lien?= Date: Sun, 22 Jan 2023 05:13:20 +0100 Subject: [PATCH] Prevent follow-up requests if a response is likely bad (#946) --- crates/interfaces/src/p2p/error.rs | 32 ++++++++++++++++++++++++++++- crates/net/network/src/fetch/mod.rs | 21 ++++++++++++------- 2 files changed, 45 insertions(+), 8 deletions(-) diff --git a/crates/interfaces/src/p2p/error.rs b/crates/interfaces/src/p2p/error.rs index 0ac917707f..a949c04622 100644 --- a/crates/interfaces/src/p2p/error.rs +++ b/crates/interfaces/src/p2p/error.rs @@ -1,14 +1,44 @@ use crate::consensus; -use reth_primitives::{rpc::BlockNumber, WithPeerId, H256}; +use reth_primitives::{rpc::BlockNumber, BlockHashOrNumber, Header, WithPeerId, H256}; use thiserror::Error; use tokio::sync::{mpsc, oneshot}; +use super::headers::client::HeadersRequest; + /// Result alias for result of a request. pub type RequestResult = Result; /// Result with [PeerId] pub type PeerRequestResult = RequestResult>; +/// Trait used to validate requests +pub trait RequestValidation { + /// Determine whether the response matches what we requested in request + fn is_likely_a_bad_message(&self, request: &HeadersRequest) -> bool; +} + +impl RequestValidation for RequestResult> { + fn is_likely_a_bad_message(&self, request: &HeadersRequest) -> bool { + match self { + Ok(headers) => { + let request_length = headers.len() as u64; + + if request_length <= 1 && request.limit != request_length { + return true + } + + match request.start { + BlockHashOrNumber::Number(block_number) => { + Some(block_number) != headers.get(0).map(|h| h.number) + } + BlockHashOrNumber::Hash(_) => false, + } + } + Err(_) => true, + } + } +} + /// Error variants that can happen when sending requests to a session. #[derive(Debug, Error, Clone, Eq, PartialEq)] #[allow(missing_docs)] diff --git a/crates/net/network/src/fetch/mod.rs b/crates/net/network/src/fetch/mod.rs index cdcf9f6ad7..0d2dfc18ff 100644 --- a/crates/net/network/src/fetch/mod.rs +++ b/crates/net/network/src/fetch/mod.rs @@ -4,7 +4,7 @@ use crate::{message::BlockRequest, peers::PeersHandle}; use futures::StreamExt; use reth_eth_wire::{BlockBody, GetBlockBodies, GetBlockHeaders}; use reth_interfaces::p2p::{ - error::{PeerRequestResult, RequestError, RequestResult}, + error::{PeerRequestResult, RequestError, RequestResult, RequestValidation}, headers::client::HeadersRequest, priority::Priority, }; @@ -226,7 +226,11 @@ impl StateFetcher { res: RequestResult>, ) -> Option { let is_error = res.is_err(); - if let Some(resp) = self.inflight_headers_requests.remove(&peer_id) { + let resp = self.inflight_headers_requests.remove(&peer_id); + let is_likely_a_bad_message = + resp.as_ref().map(|r| res.is_likely_a_bad_message(&r.request)).unwrap_or_default(); + + if let Some(resp) = resp { let _ = resp.response.send(res.map(|h| (peer_id, h).into())); } @@ -238,13 +242,16 @@ impl StateFetcher { )) } - if let Some(peer) = self.peers.get_mut(&peer_id) { - // If the peer is still ready to be accept new requests, we try to send a followup - // request immediately. - if peer.state.on_request_finished() { - return self.followup_request(peer_id) + if !is_likely_a_bad_message { + if let Some(peer) = self.peers.get_mut(&peer_id) { + // If the peer is still ready to be accept new requests, we try to send a followup + // request immediately. + if peer.state.on_request_finished() { + return self.followup_request(peer_id) + } } } + None }