diff --git a/crates/transaction-pool/src/lib.rs b/crates/transaction-pool/src/lib.rs index 71c4b4b33f..0afa0573f8 100644 --- a/crates/transaction-pool/src/lib.rs +++ b/crates/transaction-pool/src/lib.rs @@ -103,6 +103,7 @@ use reth_primitives::{Address, TxHash, U256}; use reth_provider::StateProviderFactory; use std::{collections::HashMap, sync::Arc}; use tokio::sync::mpsc::Receiver; +use tracing::{instrument, trace}; mod config; pub mod error; @@ -165,6 +166,13 @@ where self.inner().config() } + /// Sets the current block info for the pool. + #[instrument(skip(self), target = "txpool")] + pub fn set_block_info(&self, info: BlockInfo) { + trace!(target: "txpool", "updating pool block info"); + self.pool.set_block_info(info) + } + /// Returns future that validates all transaction in the given iterator. async fn validate_all( &self, diff --git a/crates/transaction-pool/src/maintain.rs b/crates/transaction-pool/src/maintain.rs index 8d99570ce3..70dcee59df 100644 --- a/crates/transaction-pool/src/maintain.rs +++ b/crates/transaction-pool/src/maintain.rs @@ -2,10 +2,10 @@ use crate::{ traits::{CanonicalStateUpdate, ChangedAccount}, - Pool, TransactionOrdering, TransactionPool, TransactionValidator, + BlockInfo, Pool, TransactionOrdering, TransactionPool, TransactionValidator, }; use futures_util::{Stream, StreamExt}; -use reth_primitives::{Address, BlockHash, FromRecoveredTransaction}; +use reth_primitives::{Address, BlockHash, BlockNumberOrTag, FromRecoveredTransaction}; use reth_provider::{BlockProvider, CanonStateNotification, PostState, StateProviderFactory}; use std::{ borrow::Borrow, @@ -27,7 +27,16 @@ pub async fn maintain_transaction_pool( T: TransactionOrdering::Transaction>, St: Stream + Unpin, { - // TODO set current head for the pool + // ensure the pool points to latest state + if let Ok(Some(latest)) = client.block(BlockNumberOrTag::Latest.into()) { + let latest = latest.seal_slow(); + let info = BlockInfo { + last_seen_block_hash: latest.hash, + last_seen_block_number: latest.number, + pending_basefee: latest.next_block_base_fee().unwrap_or_default() as u128, + }; + pool.set_block_info(info); + } // keeps track of any dirty accounts that we failed to fetch the state for and need to retry let mut dirty = HashSet::new(); diff --git a/crates/transaction-pool/src/pool/mod.rs b/crates/transaction-pool/src/pool/mod.rs index 9dbfeb3204..15ccd49228 100644 --- a/crates/transaction-pool/src/pool/mod.rs +++ b/crates/transaction-pool/src/pool/mod.rs @@ -150,6 +150,10 @@ where pub(crate) fn block_info(&self) -> BlockInfo { self.pool.read().block_info() } + /// Returns the currently tracked block + pub(crate) fn set_block_info(&self, info: BlockInfo) { + self.pool.write().set_block_info(info) + } /// Returns the internal `SenderId` for this address pub(crate) fn get_sender_id(&self, addr: Address) -> SenderId { diff --git a/crates/transaction-pool/src/pool/parked.rs b/crates/transaction-pool/src/pool/parked.rs index 2aade3e7dd..6f4c70db84 100644 --- a/crates/transaction-pool/src/pool/parked.rs +++ b/crates/transaction-pool/src/pool/parked.rs @@ -13,6 +13,7 @@ use std::{cmp::Ordering, collections::BTreeSet, ops::Deref, sync::Arc}; /// /// Note: This type is generic over [ParkedPool] which enforces that the underlying transaction type /// is [ValidPoolTransaction] wrapped in an [Arc]. +#[derive(Clone)] pub(crate) struct ParkedPool { /// Keeps track of transactions inserted in the pool. /// @@ -101,6 +102,41 @@ impl ParkedPool { } } +impl ParkedPool> { + /// Removes all transactions and their dependent transaction from the subpool that no longer + /// satisfy the given basefee. + /// + /// Note: the transactions are not returned in a particular order. + pub(crate) fn enforce_basefee(&mut self, basefee: u128) -> Vec>> { + let mut to_remove = Vec::new(); + + { + let mut iter = self.by_id.iter().peekable(); + + while let Some((id, tx)) = iter.next() { + if tx.transaction.transaction.max_fee_per_gas() < basefee { + // still parked -> skip descendant transactions + 'this: while let Some((peek, _)) = iter.peek() { + if peek.sender != id.sender { + break 'this + } + iter.next(); + } + } else { + to_remove.push(*id); + } + } + } + + let mut removed = Vec::with_capacity(to_remove.len()); + for id in to_remove { + removed.push(self.remove_transaction(&id).expect("transaction exists")); + } + + removed + } +} + impl Default for ParkedPool { fn default() -> Self { Self { @@ -225,12 +261,7 @@ impl_ord_wrapper!(BasefeeOrd); impl Ord for BasefeeOrd { fn cmp(&self, other: &Self) -> Ordering { - match (self.0.transaction.max_fee_per_gas(), other.0.transaction.max_fee_per_gas()) { - (Some(fee), Some(other)) => fee.cmp(&other), - (None, Some(_)) => Ordering::Less, - (Some(_), None) => Ordering::Greater, - _ => Ordering::Equal, - } + self.0.transaction.max_fee_per_gas().cmp(&other.0.transaction.max_fee_per_gas()) } } @@ -256,3 +287,63 @@ impl Ord for QueuedOrd { other.timestamp.cmp(&self.timestamp)) } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::test_utils::{MockTransaction, MockTransactionFactory}; + + #[test] + fn test_enforce_parked_basefee() { + let mut f = MockTransactionFactory::default(); + let mut pool = ParkedPool::>::default(); + let tx = f.validated_arc(MockTransaction::eip1559().inc_price()); + pool.add_transaction(tx.clone()); + + assert!(pool.by_id.contains_key(tx.id())); + assert_eq!(pool.len(), 1); + + let removed = pool.enforce_basefee(u128::MAX); + assert!(removed.is_empty()); + + let removed = pool.enforce_basefee(tx.max_fee_per_gas() - 1); + assert_eq!(removed.len(), 1); + assert!(pool.is_empty()); + } + + #[test] + fn test_enforce_parked_basefee_descendant() { + let mut f = MockTransactionFactory::default(); + let mut pool = ParkedPool::>::default(); + let t = MockTransaction::eip1559().inc_price_by(10); + let root_tx = f.validated_arc(t.clone()); + pool.add_transaction(root_tx.clone()); + + let descendant_tx = f.validated_arc(t.inc_nonce().inc_price()); + pool.add_transaction(descendant_tx.clone()); + + assert!(pool.by_id.contains_key(root_tx.id())); + assert!(pool.by_id.contains_key(descendant_tx.id())); + assert_eq!(pool.len(), 2); + + let removed = pool.enforce_basefee(u128::MAX); + assert!(removed.is_empty()); + + // two dependent tx in the pool with decreasing fee + + { + let mut pool2 = pool.clone(); + let removed = pool2.enforce_basefee(descendant_tx.max_fee_per_gas()); + assert_eq!(removed.len(), 1); + assert_eq!(pool2.len(), 1); + // descendant got popped + assert!(pool2.by_id.contains_key(root_tx.id())); + assert!(!pool2.by_id.contains_key(descendant_tx.id())); + } + + // remove root transaction via root tx fee + let removed = pool.enforce_basefee(root_tx.max_fee_per_gas()); + assert_eq!(removed.len(), 2); + assert!(pool.is_empty()); + } +} diff --git a/crates/transaction-pool/src/pool/pending.rs b/crates/transaction-pool/src/pool/pending.rs index c8a03aa900..3212096e28 100644 --- a/crates/transaction-pool/src/pool/pending.rs +++ b/crates/transaction-pool/src/pool/pending.rs @@ -20,6 +20,7 @@ use std::{ /// /// Once an `independent` transaction was executed it *unlocks* the next nonce, if this transaction /// is also pending, then this will be moved to the `independent` queue. +#[derive(Clone)] pub(crate) struct PendingPool { /// How to order transactions. ordering: T, @@ -83,6 +84,44 @@ impl PendingPool { } } + /// Removes all transactions and their dependent transaction from the subpool that no longer + /// satisfy the given basefee (`tx.fee < basefee`) + /// + /// Note: the transactions are not returned in a particular order. + pub(crate) fn enforce_basefee( + &mut self, + basefee: u128, + ) -> Vec>> { + let mut to_remove = Vec::new(); + + { + let mut iter = self.by_id.iter().peekable(); + while let Some((id, tx)) = iter.next() { + if tx.transaction.transaction.max_fee_per_gas() < basefee { + // this transaction no longer satisfies the basefee: remove it and all its + // descendants + to_remove.push(*id); + 'this: while let Some((peek, _)) = iter.peek() { + if peek.sender != id.sender { + break 'this + } + to_remove.push(**peek); + iter.next(); + } + } + } + } + + dbg!(&to_remove); + + let mut removed = Vec::with_capacity(to_remove.len()); + for id in to_remove { + removed.push(self.remove_transaction(&id).expect("transaction exists")); + } + + removed + } + /// Returns the ancestor the given transaction, the transaction with `nonce - 1`. /// /// Note: for a transaction with nonce higher than the current on chain nonce this will always @@ -127,7 +166,7 @@ impl PendingPool { /// Removes a _mined_ transaction from the pool. /// - /// If the transactions has a descendant transaction it will advance it to the best queue. + /// If the transaction has a descendant transaction it will advance it to the best queue. pub(crate) fn prune_transaction( &mut self, id: &TransactionId, @@ -147,9 +186,8 @@ impl PendingPool { id: &TransactionId, ) -> Option>> { let tx = self.by_id.remove(id)?; - // keep track of size - self.size_of -= tx.transaction.transaction.size(); self.all.remove(&tx.transaction); + self.size_of -= tx.transaction.transaction.size(); self.independent_transactions.remove(&tx.transaction); Some(tx.transaction.transaction.clone()) } @@ -178,7 +216,6 @@ impl PendingPool { /// Whether the pool is empty #[cfg(test)] - #[allow(unused)] pub(crate) fn is_empty(&self) -> bool { self.by_id.is_empty() } @@ -247,3 +284,65 @@ impl Ord for PendingTransactionRef { .then_with(|| other.submission_id.cmp(&self.submission_id)) } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::test_utils::{MockOrdering, MockTransaction, MockTransactionFactory}; + + #[test] + fn test_enforce_basefee() { + let mut f = MockTransactionFactory::default(); + let mut pool = PendingPool::new(MockOrdering::default()); + let tx = f.validated_arc(MockTransaction::eip1559().inc_price()); + pool.add_transaction(tx.clone()); + + assert!(pool.by_id.contains_key(tx.id())); + assert_eq!(pool.len(), 1); + + let removed = pool.enforce_basefee(0); + assert!(removed.is_empty()); + + let removed = pool.enforce_basefee(tx.max_fee_per_gas() + 1); + assert_eq!(removed.len(), 1); + assert!(pool.is_empty()); + } + + #[test] + fn test_enforce_basefee_descendant() { + let mut f = MockTransactionFactory::default(); + let mut pool = PendingPool::new(MockOrdering::default()); + let t = MockTransaction::eip1559().inc_price_by(10); + let root_tx = f.validated_arc(t.clone()); + pool.add_transaction(root_tx.clone()); + + let descendant_tx = f.validated_arc(t.inc_nonce().decr_price()); + pool.add_transaction(descendant_tx.clone()); + + assert!(pool.by_id.contains_key(root_tx.id())); + assert!(pool.by_id.contains_key(descendant_tx.id())); + assert_eq!(pool.len(), 2); + + assert_eq!(pool.independent_transactions.len(), 1); + + let removed = pool.enforce_basefee(0); + assert!(removed.is_empty()); + + // two dependent tx in the pool with decreasing fee + + { + let mut pool2 = pool.clone(); + let removed = pool2.enforce_basefee(descendant_tx.max_fee_per_gas() + 1); + assert_eq!(removed.len(), 1); + assert_eq!(pool2.len(), 1); + // descendant got popped + assert!(pool2.by_id.contains_key(root_tx.id())); + assert!(!pool2.by_id.contains_key(descendant_tx.id())); + } + + // remove root transaction via fee + let removed = pool.enforce_basefee(root_tx.max_fee_per_gas() + 1); + assert_eq!(removed.len(), 2); + assert!(pool.is_empty()); + } +} diff --git a/crates/transaction-pool/src/pool/txpool.rs b/crates/transaction-pool/src/pool/txpool.rs index 87e5900a76..b2583707a3 100644 --- a/crates/transaction-pool/src/pool/txpool.rs +++ b/crates/transaction-pool/src/pool/txpool.rs @@ -128,6 +128,40 @@ impl TxPool { } } + /// Updates the tracked basefee + /// + /// Depending on the change in direction of the basefee, this will promote or demote + /// transactions from the basefee pool. + fn update_basefee(&mut self, pending_basefee: u128) { + match pending_basefee.cmp(&self.all_transactions.pending_basefee) { + Ordering::Equal => { + // fee unchanged, nothing to update + } + Ordering::Greater => { + // increased base fee: recheck pending pool and remove all that are no longer valid + for tx in self.pending_pool.enforce_basefee(pending_basefee) { + self.basefee_pool.add_transaction(tx); + } + } + Ordering::Less => { + // decreased base fee: recheck basefee pool and promote all that are now valid + for tx in self.basefee_pool.enforce_basefee(pending_basefee) { + self.pending_pool.add_transaction(tx); + } + } + } + } + + /// Sets the current block info for the pool. + /// + /// This will also apply updates to the pool based on the new base fee + pub(crate) fn set_block_info(&mut self, info: BlockInfo) { + let BlockInfo { last_seen_block_hash, last_seen_block_number, pending_basefee } = info; + self.all_transactions.last_seen_block_hash = last_seen_block_hash; + self.all_transactions.last_seen_block_number = last_seen_block_number; + self.update_basefee(pending_basefee) + } + /// Returns an iterator that yields transactions that are ready to be included in the block. pub(crate) fn best_transactions(&self) -> BestTransactions { self.pending_pool.best() @@ -638,7 +672,6 @@ impl AllTransactions { // The `unique_sender` loop will process the first transaction of all senders, update its // state and internally update all consecutive transactions 'transactions: while let Some((id, tx)) = iter.next() { - // Advances the iterator to the next sender macro_rules! next_sender { ($iter:ident) => { 'this: while let Some((peek, _)) = iter.peek() { @@ -649,7 +682,6 @@ impl AllTransactions { } }; } - // tracks the balance if the sender was changed in the block let mut changed_balance = None; @@ -770,14 +802,12 @@ impl AllTransactions { /// Rechecks the transaction's dynamic fee condition. fn update_tx_base_fee(pending_block_base_fee: &u128, tx: &mut PoolInternalTransaction) { // Recheck dynamic fee condition. - if let Some(fee_cap) = tx.transaction.max_fee_per_gas() { - match fee_cap.cmp(pending_block_base_fee) { - Ordering::Greater | Ordering::Equal => { - tx.state.insert(TxState::ENOUGH_FEE_CAP_BLOCK); - } - Ordering::Less => { - tx.state.remove(TxState::ENOUGH_FEE_CAP_BLOCK); - } + match tx.transaction.max_fee_per_gas().cmp(pending_block_base_fee) { + Ordering::Greater | Ordering::Equal => { + tx.state.insert(TxState::ENOUGH_FEE_CAP_BLOCK); + } + Ordering::Less => { + tx.state.remove(TxState::ENOUGH_FEE_CAP_BLOCK); } } } @@ -932,15 +962,12 @@ impl AllTransactions { } // Check dynamic fee - if let Some(fee_cap) = transaction.max_fee_per_gas() { - if fee_cap < self.minimal_protocol_basefee { - return Err(InsertErr::FeeCapBelowMinimumProtocolFeeCap { transaction, fee_cap }) - } - if fee_cap >= self.pending_basefee { - state.insert(TxState::ENOUGH_FEE_CAP_BLOCK); - } - } else { - // legacy transactions always satisfy the condition + let fee_cap = transaction.max_fee_per_gas(); + + if fee_cap < self.minimal_protocol_basefee { + return Err(InsertErr::FeeCapBelowMinimumProtocolFeeCap { transaction, fee_cap }) + } + if fee_cap >= self.pending_basefee { state.insert(TxState::ENOUGH_FEE_CAP_BLOCK); } diff --git a/crates/transaction-pool/src/test_utils/mock.rs b/crates/transaction-pool/src/test_utils/mock.rs index dc67a310f1..735bb8d57b 100644 --- a/crates/transaction-pool/src/test_utils/mock.rs +++ b/crates/transaction-pool/src/test_utils/mock.rs @@ -251,8 +251,25 @@ impl MockTransaction { /// Returns a new transaction with a higher gas price +1 pub fn inc_price(&self) -> Self { + self.inc_price_by(1) + } + + /// Returns a new transaction with a higher gas price + pub fn inc_price_by(&self, value: u128) -> Self { let mut next = self.clone(); - let gas = self.get_gas_price().checked_add(1).unwrap(); + let gas = self.get_gas_price().checked_add(value).unwrap(); + next.with_gas_price(gas) + } + + /// Returns a new transaction with a lower gas price -1 + pub fn decr_price(&self) -> Self { + self.decr_price_by(1) + } + + /// Returns a new transaction with a lower gas price + pub fn decr_price_by(&self, value: u128) -> Self { + let mut next = self.clone(); + let gas = self.get_gas_price().checked_sub(value).unwrap(); next.with_gas_price(gas) } @@ -320,10 +337,10 @@ impl PoolTransaction for MockTransaction { self.get_gas_limit() } - fn max_fee_per_gas(&self) -> Option { + fn max_fee_per_gas(&self) -> u128 { match self { - MockTransaction::Legacy { .. } => None, - MockTransaction::Eip1559 { max_fee_per_gas, .. } => Some(*max_fee_per_gas), + MockTransaction::Legacy { gas_price, .. } => *gas_price, + MockTransaction::Eip1559 { max_fee_per_gas, .. } => *max_fee_per_gas, } } @@ -453,6 +470,9 @@ impl MockTransactionFactory { pub fn validated(&mut self, transaction: MockTransaction) -> MockValidTx { self.validated_with_origin(TransactionOrigin::External, transaction) } + pub fn validated_arc(&mut self, transaction: MockTransaction) -> Arc { + Arc::new(self.validated(transaction)) + } /// Converts the transaction into a validated transaction pub fn validated_with_origin( @@ -482,7 +502,7 @@ impl MockTransactionFactory { } } -#[derive(Default)] +#[derive(Clone, Default)] #[non_exhaustive] pub struct MockOrdering; diff --git a/crates/transaction-pool/src/traits.rs b/crates/transaction-pool/src/traits.rs index 3815170e77..4a80a8a24b 100644 --- a/crates/transaction-pool/src/traits.rs +++ b/crates/transaction-pool/src/traits.rs @@ -335,8 +335,10 @@ pub trait PoolTransaction: /// Returns the EIP-1559 Max base fee the caller is willing to pay. /// - /// This will return `None` for non-EIP1559 transactions - fn max_fee_per_gas(&self) -> Option; + /// For legacy transactions this is gas_price. + /// + /// This is also commonly referred to as the "Gas Fee Cap" (`GasFeeCap`). + fn max_fee_per_gas(&self) -> u128; /// Returns the EIP-1559 Priority fee the caller is paying to the block author. /// @@ -425,12 +427,14 @@ impl PoolTransaction for PooledTransaction { /// Returns the EIP-1559 Max base fee the caller is willing to pay. /// - /// This will return `None` for non-EIP1559 transactions - fn max_fee_per_gas(&self) -> Option { + /// For legacy transactions this is gas_price. + /// + /// This is also commonly referred to as the "Gas Fee Cap" (`GasFeeCap`). + fn max_fee_per_gas(&self) -> u128 { match &self.transaction.transaction { - Transaction::Legacy(_) => None, - Transaction::Eip2930(_) => None, - Transaction::Eip1559(tx) => Some(tx.max_fee_per_gas), + Transaction::Legacy(tx) => tx.gas_price, + Transaction::Eip2930(tx) => tx.gas_price, + Transaction::Eip1559(tx) => tx.max_fee_per_gas, } } diff --git a/crates/transaction-pool/src/validate.rs b/crates/transaction-pool/src/validate.rs index f97aa19c36..cf079ae24c 100644 --- a/crates/transaction-pool/src/validate.rs +++ b/crates/transaction-pool/src/validate.rs @@ -206,7 +206,7 @@ where } // Ensure max_priority_fee_per_gas (if EIP1559) is less than max_fee_per_gas if any. - if transaction.max_priority_fee_per_gas() > transaction.max_fee_per_gas() { + if transaction.max_priority_fee_per_gas() > Some(transaction.max_fee_per_gas()) { return TransactionValidationOutcome::Invalid( transaction, InvalidTransactionError::TipAboveFeeCap.into(), @@ -335,10 +335,17 @@ impl ValidPoolTransaction { } /// Returns the EIP-1559 Max base fee the caller is willing to pay. - pub fn max_fee_per_gas(&self) -> Option { + /// + /// For legacy transactions this is gas_price. + pub fn max_fee_per_gas(&self) -> u128 { self.transaction.max_fee_per_gas() } + /// Returns the EIP-1559 Max base fee the caller is willing to pay. + pub fn effective_gas_price(&self) -> u128 { + self.transaction.effective_gas_price() + } + /// Amount of gas that should be used in executing this transaction. This is paid up-front. pub fn gas_limit(&self) -> u64 { self.transaction.gas_limit()