Merge branch 'main' of github.com:foundry-rs/reth into rkrasiuk/stage-test-suite

This commit is contained in:
Roman Krasiuk
2022-11-16 09:15:09 +02:00
93 changed files with 3717 additions and 789 deletions

View File

@@ -1,20 +1,30 @@
use async_trait::async_trait;
use reth_primitives::{BlockHash, BlockNumber, SealedHeader, H256};
use reth_primitives::{BlockHash, BlockLocked, BlockNumber, SealedHeader, H256};
use tokio::sync::watch::Receiver;
/// Re-export forkchoice state
pub use reth_rpc_types::engine::ForkchoiceState;
/// Consensus is a protocol that chooses canonical chain.
/// We are checking validity of block header here.
#[async_trait]
#[auto_impl::auto_impl(&, Arc)]
pub trait Consensus: Send + Sync {
/// Get a receiver for the fork choice state
fn fork_choice_state(&self) -> Receiver<ForkchoiceState>;
/// Validate if header is correct and follows consensus specification
/// Validate if header is correct and follows consensus specification.
///
/// **This should not be called for the genesis block**.
fn validate_header(&self, header: &SealedHeader, parent: &SealedHeader) -> Result<(), Error>;
/// Validate a block disregarding world state, i.e. things that can be checked before sender
/// recovery and execution.
///
/// See the Yellow Paper sections 4.3.2 "Holistic Validity", 4.3.4 "Block Header Validity", and
/// 11.1 "Ommer Validation".
///
/// **This should not be called for the genesis block**.
fn pre_validate_block(&self, block: &BlockLocked) -> Result<(), Error>;
}
/// Consensus Errors

View File

@@ -1,4 +1,7 @@
use crate::db::{models::accounts::AccountBeforeTx, Compress, Decompress, Error};
use crate::db::{
models::{accounts::AccountBeforeTx, StoredBlockBody},
Compress, Decompress, Error,
};
use parity_scale_codec::decode_from_bytes;
use reth_primitives::*;
@@ -53,7 +56,16 @@ impl ScaleValue for Vec<u8> {}
impl sealed::Sealed for Vec<u8> {}
impl_scale!(U256, H256, H160);
impl_scale!(Header, Account, Log, Receipt, TxType, StorageEntry, TransactionSigned);
impl_scale!(
Header,
Account,
Log,
Receipt,
TxType,
StorageEntry,
TransactionSigned,
StoredBlockBody
);
impl_scale!(AccountBeforeTx);
impl_scale_value!(u8, u32, u16, u64);

View File

@@ -74,9 +74,9 @@ pub trait Database: for<'a> DatabaseGAT<'a> {
/// Sealed trait which cannot be implemented by 3rd parties, exposed only for implementers
pub trait DbTxGAT<'a, __ImplicitBounds: Sealed = Bounds<&'a Self>>: Send + Sync {
/// Cursor GAT
type Cursor<T: Table>: DbCursorRO<'a, T>;
type Cursor<T: Table>: DbCursorRO<'a, T> + Send + Sync;
/// DupCursor GAT
type DupCursor<T: DupSort>: DbDupCursorRO<'a, T> + DbCursorRO<'a, T>;
type DupCursor<T: DupSort>: DbDupCursorRO<'a, T> + DbCursorRO<'a, T> + Send + Sync;
}
/// Implements the GAT method from:
@@ -85,12 +85,14 @@ pub trait DbTxGAT<'a, __ImplicitBounds: Sealed = Bounds<&'a Self>>: Send + Sync
/// Sealed trait which cannot be implemented by 3rd parties, exposed only for implementers
pub trait DbTxMutGAT<'a, __ImplicitBounds: Sealed = Bounds<&'a Self>>: Send + Sync {
/// Cursor GAT
type CursorMut<T: Table>: DbCursorRW<'a, T> + DbCursorRO<'a, T>;
type CursorMut<T: Table>: DbCursorRW<'a, T> + DbCursorRO<'a, T> + Send + Sync;
/// DupCursor GAT
type DupCursorMut<T: DupSort>: DbDupCursorRW<'a, T>
+ DbCursorRW<'a, T>
+ DbDupCursorRO<'a, T>
+ DbCursorRO<'a, T>;
+ DbCursorRO<'a, T>
+ Send
+ Sync;
}
/// Read only transaction
@@ -190,7 +192,9 @@ pub trait DbCursorRW<'tx, T: Table> {
/// exists in a table, and insert a new row if the specified value doesn't already exist
fn upsert(&mut self, key: T::Key, value: T::Value) -> Result<(), Error>;
/// Append value to next cursor item
/// Append value to next cursor item.
///
/// This is efficient for pre-sorted data. If the data is not pre-sorted, use [`insert`].
fn append(&mut self, key: T::Key, value: T::Value) -> Result<(), Error>;
/// Delete current value that cursor points to
@@ -201,7 +205,10 @@ pub trait DbCursorRW<'tx, T: Table> {
pub trait DbDupCursorRW<'tx, T: DupSort> {
/// Append value to next cursor item
fn delete_current_duplicates(&mut self) -> Result<(), Error>;
/// Append duplicate value
/// Append duplicate value.
///
/// This is efficient for pre-sorted data. If the data is not pre-sorted, use [`insert`].
fn append_dup(&mut self, key: T::Key, value: T::Value) -> Result<(), Error>;
}

View File

@@ -8,14 +8,30 @@ use crate::{
impl_fixed_arbitrary,
};
use bytes::Bytes;
use reth_primitives::{BlockHash, BlockNumber, H256};
use reth_codecs::main_codec;
use reth_primitives::{BlockHash, BlockNumber, Header, TxNumber, H256};
use serde::{Deserialize, Serialize};
/// Total chain number of transactions. Key for [`CumulativeTxCount`].
pub type NumTransactions = u64;
/// Number of transactions in the block. Value for [`BlockBodies`].
pub type NumTxesInBlock = u16;
/// The storage representation of a block body.
///
/// A block body is stored as a pointer to the first transaction in the block (`base_tx_id`), a
/// count of how many transactions are in the block, and the headers of the block's uncles.
///
/// The [TxNumber]s for all the transactions in the block are `base_tx_id..(base_tx_id +
/// tx_amount)`.
#[derive(Debug)]
#[main_codec]
pub struct StoredBlockBody {
/// The ID of the first transaction in the block.
pub base_tx_id: TxNumber,
/// The number of transactions in the block.
pub tx_amount: u64,
/// The block headers of this block's uncles.
pub ommers: Vec<Header>,
}
/// Hash of the block header. Value for [`CanonicalHeaders`]
pub type HeaderHash = H256;

View File

@@ -3,7 +3,7 @@
use crate::db::{
models::{
accounts::{AccountBeforeTx, TxNumberAddress},
blocks::{BlockNumHash, HeaderHash, NumTransactions, NumTxesInBlock},
blocks::{BlockNumHash, HeaderHash, NumTransactions, StoredBlockBody},
ShardedKey,
},
DupSort,
@@ -13,7 +13,7 @@ use reth_primitives::{
TransactionSigned, TxNumber, H256,
};
/// Enum for the type of table present in libmdbx.
/// Enum for the types of tables present in libmdbx.
#[derive(Debug)]
pub enum TableType {
/// key value table
@@ -119,8 +119,10 @@ table!(
Headers => BlockNumHash => Header);
table!(
/// Stores the number of transactions of a block.
BlockBodies => BlockNumHash => NumTxesInBlock);
/// Stores a pointer to the first transaction in the block, the number of transactions in the block, and the uncles/ommers of the block.
///
/// The transaction IDs point to the [`Transactions`] table.
BlockBodies => BlockNumHash => StoredBlockBody);
table!(
/// Stores the maximum [`TxNumber`] from which this particular block starts.
@@ -131,19 +133,19 @@ table!(
NonCanonicalTransactions => BlockNumHashTxNumber => TransactionSigned);
table!(
/// Stores the transaction body from canonical transactions. Canonical only
/// (Canonical only) Stores the transaction body for canonical transactions.
Transactions => TxNumber => TransactionSigned);
table!(
/// Stores transaction receipts. Canonical only
/// (Canonical only) Stores transaction receipts.
Receipts => TxNumber => Receipt);
table!(
/// Stores transaction logs. Canonical only
/// (Canonical only) Stores transaction logs.
Logs => TxNumber => Receipt);
table!(
/// Stores the current state of an Account.
/// Stores the current state of an [`Account`].
PlainAccountState => Address => Account);
table!(
@@ -200,27 +202,27 @@ table!(
AccountHistory => ShardedKey<Address> => TxNumberList);
table!(
/// Stores the transaction numbers that changed each storage key.
/// Stores pointers to transactions that changed each storage key.
StorageHistory => AddressStorageKey => TxNumberList);
dupsort!(
/// Stores state of an account before a certain transaction changed it.
/// Stores the state of an account before a certain transaction changed it.
AccountChangeSet => TxNumber => [Address] AccountBeforeTx);
dupsort!(
/// Stores state of a storage key before a certain transaction changed it.
/// Stores the state of a storage key before a certain transaction changed it.
StorageChangeSet => TxNumberAddress => [H256] StorageEntry);
table!(
/// Stores the transaction sender from each transaction.
/// Stores the transaction sender for each transaction.
TxSenders => TxNumber => Address); // Is it necessary? if so, inverted index index so we dont repeat addresses?
table!(
/// Config.
/// Configuration values.
Config => ConfigKey => ConfigValue);
table!(
/// Stores the block number of each stage id.
/// Stores the highest synced block number of each stage.
SyncStage => StageId => BlockNumber);
///

View File

@@ -0,0 +1,14 @@
use reth_eth_wire::BlockBody;
use reth_primitives::H256;
use crate::p2p::bodies::error::BodiesClientError;
use async_trait::async_trait;
use std::fmt::Debug;
/// A client capable of downloading block bodies.
#[async_trait]
#[auto_impl::auto_impl(&, Arc, Box)]
pub trait BodiesClient: Send + Sync + Debug {
/// Fetches the block body for the requested block.
async fn get_block_body(&self, hash: H256) -> Result<BlockBody, BodiesClientError>;
}

View File

@@ -0,0 +1,44 @@
use super::client::BodiesClient;
use crate::p2p::bodies::error::DownloadError;
use reth_eth_wire::BlockBody;
use reth_primitives::{BlockNumber, H256};
use std::{pin::Pin, time::Duration};
use tokio_stream::Stream;
/// A downloader capable of fetching block bodies from header hashes.
///
/// A downloader represents a distinct strategy for submitting requests to download block bodies,
/// while a [BodiesClient] represents a client capable of fulfilling these requests.
pub trait BodyDownloader: Sync + Send {
/// The [BodiesClient] used to fetch the block bodies
type Client: BodiesClient;
/// The request timeout duration
fn timeout(&self) -> Duration;
/// The block bodies client
fn client(&self) -> &Self::Client;
/// Download the bodies from `starting_block` (inclusive) up until `target_block` (inclusive).
///
/// The returned stream will always emit bodies in the order they were requested, but multiple
/// requests may be in flight at the same time.
///
/// The stream may exit early in some cases. Thus, a downloader can only at a minimum guarantee:
///
/// - All emitted bodies map onto a request
/// - The emitted bodies are emitted in order: i.e. the body for the first block is emitted
/// first, even if it was not fetched first.
///
/// It is *not* guaranteed that all the requested bodies are fetched: the downloader may close
/// the stream before the entire range has been fetched for any reason
fn bodies_stream<'a, 'b, I>(&'a self, headers: I) -> BodiesStream<'a>
where
I: IntoIterator<Item = &'b (BlockNumber, H256)>,
<I as IntoIterator>::IntoIter: Send + 'b,
'b: 'a;
}
/// A stream of block bodies.
pub type BodiesStream<'a> =
Pin<Box<dyn Stream<Item = Result<(BlockNumber, H256, BlockBody), DownloadError>> + Send + 'a>>;

View File

@@ -0,0 +1,51 @@
use crate::p2p::error::RequestError;
use reth_primitives::H256;
use thiserror::Error;
/// Body client errors.
#[derive(Error, Debug, Clone)]
pub enum BodiesClientError {
/// Timed out while waiting for a response.
#[error("Timed out while getting bodies for block {header_hash}.")]
Timeout {
/// The header hash of the block that timed out.
header_hash: H256,
},
/// The client encountered an internal error.
#[error(transparent)]
Internal(#[from] RequestError),
}
/// Body downloader errors.
#[derive(Error, Debug, Clone)]
pub enum DownloadError {
/// Timed out while waiting for a response.
#[error("Timed out while getting bodies for block {header_hash}.")]
Timeout {
/// The header hash of the block that timed out.
header_hash: H256,
},
/// The [BodiesClient] used by the downloader experienced an error.
#[error("The downloader client encountered an error.")]
Client {
/// The underlying client error.
#[source]
source: BodiesClientError,
},
}
impl From<BodiesClientError> for DownloadError {
fn from(error: BodiesClientError) -> Self {
match error {
BodiesClientError::Timeout { header_hash } => DownloadError::Timeout { header_hash },
_ => DownloadError::Client { source: error },
}
}
}
impl DownloadError {
/// Indicates whether this error is retryable or fatal.
pub fn is_retryable(&self) -> bool {
matches!(self, DownloadError::Timeout { .. })
}
}

View File

@@ -0,0 +1,8 @@
/// Traits and types for block body clients.
pub mod client;
/// Block body downloaders.
pub mod downloader;
/// Error types.
pub mod error;

View File

@@ -4,13 +4,15 @@ use tokio::sync::{mpsc, oneshot};
pub type RequestResult<T> = Result<T, RequestError>;
/// Error variants that can happen when sending requests to a session.
#[derive(Debug, thiserror::Error)]
#[derive(Debug, thiserror::Error, Clone)]
#[allow(missing_docs)]
pub enum RequestError {
#[error("Closed channel to the peer.")]
ChannelClosed,
#[error("Not connected to the peer.")]
NotConnected,
#[error("Connection to a peer dropped while handling the request.")]
ConnectionDropped,
#[error("Capability Message is not supported by remote peer.")]
UnsupportedCapability,
#[error("Request timed out while awaiting response.")]

View File

@@ -1,8 +1,9 @@
use crate::p2p::MessageStream;
use reth_primitives::{rpc::BlockId, Header, H256, H512};
use reth_primitives::{Header, H256, H512};
use async_trait::async_trait;
use reth_primitives::BlockHashOrNumber;
use std::{collections::HashSet, fmt::Debug};
/// Each peer returns a list of headers and the request id corresponding
@@ -31,7 +32,7 @@ impl From<(u64, Vec<Header>)> for HeadersResponse {
#[derive(Clone, Debug)]
pub struct HeadersRequest {
/// The starting block
pub start: BlockId,
pub start: BlockHashOrNumber,
/// The response max size
pub limit: u64,
/// Flag indicating whether the blocks should

View File

@@ -1,60 +1,18 @@
use super::client::{HeadersClient, HeadersRequest, HeadersStream};
use crate::consensus::Consensus;
use crate::{consensus::Consensus, p2p::headers::error::DownloadError};
use async_trait::async_trait;
use reth_primitives::{
rpc::{BlockId, BlockNumber},
Header, SealedHeader, H256,
};
use reth_primitives::{BlockHashOrNumber, Header, SealedHeader};
use reth_rpc_types::engine::ForkchoiceState;
use std::{fmt::Debug, time::Duration};
use thiserror::Error;
use std::time::Duration;
use tokio_stream::StreamExt;
/// The downloader error type
#[derive(Error, Debug, Clone)]
pub enum DownloadError {
/// Header validation failed
#[error("Failed to validate header {hash}. Details: {details}.")]
HeaderValidation {
/// Hash of header failing validation
hash: H256,
/// The details of validation failure
details: String,
},
/// Timed out while waiting for request id response.
#[error("Timed out while getting headers for request {request_id}.")]
Timeout {
/// The request id that timed out
request_id: u64,
},
/// Error when checking that the current [`Header`] has the parent's hash as the parent_hash
/// field, and that they have sequential block numbers.
#[error("Headers did not match, current number: {header_number} / current hash: {header_hash}, parent number: {parent_number} / parent_hash: {parent_hash}")]
MismatchedHeaders {
/// The header number being evaluated
header_number: BlockNumber,
/// The header hash being evaluated
header_hash: H256,
/// The parent number being evaluated
parent_number: BlockNumber,
/// The parent hash being evaluated
parent_hash: H256,
},
}
impl DownloadError {
/// Returns bool indicating whether this error is retryable or fatal, in the cases
/// where the peer responds with no headers, or times out.
pub fn is_retryable(&self) -> bool {
matches!(self, DownloadError::Timeout { .. })
}
}
/// The header downloading strategy
/// A downloader capable of fetching block headers.
///
/// A downloader represents a distinct strategy for submitting requests to download block headers,
/// while a [HeadersClient] represents a client capable of fulfilling these requests.
#[async_trait]
#[auto_impl::auto_impl(&, Arc, Box)]
pub trait Downloader: Sync + Send {
pub trait HeaderDownloader: Sync + Send {
/// The Consensus used to verify block validity when
/// downloading
type Consensus: Consensus;
@@ -86,7 +44,7 @@ pub trait Downloader: Sync + Send {
async fn download_headers(
&self,
stream: &mut HeadersStream,
start: BlockId,
start: BlockHashOrNumber,
limit: u64,
) -> Result<Vec<Header>, DownloadError> {
let request_id = rand::random();
@@ -118,9 +76,9 @@ pub trait Downloader: Sync + Send {
})
}
self.consensus().validate_header(header, parent).map_err(|e| {
DownloadError::HeaderValidation { hash: parent.hash(), details: e.to_string() }
})?;
self.consensus()
.validate_header(header, parent)
.map_err(|error| DownloadError::HeaderValidation { hash: parent.hash(), error })?;
Ok(())
}
}

View File

@@ -0,0 +1,44 @@
use crate::consensus;
use reth_primitives::{rpc::BlockNumber, H256};
use thiserror::Error;
/// The downloader error type
#[derive(Error, Debug, Clone)]
pub enum DownloadError {
/// Header validation failed
#[error("Failed to validate header {hash}. Details: {error}.")]
HeaderValidation {
/// Hash of header failing validation
hash: H256,
/// The details of validation failure
#[source]
error: consensus::Error,
},
/// Timed out while waiting for request id response.
#[error("Timed out while getting headers for request {request_id}.")]
Timeout {
/// The request id that timed out
request_id: u64,
},
/// Error when checking that the current [`Header`] has the parent's hash as the parent_hash
/// field, and that they have sequential block numbers.
#[error("Headers did not match, current number: {header_number} / current hash: {header_hash}, parent number: {parent_number} / parent_hash: {parent_hash}")]
MismatchedHeaders {
/// The header number being evaluated
header_number: BlockNumber,
/// The header hash being evaluated
header_hash: H256,
/// The parent number being evaluated
parent_number: BlockNumber,
/// The parent hash being evaluated
parent_hash: H256,
},
}
impl DownloadError {
/// Returns bool indicating whether this error is retryable or fatal, in the cases
/// where the peer responds with no headers, or times out.
pub fn is_retryable(&self) -> bool {
matches!(self, DownloadError::Timeout { .. })
}
}

View File

@@ -9,3 +9,6 @@ pub mod client;
/// [`Consensus`]: crate::consensus::Consensus
/// [`HeadersClient`]: client::HeadersClient
pub mod downloader;
/// Error types.
pub mod error;

View File

@@ -1,3 +1,6 @@
/// Traits for implementing P2P block body clients.
pub mod bodies;
/// Traits for implementing P2P Header Clients. Also includes implementations
/// of a Linear and a Parallel downloader generic over the [`Consensus`] and
/// [`HeadersClient`].

View File

@@ -0,0 +1,33 @@
use crate::p2p::bodies::{client::BodiesClient, error::BodiesClientError};
use async_trait::async_trait;
use reth_eth_wire::BlockBody;
use reth_primitives::H256;
use std::fmt::{Debug, Formatter};
/// A test client for fetching bodies
pub struct TestBodiesClient<F>
where
F: Fn(H256) -> Result<BlockBody, BodiesClientError>,
{
/// The function that is called on each body request.
pub responder: F,
}
impl<F> Debug for TestBodiesClient<F>
where
F: Fn(H256) -> Result<BlockBody, BodiesClientError>,
{
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("TestBodiesClient").finish()
}
}
#[async_trait]
impl<F> BodiesClient for TestBodiesClient<F>
where
F: Fn(H256) -> Result<BlockBody, BodiesClientError> + Send + Sync,
{
async fn get_block_body(&self, hash: H256) -> Result<BlockBody, BodiesClientError> {
(self.responder)(hash)
}
}

View File

@@ -0,0 +1,142 @@
use rand::{thread_rng, Rng};
use reth_primitives::{
proofs, Address, BlockLocked, Bytes, Header, SealedHeader, Signature, Transaction,
TransactionKind, TransactionSigned, H256, U256,
};
// TODO(onbjerg): Maybe we should split this off to its own crate, or move the helpers to the
// relevant crates?
/// Generates a range of random [SealedHeader]s.
///
/// The parent hash of the first header
/// in the result will be equal to `head`.
///
/// The headers are assumed to not be correct if validated.
pub fn random_header_range(rng: std::ops::Range<u64>, head: H256) -> Vec<SealedHeader> {
let mut headers = Vec::with_capacity(rng.end.saturating_sub(rng.start) as usize);
for idx in rng {
headers.push(random_header(
idx,
Some(headers.last().map(|h: &SealedHeader| h.hash()).unwrap_or(head)),
));
}
headers
}
/// Generate a random [SealedHeader].
///
/// The header is assumed to not be correct if validated.
pub fn random_header(number: u64, parent: Option<H256>) -> SealedHeader {
let header = reth_primitives::Header {
number,
nonce: rand::random(),
difficulty: U256::from(rand::random::<u32>()),
parent_hash: parent.unwrap_or_default(),
..Default::default()
};
header.seal()
}
/// Generates a random legacy [Transaction].
///
/// Every field is random, except:
///
/// - The chain ID, which is always 1
/// - The input, which is always nothing
pub fn random_tx() -> Transaction {
Transaction::Legacy {
chain_id: Some(1),
nonce: rand::random::<u16>().into(),
gas_price: rand::random::<u16>().into(),
gas_limit: rand::random::<u16>().into(),
to: TransactionKind::Call(Address::random()),
value: rand::random::<u16>().into(),
input: Bytes::default(),
}
}
/// Generates a random legacy [Transaction] that is signed.
///
/// On top of the considerations of [gen_random_tx], these apply as well:
///
/// - There is no guarantee that the nonce is not used twice for the same account
pub fn random_signed_tx() -> TransactionSigned {
let tx = random_tx();
let hash = tx.signature_hash();
TransactionSigned {
transaction: tx,
hash,
signature: Signature {
// TODO
r: Default::default(),
s: Default::default(),
odd_y_parity: false,
},
}
}
/// Generate a random block filled with a random number of signed transactions (generated using
/// [random_signed_tx]).
///
/// All fields use the default values (and are assumed to be invalid) except for:
///
/// - `parent_hash`
/// - `transactions_root`
/// - `ommers_hash`
///
/// Additionally, `gas_used` and `gas_limit` always exactly match the total `gas_limit` of all
/// transactions in the block.
///
/// The ommer headers are not assumed to be valid.
pub fn random_block(number: u64, parent: Option<H256>) -> BlockLocked {
let mut rng = thread_rng();
// Generate transactions
let transactions: Vec<TransactionSigned> =
(0..rand::random::<u8>()).into_iter().map(|_| random_signed_tx()).collect();
let total_gas = transactions.iter().fold(0, |sum, tx| sum + tx.transaction.gas_limit());
// Generate ommers
let mut ommers = Vec::new();
for _ in 0..rng.gen_range(0..2) {
ommers.push(random_header(number, parent).unseal());
}
// Calculate roots
let transactions_root = proofs::calculate_transaction_root(transactions.iter());
let ommers_hash = proofs::calculate_ommers_root(ommers.iter());
BlockLocked {
header: Header {
parent_hash: parent.unwrap_or_default(),
number,
gas_used: total_gas,
gas_limit: total_gas,
transactions_root,
ommers_hash,
..Default::default()
}
.seal(),
body: transactions,
ommers: ommers.into_iter().map(|ommer| ommer.seal()).collect(),
..Default::default()
}
}
/// Generate a range of random blocks.
///
/// The parent hash of the first block
/// in the result will be equal to `head`.
///
/// See [random_block] for considerations when validating the generated blocks.
pub fn random_block_range(rng: std::ops::Range<u64>, head: H256) -> Vec<BlockLocked> {
let mut blocks = Vec::with_capacity(rng.end.saturating_sub(rng.start) as usize);
for idx in rng {
blocks.push(random_block(
idx,
Some(blocks.last().map(|block: &BlockLocked| block.header.hash()).unwrap_or(head)),
));
}
blocks
}

View File

@@ -1,32 +1,33 @@
//! Testing support for headers related interfaces.
use crate::{
consensus::{self, Consensus},
consensus::{self, Consensus, Error},
p2p::headers::{
client::{HeadersClient, HeadersRequest, HeadersResponse, HeadersStream},
downloader::{DownloadError, Downloader},
downloader::HeaderDownloader,
error::DownloadError,
},
};
use reth_primitives::{Header, SealedHeader, H256, H512, U256};
use reth_primitives::{BlockLocked, Header, SealedHeader, H256, H512};
use reth_rpc_types::engine::ForkchoiceState;
use std::{collections::HashSet, sync::Arc, time::Duration};
use tokio::sync::{broadcast, mpsc, watch};
use tokio_stream::{wrappers::BroadcastStream, StreamExt};
#[derive(Debug)]
/// A test downloader which just returns the values that have been pushed to it.
pub struct TestDownloader {
result: Result<Vec<SealedHeader>, DownloadError>,
#[derive(Debug)]
pub struct TestHeaderDownloader {
client: Arc<TestHeadersClient>,
}
impl TestDownloader {
impl TestHeaderDownloader {
/// Instantiates the downloader with the mock responses
pub fn new(result: Result<Vec<SealedHeader>, DownloadError>) -> Self {
Self { result }
pub fn new(client: Arc<TestHeadersClient>) -> Self {
Self { client }
}
}
#[async_trait::async_trait]
impl Downloader for TestDownloader {
impl HeaderDownloader for TestHeaderDownloader {
type Consensus = TestConsensus;
type Client = TestHeadersClient;
@@ -39,7 +40,7 @@ impl Downloader for TestDownloader {
}
fn client(&self) -> &Self::Client {
unimplemented!()
&self.client
}
async fn download(
@@ -47,12 +48,26 @@ impl Downloader for TestDownloader {
_: &SealedHeader,
_: &ForkchoiceState,
) -> Result<Vec<SealedHeader>, DownloadError> {
self.result.clone()
let stream = self.client.stream_headers().await;
let stream = stream.timeout(Duration::from_secs(1));
match Box::pin(stream).try_next().await {
Ok(Some(res)) => {
let mut headers = res.headers.iter().map(|h| h.clone().seal()).collect::<Vec<_>>();
if !headers.is_empty() {
headers.sort_unstable_by_key(|h| h.number);
headers.remove(0); // remove head from response
headers.reverse();
}
Ok(headers)
}
_ => Err(DownloadError::Timeout { request_id: 0 }),
}
}
}
#[derive(Debug)]
/// A test client for fetching headers
#[derive(Debug)]
pub struct TestHeadersClient {
req_tx: mpsc::Sender<(u64, HeadersRequest)>,
req_rx: Arc<tokio::sync::Mutex<mpsc::Receiver<(u64, HeadersRequest)>>>,
@@ -118,7 +133,7 @@ impl HeadersClient for TestHeadersClient {
}
}
/// Consensus client impl for testing
/// Consensus engine implementation for testing
#[derive(Debug)]
pub struct TestConsensus {
/// Watcher over the forkchoice state
@@ -141,14 +156,14 @@ impl Default for TestConsensus {
}
impl TestConsensus {
/// Update the forkchoice state
/// Update the fork choice state
pub fn update_tip(&self, tip: H256) {
let state = ForkchoiceState {
head_block_hash: tip,
finalized_block_hash: H256::zero(),
safe_block_hash: H256::zero(),
};
self.channel.0.send(state).expect("updating forkchoice state failed");
self.channel.0.send(state).expect("updating fork choice state failed");
}
/// Update the validation flag
@@ -174,29 +189,12 @@ impl Consensus for TestConsensus {
Ok(())
}
}
}
/// Generate a range of random header. The parent hash of the first header
/// in the result will be equal to head
pub fn gen_random_header_range(rng: std::ops::Range<u64>, head: H256) -> Vec<SealedHeader> {
let mut headers = Vec::with_capacity(rng.end.saturating_sub(rng.start) as usize);
for idx in rng {
headers.push(gen_random_header(
idx,
Some(headers.last().map(|h: &SealedHeader| h.hash()).unwrap_or(head)),
));
fn pre_validate_block(&self, _block: &BlockLocked) -> Result<(), Error> {
if self.fail_validation {
Err(consensus::Error::BaseFeeMissing)
} else {
Ok(())
}
}
headers
}
/// Generate a random header
pub fn gen_random_header(number: u64, parent: Option<H256>) -> SealedHeader {
let header = reth_primitives::Header {
number,
nonce: rand::random(),
difficulty: U256::from(rand::random::<u32>()),
parent_hash: parent.unwrap_or_default(),
..Default::default()
};
header.seal()
}

View File

@@ -1,5 +1,10 @@
mod api;
mod bodies;
mod headers;
/// Generators for different data structures like block headers, block bodies and ranges of those.
pub mod generators;
pub use api::TestApi;
pub use bodies::*;
pub use headers::*;