refactor: bodies client API (#257)

* refactor: bodies client API

* chore: fix errors
This commit is contained in:
Matthias Seitz
2022-11-26 10:04:42 +01:00
committed by GitHub
parent 5d5b83d575
commit fd840e1c66
11 changed files with 150 additions and 226 deletions

View File

@@ -1,8 +1,7 @@
use crate::p2p::error::RequestResult;
use async_trait::async_trait;
use reth_eth_wire::BlockBody;
use reth_primitives::H256;
use crate::p2p::bodies::error::BodiesClientError;
use async_trait::async_trait;
use std::fmt::Debug;
/// A client capable of downloading block bodies.
@@ -10,5 +9,5 @@ use std::fmt::Debug;
#[auto_impl::auto_impl(&, Arc, Box)]
pub trait BodiesClient: Send + Sync + Debug {
/// Fetches the block body for the requested block.
async fn get_block_body(&self, hash: H256) -> Result<BlockBody, BodiesClientError>;
async fn get_block_body(&self, hash: Vec<H256>) -> RequestResult<Vec<BlockBody>>;
}

View File

@@ -1,5 +1,5 @@
use super::client::BodiesClient;
use crate::p2p::bodies::error::DownloadError;
use crate::p2p::error::RequestResult;
use reth_eth_wire::BlockBody;
use reth_primitives::{BlockNumber, H256};
use std::pin::Pin;
@@ -38,4 +38,4 @@ pub trait BodyDownloader: Sync + Send {
/// A stream of block bodies.
pub type BodiesStream<'a> =
Pin<Box<dyn Stream<Item = Result<(BlockNumber, H256, BlockBody), DownloadError>> + Send + 'a>>;
Pin<Box<dyn Stream<Item = RequestResult<(BlockNumber, H256, BlockBody)>> + Send + 'a>>;

View File

@@ -1,51 +0,0 @@
use crate::p2p::error::RequestError;
use reth_primitives::H256;
use thiserror::Error;
/// Body client errors.
#[derive(Error, Debug, Clone)]
pub enum BodiesClientError {
/// Timed out while waiting for a response.
#[error("Timed out while getting bodies for block {header_hash}.")]
Timeout {
/// The header hash of the block that timed out.
header_hash: H256,
},
/// The client encountered an internal error.
#[error(transparent)]
Internal(#[from] RequestError),
}
/// Body downloader errors.
#[derive(Error, Debug, Clone)]
pub enum DownloadError {
/// Timed out while waiting for a response.
#[error("Timed out while getting bodies for block {header_hash}.")]
Timeout {
/// The header hash of the block that timed out.
header_hash: H256,
},
/// The [BodiesClient] used by the downloader experienced an error.
#[error("The downloader client encountered an error.")]
Client {
/// The underlying client error.
#[source]
source: BodiesClientError,
},
}
impl From<BodiesClientError> for DownloadError {
fn from(error: BodiesClientError) -> Self {
match error {
BodiesClientError::Timeout { header_hash } => DownloadError::Timeout { header_hash },
_ => DownloadError::Client { source: error },
}
}
}
impl DownloadError {
/// Indicates whether this error is retryable or fatal.
pub fn is_retryable(&self) -> bool {
matches!(self, DownloadError::Timeout { .. })
}
}

View File

@@ -3,6 +3,3 @@ pub mod client;
/// Block body downloaders.
pub mod downloader;
/// Error types.
pub mod error;

View File

@@ -21,6 +21,15 @@ pub enum RequestError {
BadResponse,
}
// === impl RequestError ===
impl RequestError {
/// Indicates whether this error is retryable or fatal.
pub fn is_retryable(&self) -> bool {
matches!(self, RequestError::Timeout | RequestError::ConnectionDropped)
}
}
impl<T> From<mpsc::error::SendError<T>> for RequestError {
fn from(_: mpsc::error::SendError<T>) -> Self {
RequestError::ChannelClosed

View File

@@ -1,33 +1,27 @@
use crate::p2p::bodies::{client::BodiesClient, error::BodiesClientError};
use crate::p2p::{bodies::client::BodiesClient, error::RequestResult};
use async_trait::async_trait;
use reth_eth_wire::BlockBody;
use reth_primitives::H256;
use std::fmt::{Debug, Formatter};
/// A test client for fetching bodies
pub struct TestBodiesClient<F>
where
F: Fn(H256) -> Result<BlockBody, BodiesClientError>,
{
pub struct TestBodiesClient<F> {
/// The function that is called on each body request.
pub responder: F,
}
impl<F> Debug for TestBodiesClient<F>
where
F: Fn(H256) -> Result<BlockBody, BodiesClientError>,
{
impl<F> Debug for TestBodiesClient<F> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("TestBodiesClient").finish()
f.debug_struct("TestBodiesClient").finish_non_exhaustive()
}
}
#[async_trait]
impl<F> BodiesClient for TestBodiesClient<F>
where
F: Fn(H256) -> Result<BlockBody, BodiesClientError> + Send + Sync,
F: Fn(Vec<H256>) -> RequestResult<Vec<BlockBody>> + Send + Sync,
{
async fn get_block_body(&self, hash: H256) -> Result<BlockBody, BodiesClientError> {
(self.responder)(hash)
async fn get_block_body(&self, hashes: Vec<H256>) -> RequestResult<Vec<BlockBody>> {
(self.responder)(hashes)
}
}

View File

@@ -1,10 +1,12 @@
use backon::{ExponentialBackoff, Retryable};
use futures_util::{stream, StreamExt};
use reth_eth_wire::BlockBody;
use reth_interfaces::p2p::bodies::{
client::BodiesClient,
downloader::{BodiesStream, BodyDownloader},
error::{BodiesClientError, DownloadError},
use reth_interfaces::p2p::{
bodies::{
client::BodiesClient,
downloader::{BodiesStream, BodyDownloader},
},
error::RequestResult,
};
use reth_primitives::{BlockNumber, H256};
use std::sync::Arc;
@@ -74,16 +76,9 @@ impl<C: BodiesClient> ConcurrentDownloader<C> {
&self,
block_number: BlockNumber,
header_hash: H256,
) -> Result<(BlockNumber, H256, BlockBody), DownloadError> {
match self.client.get_block_body(header_hash).await {
Ok(body) => Ok((block_number, header_hash, body)),
Err(err) => Err(match err {
BodiesClientError::Timeout { header_hash } => {
DownloadError::Timeout { header_hash }
}
err => DownloadError::Client { source: err },
}),
}
) -> RequestResult<(BlockNumber, H256, BlockBody)> {
let mut body = self.client.get_block_body(vec![header_hash]).await?;
Ok((block_number, header_hash, body.remove(0)))
}
}
@@ -91,16 +86,13 @@ impl<C: BodiesClient> ConcurrentDownloader<C> {
#[cfg(test)]
mod tests {
use super::*;
use crate::concurrent::{
tests::test_utils::{generate_bodies, TestClient},
ConcurrentDownloader,
use crate::{
concurrent::ConcurrentDownloader,
test_utils::{generate_bodies, TestClient},
};
use assert_matches::assert_matches;
use futures_util::stream::{StreamExt, TryStreamExt};
use reth_interfaces::p2p::{
bodies::{downloader::BodyDownloader, error::BodiesClientError},
error::RequestError,
};
use reth_interfaces::p2p::{bodies::downloader::BodyDownloader, error::RequestError};
use reth_primitives::H256;
use std::{
sync::{
@@ -117,15 +109,15 @@ mod tests {
// Generate some random blocks
let (hashes, mut bodies) = generate_bodies(0..20);
let downloader = ConcurrentDownloader::new(Arc::new(TestClient::new(|hash: H256| {
let downloader = ConcurrentDownloader::new(Arc::new(TestClient::new(|hash: Vec<H256>| {
let mut bodies = bodies.clone();
async move {
// Simulate that the request for this (random) block takes 0-100ms
tokio::time::sleep(Duration::from_millis(hash.to_low_u64_be() % 100)).await;
tokio::time::sleep(Duration::from_millis(hash[0].to_low_u64_be() % 100)).await;
Ok(bodies
.remove(&hash)
.expect("Downloader asked for a block it should not ask for"))
Ok(vec![bodies
.remove(&hash[0])
.expect("Downloader asked for a block it should not ask for")])
}
})));
@@ -151,15 +143,14 @@ mod tests {
/// Checks that non-retryable errors bubble up
#[tokio::test]
async fn client_failure() {
let downloader = ConcurrentDownloader::new(Arc::new(TestClient::new(|_: H256| async {
Err(BodiesClientError::Internal(RequestError::ChannelClosed))
})));
let downloader =
ConcurrentDownloader::new(Arc::new(TestClient::new(|_: Vec<H256>| async {
Err(RequestError::ChannelClosed)
})));
assert_matches!(
downloader.bodies_stream(&[(0, H256::zero())]).next().await,
Some(Err(DownloadError::Client {
source: BodiesClientError::Internal(RequestError::ChannelClosed)
}))
Some(Err(RequestError::ChannelClosed))
);
}
@@ -168,14 +159,15 @@ mod tests {
async fn retries_timeouts() {
let retries_left = Arc::new(AtomicUsize::new(3));
let downloader =
ConcurrentDownloader::new(Arc::new(TestClient::new(|header_hash: H256| {
ConcurrentDownloader::new(Arc::new(TestClient::new(|mut header_hash: Vec<H256>| {
let retries_left = retries_left.clone();
let _header_hash = header_hash.remove(0);
async move {
if retries_left.load(Ordering::SeqCst) > 0 {
retries_left.fetch_sub(1, Ordering::SeqCst);
Err(BodiesClientError::Timeout { header_hash })
Err(RequestError::Timeout)
} else {
Ok(BlockBody { transactions: vec![], ommers: vec![] })
Ok(vec![BlockBody { transactions: vec![], ommers: vec![] }])
}
}
})));
@@ -199,14 +191,15 @@ mod tests {
async fn too_many_retries() {
let retries_left = Arc::new(AtomicUsize::new(3));
let downloader =
ConcurrentDownloader::new(Arc::new(TestClient::new(|header_hash: H256| {
ConcurrentDownloader::new(Arc::new(TestClient::new(|mut header_hash: Vec<H256>| {
let _header_hash = header_hash.remove(0);
let retries_left = retries_left.clone();
async move {
if retries_left.load(Ordering::SeqCst) > 0 {
retries_left.fetch_sub(1, Ordering::SeqCst);
Err(BodiesClientError::Timeout { header_hash })
Err(RequestError::Timeout)
} else {
Ok(BlockBody { transactions: vec![], ommers: vec![] })
Ok(vec![BlockBody { transactions: vec![], ommers: vec![] }])
}
}
})))
@@ -214,93 +207,8 @@ mod tests {
assert_matches!(
downloader.bodies_stream(&[(0, H256::zero())]).next().await,
Some(Err(DownloadError::Timeout { header_hash })) => {
assert_eq!(header_hash, H256::zero())
}
Some(Err(RequestError::Timeout))
);
assert_eq!(Arc::try_unwrap(retries_left).unwrap().into_inner(), 2);
}
mod test_utils {
use async_trait::async_trait;
use reth_eth_wire::BlockBody;
use reth_interfaces::{
p2p::bodies::{client::BodiesClient, error::BodiesClientError},
test_utils::generators::random_block_range,
};
use reth_primitives::{BlockNumber, H256};
use std::{
collections::HashMap,
fmt::{Debug, Formatter},
future::Future,
sync::Arc,
};
use tokio::sync::Mutex;
/// Generate a set of bodies and their corresponding block hashes
pub(crate) fn generate_bodies(
rng: std::ops::Range<u64>,
) -> (Vec<(BlockNumber, H256)>, HashMap<H256, BlockBody>) {
let blocks = random_block_range(rng, H256::zero());
let hashes: Vec<(BlockNumber, H256)> =
blocks.iter().map(|block| (block.number, block.hash())).collect();
let bodies: HashMap<H256, BlockBody> = blocks
.into_iter()
.map(|block| {
(
block.hash(),
BlockBody {
transactions: block.body,
ommers: block
.ommers
.into_iter()
.map(|header| header.unseal())
.collect(),
},
)
})
.collect();
(hashes, bodies)
}
/// A [BodiesClient] for testing.
pub(crate) struct TestClient<F, Fut>(pub(crate) Arc<Mutex<F>>)
where
F: FnMut(H256) -> Fut + Send + Sync,
Fut: Future<Output = Result<BlockBody, BodiesClientError>> + Send;
impl<F, Fut> Debug for TestClient<F, Fut>
where
F: FnMut(H256) -> Fut + Send + Sync,
Fut: Future<Output = Result<BlockBody, BodiesClientError>> + Send,
{
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("TestClient").finish()
}
}
impl<F, Fut> TestClient<F, Fut>
where
F: FnMut(H256) -> Fut + Send + Sync,
Fut: Future<Output = Result<BlockBody, BodiesClientError>> + Send,
{
pub(crate) fn new(f: F) -> Self {
Self(Arc::new(Mutex::new(f)))
}
}
#[async_trait]
impl<F, Fut> BodiesClient for TestClient<F, Fut>
where
F: FnMut(H256) -> Fut + Send + Sync,
Fut: Future<Output = Result<BlockBody, BodiesClientError>> + Send,
{
async fn get_block_body(&self, hash: H256) -> Result<BlockBody, BodiesClientError> {
let f = &mut *self.0.lock().await;
(f)(hash).await
}
}
}
}

View File

@@ -9,3 +9,6 @@
/// A naive concurrent downloader.
pub mod concurrent;
#[cfg(test)]
mod test_utils;

View File

@@ -0,0 +1,67 @@
//! Test helper impls
use async_trait::async_trait;
use reth_eth_wire::BlockBody;
use reth_interfaces::{
p2p::{bodies::client::BodiesClient, error::RequestResult},
test_utils::generators::random_block_range,
};
use reth_primitives::{BlockNumber, H256};
use std::{
collections::HashMap,
fmt::{Debug, Formatter},
future::Future,
sync::Arc,
};
use tokio::sync::Mutex;
/// Generate a set of bodies and their corresponding block hashes
pub(crate) fn generate_bodies(
rng: std::ops::Range<u64>,
) -> (Vec<(BlockNumber, H256)>, HashMap<H256, BlockBody>) {
let blocks = random_block_range(rng, H256::zero());
let hashes: Vec<(BlockNumber, H256)> =
blocks.iter().map(|block| (block.number, block.hash())).collect();
let bodies: HashMap<H256, BlockBody> = blocks
.into_iter()
.map(|block| {
(
block.hash(),
BlockBody {
transactions: block.body,
ommers: block.ommers.into_iter().map(|header| header.unseal()).collect(),
},
)
})
.collect();
(hashes, bodies)
}
/// A [BodiesClient] for testing.
pub(crate) struct TestClient<F>(pub(crate) Arc<Mutex<F>>);
impl<F> Debug for TestClient<F> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("TestClient").finish_non_exhaustive()
}
}
impl<F> TestClient<F> {
pub(crate) fn new(f: F) -> Self {
Self(Arc::new(Mutex::new(f)))
}
}
#[async_trait]
impl<F, Fut> BodiesClient for TestClient<F>
where
F: FnMut(Vec<H256>) -> Fut + Send + Sync,
Fut: Future<Output = RequestResult<Vec<BlockBody>>> + Send,
{
async fn get_block_body(&self, hash: Vec<H256>) -> RequestResult<Vec<BlockBody>> {
let f = &mut *self.0.lock().await;
(f)(hash).await
}
}

View File

@@ -1,13 +1,13 @@
//! A client implementation that can interact with the network and download data.
use crate::fetch::{DownloadRequest, StatusUpdate};
use reth_eth_wire::BlockHeaders;
use reth_eth_wire::{BlockBody, BlockHeaders};
use reth_interfaces::p2p::{
bodies::client::BodiesClient,
error::RequestResult,
headers::client::{HeadersClient, HeadersRequest},
};
use reth_primitives::{Header, H256, U256};
use reth_primitives::{H256, U256};
use tokio::sync::{mpsc::UnboundedSender, oneshot};
/// Front-end API for fetching data from the network.
@@ -19,24 +19,25 @@ pub struct FetchClient {
pub(crate) status_tx: UnboundedSender<StatusUpdate>,
}
impl FetchClient {
/// Sends a `GetBlockHeaders` request to an available peer.
pub async fn get_block_headers(&self, request: HeadersRequest) -> RequestResult<Vec<Header>> {
let (response, rx) = oneshot::channel();
self.request_tx.send(DownloadRequest::GetBlockHeaders { request, response })?;
rx.await?
}
}
#[async_trait::async_trait]
impl HeadersClient for FetchClient {
fn update_status(&self, height: u64, hash: H256, total_difficulty: U256) {
let _ = self.status_tx.send(StatusUpdate { height, hash, total_difficulty });
}
/// Sends a `GetBlockHeaders` request to an available peer.
async fn get_headers(&self, request: HeadersRequest) -> RequestResult<BlockHeaders> {
let (response, rx) = oneshot::channel();
self.request_tx.send(DownloadRequest::GetBlockHeaders { request, response })?;
rx.await?.map(BlockHeaders::from)
}
}
#[async_trait::async_trait]
impl BodiesClient for FetchClient {
async fn get_block_body(&self, request: Vec<H256>) -> RequestResult<Vec<BlockBody>> {
let (response, rx) = oneshot::channel();
self.request_tx.send(DownloadRequest::GetBlockBodies { request, response })?;
rx.await?
}
}

View File

@@ -234,7 +234,7 @@ mod tests {
PREV_STAGE_ID,
};
use assert_matches::assert_matches;
use reth_interfaces::{consensus, p2p::bodies::error::DownloadError};
use reth_interfaces::{consensus, p2p::error::RequestError};
use std::collections::HashMap;
use test_utils::*;
@@ -437,10 +437,7 @@ mod tests {
// overwrite responses
let header = blocks.last().unwrap();
runner.set_responses(HashMap::from([(
header.hash(),
Err(DownloadError::Timeout { header_hash: header.hash() }),
)]));
runner.set_responses(HashMap::from([(header.hash(), Err(RequestError::Timeout))]));
// Run the stage
let rx = runner.execute(input);
@@ -463,10 +460,12 @@ mod tests {
use reth_eth_wire::BlockBody;
use reth_interfaces::{
db::{models::StoredBlockBody, tables, DbCursorRO, DbTx, DbTxMut},
p2p::bodies::{
client::BodiesClient,
downloader::{BodiesStream, BodyDownloader},
error::{BodiesClientError, DownloadError},
p2p::{
bodies::{
client::BodiesClient,
downloader::{BodiesStream, BodyDownloader},
},
error::RequestResult,
},
test_utils::{generators::random_block_range, TestConsensus},
};
@@ -477,9 +476,7 @@ mod tests {
pub(crate) const GENESIS_HASH: H256 = H256::zero();
/// A helper to create a collection of resulted-wrapped block bodies keyed by their hash.
pub(crate) fn body_by_hash(
block: &BlockLocked,
) -> (H256, Result<BlockBody, DownloadError>) {
pub(crate) fn body_by_hash(block: &BlockLocked) -> (H256, RequestResult<BlockBody>) {
(
block.hash(),
Ok(BlockBody {
@@ -492,7 +489,7 @@ mod tests {
/// A helper struct for running the [BodyStage].
pub(crate) struct BodyTestRunner {
pub(crate) consensus: Arc<TestConsensus>,
responses: HashMap<H256, Result<BlockBody, DownloadError>>,
responses: HashMap<H256, RequestResult<BlockBody>>,
db: TestStageDB,
batch_size: u64,
}
@@ -515,7 +512,7 @@ mod tests {
pub(crate) fn set_responses(
&mut self,
responses: HashMap<H256, Result<BlockBody, DownloadError>>,
responses: HashMap<H256, RequestResult<BlockBody>>,
) {
self.responses = responses;
}
@@ -648,7 +645,7 @@ mod tests {
#[async_trait::async_trait]
impl BodiesClient for NoopClient {
async fn get_block_body(&self, _: H256) -> Result<BlockBody, BodiesClientError> {
async fn get_block_body(&self, _: Vec<H256>) -> RequestResult<Vec<BlockBody>> {
panic!("Noop client should not be called")
}
}
@@ -657,11 +654,11 @@ mod tests {
/// A [BodyDownloader] that is backed by an internal [HashMap] for testing.
#[derive(Debug, Default, Clone)]
pub(crate) struct TestBodyDownloader {
responses: HashMap<H256, Result<BlockBody, DownloadError>>,
responses: HashMap<H256, RequestResult<BlockBody>>,
}
impl TestBodyDownloader {
pub(crate) fn new(responses: HashMap<H256, Result<BlockBody, DownloadError>>) -> Self {
pub(crate) fn new(responses: HashMap<H256, RequestResult<BlockBody>>) -> Self {
Self { responses }
}
}