From 3a56b92515d01e5c882cef3205e38dce35a0a832 Mon Sep 17 00:00:00 2001 From: Sydhds Date: Thu, 4 Sep 2025 08:42:42 +0200 Subject: [PATCH] Add new benchmark (#35) * Add new benchmark * Add black_box --- .github/workflows/ci.yml | 2 +- Readme.md | 4 + prover/Cargo.toml | 4 + prover/benches/prover_bench.rs | 8 +- prover/benches/prover_many_subscribers.rs | 243 ++++++++++++++++++++++ 5 files changed, 257 insertions(+), 4 deletions(-) create mode 100644 prover/benches/prover_many_subscribers.rs diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 100022d..6615e28 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -60,7 +60,7 @@ jobs: - name: Install foundry uses: foundry-rs/foundry-toolchain@v1 - name: Test - run: cargo test --all-targets --all-features --release + run: cargo test --all-features --release unused-deps: needs: [lint, test] diff --git a/Readme.md b/Readme.md index 8c8801f..56c7253 100644 --- a/Readme.md +++ b/Readme.md @@ -31,3 +31,7 @@ RUST_LOG=debug cargo run -p prover_cli -- --ip 127.0.0.1 --metrics-ip 127.0.0.1 * grpcurl -plaintext -d '{"sender": "Alice", "tx_id": "42"}' '[::1]:50051' prover.RlnProver/SendTransaction * grpcurl -plaintext '[::1]:50051' prover.RlnProver/GetProofs + +## Bench + +* SUBSCRIBER_COUNT=2 SUBSCRIBER_IP=10.235.185.198 RAYON_NUM_THREADS=4 PROOF_SERVICE_COUNT=4 PROOF_COUNT=6 cargo bench -p prover --bench prover_many_subscribers \ No newline at end of file diff --git a/prover/Cargo.toml b/prover/Cargo.toml index b4e104a..663cf3c 100644 --- a/prover/Cargo.toml +++ b/prover/Cargo.toml @@ -58,3 +58,7 @@ tracing-test = "0.2.5" [[bench]] name = "prover_bench" harness = false + +[[bench]] +name = "prover_many_subscribers" +harness = false diff --git a/prover/benches/prover_bench.rs b/prover/benches/prover_bench.rs index e277990..96f060b 100644 --- a/prover/benches/prover_bench.rs +++ b/prover/benches/prover_bench.rs @@ -64,7 +64,7 @@ async fn proof_sender(port: u16, addresses: Vec
, proof_count: usize) { } async fn proof_collector(port: u16, proof_count: usize) -> Vec { - let result = Arc::new(RwLock::new(vec![])); + let result = Arc::new(RwLock::new(Vec::with_capacity(proof_count))); let url = format!("http://127.0.0.1:{port}"); let mut client = RlnProverClient::connect(url).await.unwrap(); @@ -194,8 +194,10 @@ fn proof_generation_bench(c: &mut Criterion) { set.spawn(proof_sender(port, addresses.clone(), proof_count).map(|_r| vec![])); // Wait for proof_sender + proof_collector to complete let res = set.join_all().await; - // Check we receive enough proof - assert_eq!(res[1].len(), proof_count); + // Check proof_sender return an empty vec + assert_eq!(res.iter().filter(|r| r.is_empty()).count(), 1); + // Check we receive enough proofs + assert_eq!(res.iter().filter(|r| r.len() == proof_count).count(), 1); } }); }, diff --git a/prover/benches/prover_many_subscribers.rs b/prover/benches/prover_many_subscribers.rs new file mode 100644 index 0000000..276015c --- /dev/null +++ b/prover/benches/prover_many_subscribers.rs @@ -0,0 +1,243 @@ +use criterion::{BenchmarkId, Criterion, Throughput, criterion_group, criterion_main}; + +// std +use std::hint::black_box; +use std::io::Write; +use std::net::{IpAddr, Ipv4Addr}; +use std::str::FromStr; +use std::sync::Arc; +use std::time::Duration; +// third-party +use alloy::primitives::{Address, U256}; +use futures::FutureExt; +use parking_lot::RwLock; +use tempfile::NamedTempFile; +use tokio::sync::Notify; +use tokio::task::JoinSet; +use tonic::Response; +// internal +use prover::{AppArgs, MockUser, run_prover}; + +// grpc +pub mod prover_proto { + // Include generated code (see build.rs) + tonic::include_proto!("prover"); +} +use prover_proto::{ + Address as GrpcAddress, RlnProofFilter, RlnProofReply, SendTransactionReply, + SendTransactionRequest, U256 as GrpcU256, Wei as GrpcWei, rln_prover_client::RlnProverClient, +}; + +async fn proof_sender(ip: IpAddr, port: u16, addresses: Vec
, proof_count: usize) { + let chain_id = GrpcU256 { + // FIXME: LE or BE? + value: U256::from(1).to_le_bytes::<32>().to_vec(), + }; + + let url = format!("http://{ip}:{port}"); + let mut client = RlnProverClient::connect(url).await.unwrap(); + + let addr = GrpcAddress { + value: addresses[0].to_vec(), + }; + let wei = GrpcWei { + // FIXME: LE or BE? + value: U256::from(1000).to_le_bytes::<32>().to_vec(), + }; + + for i in 0..proof_count { + let tx_hash = U256::from(42 + i).to_le_bytes::<32>().to_vec(); + + let request_0 = SendTransactionRequest { + gas_price: Some(wei.clone()), + sender: Some(addr.clone()), + chain_id: Some(chain_id.clone()), + transaction_hash: tx_hash, + }; + + let request = tonic::Request::new(request_0); + let response: Response = + client.send_transaction(request).await.unwrap(); + assert!(response.into_inner().result); + } +} + +async fn proof_collector(ip: IpAddr, port: u16, proof_count: usize) -> Vec { + let result = Arc::new(RwLock::new(Vec::with_capacity(proof_count))); + + let url = format!("http://{ip}:{port}"); + let mut client = RlnProverClient::connect(url).await.unwrap(); + + let request_0 = RlnProofFilter { address: None }; + + let request = tonic::Request::new(request_0); + let stream_ = client.get_proofs(request).await.unwrap(); + let mut stream = stream_.into_inner(); + let result_2 = result.clone(); + let mut proof_received = 0; + while let Some(response) = stream.message().await.unwrap() { + result_2.write().push(response); + proof_received += 1; + if proof_received >= proof_count { + break; + } + } + + std::mem::take(&mut *result.write()) +} + +fn proof_generation_bench(c: &mut Criterion) { + let rayon_num_threads = std::env::var("RAYON_NUM_THREADS").unwrap_or("".to_string()); + let proof_service_count_default = 4; + let proof_service_count = std::env::var("PROOF_SERVICE_COUNT") + .map(|c| u16::from_str(c.as_str()).unwrap_or(proof_service_count_default)) + .unwrap_or(proof_service_count_default); + let proof_count_default = 5; + let proof_count = std::env::var("PROOF_COUNT") + .map(|c| u32::from_str(c.as_str()).unwrap_or(proof_count_default)) + .unwrap_or(proof_count_default); + let subscriber_count_default = 4; + let subscriber_count = std::env::var("SUBSCRIBER_COUNT") + .map(|c| u32::from_str(c.as_str()).unwrap_or(subscriber_count_default)) + .unwrap_or(subscriber_count_default); + let subscriber_ip = std::env::var("SUBSCRIBER_IP") + .map(|c| IpAddr::from_str(c.as_str()).expect("Invalid SUBSCRIBER_IP")) + .expect("Please provide an IP for subscriber to connect to"); + + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + + // Write mock users to tempfile + let mock_users = vec![ + MockUser { + address: Address::from_str("0xd8da6bf26964af9d7eed9e03e53415d37aa96045").unwrap(), + tx_count: 0, + }, + MockUser { + address: Address::from_str("0xb20a608c624Ca5003905aA834De7156C68b2E1d0").unwrap(), + tx_count: 0, + }, + ]; + let addresses: Vec
= mock_users.iter().map(|u| u.address).collect(); + let mock_users_as_str = serde_json::to_string(&mock_users).unwrap(); + let mut temp_file = NamedTempFile::new().unwrap(); + let temp_file_path = temp_file.path().to_path_buf(); + temp_file.write_all(mock_users_as_str.as_bytes()).unwrap(); + temp_file.flush().unwrap(); + + let port = 50051; + let temp_folder = tempfile::tempdir().unwrap(); + let temp_folder_tree = tempfile::tempdir().unwrap(); + // let proof_service_count = 4; + let app_args = AppArgs { + ip: IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), + port, + ws_rpc_url: None, + db_path: temp_folder.path().to_path_buf(), + merkle_tree_path: temp_folder_tree.path().to_path_buf(), + ksc_address: None, + rlnsc_address: None, + tsc_address: None, + mock_sc: Some(true), + mock_user: Some(temp_file_path), + config_path: Default::default(), + no_config: true, + metrics_ip: IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), + metrics_port: 30051, + broadcast_channel_size: 100, + proof_service_count, + transaction_channel_size: 100, + proof_sender_channel_size: 100, + }; + + // Tokio notify - wait for some time after spawning run_prover then notify it's ready to accept + // connections + let notify_start = Arc::new(Notify::new()); + + // Spawn prover + let notify_start_1 = notify_start.clone(); + rt.spawn(async move { + tokio::spawn(run_prover(app_args)); + tokio::time::sleep(Duration::from_secs(10)).await; + println!("Prover is ready, notifying it..."); + notify_start_1.clone().notify_one(); + }); + + let notify_start_2 = notify_start.clone(); + + // Wait for proof_collector to be connected and waiting for some proofs + rt.block_on(async move { + notify_start_2.notified().await; + println!("Prover is ready..."); + // register_users(port, addresses_0).await; + }); + + println!("Starting benchmark..."); + // let size: usize = 1024; + + let mut group = c.benchmark_group("prover_bench"); + // group.sampling_mode(criterion::SamplingMode::Flat); + + // let proof_count = 5; + let proof_count = proof_count as usize; + + group.throughput(Throughput::Elements(proof_count as u64)); + #[allow(clippy::uninlined_format_args)] + let benchmark_name = format!( + "prover_many_subscribers_{}_proof_service_{}_rt_{}", + proof_count, proof_service_count, rayon_num_threads + ); + group.bench_with_input( + BenchmarkId::new(benchmark_name, proof_count), + &proof_count, + |b, &_s| { + b.to_async(&rt).iter(|| { + async { + let mut set = JoinSet::new(); + + for _i in 0..subscriber_count { + set.spawn(proof_collector( + black_box(subscriber_ip), + black_box(port), + black_box(proof_count), + )); + } + + set.spawn( + proof_sender( + black_box(subscriber_ip), + black_box(port), + black_box(addresses.clone()), + black_box(proof_count), + ) + .map(|_r| vec![]), + ); + // Wait for proof_sender + proof_collector to complete + let res = set.join_all().await; + + assert_eq!(res.len(), subscriber_count as usize + 1); + // println!("res: {:?}", res); + assert_eq!(res.iter().filter(|r| r.is_empty()).count(), 1); + assert_eq!( + res.iter().filter(|r| r.len() == proof_count).count(), + subscriber_count as usize + ); + } + }); + }, + ); + + group.finish(); +} + +criterion_group!( + name = benches; + config = Criterion::default() + .sample_size(10) + .measurement_time(Duration::from_secs(500)) + ; + targets = proof_generation_bench +); +criterion_main!(benches);