From 15992d9bdf432ce6860a6a728db05bfeeb80b48d Mon Sep 17 00:00:00 2001 From: chirag-bgh <76247491+chirag-bgh@users.noreply.github.com> Date: Thu, 30 Nov 2023 00:14:50 +0530 Subject: [PATCH] feat: discard txs by tx_count of sender (#4520) Co-authored-by: Dan Cline <6798349+Rjected@users.noreply.github.com> --- crates/transaction-pool/Cargo.toml | 5 + crates/transaction-pool/benches/truncate.rs | 198 ++++++++++ crates/transaction-pool/src/pool/mod.rs | 2 + crates/transaction-pool/src/pool/parked.rs | 332 +++++++++++++++-- crates/transaction-pool/src/pool/pending.rs | 337 ++++++++++++++++-- crates/transaction-pool/src/pool/txpool.rs | 15 +- .../transaction-pool/src/test_utils/mock.rs | 8 +- 7 files changed, 842 insertions(+), 55 deletions(-) create mode 100644 crates/transaction-pool/benches/truncate.rs diff --git a/crates/transaction-pool/Cargo.toml b/crates/transaction-pool/Cargo.toml index 52ceaf9374..5d00994116 100644 --- a/crates/transaction-pool/Cargo.toml +++ b/crates/transaction-pool/Cargo.toml @@ -74,6 +74,11 @@ optimism = [ "revm/optimism", ] +[[bench]] +name = "truncate" +required-features = ["test-utils", "arbitrary"] +harness = false + [[bench]] name = "reorder" required-features = ["test-utils", "arbitrary"] diff --git a/crates/transaction-pool/benches/truncate.rs b/crates/transaction-pool/benches/truncate.rs new file mode 100644 index 0000000000..e96ed753b3 --- /dev/null +++ b/crates/transaction-pool/benches/truncate.rs @@ -0,0 +1,198 @@ +use criterion::{ + criterion_group, criterion_main, measurement::WallTime, BenchmarkGroup, Criterion, +}; +use proptest::{ + prelude::*, + strategy::{Strategy, ValueTree}, + test_runner::{RngAlgorithm, TestRng, TestRunner}, +}; +use reth_primitives::{hex_literal::hex, Address}; +use reth_transaction_pool::{ + pool::{BasefeeOrd, ParkedPool, PendingPool}, + test_utils::{MockOrdering, MockTransaction, MockTransactionFactory}, + SubPoolLimit, +}; + +// constant seed to use for the rng +const SEED: [u8; 32] = hex!("1337133713371337133713371337133713371337133713371337133713371337"); + +/// Generates a set of `depth` dependent transactions, with the specified sender. Its values are +/// generated using [Arbitrary]. +fn create_transactions_for_sender( + mut runner: TestRunner, + sender: Address, + depth: usize, +) -> Vec { + // TODO: for blob truncate, this would need a flag for _only_ generating 4844 mock transactions + + // assert that depth is always greater than zero, since empty vecs do not really make sense in + // this context + assert!(depth > 0); + + // make sure these are all post-eip-1559 transactions + let mut txs = prop::collection::vec(any::(), depth) + .new_tree(&mut runner) + .unwrap() + .current(); + + let mut nonce = 0; + for tx in txs.iter_mut() { + // reject pre-eip1559 tx types, if there is a legacy tx, replace it with an eip1559 tx + if tx.is_legacy() || tx.is_eip2930() { + *tx = MockTransaction::eip1559(); + + // set fee values using arbitrary + tx.set_priority_fee(any::().new_tree(&mut runner).unwrap().current()); + tx.set_max_fee(any::().new_tree(&mut runner).unwrap().current()); + } + + tx.set_sender(sender); + tx.set_nonce(nonce); + nonce += 1; + } + + txs +} + +/// Generates many transactions, each with a different sender. The number of transactions per +/// sender is generated using [Arbitrary]. The number of senders is specified by `senders`. +/// +/// Because this uses [Arbitrary], the number of transactions per sender needs to be bounded. This +/// is done by using the `max_depth` parameter. +/// +/// This uses [create_transactions_for_sender] to generate the transactions. +fn generate_many_transactions(senders: usize, max_depth: usize) -> Vec { + let config = ProptestConfig::default(); + let rng = TestRng::from_seed(RngAlgorithm::ChaCha, &SEED); + let mut runner = TestRunner::new_with_rng(config, rng); + + let mut txs = Vec::new(); + for idx in 0..senders { + // modulo max_depth so we know it is bounded, plus one so the minimum is always 1 + let depth = any::().new_tree(&mut runner).unwrap().current() % max_depth + 1; + + // set sender to an Address determined by the sender index. This should make any necessary + // debugging easier. + let idx_slice = idx.to_be_bytes(); + + // pad with 12 bytes of zeros before rest + let addr_slice = [0u8; 12].into_iter().chain(idx_slice.into_iter()).collect::>(); + + let sender = Address::from_slice(&addr_slice); + txs.extend(create_transactions_for_sender(runner.clone(), sender, depth)); + } + + txs +} + +fn txpool_truncate(c: &mut Criterion) { + let mut group = c.benchmark_group("Transaction Pool Truncate"); + + // the first few benchmarks (5, 10, 20, 100) should cause the txpool to hit the max tx limit, + // so they are there to make sure we do not regress on best-case performance. + // + // the last few benchmarks (1000, 2000) should hit the max tx limit, at least for large enough + // depth, so these should benchmark closer to real-world performance + for senders in [5, 10, 20, 100, 1000, 2000] { + // the max we'll be benching is 20, because MAX_ACCOUNT_SLOTS so far is 16. So 20 should be + // a reasonable worst-case benchmark + for max_depth in [5, 10, 20] { + println!("Generating transactions for benchmark with {senders} unique senders and a max depth of {max_depth}..."); + let txs = generate_many_transactions(senders, max_depth); + + // benchmark parked pool + truncate_parked(&mut group, "ParkedPool", txs.clone(), senders, max_depth); + + // benchmark pending pool + truncate_pending(&mut group, "PendingPool", txs, senders, max_depth); + + // TODO: benchmark blob truncate + } + } + + let large_senders = 5000; + let max_depth = 16; + + // let's run a benchmark that includes a large number of senders and max_depth of 16 to ensure + // we hit the TXPOOL_SUBPOOL_MAX_TXS_DEFAULT limit, which is currently 10k + println!("Generating transactions for large benchmark with {large_senders} unique senders and a max depth of {max_depth}..."); + let txs = generate_many_transactions(large_senders, max_depth); + + // benchmark parked + truncate_parked(&mut group, "ParkedPool", txs.clone(), large_senders, max_depth); + + // benchmark pending + truncate_pending(&mut group, "PendingPool", txs, large_senders, max_depth); +} + +fn truncate_pending( + group: &mut BenchmarkGroup, + description: &str, + seed: Vec, + senders: usize, + max_depth: usize, +) { + let setup = || { + let mut txpool = PendingPool::new(MockOrdering::default()); + let mut f = MockTransactionFactory::default(); + + for tx in seed.iter() { + // add transactions with a basefee of zero, so they are not immediately removed + txpool.add_transaction(f.validated_arc(tx.clone()), 0); + } + txpool + }; + + let group_id = format!( + "txpool | total txs: {} | total senders: {} | max depth: {} | {}", + seed.len(), + senders, + max_depth, + description, + ); + + // for now we just use the default SubPoolLimit + group.bench_function(group_id, |b| { + b.iter_with_setup(setup, |mut txpool| { + txpool.truncate_pool(SubPoolLimit::default()); + std::hint::black_box(()); + }); + }); +} + +fn truncate_parked( + group: &mut BenchmarkGroup, + description: &str, + seed: Vec, + senders: usize, + max_depth: usize, +) { + let setup = || { + let mut txpool = ParkedPool::>::default(); + let mut f = MockTransactionFactory::default(); + + for tx in seed.iter() { + txpool.add_transaction(f.validated_arc(tx.clone())); + } + txpool + }; + + let group_id = format!( + "txpool | total txs: {} | total senders: {} | max depth: {} | {}", + seed.len(), + senders, + max_depth, + description, + ); + + // for now we just use the default SubPoolLimit + group.bench_function(group_id, |b| { + b.iter_with_setup(setup, |mut txpool| { + txpool.truncate_pool(SubPoolLimit::default()); + std::hint::black_box(()); + }); + }); +} + +criterion_group!(truncate, txpool_truncate); +criterion_main!(truncate); diff --git a/crates/transaction-pool/src/pool/mod.rs b/crates/transaction-pool/src/pool/mod.rs index cc60955470..1b7717a8fc 100644 --- a/crates/transaction-pool/src/pool/mod.rs +++ b/crates/transaction-pool/src/pool/mod.rs @@ -107,6 +107,8 @@ use crate::{ }; use alloy_rlp::Encodable; pub use listener::{AllTransactionsEvents, TransactionEvents}; +pub use parked::{BasefeeOrd, ParkedOrd, ParkedPool}; +pub use pending::PendingPool; mod best; mod blob; diff --git a/crates/transaction-pool/src/pool/parked.rs b/crates/transaction-pool/src/pool/parked.rs index fa741d32f2..da65638134 100644 --- a/crates/transaction-pool/src/pool/parked.rs +++ b/crates/transaction-pool/src/pool/parked.rs @@ -1,8 +1,14 @@ use crate::{ - identifier::TransactionId, pool::size::SizeTracker, PoolTransaction, ValidPoolTransaction, + identifier::{SenderId, TransactionId}, + pool::size::SizeTracker, + PoolTransaction, SubPoolLimit, ValidPoolTransaction, +}; +use std::{ + cmp::Ordering, + collections::{BTreeMap, BTreeSet, BinaryHeap}, + ops::{Bound::Unbounded, Deref}, + sync::Arc, }; -use fnv::FnvHashMap; -use std::{cmp::Ordering, collections::BTreeSet, ops::Deref, sync::Arc}; /// A pool of transactions that are currently parked and are waiting for external changes (e.g. /// basefee, ancestor transactions, balance) that eventually move the transaction into the pending @@ -13,14 +19,15 @@ use std::{cmp::Ordering, collections::BTreeSet, ops::Deref, sync::Arc}; /// /// Note: This type is generic over [ParkedPool] which enforces that the underlying transaction type /// is [ValidPoolTransaction] wrapped in an [Arc]. +#[allow(missing_debug_implementations)] #[derive(Clone)] -pub(crate) struct ParkedPool { +pub struct ParkedPool { /// Keeps track of transactions inserted in the pool. /// /// This way we can determine when transactions were submitted to the pool. submission_id: u64, /// _All_ Transactions that are currently inside the pool grouped by their identifier. - by_id: FnvHashMap>, + by_id: BTreeMap>, /// All transactions sorted by their order function. /// /// The higher, the better. @@ -39,7 +46,7 @@ impl ParkedPool { /// # Panics /// /// If the transaction is already included. - pub(crate) fn add_transaction(&mut self, tx: Arc>) { + pub fn add_transaction(&mut self, tx: Arc>) { let id = *tx.id(); assert!( !self.by_id.contains_key(&id), @@ -51,6 +58,7 @@ impl ParkedPool { // keep track of size self.size_of += tx.size(); + // update or create sender entry let transaction = ParkedPoolTransaction { submission_id, transaction: tx.into() }; self.by_id.insert(id, transaction.clone()); @@ -79,10 +87,106 @@ impl ParkedPool { Some(tx.transaction.into()) } - /// Removes the worst transaction from this pool. - pub(crate) fn pop_worst(&mut self) -> Option>> { - let worst = self.best.iter().next().map(|tx| *tx.transaction.id())?; - self.remove_transaction(&worst) + /// Get transactions by sender + pub(crate) fn get_txs_by_sender(&self, sender: SenderId) -> Vec { + self.by_id + .range((sender.start_bound(), Unbounded)) + .take_while(move |(other, _)| sender == other.sender) + .map(|(_, tx)| *tx.transaction.id()) + .collect() + } + + /// Returns sender ids sorted by each sender's last submission id. Senders with older last + /// submission ids are first. Note that _last_ submission ids are the newest submission id for + /// that sender, so this sorts senders by the last time they submitted a transaction in + /// descending order. Senders that have least recently submitted a transaction are first. + /// + /// Similar to `Heartbeat` in Geth + pub fn get_senders_by_submission_id(&self) -> Vec { + // iterate through by_id, and get the last submission id for each sender + let senders = self + .by_id + .iter() + .fold(Vec::new(), |mut set: Vec, (_, tx)| { + if let Some(last) = set.last_mut() { + // sort by last + if last.sender_id == tx.transaction.sender_id() { + if last.submission_id < tx.submission_id { + // update last submission id + last.submission_id = tx.submission_id; + } + } else { + // new entry + set.push(SubmissionSenderId::new( + tx.transaction.sender_id(), + tx.submission_id, + )); + } + } else { + // first entry + set.push(SubmissionSenderId::new(tx.transaction.sender_id(), tx.submission_id)); + } + set + }) + .into_iter() + // sort by submission id + .collect::>(); + + // sort s.t. senders with older submission ids are first + senders.into_sorted_vec() + } + + /// Truncates the pool by removing transactions, until the given [SubPoolLimit] has been met. + /// + /// This is done by first ordering senders by the last time they have submitted a transaction, + /// using [get_senders_by_submission_id](ParkedPool::get_senders_by_submission_id) to determine + /// this ordering. + /// + /// Then, for each sender, all transactions for that sender are removed, until the pool limits + /// have been met. + /// + /// Any removed transactions are returned. + pub fn truncate_pool( + &mut self, + limit: SubPoolLimit, + ) -> Vec>> { + if self.len() <= limit.max_txs { + // if we are below the limits, we don't need to drop anything + return Vec::new() + } + + let mut removed = Vec::new(); + let mut sender_ids = self.get_senders_by_submission_id(); + let queued = self.len(); + let mut drop = queued - limit.max_txs; + + while drop > 0 && !sender_ids.is_empty() { + // SAFETY: This will not panic due to `!addresses.is_empty()` + let sender_id = sender_ids.pop().unwrap().sender_id; + let mut list = self.get_txs_by_sender(sender_id); + + // Drop all transactions if they are less than the overflow + if list.len() <= drop { + for txid in &list { + if let Some(tx) = self.remove_transaction(txid) { + removed.push(tx); + } + } + drop -= list.len(); + continue + } + + // Otherwise drop only last few transactions + // SAFETY: This will not panic because `list.len() > drop` + for txid in list.split_off(drop) { + if let Some(tx) = self.remove_transaction(&txid) { + removed.push(tx); + } + drop -= 1; + } + } + + removed } fn next_id(&mut self) -> u64 { @@ -226,10 +330,40 @@ impl Ord for ParkedPoolTransaction { } } +/// Includes a [SenderId] and `submission_id`. This is used to sort senders by their last +/// submission id. +#[derive(Debug, PartialEq, Eq, Clone)] +pub struct SubmissionSenderId { + /// The sender id + pub(crate) sender_id: SenderId, + /// The submission id + pub(crate) submission_id: u64, +} + +impl SubmissionSenderId { + /// Creates a new [SubmissionSenderId] based on the [SenderId] and `submission_id`. + fn new(sender_id: SenderId, submission_id: u64) -> Self { + Self { sender_id, submission_id } + } +} + +impl Ord for SubmissionSenderId { + fn cmp(&self, other: &Self) -> Ordering { + // Reverse ordering for `submission_id` + other.submission_id.cmp(&self.submission_id) + } +} + +impl PartialOrd for SubmissionSenderId { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + /// Helper trait used for custom `Ord` wrappers around a transaction. /// /// This is effectively a wrapper for `Arc` with custom `Ord` implementation. -pub(crate) trait ParkedOrd: +pub trait ParkedOrd: Ord + Clone + From>> @@ -294,7 +428,7 @@ macro_rules! impl_ord_wrapper { /// /// Caution: This assumes all transaction in the `BaseFee` sub-pool have a fee value. #[derive(Debug)] -pub(crate) struct BasefeeOrd(Arc>); +pub struct BasefeeOrd(Arc>); impl_ord_wrapper!(BasefeeOrd); @@ -332,6 +466,8 @@ impl Ord for QueuedOrd { mod tests { use super::*; use crate::test_utils::{MockTransaction, MockTransactionFactory}; + use reth_primitives::address; + use std::collections::HashSet; #[test] fn test_enforce_parked_basefee() { @@ -359,7 +495,7 @@ mod tests { let root_tx = f.validated_arc(t.clone()); pool.add_transaction(root_tx.clone()); - let descendant_tx = f.validated_arc(t.inc_nonce().inc_price()); + let descendant_tx = f.validated_arc(t.inc_nonce().decr_price()); pool.add_transaction(descendant_tx.clone()); assert!(pool.by_id.contains_key(root_tx.id())); @@ -368,22 +504,178 @@ mod tests { let removed = pool.enforce_basefee(u64::MAX); assert!(removed.is_empty()); - + assert_eq!(pool.len(), 2); // two dependent tx in the pool with decreasing fee { + // TODO: test change might not be intended, re review let mut pool2 = pool.clone(); - let removed = pool2.enforce_basefee(descendant_tx.max_fee_per_gas() as u64); + let removed = pool2.enforce_basefee(root_tx.max_fee_per_gas() as u64); assert_eq!(removed.len(), 1); assert_eq!(pool2.len(), 1); - // descendant got popped - assert!(pool2.by_id.contains_key(root_tx.id())); - assert!(!pool2.by_id.contains_key(descendant_tx.id())); + // root got popped - descendant should be skipped + assert!(!pool2.by_id.contains_key(root_tx.id())); + assert!(pool2.by_id.contains_key(descendant_tx.id())); } - // remove root transaction via root tx fee - let removed = pool.enforce_basefee(root_tx.max_fee_per_gas() as u64); + // remove root transaction via descendant tx fee + let removed = pool.enforce_basefee(descendant_tx.max_fee_per_gas() as u64); assert_eq!(removed.len(), 2); assert!(pool.is_empty()); } + + #[test] + fn truncate_parked_by_submission_id() { + // this test ensures that we evict from the pending pool by sender + let mut f = MockTransactionFactory::default(); + let mut pool = ParkedPool::>::default(); + + let a = address!("000000000000000000000000000000000000000a"); + let b = address!("000000000000000000000000000000000000000b"); + let c = address!("000000000000000000000000000000000000000c"); + let d = address!("000000000000000000000000000000000000000d"); + + // TODO: make creating these mock tx chains easier + // create a chain of transactions by sender A, B, C + let a1 = MockTransaction::eip1559().with_sender(a); + let a2 = a1.next(); + let a3 = a2.next(); + let a4 = a3.next(); + + let b1 = MockTransaction::eip1559().with_sender(b); + let b2 = b1.next(); + let b3 = b2.next(); + + // C has the same number of txs as B + let c1 = MockTransaction::eip1559().with_sender(c); + let c2 = c1.next(); + let c3 = c2.next(); + + let d1 = MockTransaction::eip1559().with_sender(d); + + // just construct a list of all txs to add + let expected_parked = vec![c1.clone(), c2.clone(), c3.clone(), d1.clone()] + .into_iter() + .map(|tx| (tx.sender(), tx.nonce())) + .collect::>(); + + // we expect the truncate operation to go through the senders with the most txs, removing + // txs based on when they were submitted, removing the oldest txs first, until the pool is + // not over the limit + let expected_removed = vec![ + a1.clone(), + a2.clone(), + a3.clone(), + a4.clone(), + b1.clone(), + b2.clone(), + b3.clone(), + ] + .into_iter() + .map(|tx| (tx.sender(), tx.nonce())) + .collect::>(); + let all_txs = vec![a1, a2, a3, a4, b1, b2, b3, c1, c2, c3, d1]; + + // add all the transactions to the pool + for tx in all_txs { + pool.add_transaction(f.validated_arc(tx)); + } + + // we should end up with the most recently submitted transactions + let pool_limit = SubPoolLimit { max_txs: 4, max_size: usize::MAX }; + + // truncate the pool + let removed = pool.truncate_pool(pool_limit); + assert_eq!(removed.len(), expected_removed.len()); + + // get the inner txs from the removed txs + let removed = + removed.into_iter().map(|tx| (tx.sender(), tx.nonce())).collect::>(); + assert_eq!(removed, expected_removed); + + // get the parked pool + let parked = pool.all().collect::>(); + assert_eq!(parked.len(), expected_parked.len()); + + // get the inner txs from the parked txs + let parked = parked.into_iter().map(|tx| (tx.sender(), tx.nonce())).collect::>(); + assert_eq!(parked, expected_parked); + } + + #[test] + fn test_senders_by_submission_id() { + // this test ensures that we evict from the pending pool by sender + let mut f = MockTransactionFactory::default(); + let mut pool = ParkedPool::>::default(); + + let a = address!("000000000000000000000000000000000000000a"); + let b = address!("000000000000000000000000000000000000000b"); + let c = address!("000000000000000000000000000000000000000c"); + let d = address!("000000000000000000000000000000000000000d"); + + // create a chain of transactions by sender A, B, C + let a1 = MockTransaction::eip1559().with_sender(a); + let a2 = a1.next(); + let a3 = a2.next(); + let a4 = a3.next(); + + let b1 = MockTransaction::eip1559().with_sender(b); + let b2 = b1.next(); + let b3 = b2.next(); + + // C has the same number of txs as B + let c1 = MockTransaction::eip1559().with_sender(c); + let c2 = c1.next(); + let c3 = c2.next(); + + let d1 = MockTransaction::eip1559().with_sender(d); + + let all_txs = vec![ + a1.clone(), + a2.clone(), + a3.clone(), + a4.clone(), + b1.clone(), + b2.clone(), + b3.clone(), + c1.clone(), + c2.clone(), + c3.clone(), + d1.clone(), + ]; + + // add all the transactions to the pool + for tx in all_txs { + pool.add_transaction(f.validated_arc(tx)); + } + + // get senders by submission id - a4, b3, c3, d1, reversed + let senders = pool + .get_senders_by_submission_id() + .into_iter() + .map(|s| s.sender_id) + .collect::>(); + assert_eq!(senders.len(), 4); + let expected_senders = + vec![d, c, b, a].into_iter().map(|s| f.ids.sender_id(&s).unwrap()).collect::>(); + assert_eq!(senders, expected_senders); + + let mut pool = ParkedPool::>::default(); + let all_txs = vec![a1, b1, c1, d1, a2, b2, c2, a3, b3, c3, a4]; + + // add all the transactions to the pool + for tx in all_txs { + pool.add_transaction(f.validated_arc(tx)); + } + + let senders = pool + .get_senders_by_submission_id() + .into_iter() + .map(|s| s.sender_id) + .collect::>(); + assert_eq!(senders.len(), 4); + let expected_senders = + vec![a, c, b, d].into_iter().map(|s| f.ids.sender_id(&s).unwrap()).collect::>(); + assert_eq!(senders, expected_senders); + } } diff --git a/crates/transaction-pool/src/pool/pending.rs b/crates/transaction-pool/src/pool/pending.rs index 31acc91327..34c2d0f711 100644 --- a/crates/transaction-pool/src/pool/pending.rs +++ b/crates/transaction-pool/src/pool/pending.rs @@ -1,7 +1,7 @@ use crate::{ identifier::TransactionId, pool::{best::BestTransactions, size::SizeTracker}, - Priority, TransactionOrdering, ValidPoolTransaction, + Priority, SubPoolLimit, TransactionOrdering, ValidPoolTransaction, }; use crate::pool::best::BestTransactionsWithBasefee; @@ -22,8 +22,9 @@ use tokio::sync::broadcast; /// /// Once an `independent` transaction was executed it *unlocks* the next nonce, if this transaction /// is also pending, then this will be moved to the `independent` queue. +#[allow(missing_debug_implementations)] #[derive(Clone)] -pub(crate) struct PendingPool { +pub struct PendingPool { /// How to order transactions. ordering: T, /// Keeps track of transactions inserted in the pool. @@ -34,6 +35,11 @@ pub(crate) struct PendingPool { by_id: BTreeMap>, /// _All_ transactions sorted by priority all: BTreeSet>, + /// The highest nonce transactions for each sender - like the `independent` set, but the + /// highest instead of lowest nonce. + /// + /// Sorted by their scoring value. + highest_nonces: BTreeSet>, /// Independent transactions that can be included directly and don't require other /// transactions. /// @@ -52,7 +58,7 @@ pub(crate) struct PendingPool { impl PendingPool { /// Create a new pool instance. - pub(crate) fn new(ordering: T) -> Self { + pub fn new(ordering: T) -> Self { let (new_transaction_notifier, _) = broadcast::channel(200); Self { ordering, @@ -60,6 +66,7 @@ impl PendingPool { by_id: Default::default(), all: Default::default(), independent_transactions: Default::default(), + highest_nonces: Default::default(), size_of: Default::default(), new_transaction_notifier, } @@ -73,6 +80,7 @@ impl PendingPool { /// Returns all transactions by id. fn clear_transactions(&mut self) -> BTreeMap> { self.independent_transactions.clear(); + self.highest_nonces.clear(); self.all.clear(); self.size_of.reset(); std::mem::take(&mut self.by_id) @@ -184,9 +192,7 @@ impl PendingPool { } } else { self.size_of += tx.transaction.size(); - if self.ancestor(&id).is_none() { - self.independent_transactions.insert(tx.clone()); - } + self.update_independents_and_highest_nonces(&tx, &id); self.all.insert(tx.clone()); self.by_id.insert(id, tx); } @@ -232,9 +238,7 @@ impl PendingPool { tx.priority = self.ordering.priority(&tx.transaction.transaction, base_fee); self.size_of += tx.transaction.size(); - if self.ancestor(&id).is_none() { - self.independent_transactions.insert(tx.clone()); - } + self.update_independents_and_highest_nonces(&tx, &id); self.all.insert(tx.clone()); self.by_id.insert(id, tx); } @@ -243,6 +247,27 @@ impl PendingPool { removed } + /// Updates the independent transaction and highest nonces set, assuming the given transaction + /// is being _added_ to the pool. + fn update_independents_and_highest_nonces( + &mut self, + tx: &PendingTransaction, + tx_id: &TransactionId, + ) { + let ancestor_id = tx_id.unchecked_ancestor(); + if let Some(ancestor) = ancestor_id.and_then(|id| self.by_id.get(&id)) { + // the transaction already has an ancestor, so we only need to ensure that the + // highest nonces set actually contains the highest nonce for that sender + self.highest_nonces.remove(ancestor); + self.highest_nonces.insert(tx.clone()); + } else { + // If there's __no__ ancestor in the pool, then this transaction is independent, this is + // guaranteed because this pool is gapless. + self.independent_transactions.insert(tx.clone()); + self.highest_nonces.insert(tx.clone()); + } + } + /// Returns the ancestor the given transaction, the transaction with `nonce - 1`. /// /// Note: for a transaction with nonce higher than the current on chain nonce this will always @@ -256,7 +281,7 @@ impl PendingPool { /// # Panics /// /// if the transaction is already included - pub(crate) fn add_transaction( + pub fn add_transaction( &mut self, tx: Arc>, base_fee: u64, @@ -276,11 +301,7 @@ impl PendingPool { let priority = self.ordering.priority(&tx.transaction, base_fee); let tx = PendingTransaction { submission_id, transaction: tx, priority }; - // If there's __no__ ancestor in the pool, then this transaction is independent, this is - // guaranteed because this pool is gapless. - if self.ancestor(&tx_id).is_none() { - self.independent_transactions.insert(tx.clone()); - } + self.update_independents_and_highest_nonces(&tx, &tx_id); self.all.insert(tx.clone()); // send the new transaction to any existing pendingpool snapshot iterators @@ -316,6 +337,12 @@ impl PendingPool { self.size_of -= tx.transaction.size(); self.all.remove(&tx); self.independent_transactions.remove(&tx); + + // switch out for the next ancestor if there is one + self.highest_nonces.remove(&tx); + if let Some(ancestor) = self.ancestor(id) { + self.highest_nonces.insert(ancestor.clone()); + } Some(tx.transaction) } @@ -325,10 +352,121 @@ impl PendingPool { id } - /// Removes the worst transaction from this pool. - pub(crate) fn pop_worst(&mut self) -> Option>> { - let worst = self.all.iter().next().map(|tx| *tx.transaction.id())?; - self.remove_transaction(&worst) + /// Traverses the pool, starting at the highest nonce set, removing the transactions which + /// would put the pool under the specified limits. + /// + /// This attempts to remove transactions by roughly the same amount for each sender. This is + /// done by removing the highest-nonce transactions for each sender. + /// + /// If the `remove_locals` flag is unset, transactions will be removed per-sender until a + /// local transaction is the highest nonce transaction for that sender. If all senders have a + /// local highest-nonce transaction, the pool will not be truncated further. + /// + /// Otherwise, if the `remove_locals` flag is set, transactions will be removed per-sender + /// until the pool is under the given limits. + /// + /// Any removed transactions will be added to the `end_removed` vector. + pub fn remove_to_limit( + &mut self, + limit: &SubPoolLimit, + remove_locals: bool, + end_removed: &mut Vec>>, + ) { + // This serves as a termination condition for the loop - it represents the number of + // _valid_ unique senders that might have descendants in the pool. + // + // If `remove_locals` is false, a value of zero means that there are no non-local txs in the + // pool that can be removed. + // + // If `remove_locals` is true, a value of zero means that there are no txs in the pool that + // can be removed. + let mut non_local_senders = self.highest_nonces.len(); + + // keep track of unique senders from previous iterations, to understand how many unique + // senders were removed in the last iteration + let mut unique_senders = self.highest_nonces.len(); + + // keep track of transactions to remove and how many have been removed so far + let original_length = self.len(); + let mut removed = Vec::new(); + let mut total_removed = 0; + + // track total `size` of transactions to remove + let original_size = self.size(); + let mut total_size = 0; + + loop { + // check how many unique senders were removed last iteration + let unique_removed = unique_senders - self.highest_nonces.len(); + + // the new number of unique senders + unique_senders = self.highest_nonces.len(); + non_local_senders -= unique_removed; + + // we can re-use the temp array + removed.clear(); + + // loop through the highest nonces set, removing transactions until we reach the limit + for tx in self.highest_nonces.iter() { + // return early if the pool is under limits + if original_size - total_size <= limit.max_size && + original_length - total_removed <= limit.max_txs || + non_local_senders == 0 + { + // need to remove remaining transactions before exiting + for id in &removed { + if let Some(tx) = self.remove_transaction(id) { + end_removed.push(tx); + } + } + + return + } + + if !remove_locals && tx.transaction.is_local() { + non_local_senders -= 1; + continue + } + + total_size += tx.transaction.size(); + total_removed += 1; + removed.push(*tx.transaction.id()); + } + + // remove the transactions from this iteration + for id in &removed { + if let Some(tx) = self.remove_transaction(id) { + end_removed.push(tx); + } + } + } + } + + /// Truncates the pool to the given [SubPoolLimit], removing transactions until the subpool + /// limits are met. + /// + /// This attempts to remove transactions by rougly the same amount for each sender. For more + /// information on this exact process see docs for + /// [remove_to_limit](PendingPool::remove_to_limit). + /// + /// This first truncates all of the non-local transactions in the pool. If the subpool is still + /// not under the limit, this truncates the entire pool, including non-local transactions. The + /// removed transactions are returned. + pub fn truncate_pool( + &mut self, + limit: SubPoolLimit, + ) -> Vec>> { + let mut removed = Vec::new(); + self.remove_to_limit(&limit, false, &mut removed); + + if self.size() <= limit.max_size && self.len() <= limit.max_txs { + return removed + } + + // now repeat for local transactions + self.remove_to_limit(&limit, true, &mut removed); + + removed } /// The reported size of all transactions in this pool. @@ -361,6 +499,14 @@ impl PendingPool { self.independent_transactions.len() <= self.all.len(), "independent.len() > all.len()" ); + assert!( + self.highest_nonces.len() <= self.all.len(), + "independent_descendants.len() > all.len()" + ); + assert!( + self.highest_nonces.len() == self.independent_transactions.len(), + "independent.len() = independent_descendants.len()" + ); } } @@ -418,6 +564,10 @@ impl Ord for PendingTransaction { #[cfg(test)] mod tests { + use std::collections::HashSet; + + use reth_primitives::address; + use super::*; use crate::{ test_utils::{MockOrdering, MockTransaction, MockTransactionFactory}, @@ -458,6 +608,7 @@ mod tests { assert_eq!(pool.len(), 2); assert_eq!(pool.independent_transactions.len(), 1); + assert_eq!(pool.highest_nonces.len(), 1); let removed = pool.update_base_fee(0); assert!(removed.is_empty()); @@ -478,6 +629,7 @@ mod tests { let removed = pool.update_base_fee((root_tx.max_fee_per_gas() + 1) as u64); assert_eq!(removed.len(), 2); assert!(pool.is_empty()); + pool.assert_invariants(); } #[test] @@ -492,6 +644,151 @@ mod tests { pool.add_transaction(f.validated_arc(t2), 0); // First transaction should be evicted. - assert_eq!(pool.pop_worst().map(|tx| *tx.hash()), Some(*t.hash())); + assert_eq!( + pool.highest_nonces.iter().next().map(|tx| *tx.transaction.hash()), + Some(*t.hash()) + ); + + // truncate pool with max size = 1, ensure it's the same transaction + let removed = pool.truncate_pool(SubPoolLimit { max_txs: 1, max_size: usize::MAX }); + assert_eq!(removed.len(), 1); + assert_eq!(removed[0].hash(), t.hash()); + } + + #[test] + fn correct_independent_descendants() { + // this test ensures that we set the right highest nonces set for each sender + let mut f = MockTransactionFactory::default(); + let mut pool = PendingPool::new(MockOrdering::default()); + + let a = address!("000000000000000000000000000000000000000a"); + let b = address!("000000000000000000000000000000000000000b"); + let c = address!("000000000000000000000000000000000000000c"); + let d = address!("000000000000000000000000000000000000000d"); + + // create a chain of transactions by sender A, B, C + let a1 = MockTransaction::eip1559().with_sender(a); + let a2 = a1.next(); + let a3 = a2.next(); + let a4 = a3.next(); + + let b1 = MockTransaction::eip1559().with_sender(b); + let b2 = b1.next(); + let b3 = b2.next(); + + // C has the same number of txs as B + let c1 = MockTransaction::eip1559().with_sender(c); + let c2 = c1.next(); + let c3 = c2.next(); + + let d1 = MockTransaction::eip1559().with_sender(d); + + // add all the transactions to the pool + let all_txs = + vec![a1, a2, a3, a4.clone(), b1, b2, b3.clone(), c1, c2, c3.clone(), d1.clone()]; + for tx in all_txs { + pool.add_transaction(f.validated_arc(tx), 0); + } + + pool.assert_invariants(); + + // the independent set is the roots of each of these tx chains, these are the highest + // nonces for each sender + let expected_highest_nonces = + vec![d1, c3, b3, a4].iter().map(|tx| (tx.sender(), tx.nonce())).collect::>(); + let actual_highest_nonces = pool + .highest_nonces + .iter() + .map(|tx| (tx.transaction.sender(), tx.transaction.nonce())) + .collect::>(); + assert_eq!(expected_highest_nonces, actual_highest_nonces); + pool.assert_invariants(); + } + + #[test] + fn truncate_by_sender() { + // this test ensures that we evict from the pending pool by sender + let mut f = MockTransactionFactory::default(); + let mut pool = PendingPool::new(MockOrdering::default()); + + let a = address!("000000000000000000000000000000000000000a"); + let b = address!("000000000000000000000000000000000000000b"); + let c = address!("000000000000000000000000000000000000000c"); + let d = address!("000000000000000000000000000000000000000d"); + + // TODO: make creating these mock tx chains easier + // create a chain of transactions by sender A, B, C + let a1 = MockTransaction::eip1559().with_sender(a); + let a2 = a1.next(); + let a3 = a2.next(); + let a4 = a3.next(); + + let b1 = MockTransaction::eip1559().with_sender(b); + let b2 = b1.next(); + let b3 = b2.next(); + + // C has the same number of txs as B + let c1 = MockTransaction::eip1559().with_sender(c); + let c2 = c1.next(); + let c3 = c2.next(); + + let d1 = MockTransaction::eip1559().with_sender(d); + + // just construct a list of all txs to add + let expected_pending = vec![a1.clone(), b1.clone(), c1.clone(), a2.clone()] + .into_iter() + .map(|tx| (tx.sender(), tx.nonce())) + .collect::>(); + let expected_removed = vec![ + d1.clone(), + c3.clone(), + b3.clone(), + a4.clone(), + c2.clone(), + b2.clone(), + a3.clone(), + ] + .into_iter() + .map(|tx| (tx.sender(), tx.nonce())) + .collect::>(); + let all_txs = vec![a1, a2, a3, a4.clone(), b1, b2, b3, c1, c2, c3, d1]; + + // add all the transactions to the pool + for tx in all_txs { + pool.add_transaction(f.validated_arc(tx), 0); + } + + // sanity check, make sure everything checks out + pool.assert_invariants(); + + // let's set the max total txs to 4, since we remove txs for each sender first, we remove + // in this order: + // * d1, c3, b3, a4 + // * c2, b2, a3 + // + // and we are left with: + // * a1, a2 + // * b1 + // * c1 + let pool_limit = SubPoolLimit { max_txs: 4, max_size: usize::MAX }; + + // truncate the pool + let removed = pool.truncate_pool(pool_limit); + pool.assert_invariants(); + assert_eq!(removed.len(), expected_removed.len()); + + // get the inner txs from the removed txs + let removed = + removed.into_iter().map(|tx| (tx.sender(), tx.nonce())).collect::>(); + assert_eq!(removed, expected_removed); + + // get the pending pool + let pending = pool.all().collect::>(); + assert_eq!(pending.len(), expected_pending.len()); + + // get the inner txs from the pending txs + let pending = + pending.into_iter().map(|tx| (tx.sender(), tx.nonce())).collect::>(); + assert_eq!(pending, expected_pending); } } diff --git a/crates/transaction-pool/src/pool/txpool.rs b/crates/transaction-pool/src/pool/txpool.rs index dfc2bf6c7c..e9b5acadbc 100644 --- a/crates/transaction-pool/src/pool/txpool.rs +++ b/crates/transaction-pool/src/pool/txpool.rs @@ -793,18 +793,9 @@ impl TxPool { .$limit .is_exceeded($this.$pool.len(), $this.$pool.size()) { - // pops the worst transaction from the sub-pool - if let Some(tx) = $this.$pool.pop_worst() { - let id = tx.transaction_id; - - // now that the tx is removed from the sub-pool, we need to remove it also from the total set - $this.all_transactions.remove_transaction(&id); - - // record the removed transaction - removed.push(tx); - - // this might have introduced a nonce gap, so we also discard any descendants - $this.remove_descendants(&id, &mut $removed); + removed = $this.$pool.truncate_pool($this.config.$limit.clone()); + for tx in removed.clone().iter() { + $this.remove_descendants(tx.id(), &mut $removed); } } diff --git a/crates/transaction-pool/src/test_utils/mock.rs b/crates/transaction-pool/src/test_utils/mock.rs index d289597d2b..ea1a0509d1 100644 --- a/crates/transaction-pool/src/test_utils/mock.rs +++ b/crates/transaction-pool/src/test_utils/mock.rs @@ -898,8 +898,8 @@ impl proptest::arbitrary::Arbitrary for MockTransaction { fn arbitrary_with(_: Self::Parameters) -> Self::Strategy { use proptest::prelude::{any, Strategy}; - any::<(Transaction, Address, B256, BlobTransactionSidecar)>() - .prop_map(|(tx, sender, tx_hash, sidecar)| match &tx { + any::<(Transaction, Address, B256)>() + .prop_map(|(tx, sender, tx_hash)| match &tx { Transaction::Legacy(TxLegacy { nonce, gas_price, @@ -972,7 +972,9 @@ impl proptest::arbitrary::Arbitrary for MockTransaction { value: (*value).into(), input: (*input).clone(), accesslist: (*access_list).clone(), - sidecar, + // only generate a sidecar if it is a 4844 tx - also for the sake of + // performance just use a default sidecar + sidecar: BlobTransactionSidecar::default(), }, _ => unimplemented!(), })