diff --git a/Cargo.lock b/Cargo.lock index 5cfc730416..ea47d79514 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8945,6 +8945,7 @@ dependencies = [ "pin-project", "rand 0.8.5", "rand 0.9.2", + "rayon", "reth-chainspec", "reth-consensus", "reth-discv4", diff --git a/crates/net/network/Cargo.toml b/crates/net/network/Cargo.toml index 54902ef478..cbe93a2386 100644 --- a/crates/net/network/Cargo.toml +++ b/crates/net/network/Cargo.toml @@ -66,6 +66,7 @@ tracing.workspace = true rustc-hash.workspace = true thiserror.workspace = true parking_lot.workspace = true +rayon.workspace = true rand.workspace = true rand_08.workspace = true secp256k1 = { workspace = true, features = ["global-context", "std", "recovery"] } diff --git a/crates/net/network/src/transactions/mod.rs b/crates/net/network/src/transactions/mod.rs index 45b68617b5..9a313a228a 100644 --- a/crates/net/network/src/transactions/mod.rs +++ b/crates/net/network/src/transactions/mod.rs @@ -1,6 +1,7 @@ //! Transactions management for the p2p network. use alloy_consensus::transaction::TxHashRef; +use rayon::iter::{IntoParallelIterator, ParallelIterator}; /// Aggregation on configurable parameters for [`TransactionsManager`]. pub mod config; @@ -1368,51 +1369,49 @@ where // tracks the quality of the given transactions let mut has_bad_transactions = false; - // 2. filter out transactions that are invalid or already pending import pre-size to avoid - // reallocations - let mut new_txs = Vec::with_capacity(transactions.len()); - for tx in transactions { - match self.transactions_by_peers.entry(*tx.tx_hash()) { - Entry::Occupied(mut entry) => { - // transaction was already inserted - entry.get_mut().insert(peer_id); - } - Entry::Vacant(entry) => { - if self.bad_imports.contains(tx.tx_hash()) { - trace!(target: "net::tx", - peer_id=format!("{peer_id:#}"), - hash=%tx.tx_hash(), - client_version=%peer.client_version, - "received a known bad transaction from peer" - ); - has_bad_transactions = true; - } else { - // this is a new transaction that should be imported into the pool - - // recover transaction - let tx = match tx.try_into_recovered() { - Ok(tx) => tx, - Err(badtx) => { - trace!(target: "net::tx", - peer_id=format!("{peer_id:#}"), - hash=%badtx.tx_hash(), - client_version=%peer.client_version, - "failed ecrecovery for transaction" - ); - has_bad_transactions = true; - continue - } - }; - - let pool_transaction = Pool::Transaction::from_pooled(tx); - new_txs.push(pool_transaction); - - entry.insert(HashSet::from([peer_id])); - } - } + // Remove known and invalid transactions + transactions.retain(|tx| { + if let Entry::Occupied(mut entry) = self.transactions_by_peers.entry(*tx.tx_hash()) { + entry.get_mut().insert(peer_id); + return false } + if self.bad_imports.contains(tx.tx_hash()) { + trace!(target: "net::tx", + peer_id=format!("{peer_id:#}"), + hash=%tx.tx_hash(), + client_version=%peer.client_version, + "received a known bad transaction from peer" + ); + has_bad_transactions = true; + return false; + } + true + }); + + let txs_len = transactions.len(); + + let new_txs = transactions + .into_par_iter() + .filter_map(|tx| match tx.try_into_recovered() { + Ok(tx) => Some(Pool::Transaction::from_pooled(tx)), + Err(badtx) => { + trace!(target: "net::tx", + peer_id=format!("{peer_id:#}"), + hash=%badtx.tx_hash(), + client_version=%peer.client_version, + "failed ecrecovery for transaction" + ); + None + } + }) + .collect::>(); + + has_bad_transactions |= new_txs.len() != txs_len; + + // Record the transactions as seen by the peer + for tx in &new_txs { + self.transactions_by_peers.insert(*tx.hash(), HashSet::from([peer_id])); } - new_txs.shrink_to_fit(); // 3. import new transactions as a batch to minimize lock contention on the underlying // pool