From c4c07b86bb08451ab21a881971d3a5759cc32648 Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Mon, 11 Dec 2023 15:20:45 +0100 Subject: [PATCH] feat: add network bench (#5728) --- Cargo.lock | 3 + crates/net/network/Cargo.toml | 9 ++ crates/net/network/benches/bench.rs | 89 ++++++++++++++++++++ crates/net/network/src/test_utils/testnet.rs | 18 ++++ 4 files changed, 119 insertions(+) create mode 100644 crates/net/network/benches/bench.rs diff --git a/Cargo.lock b/Cargo.lock index 826d1c2d6e..831dcf51bc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1521,6 +1521,7 @@ dependencies = [ "serde_derive", "serde_json", "tinytemplate", + "tokio", "walkdir", ] @@ -6062,6 +6063,7 @@ dependencies = [ "aquamarine", "async-trait", "auto_impl", + "criterion", "enr", "ethers-core", "ethers-middleware", @@ -6075,6 +6077,7 @@ dependencies = [ "metrics", "parking_lot 0.12.1", "pin-project", + "pprof", "rand 0.8.5", "reth-discv4", "reth-dns-discovery", diff --git a/crates/net/network/Cargo.toml b/crates/net/network/Cargo.toml index b63b6637f7..d861b2cfe8 100644 --- a/crates/net/network/Cargo.toml +++ b/crates/net/network/Cargo.toml @@ -89,6 +89,10 @@ enr = { workspace = true, features = ["serde", "rust-secp256k1"] } serial_test.workspace = true tempfile.workspace = true +## Benchmarks +pprof = { workspace = true, features = ["criterion", "flamegraph"] } +criterion = { workspace = true, features = ["async_tokio", "html_reports"] } + [features] default = ["serde"] serde = ["dep:serde", "dep:humantime-serde", "secp256k1/serde", "enr?/serde", "dep:serde_json"] @@ -101,3 +105,8 @@ optimism = [ "reth-network-api/optimism", "reth-rpc-types/optimism", ] + +[[bench]] +name = "bench" +required-features = ["test-utils"] +harness = false diff --git a/crates/net/network/benches/bench.rs b/crates/net/network/benches/bench.rs new file mode 100644 index 0000000000..9f4fb56659 --- /dev/null +++ b/crates/net/network/benches/bench.rs @@ -0,0 +1,89 @@ +use criterion::*; +use futures::StreamExt; +use pprof::criterion::{Output, PProfProfiler}; +use rand::thread_rng; +use reth_network::{test_utils::Testnet, NetworkEvents}; +use reth_network_api::Peers; +use reth_primitives::U256; +use reth_provider::test_utils::{ExtendedAccount, MockEthProvider}; +use reth_transaction_pool::{test_utils::TransactionGenerator, PoolTransaction}; +use std::sync::Arc; +use tokio::{runtime::Runtime as TokioRuntime, sync::mpsc::unbounded_channel}; + +criterion_group!( + name = brodcast_benches; + config = Criterion::default().with_profiler(PProfProfiler::new(100, Output::Flamegraph(None))); + targets = broadcast_ingress_bench +); + +pub fn broadcast_ingress_bench(c: &mut Criterion) { + let rt = TokioRuntime::new().unwrap(); + + let mut group = c.benchmark_group("Broadcast Ingress"); + group.sample_size(10); + group.bench_function("receive_broadcasts", move |b| { + b.to_async(&rt).iter_with_setup( + || { + // `b.to_async(rt)` automatically enters the + // runtime context and simply calling `block_on` here will cause the code to panic. + tokio::task::block_in_place(|| { + tokio::runtime::Handle::current().block_on(async { + let provider = MockEthProvider::default(); + let mut net = Testnet::create_with(2, provider.clone()).await; + + let mut peer0 = net.remove_peer(0); + let (tx, transactions_rx) = unbounded_channel(); + peer0.network_mut().set_transactions(tx); + let mut events0 = peer0.handle().event_listener(); + let net = net.with_eth_pool(); + let handle = net.spawn(); + let peer1 = handle.peers()[0].network().clone(); + let peer0_id = peer0.peer_id(); + peer1.add_peer(peer0_id, peer0.local_addr()); + + // await connection + tokio::select! { + _ = events0.next() => {} + _ = &mut peer0 => {} + } + + // prepare some transactions + let mut gen = TransactionGenerator::new(thread_rng()); + let num_broadcasts = 10; + for _ in 0..num_broadcasts { + for _ in 0..2 { + let mut txs = Vec::new(); + let tx = gen.gen_eip1559_pooled(); + // ensure the sender has balance + provider.add_account( + tx.sender(), + ExtendedAccount::new(0, U256::from(100_000_000)), + ); + txs.push(Arc::new(tx.transaction().clone().into_signed())); + peer1.send_transactions(peer0_id, txs); + } + } + (num_broadcasts, transactions_rx, peer0, handle) + }) + }) + }, + |(num_txs, mut transactions_rx, mut peer0, _handle)| async move { + let mut count = 0; + loop { + tokio::select! { + _ = transactions_rx.recv() => { + count += 1; + if count == num_txs { + break; + } + }, + _ = &mut peer0 => { + } + } + } + }, + ) + }); +} + +criterion_main!(brodcast_benches); diff --git a/crates/net/network/src/test_utils/testnet.rs b/crates/net/network/src/test_utils/testnet.rs index 7ad4ae867e..1ac94ae335 100644 --- a/crates/net/network/src/test_utils/testnet.rs +++ b/crates/net/network/src/test_utils/testnet.rs @@ -96,6 +96,14 @@ where &self.peers } + /// Remove a peer from the [`Testnet`] and return it. + /// + /// # Panics + /// If the index is out of bounds. + pub fn remove_peer(&mut self, index: usize) -> Peer { + self.peers.remove(index) + } + /// Return a mutable iterator over all peers. pub fn peers_iter_mut(&mut self) -> impl Iterator> + '_ { self.peers.iter_mut() @@ -346,6 +354,16 @@ where self.network.local_addr() } + /// The [PeerId] of this peer. + pub fn peer_id(&self) -> PeerId { + *self.network.peer_id() + } + + /// Returns mutable access to the network. + pub fn network_mut(&mut self) -> &mut NetworkManager { + &mut self.network + } + /// Returns the [`NetworkHandle`] of this peer. pub fn handle(&self) -> NetworkHandle { self.network.handle().clone()