diff --git a/crates/net/network/src/transactions/mod.rs b/crates/net/network/src/transactions/mod.rs index 167e0cf8da..63ec1a6c1b 100644 --- a/crates/net/network/src/transactions/mod.rs +++ b/crates/net/network/src/transactions/mod.rs @@ -54,6 +54,7 @@ use reth_transaction_pool::{ PropagatedTransactions, TransactionPool, ValidPoolTransaction, }; use std::{ + cmp::max, collections::{hash_map::Entry, HashMap, HashSet}, num::NonZeroUsize, pin::Pin, @@ -83,7 +84,8 @@ const NEW_POOLED_TRANSACTION_HASHES_SOFT_LIMIT: usize = 4096; const POOLED_TRANSACTIONS_RESPONSE_SOFT_LIMIT_BYTE_SIZE: usize = 2 * 1024 * 1024; /// The future for inserting a function into the pool -pub type PoolImportFuture = Pin> + Send + 'static>>; +pub type PoolImportFuture = + Pin>>> + Send + 'static>>; /// Api to interact with [`TransactionsManager`] task. #[derive(Debug, Clone)] @@ -277,11 +279,6 @@ impl TransactionsManager where Pool: TransactionPool + 'static, { - #[inline] - fn update_import_metrics(&self) { - self.metrics.pending_pool_imports.set(self.pool_imports.len() as f64); - } - #[inline] fn update_request_metrics(&self) { self.metrics @@ -965,6 +962,9 @@ where let mut num_already_seen = 0; if let Some(peer) = self.peers.get_mut(&peer_id) { + // pre-size to avoid reallocations, assuming ~50% of the transactions are new + let mut new_txs = Vec::with_capacity(max(1, transactions.len() / 2)); + for tx in transactions { // recover transaction let tx = if let Ok(tx) = tx.try_into_ecrecovered() { @@ -990,19 +990,32 @@ where Entry::Vacant(entry) => { // this is a new transaction that should be imported into the pool let pool_transaction = ::from_recovered_pooled_transaction(tx); + new_txs.push(pool_transaction); - let pool = self.pool.clone(); - - let import = Box::pin(async move { - pool.add_external_transaction(pool_transaction).await - }); - - self.pool_imports.push(import); entry.insert(vec![peer_id]); } } } + // import new transactions as a batch to minimize lock contention on the underlying pool + if !new_txs.is_empty() { + let pool = self.pool.clone(); + let metric_pending_pool_imports = self.metrics.pending_pool_imports.clone(); + + metric_pending_pool_imports.increment(new_txs.len() as f64); + + let import = Box::pin(async move { + let added = new_txs.len(); + let res = pool.add_external_transactions(new_txs).await; + + metric_pending_pool_imports.decrement(added as f64); + + res + }); + + self.pool_imports.push(import); + } + if num_already_seen > 0 { self.metrics.messages_with_already_seen_transactions.increment(1); trace!(target: "net::tx", num_txs=%num_already_seen, ?peer_id, client=?peer.client_version, "Peer sent already seen transactions"); @@ -1103,32 +1116,39 @@ where this.request_buffered_hashes(); this.update_request_metrics(); - this.update_import_metrics(); // Advance all imports while let Poll::Ready(Some(import_res)) = this.pool_imports.poll_next_unpin(cx) { - match import_res { - Ok(hash) => { - this.on_good_import(hash); - } + let import_res = match import_res { + Ok(res) => res, Err(err) => { - // if we're _currently_ syncing and the transaction is bad we ignore it, - // otherwise we penalize the peer that sent the bad - // transaction with the assumption that the peer should have - // known that this transaction is bad. (e.g. consensus - // rules) - if err.is_bad_transaction() && !this.network.is_syncing() { - debug!(target: "net::tx", ?err, "bad pool transaction import"); - this.on_bad_import(err.hash); - continue + debug!(target: "net::tx", ?err, "bad pool transaction batch import"); + continue + } + }; + + for res in import_res { + match res { + Ok(hash) => { + this.on_good_import(hash); + } + Err(err) => { + // if we're _currently_ syncing and the transaction is bad we ignore it, + // otherwise we penalize the peer that sent the bad + // transaction with the assumption that the peer should have + // known that this transaction is bad. (e.g. consensus + // rules) + if err.is_bad_transaction() && !this.network.is_syncing() { + debug!(target: "net::tx", ?err, "bad pool transaction import"); + this.on_bad_import(err.hash); + continue + } + this.on_good_import(err.hash); } - this.on_good_import(err.hash); } } } - this.update_import_metrics(); - // handle and propagate new transactions let mut new_txs = Vec::new(); while let Poll::Ready(Some(hash)) = this.pending_transactions.poll_next_unpin(cx) {