feat: add block range hint to BlockBodies download request (#16703)

Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
This commit is contained in:
gejeduck
2025-06-06 09:45:48 -04:00
committed by GitHub
parent 3218b3c637
commit 2fccd08845
9 changed files with 62 additions and 18 deletions

View File

@@ -14,7 +14,7 @@ use reth_network_p2p::{
};
use reth_network_peers::PeerId;
use reth_primitives_traits::{Block, BlockBody, FullBlock, SealedBlock, SealedHeader};
use std::{collections::HashMap, io, path::Path, sync::Arc};
use std::{collections::HashMap, io, ops::RangeInclusive, path::Path, sync::Arc};
use thiserror::Error;
use tokio::{fs::File, io::AsyncReadExt};
use tokio_stream::StreamExt;
@@ -354,10 +354,11 @@ impl<B: FullBlock> BodiesClient for FileClient<B> {
type Body = B::Body;
type Output = BodiesFut<B::Body>;
fn get_block_bodies_with_priority(
fn get_block_bodies_with_priority_and_range_hint(
&self,
hashes: Vec<B256>,
_priority: Priority,
_range_hint: Option<RangeInclusive<u64>>,
) -> Self::Output {
// this just searches the buffer, and fails if it can't find the block
let mut bodies = Vec::new();

View File

@@ -9,6 +9,7 @@ use reth_network_peers::PeerId;
use std::{
collections::HashMap,
fmt::Debug,
ops::RangeInclusive,
sync::{
atomic::{AtomicU64, Ordering},
Arc,
@@ -81,10 +82,11 @@ impl BodiesClient for TestBodiesClient {
type Body = BlockBody;
type Output = BodiesFut;
fn get_block_bodies_with_priority(
fn get_block_bodies_with_priority_and_range_hint(
&self,
hashes: Vec<B256>,
_priority: Priority,
_range_hint: Option<RangeInclusive<u64>>,
) -> Self::Output {
let should_delay = self.should_delay;
let bodies = self.bodies.clone();

View File

@@ -15,9 +15,12 @@ use reth_network_p2p::{
};
use reth_network_peers::PeerId;
use reth_network_types::ReputationChangeKind;
use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc,
use std::{
ops::RangeInclusive,
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
};
use tokio::sync::{mpsc::UnboundedSender, oneshot};
@@ -80,15 +83,16 @@ impl<N: NetworkPrimitives> BodiesClient for FetchClient<N> {
type Output = BodiesFut<N::BlockBody>;
/// Sends a `GetBlockBodies` request to an available peer.
fn get_block_bodies_with_priority(
fn get_block_bodies_with_priority_and_range_hint(
&self,
request: Vec<B256>,
priority: Priority,
range_hint: Option<RangeInclusive<u64>>,
) -> Self::Output {
let (response, rx) = oneshot::channel();
if self
.request_tx
.send(DownloadRequest::GetBlockBodies { request, response, priority })
.send(DownloadRequest::GetBlockBodies { request, response, priority, range_hint })
.is_ok()
{
Box::pin(FlattenedResponse::from(rx))

View File

@@ -18,6 +18,7 @@ use reth_network_peers::PeerId;
use reth_network_types::ReputationChangeKind;
use std::{
collections::{HashMap, VecDeque},
ops::RangeInclusive,
sync::{
atomic::{AtomicU64, AtomicUsize, Ordering},
Arc,
@@ -419,6 +420,8 @@ pub(crate) enum DownloadRequest<N: NetworkPrimitives> {
request: Vec<B256>,
response: oneshot::Sender<PeerRequestResult<Vec<N::BlockBody>>>,
priority: Priority,
#[allow(dead_code)]
range_hint: Option<RangeInclusive<u64>>,
},
}
@@ -491,6 +494,7 @@ mod tests {
request: vec![],
response: tx,
priority: Priority::default(),
range_hint: None,
});
assert!(fetcher.poll(cx).is_pending());

View File

@@ -1,4 +1,5 @@
use std::{
ops::RangeInclusive,
pin::Pin,
task::{ready, Context, Poll},
};
@@ -26,8 +27,26 @@ pub trait BodiesClient: DownloadClient {
}
/// Fetches the block body for the requested block with priority
fn get_block_bodies_with_priority(&self, hashes: Vec<B256>, priority: Priority)
-> Self::Output;
fn get_block_bodies_with_priority(
&self,
hashes: Vec<B256>,
priority: Priority,
) -> Self::Output {
self.get_block_bodies_with_priority_and_range_hint(hashes, priority, None)
}
/// Fetches the block body for the requested block with priority and a range hint for the
/// requested blocks.
///
/// The range hint is not required, but can be used to optimize the routing of the request if
/// the hashes are continuous or close together and the range hint is `[earliest, latest]` for
/// the requested blocks.
fn get_block_bodies_with_priority_and_range_hint(
&self,
hashes: Vec<B256>,
priority: Priority,
range_hint: Option<RangeInclusive<u64>>,
) -> Self::Output;
/// Fetches a single block body for the requested hash.
fn get_block_body(&self, hash: B256) -> SingleBodyRequest<Self::Output> {

View File

@@ -1,5 +1,7 @@
//! Support for different download types.
use std::ops::RangeInclusive;
use crate::{
bodies::client::BodiesClient,
download::DownloadClient,
@@ -37,14 +39,19 @@ where
type Body = A::Body;
type Output = Either<A::Output, B::Output>;
fn get_block_bodies_with_priority(
fn get_block_bodies_with_priority_and_range_hint(
&self,
hashes: Vec<B256>,
priority: Priority,
range_hint: Option<RangeInclusive<u64>>,
) -> Self::Output {
match self {
Self::Left(a) => Either::Left(a.get_block_bodies_with_priority(hashes, priority)),
Self::Right(b) => Either::Right(b.get_block_bodies_with_priority(hashes, priority)),
Self::Left(a) => Either::Left(
a.get_block_bodies_with_priority_and_range_hint(hashes, priority, range_hint),
),
Self::Right(b) => Either::Right(
b.get_block_bodies_with_priority_and_range_hint(hashes, priority, range_hint),
),
}
}
}

View File

@@ -20,6 +20,7 @@ use std::{
fmt::Debug,
future::Future,
hash::Hash,
ops::RangeInclusive,
pin::Pin,
sync::Arc,
task::{ready, Context, Poll},
@@ -692,10 +693,11 @@ where
/// # Returns
///
/// A future containing an empty vector of block bodies and a randomly generated `PeerId`.
fn get_block_bodies_with_priority(
fn get_block_bodies_with_priority_and_range_hint(
&self,
_hashes: Vec<B256>,
_priority: Priority,
_range_hint: Option<RangeInclusive<u64>>,
) -> Self::Output {
// Create a future that immediately returns an empty vector of block bodies and a random
// PeerId.

View File

@@ -8,7 +8,10 @@ use alloy_primitives::B256;
use futures::FutureExt;
use reth_ethereum_primitives::BlockBody;
use reth_network_peers::PeerId;
use std::fmt::{Debug, Formatter};
use std::{
fmt::{Debug, Formatter},
ops::RangeInclusive,
};
use tokio::sync::oneshot;
/// A test client for fetching bodies
@@ -40,10 +43,11 @@ where
type Body = BlockBody;
type Output = BodiesFut;
fn get_block_bodies_with_priority(
fn get_block_bodies_with_priority_and_range_hint(
&self,
hashes: Vec<B256>,
_priority: Priority,
_range_hint: Option<RangeInclusive<u64>>,
) -> Self::Output {
let (tx, rx) = oneshot::channel();
let _ = tx.send((self.responder)(hashes));

View File

@@ -14,7 +14,7 @@ use reth_eth_wire_types::HeadersDirection;
use reth_ethereum_primitives::{Block, BlockBody};
use reth_network_peers::{PeerId, WithPeerId};
use reth_primitives_traits::{SealedBlock, SealedHeader};
use std::{collections::HashMap, sync::Arc};
use std::{collections::HashMap, ops::RangeInclusive, sync::Arc};
/// A headers+bodies client that stores the headers and bodies in memory, with an artificial soft
/// bodies response limit that is set to 20 by default.
@@ -145,10 +145,11 @@ impl BodiesClient for TestFullBlockClient {
/// # Returns
///
/// A future containing the result of the block body retrieval operation.
fn get_block_bodies_with_priority(
fn get_block_bodies_with_priority_and_range_hint(
&self,
hashes: Vec<B256>,
_priority: Priority,
_range_hint: Option<RangeInclusive<u64>>,
) -> Self::Output {
// Acquire a lock on the bodies.
let bodies = self.bodies.lock();