diff --git a/bin/reth/src/node/mod.rs b/bin/reth/src/node/mod.rs index 0b1b98cc71..06c90465d4 100644 --- a/bin/reth/src/node/mod.rs +++ b/bin/reth/src/node/mod.rs @@ -264,14 +264,16 @@ impl NodeCommand { let factory = ProviderFactory::new(Arc::clone(&db), Arc::clone(&self.chain)); let blockchain_db = BlockchainProvider::new(factory, blockchain_tree.clone())?; + let blob_store = InMemoryBlobStore::default(); let transaction_pool = reth_transaction_pool::Pool::eth_pool( TransactionValidationTaskExecutor::eth_with_additional_tasks( blockchain_db.clone(), Arc::clone(&self.chain), + blob_store.clone(), ctx.task_executor.clone(), 1, ), - InMemoryBlobStore::default(), + blob_store, self.txpool.pool_config(), ); info!(target: "reth::cli", "Transaction pool initialized"); diff --git a/crates/transaction-pool/src/blobstore/mod.rs b/crates/transaction-pool/src/blobstore/mod.rs index 0bdd142180..dcc6764389 100644 --- a/crates/transaction-pool/src/blobstore/mod.rs +++ b/crates/transaction-pool/src/blobstore/mod.rs @@ -1,6 +1,7 @@ //! Storage for blob data of EIP4844 transactions. use reth_primitives::{BlobTransactionSidecar, H256}; +use std::fmt; mod maintain; mod mem; mod noop; @@ -15,7 +16,7 @@ pub use noop::NoopBlobStore; /// finalization). /// /// Note: this is Clone because it is expected to be wrapped in an Arc. -pub trait BlobStore: Send + Sync + 'static { +pub trait BlobStore: fmt::Debug + Send + Sync + 'static { /// Inserts the blob sidecar into the store fn insert(&self, tx: H256, data: BlobTransactionSidecar) -> Result<(), BlobStoreError>; diff --git a/crates/transaction-pool/src/lib.rs b/crates/transaction-pool/src/lib.rs index 296e676a0c..d7fc2f1de8 100644 --- a/crates/transaction-pool/src/lib.rs +++ b/crates/transaction-pool/src/lib.rs @@ -105,9 +105,10 @@ //! use reth_transaction_pool::{TransactionValidationTaskExecutor, Pool, TransactionPool}; //! use reth_transaction_pool::blobstore::InMemoryBlobStore; //! async fn t(client: C) where C: StateProviderFactory + ChainSpecProvider + Clone + 'static{ +//! let blob_store = InMemoryBlobStore::default(); //! let pool = Pool::eth_pool( -//! TransactionValidationTaskExecutor::eth(client, MAINNET.clone(), TokioTaskExecutor::default()), -//! InMemoryBlobStore::default(), +//! TransactionValidationTaskExecutor::eth(client, MAINNET.clone(), blob_store.clone(), TokioTaskExecutor::default()), +//! blob_store, //! Default::default(), //! ); //! let mut transactions = pool.pending_transactions_listener(); @@ -136,9 +137,10 @@ //! where C: StateProviderFactory + BlockReaderIdExt + ChainSpecProvider + Clone + 'static, //! St: Stream + Send + Unpin + 'static, //! { +//! let blob_store = InMemoryBlobStore::default(); //! let pool = Pool::eth_pool( -//! TransactionValidationTaskExecutor::eth(client.clone(), MAINNET.clone(), TokioTaskExecutor::default()), -//! InMemoryBlobStore::default(), +//! TransactionValidationTaskExecutor::eth(client.clone(), MAINNET.clone(), blob_store.clone(), TokioTaskExecutor::default()), +//! blob_store, //! Default::default(), //! ); //! @@ -296,10 +298,11 @@ where /// use reth_tasks::TokioTaskExecutor; /// use reth_transaction_pool::{TransactionValidationTaskExecutor, Pool}; /// use reth_transaction_pool::blobstore::InMemoryBlobStore; - /// # fn t(client: C) where C: StateProviderFactory + Clone + 'static{ + /// # fn t(client: C) where C: StateProviderFactory + Clone + 'static { + /// let blob_store = InMemoryBlobStore::default(); /// let pool = Pool::eth_pool( - /// TransactionValidationTaskExecutor::eth(client, MAINNET.clone(), TokioTaskExecutor::default()), - /// InMemoryBlobStore::default(), + /// TransactionValidationTaskExecutor::eth(client, MAINNET.clone(), blob_store.clone(), TokioTaskExecutor::default()), + /// blob_store, /// Default::default(), /// ); /// # } diff --git a/crates/transaction-pool/src/validate/eth.rs b/crates/transaction-pool/src/validate/eth.rs index 837dee8a64..efb0b9e0a5 100644 --- a/crates/transaction-pool/src/validate/eth.rs +++ b/crates/transaction-pool/src/validate/eth.rs @@ -1,6 +1,7 @@ //! Ethereum transaction validator. use crate::{ + blobstore::BlobStore, error::InvalidPoolTransactionError, traits::{PoolTransaction, TransactionOrigin}, validate::{ValidTransaction, ValidationTask, MAX_INIT_CODE_SIZE, TX_MAX_SIZE}, @@ -22,198 +23,16 @@ pub struct EthTransactionValidator { pub inner: Arc>, } -/// A builder for [TransactionValidationTaskExecutor] -#[derive(Debug, Clone)] -pub struct EthTransactionValidatorBuilder { - chain_spec: Arc, - /// Fork indicator whether we are in the Shanghai stage. - shanghai: bool, - /// Fork indicator whether we are in the Cancun hardfork. - cancun: bool, - /// Fork indicator whether we are using EIP-2718 type transactions. - eip2718: bool, - /// Fork indicator whether we are using EIP-1559 type transactions. - eip1559: bool, - /// Fork indicator whether we are using EIP-4844 blob transactions. - eip4844: bool, - /// The current max gas limit - block_gas_limit: u64, - /// Minimum priority fee to enforce for acceptance into the pool. - minimum_priority_fee: Option, - /// Determines how many additional tasks to spawn - /// - /// Default is 1 - additional_tasks: usize, - /// Toggle to determine if a local transaction should be propagated - propagate_local_transactions: bool, -} - -impl EthTransactionValidatorBuilder { - /// Creates a new builder for the given [ChainSpec] - pub fn new(chain_spec: Arc) -> Self { - Self { - chain_spec, - shanghai: true, - eip2718: true, - eip1559: true, - block_gas_limit: ETHEREUM_BLOCK_GAS_LIMIT, - minimum_priority_fee: None, - additional_tasks: 1, - // default to true, can potentially take this as a param in the future - propagate_local_transactions: true, - - // TODO: can hard enable by default once transitioned - cancun: false, - eip4844: false, - } - } - - /// Disables the Cancun fork. - pub fn no_cancun(self) -> Self { - self.set_cancun(false) - } - - /// Set the Cancun fork. - pub fn set_cancun(mut self, cancun: bool) -> Self { - self.cancun = cancun; - self - } - - /// Disables the Shanghai fork. - pub fn no_shanghai(self) -> Self { - self.set_shanghai(false) - } - - /// Set the Shanghai fork. - pub fn set_shanghai(mut self, shanghai: bool) -> Self { - self.shanghai = shanghai; - self - } - - /// Disables the eip2718 support. - pub fn no_eip2718(self) -> Self { - self.set_eip2718(false) - } - - /// Set eip2718 support. - pub fn set_eip2718(mut self, eip2718: bool) -> Self { - self.eip2718 = eip2718; - self - } - - /// Disables the eip1559 support. - pub fn no_eip1559(self) -> Self { - self.set_eip1559(false) - } - - /// Set the eip1559 support. - pub fn set_eip1559(mut self, eip1559: bool) -> Self { - self.eip1559 = eip1559; - self - } - /// Sets toggle to propagate transactions received locally by this client (e.g - /// transactions from eth_Sendtransaction to this nodes' RPC server) - /// - /// If set to false, only transactions received by network peers (via - /// p2p) will be marked as propagated in the local transaction pool and returned on a - /// GetPooledTransactions p2p request - pub fn set_propagate_local_transactions(mut self, propagate_local_txs: bool) -> Self { - self.propagate_local_transactions = propagate_local_txs; - self - } - /// Disables propagating transactions recieved locally by this client - /// - /// For more information, check docs for set_propagate_local_transactions - pub fn no_local_transaction_propagation(mut self) -> Self { - self.propagate_local_transactions = false; - self - } - - /// Sets a minimum priority fee that's enforced for acceptance into the pool. - pub fn with_minimum_priority_fee(mut self, minimum_priority_fee: u128) -> Self { - self.minimum_priority_fee = Some(minimum_priority_fee); - self - } - - /// Sets the number of additional tasks to spawn. - pub fn with_additional_tasks(mut self, additional_tasks: usize) -> Self { - self.additional_tasks = additional_tasks; - self - } - - /// Builds a [TransactionValidationTaskExecutor] - /// - /// The validator will spawn `additional_tasks` additional tasks for validation. - /// - /// By default this will spawn 1 additional task. - pub fn build( - self, - client: Client, - tasks: T, - ) -> TransactionValidationTaskExecutor> - where - T: TaskSpawner, - { - let Self { - chain_spec, - shanghai, - cancun, - eip2718, - eip1559, - eip4844, - block_gas_limit, - minimum_priority_fee, - additional_tasks, - propagate_local_transactions, - } = self; - - let inner = EthTransactionValidatorInner { - chain_spec, - client, - shanghai, - eip2718, - eip1559, - cancun, - eip4844, - block_gas_limit, - minimum_priority_fee, - propagate_local_transactions, - _marker: Default::default(), - }; - - let (tx, task) = ValidationTask::new(); - - // Spawn validation tasks, they are blocking because they perform db lookups - for _ in 0..additional_tasks { - let task = task.clone(); - tasks.spawn_blocking(Box::pin(async move { - task.run().await; - })); - } - - tasks.spawn_critical_blocking( - "transaction-validation-service", - Box::pin(async move { - task.run().await; - }), - ); - - let to_validation_task = Arc::new(Mutex::new(tx)); - - TransactionValidationTaskExecutor { - validator: EthTransactionValidator { inner: Arc::new(inner) }, - to_validation_task, - } - } -} - /// A [TransactionValidator] implementation that validates ethereum transaction. -#[derive(Debug, Clone)] +#[derive(Debug)] pub struct EthTransactionValidatorInner { /// Spec of the chain chain_spec: Arc, /// This type fetches account info from the db client: Client, + /// Blobstore used for fetching re-injected blob transactions. + #[allow(unused)] + blob_store: Box, /// Fork indicator whether we are in the Shanghai stage. shanghai: bool, /// Fork indicator whether we are in the Cancun hardfork. @@ -412,3 +231,192 @@ where } } } + +/// A builder for [TransactionValidationTaskExecutor] +#[derive(Debug, Clone)] +pub struct EthTransactionValidatorBuilder { + chain_spec: Arc, + /// Fork indicator whether we are in the Shanghai stage. + shanghai: bool, + /// Fork indicator whether we are in the Cancun hardfork. + cancun: bool, + /// Fork indicator whether we are using EIP-2718 type transactions. + eip2718: bool, + /// Fork indicator whether we are using EIP-1559 type transactions. + eip1559: bool, + /// Fork indicator whether we are using EIP-4844 blob transactions. + eip4844: bool, + /// The current max gas limit + block_gas_limit: u64, + /// Minimum priority fee to enforce for acceptance into the pool. + minimum_priority_fee: Option, + /// Determines how many additional tasks to spawn + /// + /// Default is 1 + additional_tasks: usize, + /// Toggle to determine if a local transaction should be propagated + propagate_local_transactions: bool, +} + +impl EthTransactionValidatorBuilder { + /// Creates a new builder for the given [ChainSpec] + pub fn new(chain_spec: Arc) -> Self { + Self { + chain_spec, + shanghai: true, + eip2718: true, + eip1559: true, + block_gas_limit: ETHEREUM_BLOCK_GAS_LIMIT, + minimum_priority_fee: None, + additional_tasks: 1, + // default to true, can potentially take this as a param in the future + propagate_local_transactions: true, + + // TODO: can hard enable by default once transitioned + cancun: false, + eip4844: false, + } + } + + /// Disables the Cancun fork. + pub fn no_cancun(self) -> Self { + self.set_cancun(false) + } + + /// Set the Cancun fork. + pub fn set_cancun(mut self, cancun: bool) -> Self { + self.cancun = cancun; + self + } + + /// Disables the Shanghai fork. + pub fn no_shanghai(self) -> Self { + self.set_shanghai(false) + } + + /// Set the Shanghai fork. + pub fn set_shanghai(mut self, shanghai: bool) -> Self { + self.shanghai = shanghai; + self + } + + /// Disables the eip2718 support. + pub fn no_eip2718(self) -> Self { + self.set_eip2718(false) + } + + /// Set eip2718 support. + pub fn set_eip2718(mut self, eip2718: bool) -> Self { + self.eip2718 = eip2718; + self + } + + /// Disables the eip1559 support. + pub fn no_eip1559(self) -> Self { + self.set_eip1559(false) + } + + /// Set the eip1559 support. + pub fn set_eip1559(mut self, eip1559: bool) -> Self { + self.eip1559 = eip1559; + self + } + /// Sets toggle to propagate transactions received locally by this client (e.g + /// transactions from eth_Sendtransaction to this nodes' RPC server) + /// + /// If set to false, only transactions received by network peers (via + /// p2p) will be marked as propagated in the local transaction pool and returned on a + /// GetPooledTransactions p2p request + pub fn set_propagate_local_transactions(mut self, propagate_local_txs: bool) -> Self { + self.propagate_local_transactions = propagate_local_txs; + self + } + /// Disables propagating transactions recieved locally by this client + /// + /// For more information, check docs for set_propagate_local_transactions + pub fn no_local_transaction_propagation(mut self) -> Self { + self.propagate_local_transactions = false; + self + } + + /// Sets a minimum priority fee that's enforced for acceptance into the pool. + pub fn with_minimum_priority_fee(mut self, minimum_priority_fee: u128) -> Self { + self.minimum_priority_fee = Some(minimum_priority_fee); + self + } + + /// Sets the number of additional tasks to spawn. + pub fn with_additional_tasks(mut self, additional_tasks: usize) -> Self { + self.additional_tasks = additional_tasks; + self + } + + /// Builds a the [EthTransactionValidator] and spawns validation tasks via the + /// [TransactionValidationTaskExecutor] + /// + /// The validator will spawn `additional_tasks` additional tasks for validation. + /// + /// By default this will spawn 1 additional task. + pub fn build_with_tasks( + self, + client: Client, + tasks: T, + blob_store: S, + ) -> TransactionValidationTaskExecutor> + where + T: TaskSpawner, + S: BlobStore, + { + let Self { + chain_spec, + shanghai, + cancun, + eip2718, + eip1559, + eip4844, + block_gas_limit, + minimum_priority_fee, + additional_tasks, + propagate_local_transactions, + } = self; + + let inner = EthTransactionValidatorInner { + chain_spec, + client, + shanghai, + eip2718, + eip1559, + cancun, + eip4844, + block_gas_limit, + minimum_priority_fee, + propagate_local_transactions, + blob_store: Box::new(blob_store), + _marker: Default::default(), + }; + + let (tx, task) = ValidationTask::new(); + + // Spawn validation tasks, they are blocking because they perform db lookups + for _ in 0..additional_tasks { + let task = task.clone(); + tasks.spawn_blocking(Box::pin(async move { + task.run().await; + })); + } + + tasks.spawn_critical_blocking( + "transaction-validation-service", + Box::pin(async move { + task.run().await; + }), + ); + + let to_validation_task = Arc::new(Mutex::new(tx)); + + TransactionValidationTaskExecutor { + validator: EthTransactionValidator { inner: Arc::new(inner) }, + to_validation_task, + } + } +} diff --git a/crates/transaction-pool/src/validate/task.rs b/crates/transaction-pool/src/validate/task.rs index 7e219157a3..007aa2568a 100644 --- a/crates/transaction-pool/src/validate/task.rs +++ b/crates/transaction-pool/src/validate/task.rs @@ -1,6 +1,7 @@ //! A validation service for transactions. use crate::{ + blobstore::BlobStore, validate::{EthTransactionValidatorBuilder, TransactionValidatorError}, EthTransactionValidator, PoolTransaction, TransactionOrigin, TransactionValidationOutcome, TransactionValidator, @@ -96,11 +97,16 @@ impl TransactionValidationTaskExecutor(client: Client, chain_spec: Arc, tasks: T) -> Self + pub fn eth( + client: Client, + chain_spec: Arc, + blob_store: S, + tasks: T, + ) -> Self where T: TaskSpawner, { - Self::eth_with_additional_tasks(client, chain_spec, tasks, 0) + Self::eth_with_additional_tasks(client, chain_spec, blob_store, tasks, 0) } /// Creates a new instance for the given [ChainSpec] @@ -112,9 +118,10 @@ impl TransactionValidationTaskExecutor( + pub fn eth_with_additional_tasks( client: Client, chain_spec: Arc, + blob_store: S, tasks: T, num_additional_tasks: usize, ) -> Self @@ -123,7 +130,7 @@ impl TransactionValidationTaskExecutor(client, tasks) + .build_with_tasks::(client, tasks, blob_store) } /// Returns the configured chain id