diff --git a/crates/net/network/benches/tx_manager_hash_fetching.rs b/crates/net/network/benches/tx_manager_hash_fetching.rs index f00f5cf69d..ff862b91d0 100644 --- a/crates/net/network/benches/tx_manager_hash_fetching.rs +++ b/crates/net/network/benches/tx_manager_hash_fetching.rs @@ -1,24 +1,71 @@ #![allow(missing_docs)] -use alloy_primitives::U256; -use criterion::*; +use alloy_primitives::{B256, U256}; +use criterion::{measurement::WallTime, *}; use rand::SeedableRng; +use reth_eth_wire::EthVersion; +use reth_eth_wire_types::EthNetworkPrimitives; use reth_network::{ - test_utils::Testnet, + test_utils::{ + transactions::{buffer_hash_to_tx_fetcher, new_mock_session}, + Testnet, + }, transactions::{ - TransactionFetcherConfig, TransactionPropagationMode::Max, TransactionsManagerConfig, + fetcher::TransactionFetcher, TransactionFetcherConfig, TransactionPropagationMode::Max, + TransactionsManagerConfig, }, }; +use reth_network_peers::PeerId; use reth_provider::test_utils::{ExtendedAccount, MockEthProvider}; use reth_transaction_pool::{test_utils::TransactionGenerator, PoolTransaction, TransactionPool}; +use std::collections::HashMap; use tokio::runtime::Runtime as TokioRuntime; criterion_group!( name = tx_fetch_benches; config = Criterion::default(); - targets = tx_fetch_bench + targets = tx_fetch_bench, fetch_pending_hashes, ); +pub fn benchmark_fetch_pending_hashes(group: &mut BenchmarkGroup<'_, WallTime>, peers_num: usize) { + let setup = || { + let mut tx_fetcher = TransactionFetcher::::default(); + let mut peers = HashMap::default(); + + for _i in 0..peers_num { + // NOTE: the worst case, each tx in the cache belongs to a differenct peer. + let peer = PeerId::random(); + let hash = B256::random(); + + let (mut peer_data, _) = new_mock_session(peer, EthVersion::Eth66); + peer_data.seen_transactions_mut().insert(hash); + peers.insert(peer, peer_data); + + buffer_hash_to_tx_fetcher(&mut tx_fetcher, hash, peer, 0, None); + } + + (tx_fetcher, peers) + }; + + let group_id = format!("fetch pending hashes, peers num: {}", peers_num); + + group.bench_function(group_id, |b| { + b.iter_with_setup(setup, |(mut tx_fetcher, peers)| { + tx_fetcher.on_fetch_pending_hashes(&peers, |_| true); + }); + }); +} + +pub fn fetch_pending_hashes(c: &mut Criterion) { + let mut group = c.benchmark_group("Fetch Pending Hashes"); + + for peers in [5, 10, 20, 100, 1000, 10000, 100000] { + benchmark_fetch_pending_hashes(&mut group, peers); + } + + group.finish(); +} + pub fn tx_fetch_bench(c: &mut Criterion) { let rt = TokioRuntime::new().unwrap(); diff --git a/crates/net/network/src/test_utils/mod.rs b/crates/net/network/src/test_utils/mod.rs index 71639bc715..5323547a6f 100644 --- a/crates/net/network/src/test_utils/mod.rs +++ b/crates/net/network/src/test_utils/mod.rs @@ -2,9 +2,11 @@ mod init; mod testnet; +pub mod transactions; pub use init::{ enr_to_peer_id, unused_port, unused_tcp_addr, unused_tcp_and_udp_port, unused_tcp_udp, unused_udp_addr, unused_udp_port, GETH_TIMEOUT, }; pub use testnet::{NetworkEventStream, Peer, PeerConfig, PeerHandle, Testnet, TestnetHandle}; +pub use transactions::{buffer_hash_to_tx_fetcher, new_mock_session, new_tx_manager}; diff --git a/crates/net/network/src/test_utils/transactions.rs b/crates/net/network/src/test_utils/transactions.rs new file mode 100644 index 0000000000..c3c38e3f1c --- /dev/null +++ b/crates/net/network/src/test_utils/transactions.rs @@ -0,0 +1,101 @@ +//! Test helper impls for transactions + +#![allow(dead_code)] + +use crate::{ + cache::LruCache, + transactions::{ + constants::{ + tx_fetcher::DEFAULT_MAX_COUNT_FALLBACK_PEERS, + tx_manager::DEFAULT_MAX_COUNT_TRANSACTIONS_SEEN_BY_PEER, + }, + fetcher::{TransactionFetcher, TxFetchMetadata}, + PeerMetadata, TransactionsManager, + }, + NetworkConfigBuilder, NetworkManager, +}; +use alloy_primitives::TxHash; +use reth_eth_wire::EthVersion; +use reth_eth_wire_types::EthNetworkPrimitives; +use reth_network_api::{PeerKind, PeerRequest, PeerRequestSender}; +use reth_network_peers::PeerId; +use reth_storage_api::noop::NoopProvider; +use reth_transaction_pool::test_utils::{testing_pool, TestPool}; +use secp256k1::SecretKey; +use std::sync::Arc; +use tokio::sync::mpsc; +use tracing::trace; + +/// A new tx manager for testing. +pub async fn new_tx_manager( +) -> (TransactionsManager, NetworkManager) { + let secret_key = SecretKey::new(&mut rand_08::thread_rng()); + let client = NoopProvider::default(); + + let config = NetworkConfigBuilder::new(secret_key) + // let OS choose port + .listener_port(0) + .disable_discovery() + .build(client); + + let pool = testing_pool(); + + let transactions_manager_config = config.transactions_manager_config.clone(); + let (_network_handle, network, transactions, _) = NetworkManager::new(config) + .await + .unwrap() + .into_builder() + .transactions(pool.clone(), transactions_manager_config) + .split_with_handle(); + + (transactions, network) +} + +/// Directly buffer hahs into tx fetcher for testing. +pub fn buffer_hash_to_tx_fetcher( + tx_fetcher: &mut TransactionFetcher, + hash: TxHash, + peer_id: PeerId, + retries: u8, + tx_encoded_length: Option, +) { + match tx_fetcher.hashes_fetch_inflight_and_pending_fetch.get_or_insert(hash, || { + TxFetchMetadata::new( + retries, + LruCache::new(DEFAULT_MAX_COUNT_FALLBACK_PEERS as u32), + tx_encoded_length, + ) + }) { + Some(metadata) => { + metadata.fallback_peers_mut().insert(peer_id); + } + None => { + trace!(target: "net::tx", + peer_id=format!("{peer_id:#}"), + %hash, + "failed to insert hash from peer in schnellru::LruMap, dropping hash" + ) + } + } + + tx_fetcher.hashes_pending_fetch.insert(hash); +} + +/// Mock a new session, returns (peer, channel-to-send-get-pooled-tx-response-on). +pub fn new_mock_session( + peer_id: PeerId, + version: EthVersion, +) -> (PeerMetadata, mpsc::Receiver) { + let (to_mock_session_tx, to_mock_session_rx) = mpsc::channel(1); + + ( + PeerMetadata::new( + PeerRequestSender::new(peer_id, to_mock_session_tx), + version, + Arc::from(""), + DEFAULT_MAX_COUNT_TRANSACTIONS_SEEN_BY_PEER, + PeerKind::Trusted, + ), + to_mock_session_rx, + ) +} diff --git a/crates/net/network/src/transactions/fetcher.rs b/crates/net/network/src/transactions/fetcher.rs index 924080ac50..151b862c9c 100644 --- a/crates/net/network/src/transactions/fetcher.rs +++ b/crates/net/network/src/transactions/fetcher.rs @@ -84,7 +84,7 @@ pub struct TransactionFetcher { /// which a [`GetPooledTransactions`] request is inflight. pub hashes_pending_fetch: LruCache, /// Tracks all hashes in the transaction fetcher. - pub(super) hashes_fetch_inflight_and_pending_fetch: LruMap, + pub hashes_fetch_inflight_and_pending_fetch: LruMap, /// Filter for valid announcement and response data. pub(super) filter_valid_message: MessageFilter, /// Info on capacity of the transaction fetcher. @@ -1301,7 +1301,7 @@ struct TxFetcherSearchDurations { #[cfg(test)] mod test { use super::*; - use crate::transactions::tests::{buffer_hash_to_tx_fetcher, new_mock_session}; + use crate::test_utils::transactions::{buffer_hash_to_tx_fetcher, new_mock_session}; use alloy_primitives::{hex, B256}; use alloy_rlp::Decodable; use derive_more::IntoIterator; diff --git a/crates/net/network/src/transactions/mod.rs b/crates/net/network/src/transactions/mod.rs index 7b32722a35..36e18ff759 100644 --- a/crates/net/network/src/transactions/mod.rs +++ b/crates/net/network/src/transactions/mod.rs @@ -1846,7 +1846,7 @@ pub struct PeerMetadata { impl PeerMetadata { /// Returns a new instance of [`PeerMetadata`]. - fn new( + pub fn new( request_tx: PeerRequestSender>, version: EthVersion, client_version: Arc, @@ -1867,6 +1867,11 @@ impl PeerMetadata { &self.request_tx } + /// Return a + pub fn seen_transactions_mut(&mut self) -> &mut LruCache { + &mut self.seen_transactions + } + /// Returns the negotiated `EthVersion` of the session. pub const fn version(&self) -> EthVersion { self.version @@ -1983,11 +1988,16 @@ struct TxManagerPollDurations { #[cfg(test)] mod tests { use super::*; - use crate::{test_utils::Testnet, NetworkConfigBuilder, NetworkManager}; + use crate::{ + test_utils::{ + transactions::{buffer_hash_to_tx_fetcher, new_mock_session, new_tx_manager}, + Testnet, + }, + NetworkConfigBuilder, NetworkManager, + }; use alloy_consensus::{transaction::PooledTransaction, TxEip1559, TxLegacy}; use alloy_primitives::{hex, Signature, TxKind, U256}; use alloy_rlp::Decodable; - use constants::tx_fetcher::DEFAULT_MAX_COUNT_FALLBACK_PEERS; use futures::FutureExt; use reth_chainspec::MIN_TRANSACTION_GAS; use reth_ethereum_primitives::{Transaction, TransactionSigned}; @@ -1998,7 +2008,7 @@ mod tests { }; use reth_storage_api::noop::NoopProvider; use reth_transaction_pool::test_utils::{ - testing_pool, MockTransaction, MockTransactionFactory, TestPool, + testing_pool, MockTransaction, MockTransactionFactory, }; use secp256k1::SecretKey; use std::{ @@ -2006,82 +2016,8 @@ mod tests { net::{IpAddr, Ipv4Addr, SocketAddr}, str::FromStr, }; - use tests::fetcher::TxFetchMetadata; use tracing::error; - async fn new_tx_manager( - ) -> (TransactionsManager, NetworkManager) - { - let secret_key = SecretKey::new(&mut rand_08::thread_rng()); - let client = NoopProvider::default(); - - let config = NetworkConfigBuilder::new(secret_key) - // let OS choose port - .listener_port(0) - .disable_discovery() - .build(client); - - let pool = testing_pool(); - - let transactions_manager_config = config.transactions_manager_config.clone(); - let (_network_handle, network, transactions, _) = NetworkManager::new(config) - .await - .unwrap() - .into_builder() - .transactions(pool.clone(), transactions_manager_config) - .split_with_handle(); - - (transactions, network) - } - - pub(super) fn buffer_hash_to_tx_fetcher( - tx_fetcher: &mut TransactionFetcher, - hash: TxHash, - peer_id: PeerId, - retries: u8, - tx_encoded_length: Option, - ) { - match tx_fetcher.hashes_fetch_inflight_and_pending_fetch.get_or_insert(hash, || { - TxFetchMetadata::new( - retries, - LruCache::new(DEFAULT_MAX_COUNT_FALLBACK_PEERS as u32), - tx_encoded_length, - ) - }) { - Some(metadata) => { - metadata.fallback_peers_mut().insert(peer_id); - } - None => { - trace!(target: "net::tx", - peer_id=format!("{peer_id:#}"), - %hash, - "failed to insert hash from peer in schnellru::LruMap, dropping hash" - ) - } - } - - tx_fetcher.hashes_pending_fetch.insert(hash); - } - - // Returns (peer, channel-to-send-get-pooled-tx-response-on). - pub(super) fn new_mock_session( - peer_id: PeerId, - version: EthVersion, - ) -> (PeerMetadata, mpsc::Receiver) { - let (to_mock_session_tx, to_mock_session_rx) = mpsc::channel(1); - - ( - PeerMetadata::new( - PeerRequestSender::new(peer_id, to_mock_session_tx), - version, - Arc::from(""), - DEFAULT_MAX_COUNT_TRANSACTIONS_SEEN_BY_PEER, - PeerKind::Trusted, - ), - to_mock_session_rx, - ) - } - #[tokio::test(flavor = "multi_thread")] async fn test_ignored_tx_broadcasts_while_initially_syncing() { reth_tracing::init_test_tracing();