mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-01-09 23:38:10 -05:00
perf: recover transactions in parallel during network import (#20385)
This commit is contained in:
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -8945,6 +8945,7 @@ dependencies = [
|
||||
"pin-project",
|
||||
"rand 0.8.5",
|
||||
"rand 0.9.2",
|
||||
"rayon",
|
||||
"reth-chainspec",
|
||||
"reth-consensus",
|
||||
"reth-discv4",
|
||||
|
||||
@@ -66,6 +66,7 @@ tracing.workspace = true
|
||||
rustc-hash.workspace = true
|
||||
thiserror.workspace = true
|
||||
parking_lot.workspace = true
|
||||
rayon.workspace = true
|
||||
rand.workspace = true
|
||||
rand_08.workspace = true
|
||||
secp256k1 = { workspace = true, features = ["global-context", "std", "recovery"] }
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
//! Transactions management for the p2p network.
|
||||
|
||||
use alloy_consensus::transaction::TxHashRef;
|
||||
use rayon::iter::{IntoParallelIterator, ParallelIterator};
|
||||
|
||||
/// Aggregation on configurable parameters for [`TransactionsManager`].
|
||||
pub mod config;
|
||||
@@ -1368,51 +1369,49 @@ where
|
||||
// tracks the quality of the given transactions
|
||||
let mut has_bad_transactions = false;
|
||||
|
||||
// 2. filter out transactions that are invalid or already pending import pre-size to avoid
|
||||
// reallocations
|
||||
let mut new_txs = Vec::with_capacity(transactions.len());
|
||||
for tx in transactions {
|
||||
match self.transactions_by_peers.entry(*tx.tx_hash()) {
|
||||
Entry::Occupied(mut entry) => {
|
||||
// transaction was already inserted
|
||||
entry.get_mut().insert(peer_id);
|
||||
}
|
||||
Entry::Vacant(entry) => {
|
||||
if self.bad_imports.contains(tx.tx_hash()) {
|
||||
trace!(target: "net::tx",
|
||||
peer_id=format!("{peer_id:#}"),
|
||||
hash=%tx.tx_hash(),
|
||||
client_version=%peer.client_version,
|
||||
"received a known bad transaction from peer"
|
||||
);
|
||||
has_bad_transactions = true;
|
||||
} else {
|
||||
// this is a new transaction that should be imported into the pool
|
||||
|
||||
// recover transaction
|
||||
let tx = match tx.try_into_recovered() {
|
||||
Ok(tx) => tx,
|
||||
Err(badtx) => {
|
||||
trace!(target: "net::tx",
|
||||
peer_id=format!("{peer_id:#}"),
|
||||
hash=%badtx.tx_hash(),
|
||||
client_version=%peer.client_version,
|
||||
"failed ecrecovery for transaction"
|
||||
);
|
||||
has_bad_transactions = true;
|
||||
continue
|
||||
}
|
||||
};
|
||||
|
||||
let pool_transaction = Pool::Transaction::from_pooled(tx);
|
||||
new_txs.push(pool_transaction);
|
||||
|
||||
entry.insert(HashSet::from([peer_id]));
|
||||
}
|
||||
}
|
||||
// Remove known and invalid transactions
|
||||
transactions.retain(|tx| {
|
||||
if let Entry::Occupied(mut entry) = self.transactions_by_peers.entry(*tx.tx_hash()) {
|
||||
entry.get_mut().insert(peer_id);
|
||||
return false
|
||||
}
|
||||
if self.bad_imports.contains(tx.tx_hash()) {
|
||||
trace!(target: "net::tx",
|
||||
peer_id=format!("{peer_id:#}"),
|
||||
hash=%tx.tx_hash(),
|
||||
client_version=%peer.client_version,
|
||||
"received a known bad transaction from peer"
|
||||
);
|
||||
has_bad_transactions = true;
|
||||
return false;
|
||||
}
|
||||
true
|
||||
});
|
||||
|
||||
let txs_len = transactions.len();
|
||||
|
||||
let new_txs = transactions
|
||||
.into_par_iter()
|
||||
.filter_map(|tx| match tx.try_into_recovered() {
|
||||
Ok(tx) => Some(Pool::Transaction::from_pooled(tx)),
|
||||
Err(badtx) => {
|
||||
trace!(target: "net::tx",
|
||||
peer_id=format!("{peer_id:#}"),
|
||||
hash=%badtx.tx_hash(),
|
||||
client_version=%peer.client_version,
|
||||
"failed ecrecovery for transaction"
|
||||
);
|
||||
None
|
||||
}
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
has_bad_transactions |= new_txs.len() != txs_len;
|
||||
|
||||
// Record the transactions as seen by the peer
|
||||
for tx in &new_txs {
|
||||
self.transactions_by_peers.insert(*tx.hash(), HashSet::from([peer_id]));
|
||||
}
|
||||
new_txs.shrink_to_fit();
|
||||
|
||||
// 3. import new transactions as a batch to minimize lock contention on the underlying
|
||||
// pool
|
||||
|
||||
Reference in New Issue
Block a user