From 4a386d57828b0573ac21e607a4404ce5517906db Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Wed, 21 Feb 2024 20:06:08 +0100 Subject: [PATCH] perf: track last sender's submission id (#6718) --- crates/transaction-pool/src/pool/parked.rs | 158 ++++++++++++--------- 1 file changed, 94 insertions(+), 64 deletions(-) diff --git a/crates/transaction-pool/src/pool/parked.rs b/crates/transaction-pool/src/pool/parked.rs index e57548a466..b569c1fd53 100644 --- a/crates/transaction-pool/src/pool/parked.rs +++ b/crates/transaction-pool/src/pool/parked.rs @@ -1,11 +1,12 @@ use crate::{ identifier::{SenderId, TransactionId}, pool::size::SizeTracker, - PoolTransaction, SubPoolLimit, ValidPoolTransaction, TXPOOL_MAX_ACCOUNT_SLOTS_PER_SENDER, + PoolTransaction, SubPoolLimit, ValidPoolTransaction, }; +use fnv::FnvHashMap; use std::{ cmp::Ordering, - collections::{BTreeMap, BTreeSet, BinaryHeap}, + collections::{hash_map::Entry, BTreeMap, BTreeSet}, ops::{Bound::Unbounded, Deref}, sync::Arc, }; @@ -31,6 +32,10 @@ pub struct ParkedPool { /// /// The higher, the better. best: BTreeSet>, + /// Keeps track of the number of transactions and the latest submission id for each sender. + last_sender_transaction: BTreeSet, + /// Keeps track of the number of transactions by the sender and the last submission id. + sender_to_last_transaction: FnvHashMap, /// Keeps track of the size of this pool. /// /// See also [`PoolTransaction::size`]. @@ -58,12 +63,66 @@ impl ParkedPool { self.size_of += tx.size(); // update or create sender entry + self.add_sender_count(tx.sender_id(), submission_id); let transaction = ParkedPoolTransaction { submission_id, transaction: tx.into() }; self.by_id.insert(id, transaction.clone()); self.best.insert(transaction); } + /// Increments the count of transactions for the given sender and updates the tracked submission + /// id. + fn add_sender_count(&mut self, sender: SenderId, submission_id: u64) { + match self.sender_to_last_transaction.entry(sender) { + Entry::Occupied(mut entry) => { + let value = entry.get_mut(); + // remove the __currently__ tracked submission id + self.last_sender_transaction + .remove(&SubmissionSenderId::new(sender, value.last_submission_id)); + + value.count += 1; + value.last_submission_id = submission_id; + } + Entry::Vacant(entry) => { + entry + .insert(SenderTransactionCount { count: 1, last_submission_id: submission_id }); + } + } + // insert a new entry + self.last_sender_transaction.insert(SubmissionSenderId::new(sender, submission_id)); + } + + /// Decrements the count of transactions for the given sender. + /// + /// If the count reaches zero, the sender is removed from the map. + /// + /// Note: this does not update the tracked submission id for the sender, because we're only + /// interested in the __last__ submission id when truncating the pool. + fn remove_sender_count(&mut self, sender_id: SenderId) { + let removed_sender = match self.sender_to_last_transaction.entry(sender_id) { + Entry::Occupied(mut entry) => { + let value = entry.get_mut(); + value.count -= 1; + if value.count == 0 { + entry.remove() + } else { + return + } + } + Entry::Vacant(_) => { + unreachable!("sender count not found {:?}", sender_id); + } + }; + + // all transactions for this sender have been removed + assert!( + self.last_sender_transaction + .remove(&SubmissionSenderId::new(sender_id, removed_sender.last_submission_id)), + "last sender transaction not found {:?}", + sender_id + ); + } + /// Returns an iterator over all transactions in the pool pub(crate) fn all( &self, @@ -79,6 +138,7 @@ impl ParkedPool { // remove from queues let tx = self.by_id.remove(id)?; self.best.remove(&tx); + self.remove_sender_count(tx.transaction.sender_id()); // keep track of size self.size_of -= tx.transaction.size(); @@ -95,58 +155,21 @@ impl ParkedPool { .collect() } - /// Returns sender ids sorted by each sender's last submission id. Senders with older last - /// submission ids are first. Note that _last_ submission ids are the newest submission id for - /// that sender, so this sorts senders by the last time they submitted a transaction in - /// descending order. Senders that have least recently submitted a transaction are first. - /// - /// Similar to `Heartbeat` in Geth - pub fn get_senders_by_submission_id(&self) -> Vec { - // iterate through by_id, and get the last submission id for each sender - let senders = self - .by_id - .iter() - .fold( - // pre-allocate some capacity for unique senders, targeting 4 slots per sender - Vec::with_capacity(self.by_id.len() / (TXPOOL_MAX_ACCOUNT_SLOTS_PER_SENDER / 4)), - |mut set: Vec, (_, tx)| { - if let Some(last) = set.last_mut() { - // sort by last - if last.sender_id == tx.transaction.sender_id() { - if last.submission_id < tx.submission_id { - // update last submission id - last.submission_id = tx.submission_id; - } - } else { - // new entry - set.push(SubmissionSenderId::new( - tx.transaction.sender_id(), - tx.submission_id, - )); - } - } else { - // first entry - set.push(SubmissionSenderId::new( - tx.transaction.sender_id(), - tx.submission_id, - )); - } - set - }, - ) - .into_iter() - // sort by submission id - .collect::>(); - - // sort s.t. senders with older submission ids are first - senders.into_sorted_vec() + #[cfg(test)] + pub(crate) fn get_senders_by_submission_id( + &self, + ) -> impl Iterator + '_ { + self.last_sender_transaction.iter().cloned() } /// Truncates the pool by removing transactions, until the given [SubPoolLimit] has been met. /// - /// This is done by first ordering senders by the last time they have submitted a transaction, - /// using [get_senders_by_submission_id](ParkedPool::get_senders_by_submission_id) to determine - /// this ordering. + /// This is done by first ordering senders by the last time they have submitted a transaction + /// + /// Uses sender ids sorted by each sender's last submission id. Senders with older last + /// submission ids are first. Note that _last_ submission ids are the newest submission id for + /// that sender, so this sorts senders by the last time they submitted a transaction in + /// descending order. Senders that have least recently submitted a transaction are first. /// /// Then, for each sender, all transactions for that sender are removed, until the pool limits /// have been met. @@ -162,11 +185,11 @@ impl ParkedPool { } let mut removed = Vec::new(); - let mut sender_ids = self.get_senders_by_submission_id(); - while limit.is_exceeded(self.len(), self.size()) && !sender_ids.is_empty() { + while limit.is_exceeded(self.len(), self.size()) && !self.last_sender_transaction.is_empty() + { // SAFETY: This will not panic due to `!addresses.is_empty()` - let sender_id = sender_ids.pop().unwrap().sender_id; + let sender_id = self.last_sender_transaction.last().expect("no empty").sender_id; let list = self.get_txs_by_sender(sender_id); // Drop transactions from this sender until the pool is under limits @@ -227,6 +250,12 @@ impl ParkedPool { #[cfg(any(test, feature = "test-utils"))] pub(crate) fn assert_invariants(&self) { assert_eq!(self.by_id.len(), self.best.len(), "by_id.len() != best.len()"); + + assert_eq!( + self.last_sender_transaction.len(), + self.sender_to_last_transaction.len(), + "last_sender_transaction.len() != sender_to_last_transaction.len()" + ); } } @@ -292,11 +321,20 @@ impl Default for ParkedPool { submission_id: 0, by_id: Default::default(), best: Default::default(), + last_sender_transaction: Default::default(), + sender_to_last_transaction: Default::default(), size_of: Default::default(), } } } +/// Keeps track of the number of transactions and the latest submission id for each sender. +#[derive(Debug, Clone, Default, PartialEq, Eq)] +struct SenderTransactionCount { + count: u64, + last_submission_id: u64, +} + /// Represents a transaction in this pool. #[derive(Debug)] struct ParkedPoolTransaction { @@ -340,7 +378,7 @@ impl Ord for ParkedPoolTransaction { /// Includes a [SenderId] and `submission_id`. This is used to sort senders by their last /// submission id. #[derive(Debug, PartialEq, Eq, Copy, Clone)] -pub struct SubmissionSenderId { +pub(crate) struct SubmissionSenderId { /// The sender id pub(crate) sender_id: SenderId, /// The submission id @@ -672,11 +710,7 @@ mod tests { } // get senders by submission id - a4, b3, c3, d1, reversed - let senders = pool - .get_senders_by_submission_id() - .into_iter() - .map(|s| s.sender_id) - .collect::>(); + let senders = pool.get_senders_by_submission_id().map(|s| s.sender_id).collect::>(); assert_eq!(senders.len(), 4); let expected_senders = vec![d_sender, c_sender, b_sender, a_sender] .into_iter() @@ -693,11 +727,7 @@ mod tests { pool.add_transaction(f.validated_arc(tx)); } - let senders = pool - .get_senders_by_submission_id() - .into_iter() - .map(|s| s.sender_id) - .collect::>(); + let senders = pool.get_senders_by_submission_id().map(|s| s.sender_id).collect::>(); assert_eq!(senders.len(), 4); let expected_senders = vec![a_sender, c_sender, b_sender, d_sender] .into_iter()