From 0d20d34eaf0f1a1b5767db800e8d2fa4fa56a17f Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Thu, 27 Apr 2023 16:04:18 +0200 Subject: [PATCH] feat(txpool): add missing txpool update checks (#2366) --- crates/transaction-pool/src/pool/mod.rs | 59 ++++-- crates/transaction-pool/src/pool/parked.rs | 1 + crates/transaction-pool/src/pool/pending.rs | 25 +-- crates/transaction-pool/src/pool/state.rs | 6 - crates/transaction-pool/src/pool/txpool.rs | 200 +++++++++++++------- 5 files changed, 185 insertions(+), 106 deletions(-) diff --git a/crates/transaction-pool/src/pool/mod.rs b/crates/transaction-pool/src/pool/mod.rs index ace1465746..9dbfeb3204 100644 --- a/crates/transaction-pool/src/pool/mod.rs +++ b/crates/transaction-pool/src/pool/mod.rs @@ -65,24 +65,31 @@ //! transactions are _currently_ waiting for state changes that eventually move them into //! category (2.) and become pending. -#![allow(dead_code)] // TODO(mattsse): remove once remaining checks implemented - use crate::{ error::{PoolError, PoolResult}, identifier::{SenderId, SenderIdentifiers, TransactionId}, - pool::{listener::PoolEventBroadcast, state::SubPool, txpool::TxPool}, + pool::{ + listener::PoolEventBroadcast, + state::SubPool, + txpool::{SenderInfo, TxPool}, + }, traits::{ BlockInfo, NewTransactionEvent, PoolSize, PoolTransaction, PropagatedTransactions, TransactionOrigin, }, validate::{TransactionValidationOutcome, ValidPoolTransaction}, - CanonicalStateUpdate, PoolConfig, TransactionOrdering, TransactionValidator, + CanonicalStateUpdate, ChangedAccount, PoolConfig, TransactionOrdering, TransactionValidator, }; use best::BestTransactions; pub use events::TransactionEvent; use parking_lot::{Mutex, RwLock}; use reth_primitives::{Address, TxHash, H256}; -use std::{collections::HashSet, fmt, sync::Arc, time::Instant}; +use std::{ + collections::{HashMap, HashSet}, + fmt, + sync::Arc, + time::Instant, +}; use tokio::sync::mpsc; use tracing::warn; @@ -149,6 +156,22 @@ where self.identifiers.write().sender_id_or_create(addr) } + /// Converts the changed accounts to a map of sender ids to sender info (internal identifier + /// used for accounts) + fn changed_senders( + &self, + accs: impl Iterator, + ) -> HashMap { + let mut identifiers = self.identifiers.write(); + accs.into_iter() + .map(|acc| { + let ChangedAccount { address, nonce, balance } = acc; + let sender_id = identifiers.sender_id_or_create(address); + (sender_id, SenderInfo { state_nonce: nonce, balance }) + }) + .collect() + } + /// Get the config the pool was configured with. pub fn config(&self) -> &PoolConfig { &self.config @@ -190,7 +213,24 @@ where /// Updates the entire pool after a new block was executed. pub(crate) fn on_canonical_state_change(&self, update: CanonicalStateUpdate) { - let outcome = self.pool.write().on_canonical_state_change(update); + let CanonicalStateUpdate { + hash, + number, + pending_block_base_fee, + changed_accounts, + mined_transactions, + } = update; + let changed_senders = self.changed_senders(changed_accounts.into_iter()); + let block_info = BlockInfo { + last_seen_block_hash: hash, + last_seen_block_number: number, + pending_basefee: pending_block_base_fee, + }; + let outcome = self.pool.write().on_canonical_state_change( + block_info, + mined_transactions, + changed_senders, + ); self.notify_on_new_state(outcome); } @@ -439,13 +479,6 @@ pub struct AddedPendingTransaction { discarded: Vec, } -impl AddedPendingTransaction { - /// Create a new, empty transaction. - fn new(transaction: Arc>) -> Self { - Self { transaction, promoted: Default::default(), discarded: Default::default() } - } -} - /// Represents a transaction that was added into the pool and its state #[derive(Debug, Clone)] pub enum AddedTransaction { diff --git a/crates/transaction-pool/src/pool/parked.rs b/crates/transaction-pool/src/pool/parked.rs index cd8d94f8e9..2aade3e7dd 100644 --- a/crates/transaction-pool/src/pool/parked.rs +++ b/crates/transaction-pool/src/pool/parked.rs @@ -95,6 +95,7 @@ impl ParkedPool { /// Whether the pool is empty #[cfg(test)] + #[allow(unused)] pub(crate) fn is_empty(&self) -> bool { self.by_id.is_empty() } diff --git a/crates/transaction-pool/src/pool/pending.rs b/crates/transaction-pool/src/pool/pending.rs index 1a27ba6687..c8a03aa900 100644 --- a/crates/transaction-pool/src/pool/pending.rs +++ b/crates/transaction-pool/src/pool/pending.rs @@ -3,7 +3,7 @@ use crate::{ pool::{best::BestTransactions, size::SizeTracker}, TransactionOrdering, ValidPoolTransaction, }; -use reth_primitives::TxHash; + use std::{ cmp::Ordering, collections::{BTreeMap, BTreeSet}, @@ -128,7 +128,7 @@ impl PendingPool { /// Removes a _mined_ transaction from the pool. /// /// If the transactions has a descendant transaction it will advance it to the best queue. - pub(crate) fn remove_mined( + pub(crate) fn prune_transaction( &mut self, id: &TransactionId, ) -> Option>> { @@ -160,11 +160,6 @@ impl PendingPool { id } - /// Returns the transaction for the id if it's in the pool but not yet mined. - pub(crate) fn get(&self, id: &TransactionId) -> Option>> { - self.by_id.get(id).cloned() - } - /// Removes the worst transaction from this pool. pub(crate) fn pop_worst(&mut self) -> Option>> { let worst = self.all.iter().next_back().map(|tx| *tx.transaction.id())?; @@ -182,6 +177,8 @@ impl PendingPool { } /// Whether the pool is empty + #[cfg(test)] + #[allow(unused)] pub(crate) fn is_empty(&self) -> bool { self.by_id.is_empty() } @@ -193,15 +190,6 @@ pub(crate) struct PendingTransaction { pub(crate) transaction: PendingTransactionRef, } -// == impl PendingTransaction === - -impl PendingTransaction { - /// Returns all ids this transaction satisfies. - pub(crate) fn id(&self) -> &TransactionId { - &self.transaction.transaction.transaction_id - } -} - impl Clone for PendingTransaction { fn clone(&self) -> Self { Self { transaction: self.transaction.clone() } @@ -223,11 +211,6 @@ impl PendingTransactionRef { pub(crate) fn unlocks(&self) -> TransactionId { self.transaction.transaction_id.descendant() } - - /// The hash for this transaction - pub(crate) fn hash(&self) -> &TxHash { - self.transaction.hash() - } } impl Clone for PendingTransactionRef { diff --git a/crates/transaction-pool/src/pool/state.rs b/crates/transaction-pool/src/pool/state.rs index e5a759997f..87be686ccd 100644 --- a/crates/transaction-pool/src/pool/state.rs +++ b/crates/transaction-pool/src/pool/state.rs @@ -41,12 +41,6 @@ impl TxState { *self >= TxState::PENDING_POOL_BITS } - /// Returns `true` if the `ENOUGH_FEE_CAP_BLOCK` bit is set. - #[inline] - pub(crate) fn has_enough_fee_cap(&self) -> bool { - self.intersects(TxState::ENOUGH_FEE_CAP_BLOCK) - } - /// Returns `true` if the transaction has a nonce gap. #[inline] pub(crate) fn has_nonce_gap(&self) -> bool { diff --git a/crates/transaction-pool/src/pool/txpool.rs b/crates/transaction-pool/src/pool/txpool.rs index 7728252c93..87e5900a76 100644 --- a/crates/transaction-pool/src/pool/txpool.rs +++ b/crates/transaction-pool/src/pool/txpool.rs @@ -13,8 +13,7 @@ use crate::{ AddedPendingTransaction, AddedTransaction, OnNewCanonicalStateOutcome, }, traits::{BlockInfo, PoolSize}, - CanonicalStateUpdate, ChangedAccount, PoolConfig, PoolResult, PoolTransaction, - TransactionOrdering, ValidPoolTransaction, U256, + PoolConfig, PoolResult, PoolTransaction, TransactionOrdering, ValidPoolTransaction, U256, }; use fnv::FnvHashMap; use reth_primitives::{constants::MIN_PROTOCOL_BASE_FEE, TxHash, H256}; @@ -129,14 +128,6 @@ impl TxPool { } } - /// Updates the pool based on the changed base fee. - /// - /// This enforces the dynamic fee requirement. - pub(crate) fn update_base_fee(&mut self, _new_base_fee: U256) { - // TODO update according to the changed base_fee - todo!() - } - /// Returns an iterator that yields transactions that are ready to be included in the block. pub(crate) fn best_transactions(&self) -> BestTransactions { self.pending_pool.best() @@ -177,29 +168,32 @@ impl TxPool { /// sender allowance. pub(crate) fn on_canonical_state_change( &mut self, - event: CanonicalStateUpdate, + block_info: BlockInfo, + mined_transactions: Vec, + changed_senders: HashMap, ) -> OnNewCanonicalStateOutcome { + // track changed accounts + self.sender_info.extend(changed_senders.clone()); + + // update block info + let block_hash = block_info.last_seen_block_hash; + self.all_transactions.set_block_info(block_info); + // Remove all transaction that were included in the block - for tx_hash in &event.mined_transactions { - if self.remove_transaction_by_hash(tx_hash).is_some() { + for tx_hash in mined_transactions.iter() { + if self.prune_transaction_by_hash(tx_hash).is_some() { // Update removed transactions metric self.metrics.removed_transactions.increment(1); } } // Apply the state changes to the total set of transactions which triggers sub-pool updates. - let updates = - self.all_transactions.update(event.pending_block_base_fee, &event.changed_accounts); + let updates = self.all_transactions.update(changed_senders); // Process the sub-pool updates let UpdateOutcome { promoted, discarded } = self.process_updates(updates); - OnNewCanonicalStateOutcome { - block_hash: event.hash, - mined: event.mined_transactions, - promoted, - discarded, - } + OnNewCanonicalStateOutcome { block_hash, mined: mined_transactions, promoted, discarded } } /// Adds the transaction into the pool. @@ -262,7 +256,7 @@ impl TxPool { // Update invalid transactions metric self.metrics.invalid_transactions.increment(1); match e { - InsertErr::Underpriced { existing, .. } => { + InsertErr::Underpriced { existing, transaction: _ } => { Err(PoolError::ReplacementUnderpriced(existing)) } InsertErr::FeeCapBelowMinimumProtocolFeeCap { transaction, fee_cap } => Err( @@ -321,6 +315,9 @@ impl TxPool { } /// Removes and returns all matching transactions from the pool. + /// + /// Note: this does not advance any descendants of the removed transactions and does not apply + /// any additional updates. pub(crate) fn remove_transactions( &mut self, hashes: impl IntoIterator, @@ -350,6 +347,19 @@ impl TxPool { self.remove_from_subpool(pool, tx.id()) } + /// This removes the transaction from the pool and advances any descendant state inside the + /// subpool. + /// + /// This is intended to be used when a transaction is included in a block, + /// [Self::on_canonical_state_change] + fn prune_transaction_by_hash( + &mut self, + tx_hash: &H256, + ) -> Option>> { + let (tx, pool) = self.all_transactions.remove_transaction_by_hash(tx_hash)?; + self.prune_from_subpool(pool, tx.id()) + } + /// Removes the transaction from the given pool. /// /// Caution: this only removes the tx from the sub-pool and not from the pool itself @@ -365,6 +375,20 @@ impl TxPool { } } + /// Removes the transaction from the given pool and advance sub-pool internal state, with the + /// expectation that the given transaction is included in a block. + fn prune_from_subpool( + &mut self, + pool: SubPool, + tx: &TransactionId, + ) -> Option>> { + match pool { + SubPool::Queued => self.queued_pool.remove_transaction(tx), + SubPool::Pending => self.pending_pool.prune_transaction(tx), + SubPool::BaseFee => self.basefee_pool.remove_transaction(tx), + } + } + /// Removes _only_ the descendants of the given transaction from the entire pool. /// /// All removed transactions are added to the `removed` vec. @@ -571,6 +595,15 @@ impl AllTransactions { } } + /// Updates the block specific info + fn set_block_info(&mut self, block_info: BlockInfo) { + let BlockInfo { last_seen_block_hash, last_seen_block_number, pending_basefee } = + block_info; + self.last_seen_block_number = last_seen_block_number; + self.last_seen_block_hash = last_seen_block_hash; + self.pending_basefee = pending_basefee; + } + /// Rechecks all transactions in the pool against the changes. /// /// Possible changes are: @@ -588,14 +621,10 @@ impl AllTransactions { /// that got transaction included in the block. pub(crate) fn update( &mut self, - pending_block_base_fee: u128, - _changed_accounts: &[ChangedAccount], + changed_accounts: HashMap, ) -> Vec { - // update new basefee - self.pending_basefee = pending_block_base_fee; - - // TODO(mattsse): probably good idea to allocate some capacity here. - let mut updates = Vec::new(); + // pre-allocate a few updates + let mut updates = Vec::with_capacity(64); let mut iter = self.txs.iter_mut().peekable(); @@ -608,7 +637,7 @@ impl AllTransactions { // The `unique_sender` loop will process the first transaction of all senders, update its // state and internally update all consecutive transactions - 'unique_sender: while let Some((id, tx)) = iter.next() { + 'transactions: while let Some((id, tx)) = iter.next() { // Advances the iterator to the next sender macro_rules! next_sender { ($iter:ident) => { @@ -620,36 +649,85 @@ impl AllTransactions { } }; } - // If there's a nonce gap, we can shortcircuit, because there's nothing to update. - if tx.state.has_nonce_gap() { - next_sender!(iter); - continue + + // tracks the balance if the sender was changed in the block + let mut changed_balance = None; + + // check if this is a changed account + if let Some(info) = changed_accounts.get(&id.sender) { + // discard all transactions with a nonce lower than the current state nonce + if id.nonce < info.state_nonce { + updates.push(PoolUpdate { + id: *tx.transaction.id(), + hash: *tx.transaction.hash(), + current: tx.subpool, + destination: Destination::Discard, + }); + continue 'transactions + } + + let ancestor = TransactionId::ancestor(id.nonce, info.state_nonce, id.sender); + // If there's no ancestor then this is the next transaction. + if ancestor.is_none() { + tx.state.insert(TxState::NO_NONCE_GAPS); + tx.state.insert(TxState::NO_PARKED_ANCESTORS); + tx.cumulative_cost = U256::ZERO; + if tx.transaction.cost > info.balance { + // sender lacks sufficient funds to pay for this transaction + tx.state.remove(TxState::ENOUGH_BALANCE); + } else { + tx.state.insert(TxState::ENOUGH_BALANCE); + } + } + + changed_balance = Some(info.balance); } - // TODO(mattsse): if account has balance changes or mined transactions the balance needs - // to be checked here + // If there's a nonce gap, we can shortcircuit, because there's nothing to update yet. + if tx.state.has_nonce_gap() { + next_sender!(iter); + continue 'transactions + } // Since this is the first transaction of the sender, it has no parked ancestors tx.state.insert(TxState::NO_PARKED_ANCESTORS); // Update the first transaction of this sender. - Self::update_base_fee(&pending_block_base_fee, tx); + Self::update_tx_base_fee(&self.pending_basefee, tx); // Track if the transaction's sub-pool changed. Self::record_subpool_update(&mut updates, tx); // Track blocking transactions. let mut has_parked_ancestor = !tx.state.is_pending(); + let mut cumulative_cost = tx.next_cumulative_cost(); + // Update all consecutive transaction of this sender while let Some((peek, ref mut tx)) = iter.peek_mut() { if peek.sender != id.sender { // Found the next sender - continue 'unique_sender + continue 'transactions } + // can short circuit if tx.state.has_nonce_gap() { next_sender!(iter); - continue 'unique_sender + continue 'transactions + } + + // update cumulative cost + tx.cumulative_cost = cumulative_cost; + // Update for next transaction + cumulative_cost = tx.next_cumulative_cost(); + + // If the account changed in the block, check the balance. + if let Some(changed_balance) = changed_balance { + if cumulative_cost > changed_balance { + // sender lacks sufficient funds to pay for this transaction + tx.state.remove(TxState::ENOUGH_BALANCE); + } else { + tx.state.insert(TxState::ENOUGH_BALANCE); + } } // Update ancestor condition. @@ -661,7 +739,7 @@ impl AllTransactions { has_parked_ancestor = !tx.state.is_pending(); // Update and record sub-pool changes. - Self::update_base_fee(&pending_block_base_fee, tx); + Self::update_tx_base_fee(&self.pending_basefee, tx); Self::record_subpool_update(&mut updates, tx); // Advance iterator @@ -690,7 +768,7 @@ impl AllTransactions { } /// Rechecks the transaction's dynamic fee condition. - fn update_base_fee(pending_block_base_fee: &u128, tx: &mut PoolInternalTransaction) { + fn update_tx_base_fee(pending_block_base_fee: &u128, tx: &mut PoolInternalTransaction) { // Recheck dynamic fee condition. if let Some(fee_cap) = tx.transaction.max_fee_per_gas() { match fee_cap.cmp(pending_block_base_fee) { @@ -738,22 +816,12 @@ impl AllTransactions { self.txs.range((Excluded(id), Unbounded)).take_while(|(other, _)| id.sender == other.sender) } - /// Returns all transactions that _follow_ after the given id but have the same sender. - /// - /// NOTE: The range is _exclusive_ - pub(crate) fn descendant_txs_exclusive_mut<'a, 'b: 'a>( - &'a mut self, - id: &'b TransactionId, - ) -> impl Iterator)> + '_ { - self.txs - .range_mut((Excluded(id), Unbounded)) - .take_while(|(other, _)| id.sender == other.sender) - } - /// Returns all transactions that _follow_ after the given id but have the same sender. /// /// NOTE: The range is _inclusive_: if the transaction that belongs to `id` it field be the /// first value. + #[cfg(test)] + #[allow(unused)] pub(crate) fn descendant_txs<'a, 'b: 'a>( &'a self, id: &'b TransactionId, @@ -854,11 +922,11 @@ impl AllTransactions { let mut cumulative_cost = U256::ZERO; let mut updates = Vec::new(); - let predecessor = + let ancestor = TransactionId::ancestor(transaction.transaction.nonce(), on_chain_nonce, tx_id.sender); - // If there's no predecessor then this is the next transaction. - if predecessor.is_none() { + // If there's no ancestor tx then this is the next transaction. + if ancestor.is_none() { state.insert(TxState::NO_NONCE_GAPS); state.insert(TxState::NO_PARKED_ANCESTORS); } @@ -926,7 +994,7 @@ impl AllTransactions { // We need to find out if the next transaction of the sender is considered pending // - let mut has_parked_ancestor = if predecessor.is_none() { + let mut has_parked_ancestor = if ancestor.is_none() { // the new transaction is the next one false } else { @@ -1046,7 +1114,11 @@ pub(crate) type InsertResult = Result, InsertErr>; #[derive(Debug)] pub(crate) enum InsertErr { /// Attempted to replace existing transaction, but was underpriced - Underpriced { transaction: Arc>, existing: TxHash }, + Underpriced { + #[allow(unused)] + transaction: Arc>, + existing: TxHash, + }, /// The transactions feeCap is lower than the chain's minimum fee requirement. /// /// See also [`MIN_PROTOCOL_BASE_FEE`] @@ -1071,6 +1143,7 @@ pub(crate) struct InsertOk { /// Where to move the transaction to. move_to: SubPool, /// Current state of the inserted tx. + #[allow(unused)] state: TxState, /// The transaction that was replaced by this. replaced_tx: Option<(Arc>, SubPool)>, @@ -1143,21 +1216,16 @@ impl fmt::Debug for PruneResult { /// Stores relevant context about a sender. #[derive(Debug, Clone, Default)] -struct SenderInfo { +pub(crate) struct SenderInfo { /// current nonce of the sender. - state_nonce: u64, + pub(crate) state_nonce: u64, /// Balance of the sender at the current point. - balance: U256, + pub(crate) balance: U256, } // === impl SenderInfo === impl SenderInfo { - /// Creates a new entry for an incoming, not yet tracked sender. - fn new_incoming(state_nonce: u64, balance: U256) -> Self { - Self { state_nonce, balance } - } - /// Updates the info with the new values. fn update(&mut self, state_nonce: u64, balance: U256) { *self = Self { state_nonce, balance };