diff --git a/crates/transaction-pool/src/lib.rs b/crates/transaction-pool/src/lib.rs index cff266160a..e39f9ae314 100644 --- a/crates/transaction-pool/src/lib.rs +++ b/crates/transaction-pool/src/lib.rs @@ -6,12 +6,86 @@ attr(deny(warnings, rust_2018_idioms), allow(dead_code, unused_variables)) ))] -//! Reth's transaction pool implementation +//! Reth's transaction pool implementation. +//! +//! This crate provides a generic transaction pool implementation. +//! +//! ## Functionality +//! +//! The transaction pool is responsible for +//! +//! - recording incoming transactions +//! - providing existing transactions +//! - ordering and providing the best transactions for block production +//! - monitoring memory footprint and enforce pool size limits +//! +//! ## Assumptions +//! +//! ### Transaction type +//! +//! The pool expects certain ethereum related information from the generic transaction type of the +//! pool ([`PoolTransaction`]), this includes gas price, base fee (EIP-1559 transactions), nonce +//! etc. It makes no assumptions about the encoding format, but the transaction type must report its +//! size so pool size limits (memory) can be enforced. +//! +//! ### Transaction ordering +//! +//! The pending pool contains transactions that can be mined on the current state. +//! The order in which they're returned are determined by a `Priority` value returned by the +//! `TransactionOrdering` type this pool is configured with. +//! +//! This is only used in the _pending_ pool to yield the best transactions for block production. The +//! _base pool_ is ordered by base fee, and the _queued pool_ by current distance. +//! +//! ### Validation +//! +//! The pool itself does not validate incoming transactions, instead this should be provided by +//! implementing `TransactionsValidator`. Only transactions that the validator returns as valid are +//! included in the pool. It is assumed that transaction that are in the pool are either valid on +//! the current state or could become valid after certain state changes. transaction that can never +//! become valid (e.g. nonce lower than current on chain nonce) will never be added to the pool and +//! instead are discarded right away. +//! +//! ### State Changes +//! +//! Once a new block is mined, the pool needs to be updated with a changeset in order to: +//! +//! - remove mined transactions +//! - update using account changes: balance changes +//! - base fee updates +//! +//! ## Implementation details +//! +//! The `TransactionPool` trait exposes all externally used functionality of the pool, such as +//! inserting, querying specific transactions by hash or retrieving the best transactions. +//! Additionally, it allows to register event listeners for new ready transactions or state changes. +//! Events are communicated via channels. +//! +//! ### Architecture +//! +//! The final `TransactionPool` is made up of two layers: +//! +//! The lowest layer is the actual pool implementations that manages (validated) transactions: +//! [`TxPool`](crate::pool::TxPool). This is contained in a higher level pool type that guards the +//! low level pool and handles additional listeners or metrics: +//! [`PoolInner`](crate::pool::PoolInner) +//! +//! The transaction pool will be used by separate consumers (RPC, P2P), to make sharing easier, the +//! [`Pool`](crate::Pool) type is just an `Arc` wrapper around `PoolInner`. This is the usable type +//! that provides the `TransactionPool` interface. +pub use crate::{ + client::PoolClient, + config::PoolConfig, + ordering::TransactionOrdering, + traits::{BestTransactions, NewBlockEvent, PoolTransaction, TransactionPool}, + validate::{TransactionValidationOutcome, TransactionValidator}, +}; +use crate::{error::PoolResult, pool::PoolInner, validate::ValidPoolTransaction}; use futures::channel::mpsc::Receiver; use parking_lot::Mutex; use reth_primitives::{BlockID, TxHash, U256, U64}; -use std::sync::Arc; +use std::{collections::HashMap, sync::Arc}; mod client; mod config; @@ -25,24 +99,10 @@ mod validate; #[cfg(test)] mod test_util; -pub use crate::{ - client::PoolClient, - config::PoolConfig, - ordering::TransactionOrdering, - pool::BasicPool, - traits::{BestTransactions, NewBlockEvent, PoolTransaction, TransactionPool}, - validate::{TransactionValidationOutcome, TransactionValidator}, -}; -use crate::{error::PoolResult, validate::ValidPoolTransaction}; - -/// A generic, customizable `TransactionPool` implementation. +/// A shareable, generic, customizable `TransactionPool` implementation. pub struct Pool { - /// The actual transaction pool where transactions and subscriptions are handled. - pool: BasicPool, - /// Tracks status updates linked to chain events. - update_status: Arc>, - /// Chain/Storage access. - client: Arc

, + /// Arc'ed instance of the pool internals + pool: Arc>, } // === impl Pool === @@ -52,10 +112,55 @@ where P: PoolClient, T: TransactionOrdering::Transaction>, { - /// Creates a new `Pool` with the given config and client and ordering. + /// Create a new transaction pool instance. pub fn new(client: Arc

, ordering: Arc, config: PoolConfig) -> Self { - let pool = BasicPool::new(Arc::clone(&client), ordering, config); - Self { pool, update_status: Arc::new(Default::default()), client } + Self { pool: Arc::new(PoolInner::new(client, ordering, config)) } + } + + /// Returns the wrapped pool + pub(crate) fn inner(&self) -> &PoolInner { + &self.pool + } + + /// Returns the actual block number for the block id + fn resolve_block_number(&self, block_id: &BlockID) -> PoolResult { + self.pool.client().ensure_block_number(block_id) + } + + /// Returns future that validates all transaction in the given iterator at the block the + /// `block_id` points to. + async fn validate_all( + &self, + block_id: &BlockID, + transactions: impl IntoIterator, + ) -> PoolResult>> { + // get the actual block number which is required to validate the transactions + let block_number = self.resolve_block_number(block_id)?; + + let outcome = futures::future::join_all( + transactions.into_iter().map(|tx| self.validate(block_id, block_number, tx)), + ) + .await + .into_iter() + .collect::>(); + + Ok(outcome) + } + + /// Validates the given transaction at the given block + async fn validate( + &self, + block_id: &BlockID, + _block_number: U64, + transaction: P::Transaction, + ) -> (TxHash, TransactionValidationOutcome) { + let _hash = *transaction.hash(); + // TODO this is where additional validate checks would go, like banned senders etc... + let _res = self.pool.client().validate_transaction(block_id, transaction).await; + + // TODO blockstamp the transaction + + todo!() } } @@ -78,7 +183,10 @@ where block_id: BlockID, transaction: Self::Transaction, ) -> PoolResult { - self.pool.clone().add_transaction(&block_id, transaction).await + self.add_transactions(block_id, vec![transaction]) + .await? + .pop() + .expect("transaction exists; qed") } async fn add_transactions( @@ -86,17 +194,19 @@ where block_id: BlockID, transactions: Vec, ) -> PoolResult>> { - self.pool.clone().add_transactions(&block_id, transactions).await + let validated = self.validate_all(&block_id, transactions).await?; + let transactions = self.pool.add_transactions(validated.into_values()); + Ok(transactions) } fn ready_transactions_listener(&self) -> Receiver { - self.pool.ready_transactions_listener() + self.pool.add_ready_listener() } fn best_transactions( &self, ) -> Box>>> { - Box::new(self.pool.inner().ready_transactions()) + Box::new(self.pool.ready_transactions()) } fn remove_invalid( @@ -115,3 +225,9 @@ struct UpdateStatus { /// Current base fee that needs to be enforced base_fee: U256, } + +impl Clone for Pool { + fn clone(&self) -> Self { + Self { pool: Arc::clone(&self.pool) } + } +} diff --git a/crates/transaction-pool/src/ordering.rs b/crates/transaction-pool/src/ordering.rs index 164cb6780d..f5bf68eb95 100644 --- a/crates/transaction-pool/src/ordering.rs +++ b/crates/transaction-pool/src/ordering.rs @@ -1,18 +1,19 @@ use crate::traits::PoolTransaction; use std::fmt; + /// Transaction ordering trait to determine the order of transactions. /// -/// Decides how transactions should be ordered within the pool. +/// Decides how transactions should be ordered within the pool, depending on a `Priority` value. /// -/// The returned priority must reflect natural `Ordering` -// TODO(mattsse) this should be extended so it provides a way to rank transaction in relation to -// each other. +/// The returned priority must reflect [total order](https://en.wikipedia.org/wiki/Total_order). pub trait TransactionOrdering: Send + Sync + 'static { /// Priority of a transaction. + /// + /// Higher is better. type Priority: Ord + Clone + Default + fmt::Debug + Send + Sync; - /// The transaction type to score. - type Transaction: PoolTransaction + Send + Sync + 'static; + /// The transaction type to determine the priority of. + type Transaction: PoolTransaction; /// Returns the priority score for the given transaction. fn priority(&self, transaction: &Self::Transaction) -> Self::Priority; diff --git a/crates/transaction-pool/src/pool/mod.rs b/crates/transaction-pool/src/pool/mod.rs index ea0006d3d2..4c6990746b 100644 --- a/crates/transaction-pool/src/pool/mod.rs +++ b/crates/transaction-pool/src/pool/mod.rs @@ -88,110 +88,11 @@ pub mod txpool; use crate::{pool::txpool::TxPool, validate::TransactionValidationOutcome}; pub use events::TransactionEvent; -/// Shareable Transaction pool. -pub struct BasicPool { - /// Arc'ed instance of the pool internals - pool: Arc>, -} - -// === impl Pool === - -impl BasicPool -where - P: PoolClient, - T: TransactionOrdering::Transaction>, -{ - /// Create a new transaction pool instance. - pub fn new(client: Arc

, ordering: Arc, config: PoolConfig) -> Self { - Self { pool: Arc::new(PoolInner::new(client, ordering, config)) } - } - - /// Returns the wrapped pool - pub(crate) fn inner(&self) -> &PoolInner { - &self.pool - } - - /// Returns the actual block number for the block id - fn resolve_block_number(&self, block_id: &BlockID) -> PoolResult { - self.pool.client().ensure_block_number(block_id) - } - - /// Add a single _unverified_ transaction into the pool. - pub async fn add_transaction( - &self, - block_id: &BlockID, - transaction: P::Transaction, - ) -> PoolResult { - self.add_transactions(block_id, Some(transaction)) - .await? - .pop() - .expect("transaction exists; qed") - } - - /// Adds all given transactions into the pool - pub async fn add_transactions( - &self, - block_id: &BlockID, - transactions: impl IntoIterator, - ) -> PoolResult>> { - let validated = self.validate_all(block_id, transactions).await?; - let transactions = self.pool.add_transactions(validated.into_values()); - Ok(transactions) - } - - /// Returns future that validates all transaction in the given iterator at the block the - /// `block_id` points to. - async fn validate_all( - &self, - block_id: &BlockID, - transactions: impl IntoIterator, - ) -> PoolResult>> { - // get the actual block number which is required to validate the transactions - let block_number = self.resolve_block_number(block_id)?; - - let outcome = futures::future::join_all( - transactions.into_iter().map(|tx| self.validate(block_id, block_number, tx)), - ) - .await - .into_iter() - .collect::>(); - - Ok(outcome) - } - - /// Validates the given transaction at the given block - async fn validate( - &self, - block_id: &BlockID, - _block_number: U64, - transaction: P::Transaction, - ) -> (TxHash, TransactionValidationOutcome) { - let _hash = *transaction.hash(); - // TODO this is where additional validate checks would go, like banned senders etc... - let _res = self.pool.client().validate_transaction(block_id, transaction).await; - - // TODO blockstamp the transaction - - todo!() - } - - /// Registers a new transaction listener and returns the receiver stream. - pub fn ready_transactions_listener(&self) -> Receiver { - self.pool.add_ready_listener() - } -} - -impl Clone for BasicPool { - fn clone(&self) -> Self { - Self { pool: Arc::clone(&self.pool) } - } -} - /// Transaction pool internals. pub struct PoolInner { /// Chain/Storage access. client: Arc

, - /// The internal pool that manages + /// The internal pool that manages all transactions. pool: RwLock>, /// Pool settings. config: PoolConfig, @@ -310,7 +211,7 @@ where listener.ready(&tx.hash, None); // TODO more listeners for discarded, removed etc... } - AddedTransaction::Queued { hash } => { + AddedTransaction::Parked { hash } => { listener.queued(hash); } } @@ -352,17 +253,17 @@ impl AddedPendingTransaction { pub enum AddedTransaction { /// Transaction was successfully added and moved to the pending pool. Pending(AddedPendingTransaction), - /// Transaction was successfully added but not yet queued for processing and moved to the - /// queued pool instead. - Queued { - /// the hash of the submitted transaction + /// Transaction was successfully added but not yet ready for processing and moved to a + /// parked pool instead. + Parked { + /// Hash of the submitted transaction that is currently parked. hash: TxHash, }, } impl AddedTransaction { - /// Returns the hash of the transaction if it's ready - pub fn as_ready(&self) -> Option<&TxHash> { + /// Returns the hash of the transaction if it's pending + pub fn as_pending(&self) -> Option<&TxHash> { if let AddedTransaction::Pending(tx) = self { Some(&tx.hash) } else { @@ -374,7 +275,7 @@ impl AddedTransaction { pub fn hash(&self) -> &TxHash { match self { AddedTransaction::Pending(tx) => &tx.hash, - AddedTransaction::Queued { hash } => hash, + AddedTransaction::Parked { hash } => hash, } } } diff --git a/crates/transaction-pool/src/pool/txpool.rs b/crates/transaction-pool/src/pool/txpool.rs index edd5a47814..b352ee7115 100644 --- a/crates/transaction-pool/src/pool/txpool.rs +++ b/crates/transaction-pool/src/pool/txpool.rs @@ -162,7 +162,7 @@ impl TxPool { removed, }) } else { - AddedTransaction::Queued { hash } + AddedTransaction::Parked { hash } }; Ok(res) diff --git a/crates/transaction-pool/src/traits.rs b/crates/transaction-pool/src/traits.rs index 4efe83bb20..f07d883788 100644 --- a/crates/transaction-pool/src/traits.rs +++ b/crates/transaction-pool/src/traits.rs @@ -11,7 +11,7 @@ use std::{fmt, hash::Hash, sync::Arc}; #[async_trait::async_trait] pub trait TransactionPool: Send + Sync { /// The transaction type of the pool - type Transaction: PoolTransaction + Send + Sync + 'static; + type Transaction: PoolTransaction; /// Event listener for when a new block was mined. /// diff --git a/crates/transaction-pool/src/validate.rs b/crates/transaction-pool/src/validate.rs index 859aed6928..4cbdf92e5c 100644 --- a/crates/transaction-pool/src/validate.rs +++ b/crates/transaction-pool/src/validate.rs @@ -29,7 +29,7 @@ pub enum TransactionValidationOutcome { #[async_trait::async_trait] pub trait TransactionValidator: Send + Sync { /// The transaction type to validate. - type Transaction: PoolTransaction + Send + Sync + 'static; + type Transaction: PoolTransaction; /// Validates the transaction and returns a validated outcome ///