mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-02-19 03:04:27 -05:00
fix(net): limit pending pool imports for broadcast transactions (#21254)
Co-authored-by: Arsenii Kulikov <klkvrr@gmail.com>
This commit is contained in:
committed by
GitHub
parent
28a31cd579
commit
0c854b6f14
@@ -131,6 +131,8 @@ pub struct TransactionsManagerMetrics {
|
||||
/// capacity. Note, this is not a limit to the number of inflight requests, but a health
|
||||
/// measure.
|
||||
pub(crate) capacity_pending_pool_imports: Counter,
|
||||
/// Total number of transactions ignored because pending pool imports are at capacity.
|
||||
pub(crate) skipped_transactions_pending_pool_imports_at_capacity: Counter,
|
||||
/// The time it took to prepare transactions for import. This is mostly sender recovery.
|
||||
pub(crate) pool_import_prepare_duration: Histogram,
|
||||
|
||||
|
||||
@@ -429,11 +429,22 @@ impl<Pool: TransactionPool, N: NetworkPrimitives> TransactionsManager<Pool, N> {
|
||||
/// Returns `true` if [`TransactionsManager`] has capacity to request pending hashes. Returns
|
||||
/// `false` if [`TransactionsManager`] is operating close to full capacity.
|
||||
fn has_capacity_for_fetching_pending_hashes(&self) -> bool {
|
||||
self.pending_pool_imports_info
|
||||
.has_capacity(self.pending_pool_imports_info.max_pending_pool_imports) &&
|
||||
self.has_capacity_for_pending_pool_imports() &&
|
||||
self.transaction_fetcher.has_capacity_for_fetching_pending_hashes()
|
||||
}
|
||||
|
||||
/// Returns `true` if [`TransactionsManager`] has capacity for more pending pool imports.
|
||||
fn has_capacity_for_pending_pool_imports(&self) -> bool {
|
||||
self.remaining_pool_import_capacity() > 0
|
||||
}
|
||||
|
||||
/// Returns the remaining capacity for pending pool imports.
|
||||
fn remaining_pool_import_capacity(&self) -> usize {
|
||||
self.pending_pool_imports_info.max_pending_pool_imports.saturating_sub(
|
||||
self.pending_pool_imports_info.pending_pool_imports.load(Ordering::Relaxed),
|
||||
)
|
||||
}
|
||||
|
||||
fn report_peer_bad_transactions(&self, peer_id: PeerId) {
|
||||
self.report_peer(peer_id, ReputationChangeKind::BadTransactions);
|
||||
self.metrics.reported_bad_transactions.increment(1);
|
||||
@@ -1285,6 +1296,7 @@ where
|
||||
trace!(target: "net::tx", peer_id=format!("{peer_id:#}"), policy=?self.config.ingress_policy, "Ignoring full transactions from peer blocked by ingress policy");
|
||||
return;
|
||||
}
|
||||
|
||||
// ensure we didn't receive any blob transactions as these are disallowed to be
|
||||
// broadcasted in full
|
||||
|
||||
@@ -1335,7 +1347,13 @@ where
|
||||
return
|
||||
}
|
||||
|
||||
// Early return if we don't have capacity for any imports
|
||||
if !self.has_capacity_for_pending_pool_imports() {
|
||||
return
|
||||
}
|
||||
|
||||
let Some(peer) = self.peers.get_mut(&peer_id) else { return };
|
||||
let client_version = peer.client_version.clone();
|
||||
let mut transactions = transactions.0;
|
||||
|
||||
let start = Instant::now();
|
||||
@@ -1378,7 +1396,7 @@ where
|
||||
trace!(target: "net::tx",
|
||||
peer_id=format!("{peer_id:#}"),
|
||||
hash=%tx.tx_hash(),
|
||||
client_version=%peer.client_version,
|
||||
%client_version,
|
||||
"received a known bad transaction from peer"
|
||||
);
|
||||
has_bad_transactions = true;
|
||||
@@ -1387,6 +1405,18 @@ where
|
||||
true
|
||||
});
|
||||
|
||||
// Truncate to remaining capacity before recovery to avoid wasting CPU on transactions
|
||||
// that won't be imported anyway.
|
||||
let capacity = self.remaining_pool_import_capacity();
|
||||
if transactions.len() > capacity {
|
||||
let skipped = transactions.len() - capacity;
|
||||
transactions.truncate(capacity);
|
||||
self.metrics
|
||||
.skipped_transactions_pending_pool_imports_at_capacity
|
||||
.increment(skipped as u64);
|
||||
trace!(target: "net::tx", skipped, capacity, "Truncated transactions batch to capacity");
|
||||
}
|
||||
|
||||
let txs_len = transactions.len();
|
||||
|
||||
let new_txs = transactions
|
||||
@@ -1397,7 +1427,7 @@ where
|
||||
trace!(target: "net::tx",
|
||||
peer_id=format!("{peer_id:#}"),
|
||||
hash=%badtx.tx_hash(),
|
||||
client_version=%peer.client_version,
|
||||
client_version=%client_version,
|
||||
"failed ecrecovery for transaction"
|
||||
);
|
||||
None
|
||||
@@ -1448,7 +1478,7 @@ where
|
||||
self.metrics
|
||||
.occurrences_of_transaction_already_seen_by_peer
|
||||
.increment(num_already_seen_by_peer);
|
||||
trace!(target: "net::tx", num_txs=%num_already_seen_by_peer, ?peer_id, client=?peer.client_version, "Peer sent already seen transactions");
|
||||
trace!(target: "net::tx", num_txs=%num_already_seen_by_peer, ?peer_id, client=%client_version, "Peer sent already seen transactions");
|
||||
}
|
||||
|
||||
if has_bad_transactions {
|
||||
|
||||
Reference in New Issue
Block a user