From 0c854b6f141f0ebfec577aebe2abde3a569ee361 Mon Sep 17 00:00:00 2001 From: Georgios Konstantopoulos Date: Thu, 22 Jan 2026 10:32:07 -0800 Subject: [PATCH] fix(net): limit pending pool imports for broadcast transactions (#21254) Co-authored-by: Arsenii Kulikov --- crates/net/network/src/metrics.rs | 2 ++ crates/net/network/src/transactions/mod.rs | 40 +++++++++++++++++++--- 2 files changed, 37 insertions(+), 5 deletions(-) diff --git a/crates/net/network/src/metrics.rs b/crates/net/network/src/metrics.rs index ba9efdff54..6da8deced2 100644 --- a/crates/net/network/src/metrics.rs +++ b/crates/net/network/src/metrics.rs @@ -131,6 +131,8 @@ pub struct TransactionsManagerMetrics { /// capacity. Note, this is not a limit to the number of inflight requests, but a health /// measure. pub(crate) capacity_pending_pool_imports: Counter, + /// Total number of transactions ignored because pending pool imports are at capacity. + pub(crate) skipped_transactions_pending_pool_imports_at_capacity: Counter, /// The time it took to prepare transactions for import. This is mostly sender recovery. pub(crate) pool_import_prepare_duration: Histogram, diff --git a/crates/net/network/src/transactions/mod.rs b/crates/net/network/src/transactions/mod.rs index a6cf3c4d09..5ed91e80f3 100644 --- a/crates/net/network/src/transactions/mod.rs +++ b/crates/net/network/src/transactions/mod.rs @@ -429,11 +429,22 @@ impl TransactionsManager { /// Returns `true` if [`TransactionsManager`] has capacity to request pending hashes. Returns /// `false` if [`TransactionsManager`] is operating close to full capacity. fn has_capacity_for_fetching_pending_hashes(&self) -> bool { - self.pending_pool_imports_info - .has_capacity(self.pending_pool_imports_info.max_pending_pool_imports) && + self.has_capacity_for_pending_pool_imports() && self.transaction_fetcher.has_capacity_for_fetching_pending_hashes() } + /// Returns `true` if [`TransactionsManager`] has capacity for more pending pool imports. + fn has_capacity_for_pending_pool_imports(&self) -> bool { + self.remaining_pool_import_capacity() > 0 + } + + /// Returns the remaining capacity for pending pool imports. + fn remaining_pool_import_capacity(&self) -> usize { + self.pending_pool_imports_info.max_pending_pool_imports.saturating_sub( + self.pending_pool_imports_info.pending_pool_imports.load(Ordering::Relaxed), + ) + } + fn report_peer_bad_transactions(&self, peer_id: PeerId) { self.report_peer(peer_id, ReputationChangeKind::BadTransactions); self.metrics.reported_bad_transactions.increment(1); @@ -1285,6 +1296,7 @@ where trace!(target: "net::tx", peer_id=format!("{peer_id:#}"), policy=?self.config.ingress_policy, "Ignoring full transactions from peer blocked by ingress policy"); return; } + // ensure we didn't receive any blob transactions as these are disallowed to be // broadcasted in full @@ -1335,7 +1347,13 @@ where return } + // Early return if we don't have capacity for any imports + if !self.has_capacity_for_pending_pool_imports() { + return + } + let Some(peer) = self.peers.get_mut(&peer_id) else { return }; + let client_version = peer.client_version.clone(); let mut transactions = transactions.0; let start = Instant::now(); @@ -1378,7 +1396,7 @@ where trace!(target: "net::tx", peer_id=format!("{peer_id:#}"), hash=%tx.tx_hash(), - client_version=%peer.client_version, + %client_version, "received a known bad transaction from peer" ); has_bad_transactions = true; @@ -1387,6 +1405,18 @@ where true }); + // Truncate to remaining capacity before recovery to avoid wasting CPU on transactions + // that won't be imported anyway. + let capacity = self.remaining_pool_import_capacity(); + if transactions.len() > capacity { + let skipped = transactions.len() - capacity; + transactions.truncate(capacity); + self.metrics + .skipped_transactions_pending_pool_imports_at_capacity + .increment(skipped as u64); + trace!(target: "net::tx", skipped, capacity, "Truncated transactions batch to capacity"); + } + let txs_len = transactions.len(); let new_txs = transactions @@ -1397,7 +1427,7 @@ where trace!(target: "net::tx", peer_id=format!("{peer_id:#}"), hash=%badtx.tx_hash(), - client_version=%peer.client_version, + client_version=%client_version, "failed ecrecovery for transaction" ); None @@ -1448,7 +1478,7 @@ where self.metrics .occurrences_of_transaction_already_seen_by_peer .increment(num_already_seen_by_peer); - trace!(target: "net::tx", num_txs=%num_already_seen_by_peer, ?peer_id, client=?peer.client_version, "Peer sent already seen transactions"); + trace!(target: "net::tx", num_txs=%num_already_seen_by_peer, ?peer_id, client=%client_version, "Peer sent already seen transactions"); } if has_bad_transactions {