mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-01-30 17:48:03 -05:00
Prevent follow-up requests if a response is likely bad (#946)
This commit is contained in:
@@ -1,14 +1,44 @@
|
||||
use crate::consensus;
|
||||
use reth_primitives::{rpc::BlockNumber, WithPeerId, H256};
|
||||
use reth_primitives::{rpc::BlockNumber, BlockHashOrNumber, Header, WithPeerId, H256};
|
||||
use thiserror::Error;
|
||||
use tokio::sync::{mpsc, oneshot};
|
||||
|
||||
use super::headers::client::HeadersRequest;
|
||||
|
||||
/// Result alias for result of a request.
|
||||
pub type RequestResult<T> = Result<T, RequestError>;
|
||||
|
||||
/// Result with [PeerId]
|
||||
pub type PeerRequestResult<T> = RequestResult<WithPeerId<T>>;
|
||||
|
||||
/// Trait used to validate requests
|
||||
pub trait RequestValidation {
|
||||
/// Determine whether the response matches what we requested in request
|
||||
fn is_likely_a_bad_message(&self, request: &HeadersRequest) -> bool;
|
||||
}
|
||||
|
||||
impl RequestValidation for RequestResult<Vec<Header>> {
|
||||
fn is_likely_a_bad_message(&self, request: &HeadersRequest) -> bool {
|
||||
match self {
|
||||
Ok(headers) => {
|
||||
let request_length = headers.len() as u64;
|
||||
|
||||
if request_length <= 1 && request.limit != request_length {
|
||||
return true
|
||||
}
|
||||
|
||||
match request.start {
|
||||
BlockHashOrNumber::Number(block_number) => {
|
||||
Some(block_number) != headers.get(0).map(|h| h.number)
|
||||
}
|
||||
BlockHashOrNumber::Hash(_) => false,
|
||||
}
|
||||
}
|
||||
Err(_) => true,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Error variants that can happen when sending requests to a session.
|
||||
#[derive(Debug, Error, Clone, Eq, PartialEq)]
|
||||
#[allow(missing_docs)]
|
||||
|
||||
@@ -4,7 +4,7 @@ use crate::{message::BlockRequest, peers::PeersHandle};
|
||||
use futures::StreamExt;
|
||||
use reth_eth_wire::{BlockBody, GetBlockBodies, GetBlockHeaders};
|
||||
use reth_interfaces::p2p::{
|
||||
error::{PeerRequestResult, RequestError, RequestResult},
|
||||
error::{PeerRequestResult, RequestError, RequestResult, RequestValidation},
|
||||
headers::client::HeadersRequest,
|
||||
priority::Priority,
|
||||
};
|
||||
@@ -226,7 +226,11 @@ impl StateFetcher {
|
||||
res: RequestResult<Vec<Header>>,
|
||||
) -> Option<BlockResponseOutcome> {
|
||||
let is_error = res.is_err();
|
||||
if let Some(resp) = self.inflight_headers_requests.remove(&peer_id) {
|
||||
let resp = self.inflight_headers_requests.remove(&peer_id);
|
||||
let is_likely_a_bad_message =
|
||||
resp.as_ref().map(|r| res.is_likely_a_bad_message(&r.request)).unwrap_or_default();
|
||||
|
||||
if let Some(resp) = resp {
|
||||
let _ = resp.response.send(res.map(|h| (peer_id, h).into()));
|
||||
}
|
||||
|
||||
@@ -238,13 +242,16 @@ impl StateFetcher {
|
||||
))
|
||||
}
|
||||
|
||||
if let Some(peer) = self.peers.get_mut(&peer_id) {
|
||||
// If the peer is still ready to be accept new requests, we try to send a followup
|
||||
// request immediately.
|
||||
if peer.state.on_request_finished() {
|
||||
return self.followup_request(peer_id)
|
||||
if !is_likely_a_bad_message {
|
||||
if let Some(peer) = self.peers.get_mut(&peer_id) {
|
||||
// If the peer is still ready to be accept new requests, we try to send a followup
|
||||
// request immediately.
|
||||
if peer.state.on_request_finished() {
|
||||
return self.followup_request(peer_id)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
None
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user