mirror of
https://github.com/vacp2p/status-rln-prover.git
synced 2026-01-07 20:54:07 -05:00
2
.github/workflows/ci.yml
vendored
2
.github/workflows/ci.yml
vendored
@@ -60,7 +60,7 @@ jobs:
|
||||
- name: Install foundry
|
||||
uses: foundry-rs/foundry-toolchain@v1
|
||||
- name: Test
|
||||
run: cargo test --all-targets --all-features --release
|
||||
run: cargo test --all-features --release
|
||||
|
||||
unused-deps:
|
||||
needs: [lint, test]
|
||||
|
||||
@@ -31,3 +31,7 @@ RUST_LOG=debug cargo run -p prover_cli -- --ip 127.0.0.1 --metrics-ip 127.0.0.1
|
||||
|
||||
* grpcurl -plaintext -d '{"sender": "Alice", "tx_id": "42"}' '[::1]:50051' prover.RlnProver/SendTransaction
|
||||
* grpcurl -plaintext '[::1]:50051' prover.RlnProver/GetProofs
|
||||
|
||||
## Bench
|
||||
|
||||
* SUBSCRIBER_COUNT=2 SUBSCRIBER_IP=10.235.185.198 RAYON_NUM_THREADS=4 PROOF_SERVICE_COUNT=4 PROOF_COUNT=6 cargo bench -p prover --bench prover_many_subscribers
|
||||
@@ -58,3 +58,7 @@ tracing-test = "0.2.5"
|
||||
[[bench]]
|
||||
name = "prover_bench"
|
||||
harness = false
|
||||
|
||||
[[bench]]
|
||||
name = "prover_many_subscribers"
|
||||
harness = false
|
||||
|
||||
@@ -64,7 +64,7 @@ async fn proof_sender(port: u16, addresses: Vec<Address>, proof_count: usize) {
|
||||
}
|
||||
|
||||
async fn proof_collector(port: u16, proof_count: usize) -> Vec<RlnProofReply> {
|
||||
let result = Arc::new(RwLock::new(vec![]));
|
||||
let result = Arc::new(RwLock::new(Vec::with_capacity(proof_count)));
|
||||
|
||||
let url = format!("http://127.0.0.1:{port}");
|
||||
let mut client = RlnProverClient::connect(url).await.unwrap();
|
||||
@@ -194,8 +194,10 @@ fn proof_generation_bench(c: &mut Criterion) {
|
||||
set.spawn(proof_sender(port, addresses.clone(), proof_count).map(|_r| vec![]));
|
||||
// Wait for proof_sender + proof_collector to complete
|
||||
let res = set.join_all().await;
|
||||
// Check we receive enough proof
|
||||
assert_eq!(res[1].len(), proof_count);
|
||||
// Check proof_sender return an empty vec
|
||||
assert_eq!(res.iter().filter(|r| r.is_empty()).count(), 1);
|
||||
// Check we receive enough proofs
|
||||
assert_eq!(res.iter().filter(|r| r.len() == proof_count).count(), 1);
|
||||
}
|
||||
});
|
||||
},
|
||||
|
||||
243
prover/benches/prover_many_subscribers.rs
Normal file
243
prover/benches/prover_many_subscribers.rs
Normal file
@@ -0,0 +1,243 @@
|
||||
use criterion::{BenchmarkId, Criterion, Throughput, criterion_group, criterion_main};
|
||||
|
||||
// std
|
||||
use std::hint::black_box;
|
||||
use std::io::Write;
|
||||
use std::net::{IpAddr, Ipv4Addr};
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
// third-party
|
||||
use alloy::primitives::{Address, U256};
|
||||
use futures::FutureExt;
|
||||
use parking_lot::RwLock;
|
||||
use tempfile::NamedTempFile;
|
||||
use tokio::sync::Notify;
|
||||
use tokio::task::JoinSet;
|
||||
use tonic::Response;
|
||||
// internal
|
||||
use prover::{AppArgs, MockUser, run_prover};
|
||||
|
||||
// grpc
|
||||
pub mod prover_proto {
|
||||
// Include generated code (see build.rs)
|
||||
tonic::include_proto!("prover");
|
||||
}
|
||||
use prover_proto::{
|
||||
Address as GrpcAddress, RlnProofFilter, RlnProofReply, SendTransactionReply,
|
||||
SendTransactionRequest, U256 as GrpcU256, Wei as GrpcWei, rln_prover_client::RlnProverClient,
|
||||
};
|
||||
|
||||
async fn proof_sender(ip: IpAddr, port: u16, addresses: Vec<Address>, proof_count: usize) {
|
||||
let chain_id = GrpcU256 {
|
||||
// FIXME: LE or BE?
|
||||
value: U256::from(1).to_le_bytes::<32>().to_vec(),
|
||||
};
|
||||
|
||||
let url = format!("http://{ip}:{port}");
|
||||
let mut client = RlnProverClient::connect(url).await.unwrap();
|
||||
|
||||
let addr = GrpcAddress {
|
||||
value: addresses[0].to_vec(),
|
||||
};
|
||||
let wei = GrpcWei {
|
||||
// FIXME: LE or BE?
|
||||
value: U256::from(1000).to_le_bytes::<32>().to_vec(),
|
||||
};
|
||||
|
||||
for i in 0..proof_count {
|
||||
let tx_hash = U256::from(42 + i).to_le_bytes::<32>().to_vec();
|
||||
|
||||
let request_0 = SendTransactionRequest {
|
||||
gas_price: Some(wei.clone()),
|
||||
sender: Some(addr.clone()),
|
||||
chain_id: Some(chain_id.clone()),
|
||||
transaction_hash: tx_hash,
|
||||
};
|
||||
|
||||
let request = tonic::Request::new(request_0);
|
||||
let response: Response<SendTransactionReply> =
|
||||
client.send_transaction(request).await.unwrap();
|
||||
assert!(response.into_inner().result);
|
||||
}
|
||||
}
|
||||
|
||||
async fn proof_collector(ip: IpAddr, port: u16, proof_count: usize) -> Vec<RlnProofReply> {
|
||||
let result = Arc::new(RwLock::new(Vec::with_capacity(proof_count)));
|
||||
|
||||
let url = format!("http://{ip}:{port}");
|
||||
let mut client = RlnProverClient::connect(url).await.unwrap();
|
||||
|
||||
let request_0 = RlnProofFilter { address: None };
|
||||
|
||||
let request = tonic::Request::new(request_0);
|
||||
let stream_ = client.get_proofs(request).await.unwrap();
|
||||
let mut stream = stream_.into_inner();
|
||||
let result_2 = result.clone();
|
||||
let mut proof_received = 0;
|
||||
while let Some(response) = stream.message().await.unwrap() {
|
||||
result_2.write().push(response);
|
||||
proof_received += 1;
|
||||
if proof_received >= proof_count {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
std::mem::take(&mut *result.write())
|
||||
}
|
||||
|
||||
fn proof_generation_bench(c: &mut Criterion) {
|
||||
let rayon_num_threads = std::env::var("RAYON_NUM_THREADS").unwrap_or("".to_string());
|
||||
let proof_service_count_default = 4;
|
||||
let proof_service_count = std::env::var("PROOF_SERVICE_COUNT")
|
||||
.map(|c| u16::from_str(c.as_str()).unwrap_or(proof_service_count_default))
|
||||
.unwrap_or(proof_service_count_default);
|
||||
let proof_count_default = 5;
|
||||
let proof_count = std::env::var("PROOF_COUNT")
|
||||
.map(|c| u32::from_str(c.as_str()).unwrap_or(proof_count_default))
|
||||
.unwrap_or(proof_count_default);
|
||||
let subscriber_count_default = 4;
|
||||
let subscriber_count = std::env::var("SUBSCRIBER_COUNT")
|
||||
.map(|c| u32::from_str(c.as_str()).unwrap_or(subscriber_count_default))
|
||||
.unwrap_or(subscriber_count_default);
|
||||
let subscriber_ip = std::env::var("SUBSCRIBER_IP")
|
||||
.map(|c| IpAddr::from_str(c.as_str()).expect("Invalid SUBSCRIBER_IP"))
|
||||
.expect("Please provide an IP for subscriber to connect to");
|
||||
|
||||
let rt = tokio::runtime::Builder::new_current_thread()
|
||||
.enable_all()
|
||||
.build()
|
||||
.unwrap();
|
||||
|
||||
// Write mock users to tempfile
|
||||
let mock_users = vec![
|
||||
MockUser {
|
||||
address: Address::from_str("0xd8da6bf26964af9d7eed9e03e53415d37aa96045").unwrap(),
|
||||
tx_count: 0,
|
||||
},
|
||||
MockUser {
|
||||
address: Address::from_str("0xb20a608c624Ca5003905aA834De7156C68b2E1d0").unwrap(),
|
||||
tx_count: 0,
|
||||
},
|
||||
];
|
||||
let addresses: Vec<Address> = mock_users.iter().map(|u| u.address).collect();
|
||||
let mock_users_as_str = serde_json::to_string(&mock_users).unwrap();
|
||||
let mut temp_file = NamedTempFile::new().unwrap();
|
||||
let temp_file_path = temp_file.path().to_path_buf();
|
||||
temp_file.write_all(mock_users_as_str.as_bytes()).unwrap();
|
||||
temp_file.flush().unwrap();
|
||||
|
||||
let port = 50051;
|
||||
let temp_folder = tempfile::tempdir().unwrap();
|
||||
let temp_folder_tree = tempfile::tempdir().unwrap();
|
||||
// let proof_service_count = 4;
|
||||
let app_args = AppArgs {
|
||||
ip: IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
|
||||
port,
|
||||
ws_rpc_url: None,
|
||||
db_path: temp_folder.path().to_path_buf(),
|
||||
merkle_tree_path: temp_folder_tree.path().to_path_buf(),
|
||||
ksc_address: None,
|
||||
rlnsc_address: None,
|
||||
tsc_address: None,
|
||||
mock_sc: Some(true),
|
||||
mock_user: Some(temp_file_path),
|
||||
config_path: Default::default(),
|
||||
no_config: true,
|
||||
metrics_ip: IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
|
||||
metrics_port: 30051,
|
||||
broadcast_channel_size: 100,
|
||||
proof_service_count,
|
||||
transaction_channel_size: 100,
|
||||
proof_sender_channel_size: 100,
|
||||
};
|
||||
|
||||
// Tokio notify - wait for some time after spawning run_prover then notify it's ready to accept
|
||||
// connections
|
||||
let notify_start = Arc::new(Notify::new());
|
||||
|
||||
// Spawn prover
|
||||
let notify_start_1 = notify_start.clone();
|
||||
rt.spawn(async move {
|
||||
tokio::spawn(run_prover(app_args));
|
||||
tokio::time::sleep(Duration::from_secs(10)).await;
|
||||
println!("Prover is ready, notifying it...");
|
||||
notify_start_1.clone().notify_one();
|
||||
});
|
||||
|
||||
let notify_start_2 = notify_start.clone();
|
||||
|
||||
// Wait for proof_collector to be connected and waiting for some proofs
|
||||
rt.block_on(async move {
|
||||
notify_start_2.notified().await;
|
||||
println!("Prover is ready...");
|
||||
// register_users(port, addresses_0).await;
|
||||
});
|
||||
|
||||
println!("Starting benchmark...");
|
||||
// let size: usize = 1024;
|
||||
|
||||
let mut group = c.benchmark_group("prover_bench");
|
||||
// group.sampling_mode(criterion::SamplingMode::Flat);
|
||||
|
||||
// let proof_count = 5;
|
||||
let proof_count = proof_count as usize;
|
||||
|
||||
group.throughput(Throughput::Elements(proof_count as u64));
|
||||
#[allow(clippy::uninlined_format_args)]
|
||||
let benchmark_name = format!(
|
||||
"prover_many_subscribers_{}_proof_service_{}_rt_{}",
|
||||
proof_count, proof_service_count, rayon_num_threads
|
||||
);
|
||||
group.bench_with_input(
|
||||
BenchmarkId::new(benchmark_name, proof_count),
|
||||
&proof_count,
|
||||
|b, &_s| {
|
||||
b.to_async(&rt).iter(|| {
|
||||
async {
|
||||
let mut set = JoinSet::new();
|
||||
|
||||
for _i in 0..subscriber_count {
|
||||
set.spawn(proof_collector(
|
||||
black_box(subscriber_ip),
|
||||
black_box(port),
|
||||
black_box(proof_count),
|
||||
));
|
||||
}
|
||||
|
||||
set.spawn(
|
||||
proof_sender(
|
||||
black_box(subscriber_ip),
|
||||
black_box(port),
|
||||
black_box(addresses.clone()),
|
||||
black_box(proof_count),
|
||||
)
|
||||
.map(|_r| vec![]),
|
||||
);
|
||||
// Wait for proof_sender + proof_collector to complete
|
||||
let res = set.join_all().await;
|
||||
|
||||
assert_eq!(res.len(), subscriber_count as usize + 1);
|
||||
// println!("res: {:?}", res);
|
||||
assert_eq!(res.iter().filter(|r| r.is_empty()).count(), 1);
|
||||
assert_eq!(
|
||||
res.iter().filter(|r| r.len() == proof_count).count(),
|
||||
subscriber_count as usize
|
||||
);
|
||||
}
|
||||
});
|
||||
},
|
||||
);
|
||||
|
||||
group.finish();
|
||||
}
|
||||
|
||||
criterion_group!(
|
||||
name = benches;
|
||||
config = Criterion::default()
|
||||
.sample_size(10)
|
||||
.measurement_time(Duration::from_secs(500))
|
||||
;
|
||||
targets = proof_generation_bench
|
||||
);
|
||||
criterion_main!(benches);
|
||||
Reference in New Issue
Block a user