mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-02-19 03:04:27 -05:00
fix(net): FetchFullBlockRangeFuture can get stuck forever after partial body fetch + error (#21411)
Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de> Co-authored-by: Amp <amp@ampcode.com>
This commit is contained in:
committed by
GitHub
parent
0b5f79e8c9
commit
4baf2baec4
@@ -571,8 +571,8 @@ where
|
||||
debug!(target: "downloaders", %err, ?this.start_hash, "Body range download failed");
|
||||
}
|
||||
}
|
||||
if this.bodies.is_empty() {
|
||||
// received bad response, re-request headers
|
||||
if this.request.bodies.is_none() && !this.is_bodies_complete() {
|
||||
// no pending bodies request (e.g., request error), retry remaining bodies
|
||||
// TODO: convert this into two futures, one which is a headers range
|
||||
// future, and one which is a bodies range future.
|
||||
//
|
||||
@@ -751,8 +751,12 @@ mod tests {
|
||||
use reth_ethereum_primitives::BlockBody;
|
||||
|
||||
use super::*;
|
||||
use crate::test_utils::TestFullBlockClient;
|
||||
use std::ops::Range;
|
||||
use crate::{error::RequestError, test_utils::TestFullBlockClient};
|
||||
use std::{
|
||||
ops::Range,
|
||||
sync::atomic::{AtomicUsize, Ordering},
|
||||
};
|
||||
use tokio::time::{timeout, Duration};
|
||||
|
||||
#[tokio::test]
|
||||
async fn download_single_full_block() {
|
||||
@@ -800,6 +804,65 @@ mod tests {
|
||||
(sealed_header, body)
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
struct FailingBodiesClient {
|
||||
inner: TestFullBlockClient,
|
||||
fail_on: usize,
|
||||
body_requests: Arc<AtomicUsize>,
|
||||
}
|
||||
|
||||
impl FailingBodiesClient {
|
||||
fn new(inner: TestFullBlockClient, fail_on: usize) -> Self {
|
||||
Self { inner, fail_on, body_requests: Arc::new(AtomicUsize::new(0)) }
|
||||
}
|
||||
}
|
||||
|
||||
impl DownloadClient for FailingBodiesClient {
|
||||
fn report_bad_message(&self, peer_id: PeerId) {
|
||||
self.inner.report_bad_message(peer_id);
|
||||
}
|
||||
|
||||
fn num_connected_peers(&self) -> usize {
|
||||
self.inner.num_connected_peers()
|
||||
}
|
||||
}
|
||||
|
||||
impl HeadersClient for FailingBodiesClient {
|
||||
type Header = <TestFullBlockClient as HeadersClient>::Header;
|
||||
type Output = <TestFullBlockClient as HeadersClient>::Output;
|
||||
|
||||
fn get_headers_with_priority(
|
||||
&self,
|
||||
request: HeadersRequest,
|
||||
priority: Priority,
|
||||
) -> Self::Output {
|
||||
self.inner.get_headers_with_priority(request, priority)
|
||||
}
|
||||
}
|
||||
|
||||
impl BodiesClient for FailingBodiesClient {
|
||||
type Body = <TestFullBlockClient as BodiesClient>::Body;
|
||||
type Output = <TestFullBlockClient as BodiesClient>::Output;
|
||||
|
||||
fn get_block_bodies_with_priority_and_range_hint(
|
||||
&self,
|
||||
hashes: Vec<B256>,
|
||||
priority: Priority,
|
||||
range_hint: Option<RangeInclusive<u64>>,
|
||||
) -> Self::Output {
|
||||
let attempt = self.body_requests.fetch_add(1, Ordering::SeqCst);
|
||||
if attempt == self.fail_on {
|
||||
return futures::future::ready(Err(RequestError::Timeout))
|
||||
}
|
||||
|
||||
self.inner.get_block_bodies_with_priority_and_range_hint(hashes, priority, range_hint)
|
||||
}
|
||||
}
|
||||
|
||||
impl BlockClient for FailingBodiesClient {
|
||||
type Block = reth_ethereum_primitives::Block;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn download_full_block_range() {
|
||||
let client = TestFullBlockClient::default();
|
||||
@@ -837,6 +900,25 @@ mod tests {
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn download_full_block_range_retries_after_body_error() {
|
||||
let mut client = TestFullBlockClient::default();
|
||||
client.set_soft_limit(2);
|
||||
let (header, _) = insert_headers_into_client(&client, 0..3);
|
||||
|
||||
let client = FailingBodiesClient::new(client, 1);
|
||||
let body_requests = Arc::clone(&client.body_requests);
|
||||
let client = FullBlockClient::test_client(client);
|
||||
|
||||
let received =
|
||||
timeout(Duration::from_secs(1), client.get_full_block_range(header.hash(), 3))
|
||||
.await
|
||||
.expect("body request retry should complete");
|
||||
|
||||
assert_eq!(received.len(), 3);
|
||||
assert_eq!(body_requests.load(Ordering::SeqCst), 3);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn download_full_block_range_with_invalid_header() {
|
||||
let client = TestFullBlockClient::default();
|
||||
|
||||
Reference in New Issue
Block a user