From 2d1bcd17d8020aff3b4bebd16ec25183c0a58601 Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Fri, 3 Mar 2023 20:50:49 +0100 Subject: [PATCH] perf: improve initial pooled tx exchange (#1615) --- crates/net/network/src/transactions.rs | 54 ++++++++++++++----------- crates/transaction-pool/src/lib.rs | 3 +- crates/transaction-pool/src/validate.rs | 5 +++ 3 files changed, 37 insertions(+), 25 deletions(-) diff --git a/crates/net/network/src/transactions.rs b/crates/net/network/src/transactions.rs index e59d3c927c..39bf239df6 100644 --- a/crates/net/network/src/transactions.rs +++ b/crates/net/network/src/transactions.rs @@ -19,7 +19,8 @@ use reth_primitives::{ }; use reth_rlp::Encodable; use reth_transaction_pool::{ - error::PoolResult, PropagateKind, PropagatedTransactions, TransactionPool, + error::PoolResult, PoolTransaction, PropagateKind, PropagatedTransactions, TransactionPool, + ValidPoolTransaction, }; use std::{ collections::{hash_map::Entry, HashMap}, @@ -37,7 +38,7 @@ use tracing::trace; const PEER_TRANSACTION_CACHE_LIMIT: usize = 1024 * 10; /// Soft limit for NewPooledTransactions -const NEW_POOLED_TRANSACTION_HASHES_SOFT_LIMT: usize = 4096; +const NEW_POOLED_TRANSACTION_HASHES_SOFT_LIMIT: usize = 4096; /// The future for inserting a function into the pool pub type PoolImportFuture = Pin> + Send + 'static>>; @@ -340,7 +341,7 @@ where self.peers.remove(&peer_id); } NetworkEvent::SessionEstablished { peer_id, messages, version, .. } => { - // insert a new peer + // insert a new peer into the peerset self.peers.insert( peer_id, Peer { @@ -352,31 +353,26 @@ where }, ); - // Send a `NewPooledTransactionHashes` to the peer with _all_ transactions in the + // Send a `NewPooledTransactionHashes` to the peer with up to + // `NEW_POOLED_TRANSACTION_HASHES_SOFT_LIMIT` transactions in the // pool if !self.network.is_syncing() { - let mut hashes = PooledTransactionsHashesBuilder::new(version); - let to_propogate = self.pool.pooled_transactions().into_iter().map(|tx| { - let tx = Arc::new(tx.transaction.to_recovered_transaction().into_signed()); - PropagateTransaction::new(tx) - }); + let peer = self.peers.get_mut(&peer_id).expect("is present; qed"); - for tx in to_propogate.take(NEW_POOLED_TRANSACTION_HASHES_SOFT_LIMT) { - let peer = self.peers.get_mut(&peer_id).unwrap(); + let mut msg_builder = PooledTransactionsHashesBuilder::new(version); - if peer.transactions.insert(tx.hash()) { - hashes.push(&tx); - } + for pooled_tx in self + .pool + .pooled_transactions() + .into_iter() + .take(NEW_POOLED_TRANSACTION_HASHES_SOFT_LIMIT) + { + peer.transactions.insert(*pooled_tx.hash()); + msg_builder.push_pooled(pooled_tx); } - let hashes = hashes.build(); - - self.network.send_transactions_hashes(peer_id, hashes); - // let msg = NewPooledTransactionHashes66(self.pool.pooled_transactions()); - // self.network.send_message(NetworkHandleMessage::SendPooledTransactionHashes { - // peer_id, - // msg, - // }) + let msg = msg_builder.build(); + self.network.send_transactions_hashes(peer_id, msg); } } // TODO Add remaining events @@ -536,7 +532,7 @@ where } } -/// A transaction that's about to be propagated +/// A transaction that's about to be propagated to multiple peers. struct PropagateTransaction { tx_type: u8, length: usize, @@ -565,6 +561,18 @@ enum PooledTransactionsHashesBuilder { // === impl PooledTransactionsHashesBuilder === impl PooledTransactionsHashesBuilder { + /// Push a transaction from the pool to the list. + fn push_pooled(&mut self, pooled_tx: Arc>) { + match self { + PooledTransactionsHashesBuilder::Eth66(msg) => msg.0.push(*pooled_tx.hash()), + PooledTransactionsHashesBuilder::Eth68(msg) => { + msg.hashes.push(*pooled_tx.hash()); + msg.sizes.push(pooled_tx.encoded_length); + msg.types.push(pooled_tx.transaction.tx_type()); + } + } + } + fn push(&mut self, tx: &PropagateTransaction) { match self { PooledTransactionsHashesBuilder::Eth66(msg) => msg.0.push(tx.hash()), diff --git a/crates/transaction-pool/src/lib.rs b/crates/transaction-pool/src/lib.rs index 5b356d5830..f9d099a32d 100644 --- a/crates/transaction-pool/src/lib.rs +++ b/crates/transaction-pool/src/lib.rs @@ -86,13 +86,12 @@ pub use crate::{ BestTransactions, OnNewBlockEvent, PoolTransaction, PropagateKind, PropagatedTransactions, TransactionOrigin, TransactionPool, }, - validate::{TransactionValidationOutcome, TransactionValidator}, + validate::{TransactionValidationOutcome, TransactionValidator, ValidPoolTransaction}, }; use crate::{ error::PoolResult, pool::PoolInner, traits::{NewTransactionEvent, PoolSize}, - validate::ValidPoolTransaction, }; use reth_primitives::{TxHash, U256}; use std::{collections::HashMap, sync::Arc}; diff --git a/crates/transaction-pool/src/validate.rs b/crates/transaction-pool/src/validate.rs index a5d855b3a0..7200d4f850 100644 --- a/crates/transaction-pool/src/validate.rs +++ b/crates/transaction-pool/src/validate.rs @@ -106,6 +106,11 @@ impl ValidPoolTransaction { self.transaction.hash() } + /// Returns the type identifier of the transaction + pub fn tx_type(&self) -> u8 { + self.transaction.tx_type() + } + /// Returns the address of the sender pub fn sender(&self) -> Address { self.transaction.sender()