From fd840e1c66fe371c54d6779ae482907440d232a2 Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Sat, 26 Nov 2022 10:04:42 +0100 Subject: [PATCH] refactor: bodies client API (#257) * refactor: bodies client API * chore: fix errors --- crates/interfaces/src/p2p/bodies/client.rs | 7 +- .../interfaces/src/p2p/bodies/downloader.rs | 4 +- crates/interfaces/src/p2p/bodies/error.rs | 51 ------ crates/interfaces/src/p2p/bodies/mod.rs | 3 - crates/interfaces/src/p2p/error.rs | 9 + crates/interfaces/src/test_utils/bodies.rs | 20 +-- .../net/bodies-downloaders/src/concurrent.rs | 156 ++++-------------- crates/net/bodies-downloaders/src/lib.rs | 3 + .../net/bodies-downloaders/src/test_utils.rs | 67 ++++++++ crates/net/network/src/fetch/client.rs | 25 +-- crates/stages/src/stages/bodies.rs | 31 ++-- 11 files changed, 150 insertions(+), 226 deletions(-) delete mode 100644 crates/interfaces/src/p2p/bodies/error.rs create mode 100644 crates/net/bodies-downloaders/src/test_utils.rs diff --git a/crates/interfaces/src/p2p/bodies/client.rs b/crates/interfaces/src/p2p/bodies/client.rs index 4e546fea95..c8953ba2c4 100644 --- a/crates/interfaces/src/p2p/bodies/client.rs +++ b/crates/interfaces/src/p2p/bodies/client.rs @@ -1,8 +1,7 @@ +use crate::p2p::error::RequestResult; +use async_trait::async_trait; use reth_eth_wire::BlockBody; use reth_primitives::H256; - -use crate::p2p::bodies::error::BodiesClientError; -use async_trait::async_trait; use std::fmt::Debug; /// A client capable of downloading block bodies. @@ -10,5 +9,5 @@ use std::fmt::Debug; #[auto_impl::auto_impl(&, Arc, Box)] pub trait BodiesClient: Send + Sync + Debug { /// Fetches the block body for the requested block. - async fn get_block_body(&self, hash: H256) -> Result; + async fn get_block_body(&self, hash: Vec) -> RequestResult>; } diff --git a/crates/interfaces/src/p2p/bodies/downloader.rs b/crates/interfaces/src/p2p/bodies/downloader.rs index a1011eb1f0..81716abb72 100644 --- a/crates/interfaces/src/p2p/bodies/downloader.rs +++ b/crates/interfaces/src/p2p/bodies/downloader.rs @@ -1,5 +1,5 @@ use super::client::BodiesClient; -use crate::p2p::bodies::error::DownloadError; +use crate::p2p::error::RequestResult; use reth_eth_wire::BlockBody; use reth_primitives::{BlockNumber, H256}; use std::pin::Pin; @@ -38,4 +38,4 @@ pub trait BodyDownloader: Sync + Send { /// A stream of block bodies. pub type BodiesStream<'a> = - Pin> + Send + 'a>>; + Pin> + Send + 'a>>; diff --git a/crates/interfaces/src/p2p/bodies/error.rs b/crates/interfaces/src/p2p/bodies/error.rs deleted file mode 100644 index b6b11682ff..0000000000 --- a/crates/interfaces/src/p2p/bodies/error.rs +++ /dev/null @@ -1,51 +0,0 @@ -use crate::p2p::error::RequestError; -use reth_primitives::H256; -use thiserror::Error; - -/// Body client errors. -#[derive(Error, Debug, Clone)] -pub enum BodiesClientError { - /// Timed out while waiting for a response. - #[error("Timed out while getting bodies for block {header_hash}.")] - Timeout { - /// The header hash of the block that timed out. - header_hash: H256, - }, - /// The client encountered an internal error. - #[error(transparent)] - Internal(#[from] RequestError), -} - -/// Body downloader errors. -#[derive(Error, Debug, Clone)] -pub enum DownloadError { - /// Timed out while waiting for a response. - #[error("Timed out while getting bodies for block {header_hash}.")] - Timeout { - /// The header hash of the block that timed out. - header_hash: H256, - }, - /// The [BodiesClient] used by the downloader experienced an error. - #[error("The downloader client encountered an error.")] - Client { - /// The underlying client error. - #[source] - source: BodiesClientError, - }, -} - -impl From for DownloadError { - fn from(error: BodiesClientError) -> Self { - match error { - BodiesClientError::Timeout { header_hash } => DownloadError::Timeout { header_hash }, - _ => DownloadError::Client { source: error }, - } - } -} - -impl DownloadError { - /// Indicates whether this error is retryable or fatal. - pub fn is_retryable(&self) -> bool { - matches!(self, DownloadError::Timeout { .. }) - } -} diff --git a/crates/interfaces/src/p2p/bodies/mod.rs b/crates/interfaces/src/p2p/bodies/mod.rs index bc0c5df092..18dc91f553 100644 --- a/crates/interfaces/src/p2p/bodies/mod.rs +++ b/crates/interfaces/src/p2p/bodies/mod.rs @@ -3,6 +3,3 @@ pub mod client; /// Block body downloaders. pub mod downloader; - -/// Error types. -pub mod error; diff --git a/crates/interfaces/src/p2p/error.rs b/crates/interfaces/src/p2p/error.rs index 86087d29d2..2496f19f10 100644 --- a/crates/interfaces/src/p2p/error.rs +++ b/crates/interfaces/src/p2p/error.rs @@ -21,6 +21,15 @@ pub enum RequestError { BadResponse, } +// === impl RequestError === + +impl RequestError { + /// Indicates whether this error is retryable or fatal. + pub fn is_retryable(&self) -> bool { + matches!(self, RequestError::Timeout | RequestError::ConnectionDropped) + } +} + impl From> for RequestError { fn from(_: mpsc::error::SendError) -> Self { RequestError::ChannelClosed diff --git a/crates/interfaces/src/test_utils/bodies.rs b/crates/interfaces/src/test_utils/bodies.rs index 802b216739..6c03942c01 100644 --- a/crates/interfaces/src/test_utils/bodies.rs +++ b/crates/interfaces/src/test_utils/bodies.rs @@ -1,33 +1,27 @@ -use crate::p2p::bodies::{client::BodiesClient, error::BodiesClientError}; +use crate::p2p::{bodies::client::BodiesClient, error::RequestResult}; use async_trait::async_trait; use reth_eth_wire::BlockBody; use reth_primitives::H256; use std::fmt::{Debug, Formatter}; /// A test client for fetching bodies -pub struct TestBodiesClient -where - F: Fn(H256) -> Result, -{ +pub struct TestBodiesClient { /// The function that is called on each body request. pub responder: F, } -impl Debug for TestBodiesClient -where - F: Fn(H256) -> Result, -{ +impl Debug for TestBodiesClient { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - f.debug_struct("TestBodiesClient").finish() + f.debug_struct("TestBodiesClient").finish_non_exhaustive() } } #[async_trait] impl BodiesClient for TestBodiesClient where - F: Fn(H256) -> Result + Send + Sync, + F: Fn(Vec) -> RequestResult> + Send + Sync, { - async fn get_block_body(&self, hash: H256) -> Result { - (self.responder)(hash) + async fn get_block_body(&self, hashes: Vec) -> RequestResult> { + (self.responder)(hashes) } } diff --git a/crates/net/bodies-downloaders/src/concurrent.rs b/crates/net/bodies-downloaders/src/concurrent.rs index 1a1f686f25..d1c257f903 100644 --- a/crates/net/bodies-downloaders/src/concurrent.rs +++ b/crates/net/bodies-downloaders/src/concurrent.rs @@ -1,10 +1,12 @@ use backon::{ExponentialBackoff, Retryable}; use futures_util::{stream, StreamExt}; use reth_eth_wire::BlockBody; -use reth_interfaces::p2p::bodies::{ - client::BodiesClient, - downloader::{BodiesStream, BodyDownloader}, - error::{BodiesClientError, DownloadError}, +use reth_interfaces::p2p::{ + bodies::{ + client::BodiesClient, + downloader::{BodiesStream, BodyDownloader}, + }, + error::RequestResult, }; use reth_primitives::{BlockNumber, H256}; use std::sync::Arc; @@ -74,16 +76,9 @@ impl ConcurrentDownloader { &self, block_number: BlockNumber, header_hash: H256, - ) -> Result<(BlockNumber, H256, BlockBody), DownloadError> { - match self.client.get_block_body(header_hash).await { - Ok(body) => Ok((block_number, header_hash, body)), - Err(err) => Err(match err { - BodiesClientError::Timeout { header_hash } => { - DownloadError::Timeout { header_hash } - } - err => DownloadError::Client { source: err }, - }), - } + ) -> RequestResult<(BlockNumber, H256, BlockBody)> { + let mut body = self.client.get_block_body(vec![header_hash]).await?; + Ok((block_number, header_hash, body.remove(0))) } } @@ -91,16 +86,13 @@ impl ConcurrentDownloader { #[cfg(test)] mod tests { use super::*; - use crate::concurrent::{ - tests::test_utils::{generate_bodies, TestClient}, - ConcurrentDownloader, + use crate::{ + concurrent::ConcurrentDownloader, + test_utils::{generate_bodies, TestClient}, }; use assert_matches::assert_matches; use futures_util::stream::{StreamExt, TryStreamExt}; - use reth_interfaces::p2p::{ - bodies::{downloader::BodyDownloader, error::BodiesClientError}, - error::RequestError, - }; + use reth_interfaces::p2p::{bodies::downloader::BodyDownloader, error::RequestError}; use reth_primitives::H256; use std::{ sync::{ @@ -117,15 +109,15 @@ mod tests { // Generate some random blocks let (hashes, mut bodies) = generate_bodies(0..20); - let downloader = ConcurrentDownloader::new(Arc::new(TestClient::new(|hash: H256| { + let downloader = ConcurrentDownloader::new(Arc::new(TestClient::new(|hash: Vec| { let mut bodies = bodies.clone(); async move { // Simulate that the request for this (random) block takes 0-100ms - tokio::time::sleep(Duration::from_millis(hash.to_low_u64_be() % 100)).await; + tokio::time::sleep(Duration::from_millis(hash[0].to_low_u64_be() % 100)).await; - Ok(bodies - .remove(&hash) - .expect("Downloader asked for a block it should not ask for")) + Ok(vec![bodies + .remove(&hash[0]) + .expect("Downloader asked for a block it should not ask for")]) } }))); @@ -151,15 +143,14 @@ mod tests { /// Checks that non-retryable errors bubble up #[tokio::test] async fn client_failure() { - let downloader = ConcurrentDownloader::new(Arc::new(TestClient::new(|_: H256| async { - Err(BodiesClientError::Internal(RequestError::ChannelClosed)) - }))); + let downloader = + ConcurrentDownloader::new(Arc::new(TestClient::new(|_: Vec| async { + Err(RequestError::ChannelClosed) + }))); assert_matches!( downloader.bodies_stream(&[(0, H256::zero())]).next().await, - Some(Err(DownloadError::Client { - source: BodiesClientError::Internal(RequestError::ChannelClosed) - })) + Some(Err(RequestError::ChannelClosed)) ); } @@ -168,14 +159,15 @@ mod tests { async fn retries_timeouts() { let retries_left = Arc::new(AtomicUsize::new(3)); let downloader = - ConcurrentDownloader::new(Arc::new(TestClient::new(|header_hash: H256| { + ConcurrentDownloader::new(Arc::new(TestClient::new(|mut header_hash: Vec| { let retries_left = retries_left.clone(); + let _header_hash = header_hash.remove(0); async move { if retries_left.load(Ordering::SeqCst) > 0 { retries_left.fetch_sub(1, Ordering::SeqCst); - Err(BodiesClientError::Timeout { header_hash }) + Err(RequestError::Timeout) } else { - Ok(BlockBody { transactions: vec![], ommers: vec![] }) + Ok(vec![BlockBody { transactions: vec![], ommers: vec![] }]) } } }))); @@ -199,14 +191,15 @@ mod tests { async fn too_many_retries() { let retries_left = Arc::new(AtomicUsize::new(3)); let downloader = - ConcurrentDownloader::new(Arc::new(TestClient::new(|header_hash: H256| { + ConcurrentDownloader::new(Arc::new(TestClient::new(|mut header_hash: Vec| { + let _header_hash = header_hash.remove(0); let retries_left = retries_left.clone(); async move { if retries_left.load(Ordering::SeqCst) > 0 { retries_left.fetch_sub(1, Ordering::SeqCst); - Err(BodiesClientError::Timeout { header_hash }) + Err(RequestError::Timeout) } else { - Ok(BlockBody { transactions: vec![], ommers: vec![] }) + Ok(vec![BlockBody { transactions: vec![], ommers: vec![] }]) } } }))) @@ -214,93 +207,8 @@ mod tests { assert_matches!( downloader.bodies_stream(&[(0, H256::zero())]).next().await, - Some(Err(DownloadError::Timeout { header_hash })) => { - assert_eq!(header_hash, H256::zero()) - } + Some(Err(RequestError::Timeout)) ); assert_eq!(Arc::try_unwrap(retries_left).unwrap().into_inner(), 2); } - - mod test_utils { - use async_trait::async_trait; - use reth_eth_wire::BlockBody; - use reth_interfaces::{ - p2p::bodies::{client::BodiesClient, error::BodiesClientError}, - test_utils::generators::random_block_range, - }; - use reth_primitives::{BlockNumber, H256}; - use std::{ - collections::HashMap, - fmt::{Debug, Formatter}, - future::Future, - sync::Arc, - }; - use tokio::sync::Mutex; - - /// Generate a set of bodies and their corresponding block hashes - pub(crate) fn generate_bodies( - rng: std::ops::Range, - ) -> (Vec<(BlockNumber, H256)>, HashMap) { - let blocks = random_block_range(rng, H256::zero()); - - let hashes: Vec<(BlockNumber, H256)> = - blocks.iter().map(|block| (block.number, block.hash())).collect(); - let bodies: HashMap = blocks - .into_iter() - .map(|block| { - ( - block.hash(), - BlockBody { - transactions: block.body, - ommers: block - .ommers - .into_iter() - .map(|header| header.unseal()) - .collect(), - }, - ) - }) - .collect(); - - (hashes, bodies) - } - - /// A [BodiesClient] for testing. - pub(crate) struct TestClient(pub(crate) Arc>) - where - F: FnMut(H256) -> Fut + Send + Sync, - Fut: Future> + Send; - - impl Debug for TestClient - where - F: FnMut(H256) -> Fut + Send + Sync, - Fut: Future> + Send, - { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - f.debug_struct("TestClient").finish() - } - } - - impl TestClient - where - F: FnMut(H256) -> Fut + Send + Sync, - Fut: Future> + Send, - { - pub(crate) fn new(f: F) -> Self { - Self(Arc::new(Mutex::new(f))) - } - } - - #[async_trait] - impl BodiesClient for TestClient - where - F: FnMut(H256) -> Fut + Send + Sync, - Fut: Future> + Send, - { - async fn get_block_body(&self, hash: H256) -> Result { - let f = &mut *self.0.lock().await; - (f)(hash).await - } - } - } } diff --git a/crates/net/bodies-downloaders/src/lib.rs b/crates/net/bodies-downloaders/src/lib.rs index e9f0fb6c39..92767f046f 100644 --- a/crates/net/bodies-downloaders/src/lib.rs +++ b/crates/net/bodies-downloaders/src/lib.rs @@ -9,3 +9,6 @@ /// A naive concurrent downloader. pub mod concurrent; + +#[cfg(test)] +mod test_utils; diff --git a/crates/net/bodies-downloaders/src/test_utils.rs b/crates/net/bodies-downloaders/src/test_utils.rs new file mode 100644 index 0000000000..6a36b4c207 --- /dev/null +++ b/crates/net/bodies-downloaders/src/test_utils.rs @@ -0,0 +1,67 @@ +//! Test helper impls + +use async_trait::async_trait; +use reth_eth_wire::BlockBody; +use reth_interfaces::{ + p2p::{bodies::client::BodiesClient, error::RequestResult}, + test_utils::generators::random_block_range, +}; +use reth_primitives::{BlockNumber, H256}; +use std::{ + collections::HashMap, + fmt::{Debug, Formatter}, + future::Future, + sync::Arc, +}; +use tokio::sync::Mutex; + +/// Generate a set of bodies and their corresponding block hashes +pub(crate) fn generate_bodies( + rng: std::ops::Range, +) -> (Vec<(BlockNumber, H256)>, HashMap) { + let blocks = random_block_range(rng, H256::zero()); + + let hashes: Vec<(BlockNumber, H256)> = + blocks.iter().map(|block| (block.number, block.hash())).collect(); + let bodies: HashMap = blocks + .into_iter() + .map(|block| { + ( + block.hash(), + BlockBody { + transactions: block.body, + ommers: block.ommers.into_iter().map(|header| header.unseal()).collect(), + }, + ) + }) + .collect(); + + (hashes, bodies) +} + +/// A [BodiesClient] for testing. +pub(crate) struct TestClient(pub(crate) Arc>); + +impl Debug for TestClient { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("TestClient").finish_non_exhaustive() + } +} + +impl TestClient { + pub(crate) fn new(f: F) -> Self { + Self(Arc::new(Mutex::new(f))) + } +} + +#[async_trait] +impl BodiesClient for TestClient +where + F: FnMut(Vec) -> Fut + Send + Sync, + Fut: Future>> + Send, +{ + async fn get_block_body(&self, hash: Vec) -> RequestResult> { + let f = &mut *self.0.lock().await; + (f)(hash).await + } +} diff --git a/crates/net/network/src/fetch/client.rs b/crates/net/network/src/fetch/client.rs index 48876113e6..8f38fb6226 100644 --- a/crates/net/network/src/fetch/client.rs +++ b/crates/net/network/src/fetch/client.rs @@ -1,13 +1,13 @@ //! A client implementation that can interact with the network and download data. use crate::fetch::{DownloadRequest, StatusUpdate}; - -use reth_eth_wire::BlockHeaders; +use reth_eth_wire::{BlockBody, BlockHeaders}; use reth_interfaces::p2p::{ + bodies::client::BodiesClient, error::RequestResult, headers::client::{HeadersClient, HeadersRequest}, }; -use reth_primitives::{Header, H256, U256}; +use reth_primitives::{H256, U256}; use tokio::sync::{mpsc::UnboundedSender, oneshot}; /// Front-end API for fetching data from the network. @@ -19,24 +19,25 @@ pub struct FetchClient { pub(crate) status_tx: UnboundedSender, } -impl FetchClient { - /// Sends a `GetBlockHeaders` request to an available peer. - pub async fn get_block_headers(&self, request: HeadersRequest) -> RequestResult> { - let (response, rx) = oneshot::channel(); - self.request_tx.send(DownloadRequest::GetBlockHeaders { request, response })?; - rx.await? - } -} - #[async_trait::async_trait] impl HeadersClient for FetchClient { fn update_status(&self, height: u64, hash: H256, total_difficulty: U256) { let _ = self.status_tx.send(StatusUpdate { height, hash, total_difficulty }); } + /// Sends a `GetBlockHeaders` request to an available peer. async fn get_headers(&self, request: HeadersRequest) -> RequestResult { let (response, rx) = oneshot::channel(); self.request_tx.send(DownloadRequest::GetBlockHeaders { request, response })?; rx.await?.map(BlockHeaders::from) } } + +#[async_trait::async_trait] +impl BodiesClient for FetchClient { + async fn get_block_body(&self, request: Vec) -> RequestResult> { + let (response, rx) = oneshot::channel(); + self.request_tx.send(DownloadRequest::GetBlockBodies { request, response })?; + rx.await? + } +} diff --git a/crates/stages/src/stages/bodies.rs b/crates/stages/src/stages/bodies.rs index 2dd98cc5e2..e9a4a85415 100644 --- a/crates/stages/src/stages/bodies.rs +++ b/crates/stages/src/stages/bodies.rs @@ -234,7 +234,7 @@ mod tests { PREV_STAGE_ID, }; use assert_matches::assert_matches; - use reth_interfaces::{consensus, p2p::bodies::error::DownloadError}; + use reth_interfaces::{consensus, p2p::error::RequestError}; use std::collections::HashMap; use test_utils::*; @@ -437,10 +437,7 @@ mod tests { // overwrite responses let header = blocks.last().unwrap(); - runner.set_responses(HashMap::from([( - header.hash(), - Err(DownloadError::Timeout { header_hash: header.hash() }), - )])); + runner.set_responses(HashMap::from([(header.hash(), Err(RequestError::Timeout))])); // Run the stage let rx = runner.execute(input); @@ -463,10 +460,12 @@ mod tests { use reth_eth_wire::BlockBody; use reth_interfaces::{ db::{models::StoredBlockBody, tables, DbCursorRO, DbTx, DbTxMut}, - p2p::bodies::{ - client::BodiesClient, - downloader::{BodiesStream, BodyDownloader}, - error::{BodiesClientError, DownloadError}, + p2p::{ + bodies::{ + client::BodiesClient, + downloader::{BodiesStream, BodyDownloader}, + }, + error::RequestResult, }, test_utils::{generators::random_block_range, TestConsensus}, }; @@ -477,9 +476,7 @@ mod tests { pub(crate) const GENESIS_HASH: H256 = H256::zero(); /// A helper to create a collection of resulted-wrapped block bodies keyed by their hash. - pub(crate) fn body_by_hash( - block: &BlockLocked, - ) -> (H256, Result) { + pub(crate) fn body_by_hash(block: &BlockLocked) -> (H256, RequestResult) { ( block.hash(), Ok(BlockBody { @@ -492,7 +489,7 @@ mod tests { /// A helper struct for running the [BodyStage]. pub(crate) struct BodyTestRunner { pub(crate) consensus: Arc, - responses: HashMap>, + responses: HashMap>, db: TestStageDB, batch_size: u64, } @@ -515,7 +512,7 @@ mod tests { pub(crate) fn set_responses( &mut self, - responses: HashMap>, + responses: HashMap>, ) { self.responses = responses; } @@ -648,7 +645,7 @@ mod tests { #[async_trait::async_trait] impl BodiesClient for NoopClient { - async fn get_block_body(&self, _: H256) -> Result { + async fn get_block_body(&self, _: Vec) -> RequestResult> { panic!("Noop client should not be called") } } @@ -657,11 +654,11 @@ mod tests { /// A [BodyDownloader] that is backed by an internal [HashMap] for testing. #[derive(Debug, Default, Clone)] pub(crate) struct TestBodyDownloader { - responses: HashMap>, + responses: HashMap>, } impl TestBodyDownloader { - pub(crate) fn new(responses: HashMap>) -> Self { + pub(crate) fn new(responses: HashMap>) -> Self { Self { responses } } }