feat: add fullblock client (#2613)

This commit is contained in:
Matthias Seitz
2023-05-09 16:46:17 +02:00
committed by GitHub
parent 32b9a5b05e
commit 64250b3b07
8 changed files with 394 additions and 13 deletions

1
Cargo.lock generated
View File

@@ -4951,6 +4951,7 @@ dependencies = [
"thiserror",
"tokio",
"tokio-stream",
"tracing",
]
[[package]]

View File

@@ -11,19 +11,23 @@ reth-codecs = { path = "../storage/codecs" }
reth-primitives = { path = "../primitives" }
reth-rpc-types = { path = "../rpc/rpc-types" }
reth-network-api = { path = "../net/network-api" }
revm-primitives = "1.1"
async-trait = "0.1.57"
thiserror = "1.0.37"
auto_impl = "1.0"
tokio = { version = "1.21.2", features = ["sync"] }
# TODO(onbjerg): We only need this for [BlockBody]
reth-eth-wire = { path = "../net/eth-wire" }
# codecs
# eth
revm-primitives = "1.1"
parity-scale-codec = { version = "3.2.1", features = ["bytes"] }
# async
async-trait = "0.1.57"
futures = "0.3"
tokio = { version = "1.21.2", features = ["sync"] }
tokio-stream = "0.1.11"
# misc
auto_impl = "1.0"
thiserror = "1.0.37"
tracing = "0.1"
rand = "0.8.5"
arbitrary = { version = "1.1.7", features = ["derive"], optional = true }
secp256k1 = { version = "0.27.0", default-features = false, features = [

View File

@@ -1,7 +1,10 @@
use std::pin::Pin;
use std::{
pin::Pin,
task::{ready, Context, Poll},
};
use crate::p2p::{download::DownloadClient, error::PeerRequestResult, priority::Priority};
use futures::Future;
use futures::{Future, FutureExt};
use reth_primitives::{BlockBody, H256};
/// The bodies future type
@@ -10,7 +13,7 @@ pub type BodiesFut = Pin<Box<dyn Future<Output = PeerRequestResult<Vec<BlockBody
/// A client capable of downloading block bodies.
#[auto_impl::auto_impl(&, Arc, Box)]
pub trait BodiesClient: DownloadClient {
/// The bodies type
/// The output of the request future for querying block bodies.
type Output: Future<Output = PeerRequestResult<Vec<BlockBody>>> + Sync + Send + Unpin;
/// Fetches the block body for the requested block.
@@ -21,4 +24,39 @@ pub trait BodiesClient: DownloadClient {
/// Fetches the block body for the requested block with priority
fn get_block_bodies_with_priority(&self, hashes: Vec<H256>, priority: Priority)
-> Self::Output;
/// Fetches a single block body for the requested hash.
fn get_block_body(&self, hash: H256) -> SingleBodyRequest<Self::Output> {
self.get_block_body_with_priority(hash, Priority::Normal)
}
/// Fetches a single block body for the requested hash with priority
fn get_block_body_with_priority(
&self,
hash: H256,
priority: Priority,
) -> SingleBodyRequest<Self::Output> {
let fut = self.get_block_bodies_with_priority(vec![hash], priority);
SingleBodyRequest { fut }
}
}
/// A Future that resolves to a single block body.
#[derive(Debug)]
#[must_use = "futures do nothing unless polled"]
pub struct SingleBodyRequest<Fut> {
fut: Fut,
}
impl<Fut> Future for SingleBodyRequest<Fut>
where
Fut: Future<Output = PeerRequestResult<Vec<BlockBody>>> + Sync + Send + Unpin,
{
type Output = PeerRequestResult<Option<BlockBody>>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let resp = ready!(self.get_mut().fut.poll_unpin(cx));
let resp = resp.map(|res| res.map(|bodies| bodies.into_iter().next()));
Poll::Ready(resp)
}
}

View File

@@ -0,0 +1,279 @@
use crate::p2p::{
bodies::client::{BodiesClient, SingleBodyRequest},
error::PeerRequestResult,
headers::client::{HeadersClient, SingleHeaderRequest},
};
use reth_primitives::{BlockBody, Header, SealedBlock, SealedHeader, H256};
use std::{
fmt::Debug,
future::Future,
pin::Pin,
task::{ready, Context, Poll},
};
use tracing::debug;
/// A Client that can fetch full blocks from the network.
#[derive(Debug, Clone)]
pub struct FullBlockClient<Client> {
client: Client,
}
impl<Client> FullBlockClient<Client> {
/// Creates a new instance of `FullBlockClient`.
pub fn new(client: Client) -> Self {
Self { client }
}
}
impl<Client> FullBlockClient<Client>
where
Client: BodiesClient + HeadersClient + Clone,
{
/// Returns a future that fetches the [SealedBlock] for the given hash.
///
/// Note: this future is cancel safe
///
/// Caution: This does no validation of body (transactions) response but guarantees that the
/// [SealedHeader] matches the requested hash.
pub fn get_full_block(&self, hash: H256) -> FetchFullBlockFuture<Client> {
let client = self.client.clone();
FetchFullBlockFuture {
hash,
request: FullBlockRequest {
header: Some(client.get_header(hash.into())),
body: Some(client.get_block_body(hash)),
},
client,
header: None,
body: None,
}
}
}
/// A future that downloads a full block from the network.
#[must_use = "futures do nothing unless polled"]
pub struct FetchFullBlockFuture<Client>
where
Client: BodiesClient + HeadersClient,
{
client: Client,
hash: H256,
request: FullBlockRequest<Client>,
header: Option<SealedHeader>,
body: Option<BlockBody>,
}
impl<Client> FetchFullBlockFuture<Client>
where
Client: BodiesClient + HeadersClient,
{
/// If the header request is already complete, this returns the block number
pub fn block_number(&self) -> Option<u64> {
self.header.as_ref().map(|h| h.number)
}
/// Returns the [SealedBlock] if the request is complete.
fn take_block(&mut self) -> Option<SealedBlock> {
if self.header.is_none() || self.body.is_none() {
return None
}
let header = self.header.take().unwrap();
let body = self.body.take().unwrap();
Some(SealedBlock::new(header, body))
}
}
impl<Client> Future for FetchFullBlockFuture<Client>
where
Client: BodiesClient + HeadersClient + Unpin + 'static,
{
type Output = SealedBlock;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
loop {
match ready!(this.request.poll(cx)) {
ResponseResult::Header(res) => {
match res {
Ok(maybe_header) => {
let (peer, maybe_header) =
maybe_header.map(|h| h.map(|h| h.seal_slow())).split();
if let Some(header) = maybe_header {
if header.hash() != this.hash {
debug!(target: "downloaders", expected=?this.hash, received=?header.hash, "Received wrong header");
// received bad header
this.client.report_bad_message(peer)
} else {
this.header = Some(header);
}
}
}
Err(err) => {
debug!(target: "downloaders", %err, ?this.hash, "Header download failed");
}
}
if this.header.is_none() {
// received bad response
this.request.header = Some(this.client.get_header(this.hash.into()));
}
}
ResponseResult::Body(res) => {
match res {
Ok(maybe_body) => {
this.body = maybe_body.into_data();
}
Err(err) => {
debug!(target: "downloaders", %err, ?this.hash, "Body download failed");
}
}
if this.body.is_none() {
// received bad response
this.request.body = Some(this.client.get_block_body(this.hash));
}
}
}
if let Some(res) = this.take_block() {
return Poll::Ready(res)
}
}
}
}
impl<Client> Debug for FetchFullBlockFuture<Client>
where
Client: BodiesClient + HeadersClient,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("FetchFullBlockFuture")
.field("hash", &self.hash)
.field("header", &self.header)
.field("body", &self.body)
.finish()
}
}
struct FullBlockRequest<Client>
where
Client: BodiesClient + HeadersClient,
{
header: Option<SingleHeaderRequest<<Client as HeadersClient>::Output>>,
body: Option<SingleBodyRequest<<Client as BodiesClient>::Output>>,
}
impl<Client> FullBlockRequest<Client>
where
Client: BodiesClient + HeadersClient,
{
fn poll(&mut self, cx: &mut Context<'_>) -> Poll<ResponseResult> {
if let Some(fut) = Pin::new(&mut self.header).as_pin_mut() {
if let Poll::Ready(res) = fut.poll(cx) {
self.header = None;
return Poll::Ready(ResponseResult::Header(res))
}
}
if let Some(fut) = Pin::new(&mut self.body).as_pin_mut() {
if let Poll::Ready(res) = fut.poll(cx) {
self.header = None;
return Poll::Ready(ResponseResult::Body(res))
}
}
Poll::Pending
}
}
enum ResponseResult {
Header(PeerRequestResult<Option<Header>>),
Body(PeerRequestResult<Option<BlockBody>>),
}
#[cfg(test)]
mod tests {
use super::*;
use crate::p2p::{
download::DownloadClient, headers::client::HeadersRequest, priority::Priority,
};
use parking_lot::Mutex;
use reth_primitives::{BlockHashOrNumber, PeerId, WithPeerId};
use std::{collections::HashMap, sync::Arc};
#[derive(Clone, Default, Debug)]
struct TestSingleFullBlockClient {
headers: Arc<Mutex<HashMap<H256, Header>>>,
bodies: Arc<Mutex<HashMap<H256, BlockBody>>>,
}
impl TestSingleFullBlockClient {
fn insert(&self, header: SealedHeader, body: BlockBody) {
let hash = header.hash();
let header = header.unseal();
self.headers.lock().insert(hash, header);
self.bodies.lock().insert(hash, body);
}
}
impl DownloadClient for TestSingleFullBlockClient {
fn report_bad_message(&self, _peer_id: PeerId) {}
fn num_connected_peers(&self) -> usize {
1
}
}
impl HeadersClient for TestSingleFullBlockClient {
type Output = futures::future::Ready<PeerRequestResult<Vec<Header>>>;
fn get_headers_with_priority(
&self,
request: HeadersRequest,
_priority: Priority,
) -> Self::Output {
let headers = self.headers.lock();
let resp = match request.start {
BlockHashOrNumber::Hash(hash) => headers.get(&hash).cloned(),
BlockHashOrNumber::Number(num) => {
headers.values().find(|h| h.number == num).cloned()
}
}
.map(|h| vec![h])
.unwrap_or_default();
futures::future::ready(Ok(WithPeerId::new(PeerId::random(), resp)))
}
}
impl BodiesClient for TestSingleFullBlockClient {
type Output = futures::future::Ready<PeerRequestResult<Vec<BlockBody>>>;
fn get_block_bodies_with_priority(
&self,
hashes: Vec<H256>,
_priority: Priority,
) -> Self::Output {
let bodies = self.bodies.lock();
let mut all_bodies = Vec::new();
for hash in hashes {
if let Some(body) = bodies.get(&hash) {
all_bodies.push(body.clone());
}
}
futures::future::ready(Ok(WithPeerId::new(PeerId::random(), all_bodies)))
}
}
#[tokio::test]
async fn download_single_full_block() {
let client = TestSingleFullBlockClient::default();
let header = SealedHeader::default();
let body = BlockBody::default();
client.insert(header.clone(), body.clone());
let client = FullBlockClient::new(client);
let received = client.get_full_block(header.hash()).await;
assert_eq!(received, SealedBlock::new(header, body));
}
}

View File

@@ -1,8 +1,12 @@
use crate::p2p::{download::DownloadClient, error::PeerRequestResult, priority::Priority};
use futures::Future;
use futures::{Future, FutureExt};
pub use reth_eth_wire::BlockHeaders;
use reth_primitives::{BlockHashOrNumber, Head, Header, HeadersDirection};
use std::{fmt::Debug, pin::Pin};
use std::{
fmt::Debug,
pin::Pin,
task::{ready, Context, Poll},
};
/// The header request struct to be sent to connected peers, which
/// will proceed to ask them to stream the requested headers to us.
@@ -22,7 +26,7 @@ pub type HeadersFut = Pin<Box<dyn Future<Output = PeerRequestResult<Vec<Header>>
/// The block headers downloader client
#[auto_impl::auto_impl(&, Arc, Box)]
pub trait HeadersClient: DownloadClient {
/// The headers type
/// The headers future type
type Output: Future<Output = PeerRequestResult<Vec<Header>>> + Sync + Send + Unpin;
/// Sends the header request to the p2p network and returns the header response received from a
@@ -38,6 +42,47 @@ pub trait HeadersClient: DownloadClient {
request: HeadersRequest,
priority: Priority,
) -> Self::Output;
/// Fetches a single header for the requested number or hash.
fn get_header(&self, start: BlockHashOrNumber) -> SingleHeaderRequest<Self::Output> {
self.get_header_with_priority(start, Priority::Normal)
}
/// Fetches a single header for the requested number or hash with priority
fn get_header_with_priority(
&self,
start: BlockHashOrNumber,
priority: Priority,
) -> SingleHeaderRequest<Self::Output> {
let req = HeadersRequest {
start,
limit: 1,
// doesn't matter for a single header
direction: HeadersDirection::Rising,
};
let fut = self.get_headers_with_priority(req, priority);
SingleHeaderRequest { fut }
}
}
/// A Future that resolves to a single block body.
#[derive(Debug)]
#[must_use = "futures do nothing unless polled"]
pub struct SingleHeaderRequest<Fut> {
fut: Fut,
}
impl<Fut> Future for SingleHeaderRequest<Fut>
where
Fut: Future<Output = PeerRequestResult<Vec<Header>>> + Sync + Send + Unpin,
{
type Output = PeerRequestResult<Option<Header>>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let resp = ready!(self.get_mut().fut.poll_unpin(cx));
let resp = resp.map(|res| res.map(|headers| headers.into_iter().next()));
Poll::Ready(resp)
}
}
/// The status updater for updating the status of the p2p node

View File

@@ -4,6 +4,9 @@ pub mod download;
/// Traits for implementing P2P block body clients.
pub mod bodies;
/// An implementation that uses headers and bodies traits to download full blocks
pub mod full_block;
/// Traits for implementing P2P Header Clients. Also includes implementations
/// of a Linear and a Parallel downloader generic over the [`Consensus`] and
/// [`HeadersClient`].

View File

@@ -123,6 +123,12 @@ pub struct SealedBlock {
}
impl SealedBlock {
/// Create a new sealed block instance using the sealed header and block body.
pub fn new(header: SealedHeader, body: BlockBody) -> Self {
let BlockBody { transactions, ommers, withdrawals } = body;
Self { header, body: transactions, ommers, withdrawals }
}
/// Header hash.
pub fn hash(&self) -> H256 {
self.header.hash()

View File

@@ -46,4 +46,9 @@ impl<T> WithPeerId<T> {
pub fn split(self) -> (PeerId, T) {
(self.0, self.1)
}
/// Maps the inner value to a new value using the given function.
pub fn map<U, F: FnOnce(T) -> U>(self, op: F) -> WithPeerId<U> {
WithPeerId(self.0, op(self.1))
}
}