perf(net): don't fetch or import recently mined transactions

This commit is contained in:
Arsenii Kulikov
2025-12-12 22:12:34 +04:00
parent daf6b88dc6
commit 15b0b45820
3 changed files with 51 additions and 6 deletions

View File

@@ -53,6 +53,12 @@ pub mod tx_manager {
///
/// Default is 100 KiB, i.e. 3 200 transaction hashes.
pub const DEFAULT_MAX_COUNT_BAD_IMPORTS: u32 = 100 * 1024 / 32;
/// Default limit for number of recently mined transactions to keep track of.
///
/// Used to prevent re-importing transactions that were recently included in blocks.
/// Default is 100 KiB, i.e. 3 200 transaction hashes.
pub const DEFAULT_MAX_COUNT_MINED_TRANSACTIONS: u32 = 100 * 1024 / 32;
}
/// Constants used by [`TransactionFetcher`](super::TransactionFetcher).

View File

@@ -504,7 +504,7 @@ impl<N: NetworkPrimitives> TransactionFetcher<N> {
pub fn filter_unseen_and_pending_hashes(
&mut self,
new_announced_hashes: &mut ValidAnnouncementData,
is_tx_bad_import: impl Fn(&TxHash) -> bool,
should_skip: impl Fn(&TxHash) -> bool,
peer_id: &PeerId,
client_version: &str,
) {
@@ -546,7 +546,7 @@ impl<N: NetworkPrimitives> TransactionFetcher<N> {
// vacant entry
if is_tx_bad_import(hash) {
if should_skip(hash) {
return false
}

View File

@@ -63,8 +63,8 @@ use reth_primitives_traits::SignedTransaction;
use reth_tokio_util::EventStream;
use reth_transaction_pool::{
error::{PoolError, PoolResult},
AddedTransactionOutcome, GetPooledTransactionLimit, PoolTransaction, PropagateKind,
PropagatedTransactions, TransactionPool, ValidPoolTransaction,
AddedTransactionOutcome, AllTransactionsEvents, FullTransactionEvent, GetPooledTransactionLimit,
PoolTransaction, PropagateKind, PropagatedTransactions, TransactionPool, ValidPoolTransaction,
};
use std::{
collections::{hash_map::Entry, HashMap, HashSet},
@@ -279,7 +279,7 @@ impl<N: NetworkPrimitives> TransactionsHandle<N> {
/// Rate limiting via reputation, bad transaction isolation, peer scoring.
#[derive(Debug)]
#[must_use = "Manager does nothing unless polled."]
pub struct TransactionsManager<Pool, N: NetworkPrimitives = EthNetworkPrimitives> {
pub struct TransactionsManager<Pool: TransactionPool, N: NetworkPrimitives = EthNetworkPrimitives> {
/// Access to the transaction pool.
pool: Pool,
/// Network access.
@@ -311,6 +311,11 @@ pub struct TransactionsManager<Pool, N: NetworkPrimitives = EthNetworkPrimitives
pending_pool_imports_info: PendingPoolImportsInfo,
/// Bad imports.
bad_imports: LruCache<TxHash>,
/// Recently mined transactions.
///
/// Tracks hashes of transactions that were recently included in blocks to prevent
/// re-importing them when peers announce them after they've been mined.
mined_transactions: LruCache<TxHash>,
/// All the connected peers.
peers: HashMap<PeerId, PeerMetadata<N>>,
/// Send half for the command channel.
@@ -333,6 +338,10 @@ pub struct TransactionsManager<Pool, N: NetworkPrimitives = EthNetworkPrimitives
pending_transactions: mpsc::Receiver<TxHash>,
/// Incoming events from the [`NetworkManager`](crate::NetworkManager).
transaction_events: UnboundedMeteredReceiver<NetworkTransactionEvent<N>>,
/// Pool events for tracking mined transactions.
///
/// Used to clean up internal tracking structures when transactions are mined.
pool_events: AllTransactionsEvents<Pool::Transaction>,
/// How the `TransactionsManager` is configured.
config: TransactionsManagerConfig,
/// Network Policies
@@ -388,6 +397,8 @@ impl<Pool: TransactionPool, N: NetworkPrimitives> TransactionsManager<Pool, N> {
// install a listener for new __pending__ transactions that are allowed to be propagated
// over the network
let pending = pool.pending_transactions_listener();
// install a listener for all pool events to track mined transactions
let pool_events = pool.all_transactions_event_listener();
let pending_pool_imports_info = PendingPoolImportsInfo::default();
let metrics = TransactionsManagerMetrics::default();
metrics
@@ -405,6 +416,7 @@ impl<Pool: TransactionPool, N: NetworkPrimitives> TransactionsManager<Pool, N> {
DEFAULT_MAX_COUNT_PENDING_POOL_IMPORTS,
),
bad_imports: LruCache::new(DEFAULT_MAX_COUNT_BAD_IMPORTS),
mined_transactions: LruCache::new(DEFAULT_MAX_COUNT_MINED_TRANSACTIONS),
peers: Default::default(),
command_tx,
command_rx: UnboundedReceiverStream::new(command_rx),
@@ -413,6 +425,7 @@ impl<Pool: TransactionPool, N: NetworkPrimitives> TransactionsManager<Pool, N> {
from_network,
NETWORK_POOL_TRANSACTIONS_SCOPE,
),
pool_events,
config: transactions_manager_config,
policies,
metrics,
@@ -730,9 +743,10 @@ impl<Pool: TransactionPool, N: NetworkPrimitives> TransactionsManager<Pool, N> {
// for any seen hashes add the peer as fallback. unseen hashes are loaded into the tx
// fetcher, hence they should be valid at this point.
let bad_imports = &self.bad_imports;
let mined_transactions = &self.mined_transactions;
self.transaction_fetcher.filter_unseen_and_pending_hashes(
&mut valid_announcement_data,
|hash| bad_imports.contains(hash),
|hash| bad_imports.contains(hash) || mined_transactions.contains(hash),
&peer_id,
&client,
);
@@ -850,6 +864,16 @@ where
self.propagate_all(hashes);
}
/// Handles a mined transaction event.
///
/// Adds the transaction hash to the mined transactions cache to prevent re-importing
/// it when peers announce it after mining. Also cleans up internal tracking structures.
fn on_mined_transaction(&mut self, hash: TxHash) {
self.mined_transactions.insert(hash);
self.transactions_by_peers.remove(&hash);
self.transaction_fetcher.remove_hashes_from_transaction_fetcher(std::iter::once(&hash));
}
/// Propagate the full transactions to a specific peer.
///
/// Returns the propagated transactions.
@@ -1401,6 +1425,13 @@ where
"received a known bad transaction from peer"
);
has_bad_transactions = true;
} else if self.mined_transactions.contains(tx.tx_hash()) {
trace!(target: "net::tx",
peer_id=format!("{peer_id:#}"),
hash=%tx.tx_hash(),
client_version=%peer.client_version,
"received a recently mined transaction from peer"
);
} else {
// this is a new transaction that should be imported into the pool
@@ -1650,6 +1681,14 @@ where
|cmd| this.on_command(cmd)
);
// Drain pool events to handle mined transactions.
// We only care about `Mined` events for cleanup purposes.
while let Poll::Ready(Some(event)) = this.pool_events.poll_next_unpin(cx) {
if let FullTransactionEvent::Mined { tx_hash, .. } = event {
this.on_mined_transaction(tx_hash);
}
}
this.transaction_fetcher.update_metrics();
// all channels are fully drained and import futures pending