mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-02-12 16:05:08 -05:00
319 lines
11 KiB
Rust
319 lines
11 KiB
Rust
//! Blocks/Headers management for the p2p network.
|
|
|
|
use crate::{
|
|
budget::DEFAULT_BUDGET_TRY_DRAIN_DOWNLOADERS, metered_poll_nested_stream_with_budget,
|
|
metrics::EthRequestHandlerMetrics,
|
|
};
|
|
use alloy_consensus::{BlockHeader, ReceiptWithBloom};
|
|
use alloy_eips::BlockHashOrNumber;
|
|
use alloy_rlp::Encodable;
|
|
use futures::StreamExt;
|
|
use reth_eth_wire::{
|
|
BlockBodies, BlockHeaders, EthNetworkPrimitives, GetBlockBodies, GetBlockHeaders, GetNodeData,
|
|
GetReceipts, HeadersDirection, NetworkPrimitives, NodeData, Receipts,
|
|
};
|
|
use reth_network_api::test_utils::PeersHandle;
|
|
use reth_network_p2p::error::RequestResult;
|
|
use reth_network_peers::PeerId;
|
|
use reth_primitives_traits::Block;
|
|
use reth_storage_api::{BlockReader, HeaderProvider};
|
|
use std::{
|
|
future::Future,
|
|
pin::Pin,
|
|
task::{Context, Poll},
|
|
time::Duration,
|
|
};
|
|
use tokio::sync::{mpsc::Receiver, oneshot};
|
|
use tokio_stream::wrappers::ReceiverStream;
|
|
|
|
// Limits: <https://github.com/ethereum/go-ethereum/blob/b0d44338bbcefee044f1f635a84487cbbd8f0538/eth/protocols/eth/handler.go#L34-L56>
|
|
|
|
/// Maximum number of receipts to serve.
|
|
///
|
|
/// Used to limit lookups.
|
|
const MAX_RECEIPTS_SERVE: usize = 1024;
|
|
|
|
/// Maximum number of block headers to serve.
|
|
///
|
|
/// Used to limit lookups.
|
|
const MAX_HEADERS_SERVE: usize = 1024;
|
|
|
|
/// Maximum number of block headers to serve.
|
|
///
|
|
/// Used to limit lookups. With 24KB block sizes nowadays, the practical limit will always be
|
|
/// `SOFT_RESPONSE_LIMIT`.
|
|
const MAX_BODIES_SERVE: usize = 1024;
|
|
|
|
/// Maximum size of replies to data retrievals: 2MB
|
|
const SOFT_RESPONSE_LIMIT: usize = 2 * 1024 * 1024;
|
|
|
|
/// Manages eth related requests on top of the p2p network.
|
|
///
|
|
/// This can be spawned to another task and is supposed to be run as background service.
|
|
#[derive(Debug)]
|
|
#[must_use = "Manager does nothing unless polled."]
|
|
pub struct EthRequestHandler<C, N: NetworkPrimitives = EthNetworkPrimitives> {
|
|
/// The client type that can interact with the chain.
|
|
client: C,
|
|
/// Used for reporting peers.
|
|
// TODO use to report spammers
|
|
#[allow(dead_code)]
|
|
peers: PeersHandle,
|
|
/// Incoming request from the [`NetworkManager`](crate::NetworkManager).
|
|
incoming_requests: ReceiverStream<IncomingEthRequest<N>>,
|
|
/// Metrics for the eth request handler.
|
|
metrics: EthRequestHandlerMetrics,
|
|
}
|
|
|
|
// === impl EthRequestHandler ===
|
|
impl<C, N: NetworkPrimitives> EthRequestHandler<C, N> {
|
|
/// Create a new instance
|
|
pub fn new(client: C, peers: PeersHandle, incoming: Receiver<IncomingEthRequest<N>>) -> Self {
|
|
Self {
|
|
client,
|
|
peers,
|
|
incoming_requests: ReceiverStream::new(incoming),
|
|
metrics: Default::default(),
|
|
}
|
|
}
|
|
}
|
|
|
|
impl<C, N> EthRequestHandler<C, N>
|
|
where
|
|
N: NetworkPrimitives,
|
|
C: BlockReader,
|
|
{
|
|
/// Returns the list of requested headers
|
|
fn get_headers_response(&self, request: GetBlockHeaders) -> Vec<C::Header> {
|
|
let GetBlockHeaders { start_block, limit, skip, direction } = request;
|
|
|
|
let mut headers = Vec::new();
|
|
|
|
let mut block: BlockHashOrNumber = match start_block {
|
|
BlockHashOrNumber::Hash(start) => start.into(),
|
|
BlockHashOrNumber::Number(num) => {
|
|
let Some(hash) = self.client.block_hash(num).unwrap_or_default() else {
|
|
return headers
|
|
};
|
|
hash.into()
|
|
}
|
|
};
|
|
|
|
let skip = skip as u64;
|
|
let mut total_bytes = 0;
|
|
|
|
for _ in 0..limit {
|
|
if let Some(header) = self.client.header_by_hash_or_number(block).unwrap_or_default() {
|
|
match direction {
|
|
HeadersDirection::Rising => {
|
|
if let Some(next) = (header.number() + 1).checked_add(skip) {
|
|
block = next.into()
|
|
} else {
|
|
break
|
|
}
|
|
}
|
|
HeadersDirection::Falling => {
|
|
if skip > 0 {
|
|
// prevent under flows for block.number == 0 and `block.number - skip <
|
|
// 0`
|
|
if let Some(next) =
|
|
header.number().checked_sub(1).and_then(|num| num.checked_sub(skip))
|
|
{
|
|
block = next.into()
|
|
} else {
|
|
break
|
|
}
|
|
} else {
|
|
block = header.parent_hash().into()
|
|
}
|
|
}
|
|
}
|
|
|
|
total_bytes += header.length();
|
|
headers.push(header);
|
|
|
|
if headers.len() >= MAX_HEADERS_SERVE || total_bytes > SOFT_RESPONSE_LIMIT {
|
|
break
|
|
}
|
|
} else {
|
|
break
|
|
}
|
|
}
|
|
|
|
headers
|
|
}
|
|
|
|
fn on_headers_request(
|
|
&self,
|
|
_peer_id: PeerId,
|
|
request: GetBlockHeaders,
|
|
response: oneshot::Sender<RequestResult<BlockHeaders<C::Header>>>,
|
|
) {
|
|
self.metrics.eth_headers_requests_received_total.increment(1);
|
|
let headers = self.get_headers_response(request);
|
|
let _ = response.send(Ok(BlockHeaders(headers)));
|
|
}
|
|
|
|
fn on_bodies_request(
|
|
&self,
|
|
_peer_id: PeerId,
|
|
request: GetBlockBodies,
|
|
response: oneshot::Sender<RequestResult<BlockBodies<<C::Block as Block>::Body>>>,
|
|
) {
|
|
self.metrics.eth_bodies_requests_received_total.increment(1);
|
|
let mut bodies = Vec::new();
|
|
|
|
let mut total_bytes = 0;
|
|
|
|
for hash in request.0 {
|
|
if let Some(block) = self.client.block_by_hash(hash).unwrap_or_default() {
|
|
let body = block.into_body();
|
|
total_bytes += body.length();
|
|
bodies.push(body);
|
|
|
|
if bodies.len() >= MAX_BODIES_SERVE || total_bytes > SOFT_RESPONSE_LIMIT {
|
|
break
|
|
}
|
|
} else {
|
|
break
|
|
}
|
|
}
|
|
|
|
let _ = response.send(Ok(BlockBodies(bodies)));
|
|
}
|
|
|
|
fn on_receipts_request(
|
|
&self,
|
|
_peer_id: PeerId,
|
|
request: GetReceipts,
|
|
response: oneshot::Sender<RequestResult<Receipts<C::Receipt>>>,
|
|
) {
|
|
self.metrics.eth_receipts_requests_received_total.increment(1);
|
|
|
|
let mut receipts = Vec::new();
|
|
|
|
let mut total_bytes = 0;
|
|
|
|
for hash in request.0 {
|
|
if let Some(receipts_by_block) =
|
|
self.client.receipts_by_block(BlockHashOrNumber::Hash(hash)).unwrap_or_default()
|
|
{
|
|
let receipt =
|
|
receipts_by_block.into_iter().map(ReceiptWithBloom::from).collect::<Vec<_>>();
|
|
|
|
total_bytes += receipt.length();
|
|
receipts.push(receipt);
|
|
|
|
if receipts.len() >= MAX_RECEIPTS_SERVE || total_bytes > SOFT_RESPONSE_LIMIT {
|
|
break
|
|
}
|
|
} else {
|
|
break
|
|
}
|
|
}
|
|
|
|
let _ = response.send(Ok(Receipts(receipts)));
|
|
}
|
|
}
|
|
|
|
/// An endless future.
|
|
///
|
|
/// This should be spawned or used as part of `tokio::select!`.
|
|
impl<C, N> Future for EthRequestHandler<C, N>
|
|
where
|
|
N: NetworkPrimitives,
|
|
C: BlockReader<Block = N::Block, Receipt = N::Receipt>
|
|
+ HeaderProvider<Header = N::BlockHeader>
|
|
+ Unpin,
|
|
{
|
|
type Output = ();
|
|
|
|
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
|
let this = self.get_mut();
|
|
|
|
let mut acc = Duration::ZERO;
|
|
let maybe_more_incoming_requests = metered_poll_nested_stream_with_budget!(
|
|
acc,
|
|
"net::eth",
|
|
"Incoming eth requests stream",
|
|
DEFAULT_BUDGET_TRY_DRAIN_DOWNLOADERS,
|
|
this.incoming_requests.poll_next_unpin(cx),
|
|
|incoming| {
|
|
match incoming {
|
|
IncomingEthRequest::GetBlockHeaders { peer_id, request, response } => {
|
|
this.on_headers_request(peer_id, request, response)
|
|
}
|
|
IncomingEthRequest::GetBlockBodies { peer_id, request, response } => {
|
|
this.on_bodies_request(peer_id, request, response)
|
|
}
|
|
IncomingEthRequest::GetNodeData { .. } => {
|
|
this.metrics.eth_node_data_requests_received_total.increment(1);
|
|
}
|
|
IncomingEthRequest::GetReceipts { peer_id, request, response } => {
|
|
this.on_receipts_request(peer_id, request, response)
|
|
}
|
|
}
|
|
},
|
|
);
|
|
|
|
this.metrics.acc_duration_poll_eth_req_handler.set(acc.as_secs_f64());
|
|
|
|
// stream is fully drained and import futures pending
|
|
if maybe_more_incoming_requests {
|
|
// make sure we're woken up again
|
|
cx.waker().wake_by_ref();
|
|
}
|
|
|
|
Poll::Pending
|
|
}
|
|
}
|
|
|
|
/// All `eth` request related to blocks delegated by the network.
|
|
#[derive(Debug)]
|
|
pub enum IncomingEthRequest<N: NetworkPrimitives = EthNetworkPrimitives> {
|
|
/// Request Block headers from the peer.
|
|
///
|
|
/// The response should be sent through the channel.
|
|
GetBlockHeaders {
|
|
/// The ID of the peer to request block headers from.
|
|
peer_id: PeerId,
|
|
/// The specific block headers requested.
|
|
request: GetBlockHeaders,
|
|
/// The channel sender for the response containing block headers.
|
|
response: oneshot::Sender<RequestResult<BlockHeaders<N::BlockHeader>>>,
|
|
},
|
|
/// Request Block bodies from the peer.
|
|
///
|
|
/// The response should be sent through the channel.
|
|
GetBlockBodies {
|
|
/// The ID of the peer to request block bodies from.
|
|
peer_id: PeerId,
|
|
/// The specific block bodies requested.
|
|
request: GetBlockBodies,
|
|
/// The channel sender for the response containing block bodies.
|
|
response: oneshot::Sender<RequestResult<BlockBodies<N::BlockBody>>>,
|
|
},
|
|
/// Request Node Data from the peer.
|
|
///
|
|
/// The response should be sent through the channel.
|
|
GetNodeData {
|
|
/// The ID of the peer to request node data from.
|
|
peer_id: PeerId,
|
|
/// The specific node data requested.
|
|
request: GetNodeData,
|
|
/// The channel sender for the response containing node data.
|
|
response: oneshot::Sender<RequestResult<NodeData>>,
|
|
},
|
|
/// Request Receipts from the peer.
|
|
///
|
|
/// The response should be sent through the channel.
|
|
GetReceipts {
|
|
/// The ID of the peer to request receipts from.
|
|
peer_id: PeerId,
|
|
/// The specific receipts requested.
|
|
request: GetReceipts,
|
|
/// The channel sender for the response containing receipts.
|
|
response: oneshot::Sender<RequestResult<Receipts<N::Receipt>>>,
|
|
},
|
|
}
|