From e5f88726ae753d9eac9ddc9991d01c240c657ba5 Mon Sep 17 00:00:00 2001 From: Sydhds Date: Fri, 11 Jul 2025 09:57:38 +0200 Subject: [PATCH] Improve test_notify unit test (more reliable) (#17) * Improve test_notify unit test (more reliable) --- prover/benches/prover_bench.rs | 78 ++++++++++++-------------------- prover/src/epoch_service.rs | 83 +++++++++++++++++++++++----------- prover/src/lib.rs | 75 ++++++++++++------------------ prover_cli/src/main.rs | 6 +-- 4 files changed, 115 insertions(+), 127 deletions(-) diff --git a/prover/benches/prover_bench.rs b/prover/benches/prover_bench.rs index 95d1a80..dea1a0f 100644 --- a/prover/benches/prover_bench.rs +++ b/prover/benches/prover_bench.rs @@ -4,23 +4,18 @@ use criterion::{criterion_group, criterion_main}; // std use std::net::{IpAddr, Ipv4Addr}; +use std::str::FromStr; use std::sync::Arc; use std::time::Duration; -use std::str::FromStr; // third-party -use alloy::{ - primitives::{Address, U256}, -}; +use alloy::primitives::{Address, U256}; +use futures::FutureExt; use parking_lot::RwLock; use tokio::sync::Notify; use tokio::task::JoinSet; use tonic::Response; -use futures::FutureExt; // internal -use prover::{ - AppArgs, - run_prover -}; +use prover::{AppArgs, run_prover}; // grpc pub mod prover_proto { @@ -28,40 +23,32 @@ pub mod prover_proto { tonic::include_proto!("prover"); } use prover_proto::{ - Address as GrpcAddress, - U256 as GrpcU256, - Wei as GrpcWei, - RegisterUserReply, RegisterUserRequest, RegistrationStatus, - SendTransactionRequest, SendTransactionReply, - RlnProofFilter, RlnProofReply, - rln_prover_client::RlnProverClient + Address as GrpcAddress, RegisterUserReply, RegisterUserRequest, RegistrationStatus, + RlnProofFilter, RlnProofReply, SendTransactionReply, SendTransactionRequest, U256 as GrpcU256, + Wei as GrpcWei, rln_prover_client::RlnProverClient, }; async fn register_users(port: u16, addresses: Vec
) { - let url = format!("http://127.0.0.1:{}", port); let mut client = RlnProverClient::connect(url).await.unwrap(); for address in addresses { - let addr = GrpcAddress { value: address.to_vec(), }; - let request_0 = RegisterUserRequest { - user: Some(addr), - }; + let request_0 = RegisterUserRequest { user: Some(addr) }; let request = tonic::Request::new(request_0); let response: Response = client.register_user(request).await.unwrap(); assert_eq!( RegistrationStatus::try_from(response.into_inner().status).unwrap(), - RegistrationStatus::Success); + RegistrationStatus::Success + ); } } async fn proof_sender(port: u16, addresses: Vec
, proof_count: usize) { - let chain_id = GrpcU256 { // FIXME: LE or BE? value: U256::from(1).to_le_bytes::<32>().to_vec(), @@ -79,7 +66,7 @@ async fn proof_sender(port: u16, addresses: Vec
, proof_count: usize) { }; for i in 0..proof_count { - let tx_hash = U256::from(42+i).to_le_bytes::<32>().to_vec(); + let tx_hash = U256::from(42 + i).to_le_bytes::<32>().to_vec(); let request_0 = SendTransactionRequest { gas_price: Some(wei.clone()), @@ -89,21 +76,19 @@ async fn proof_sender(port: u16, addresses: Vec
, proof_count: usize) { }; let request = tonic::Request::new(request_0); - let response: Response = client.send_transaction(request).await.unwrap(); + let response: Response = + client.send_transaction(request).await.unwrap(); assert_eq!(response.into_inner().result, true); } } async fn proof_collector(port: u16, proof_count: usize) -> Vec { - - let result= Arc::new(RwLock::new(vec![])); + let result = Arc::new(RwLock::new(vec![])); let url = format!("http://127.0.0.1:{}", port); let mut client = RlnProverClient::connect(url).await.unwrap(); - let request_0 = RlnProofFilter { - address: None, - }; + let request_0 = RlnProofFilter { address: None }; let request = tonic::Request::new(request_0); let stream_ = client.get_proofs(request).await.unwrap(); @@ -125,7 +110,6 @@ async fn proof_collector(port: u16, proof_count: usize) -> Vec { } fn proof_generation_bench(c: &mut Criterion) { - let rt = tokio::runtime::Builder::new_current_thread() .enable_all() .build() @@ -163,26 +147,22 @@ fn proof_generation_bench(c: &mut Criterion) { // Spawn prover let notify_start_1 = notify_start.clone(); - rt.spawn( - async move { - tokio::spawn(run_prover(app_args)); - tokio::time::sleep(Duration::from_secs(10)).await; - println!("Prover is ready, notifying it..."); - notify_start_1.clone().notify_one(); - } - ); + rt.spawn(async move { + tokio::spawn(run_prover(app_args)); + tokio::time::sleep(Duration::from_secs(10)).await; + println!("Prover is ready, notifying it..."); + notify_start_1.clone().notify_one(); + }); let notify_start_2 = notify_start.clone(); let addresses_0 = addresses.clone(); // Wait for proof_collector to be connected and waiting for some proofs - let _res = rt.block_on( - async move { - notify_start_2.notified().await; - println!("Prover is ready, registering users..."); - register_users(port, addresses_0).await; - } - ); + let _res = rt.block_on(async move { + notify_start_2.notified().await; + println!("Prover is ready, registering users..."); + register_users(port, addresses_0).await; + }); println!("Starting benchmark..."); let size: usize = 1024; @@ -192,9 +172,7 @@ fn proof_generation_bench(c: &mut Criterion) { async { let mut set = JoinSet::new(); set.spawn(proof_collector(port, proof_count)); - set.spawn( - proof_sender(port, addresses.clone(), proof_count).map(|_r| vec![]) - ); + set.spawn(proof_sender(port, addresses.clone(), proof_count).map(|_r| vec![])); // Wait for proof_sender + proof_collector to complete let res = set.join_all().await; // Check we receive enough proof @@ -211,4 +189,4 @@ criterion_group!( .measurement_time(Duration::from_secs(150)); targets = proof_generation_bench ); -criterion_main!(benches); \ No newline at end of file +criterion_main!(benches); diff --git a/prover/src/epoch_service.rs b/prover/src/epoch_service.rs index 5678a41..9155602 100644 --- a/prover/src/epoch_service.rs +++ b/prover/src/epoch_service.rs @@ -6,19 +6,25 @@ use chrono::{DateTime, NaiveDate, NaiveDateTime, OutOfRangeError, TimeDelta, Utc use derive_more::{Deref, From, Into}; use parking_lot::RwLock; use tokio::sync::Notify; -use tracing::debug; +use tracing::{debug, error}; // internal use crate::error::AppError; /// Duration of an epoch (1 day) const EPOCH_DURATION: Duration = Duration::from_secs(TimeDelta::days(1).num_seconds() as u64); /// Minimum duration returned by EpochService::compute_wait_until() -const WAIT_UNTIL_MIN_DURATION: Duration = Duration::from_secs(5); +const WAIT_UNTIL_MIN_DURATION: Duration = Duration::from_secs(2); +/// EpochService::compute_wait_until() can return an error like TooLow (see WAIT_UNTIL_MIN_DURATION) +/// so the epoch service will retry X many times. +const WAIT_UNTIL_MAX_COMPUTE_ERROR: usize = 10; /// An Epoch tracking service /// /// The service keeps track of the current epoch (duration: 1 day) and the current epoch slice /// (duration: configurable, < 1 day, usually in minutes) +/// +/// Use TryFrom impl to initialize an EpochService. Note that initial epoch & epoch slice is +/// initialized to Default values. Calling listen_for_new_epoch will initialize these fields. pub struct EpochService { /// A subdivision of an epoch (in minutes or seconds) epoch_slice_duration: Duration, @@ -36,20 +42,45 @@ impl EpochService { Self::compute_epoch_slice_count(EPOCH_DURATION, self.epoch_slice_duration); debug!("epoch slice in an epoch: {}", epoch_slice_count); - let (mut current_epoch, mut current_epoch_slice, mut wait_until) = + let mut retry_counter = 0; + let (mut current_epoch, mut current_epoch_slice, mut wait_until): ( + i64, + i64, + tokio::time::Instant, + ) = loop { match self.compute_wait_until(&|| Utc::now(), &|| tokio::time::Instant::now()) { Ok((current_epoch, current_epoch_slice, wait_until)) => { - (current_epoch, current_epoch_slice, wait_until) + break (current_epoch, current_epoch_slice, wait_until); } - Err(_e) => { - // sleep and try again (only one retry) + Err(WaitUntilError::TooLow(d1, d2)) => { + // Wait until is too low (according to const WAIT_UNTIL_MIN_DURATION) + // so we will retry (WAIT_UNTIL_MAX_COMPUTE_ERROR many times) after a short sleep + + debug!("compute_wait_until return TooLow, will retry after a sleep..."); tokio::time::sleep(WAIT_UNTIL_MIN_DURATION).await; - self.compute_wait_until(&|| Utc::now(), &|| tokio::time::Instant::now())? + retry_counter += 1; + if retry_counter > WAIT_UNTIL_MAX_COMPUTE_ERROR { + error!( + "Too many errors while computing the initial wait until, aborting..." + ); + return Err(AppError::EpochError(WaitUntilError::TooLow(d1, d2))); + } + } + Err(e) => { + // Another error (like OutOfRange) - exiting... + + error!("Error computing the initial wait until: {}", e); + return Err(AppError::EpochError(e)); } }; + }; - debug!("wait until: {:?}", wait_until); + // debug!("wait until: {:?}", wait_until); *self.current_epoch.write() = (current_epoch.into(), current_epoch_slice.into()); + debug!( + "Initial epoch: {}, epoch slice: {}", + current_epoch, current_epoch_slice + ); loop { debug!("wait until: {:?}", wait_until); @@ -146,7 +177,7 @@ impl EpochService { fn compute_current_epoch_slice DateTime>( now_date: NaiveDate, epoch_slice_duration: Duration, - now: F, + now: &F, ) -> i64 { debug_assert!(epoch_slice_duration.as_secs() > 0); debug_assert!(i32::try_from(epoch_slice_duration.as_secs()).is_ok()); @@ -239,21 +270,11 @@ impl Add for EpochSlice { mod tests { use super::*; use chrono::{NaiveDate, NaiveDateTime, TimeDelta}; + use claims::assert_ge; use futures::TryFutureExt; use std::sync::atomic::{AtomicU64, Ordering}; use tracing_test::traced_test; - /* - #[tokio::test] - async fn test_wait_until_0() { - let wait_until = tokio::time::Instant::now() + Duration::from_secs(10); - println!("Should wait until: {:?}", wait_until); - tokio::time::sleep(Duration::from_secs(3)).await; - tokio::time::sleep_until(wait_until).await; - println!("Wake up at: {:?}", tokio::time::Instant::now()); - } - */ - #[test] fn test_wait_until() { // Check wait_until is correctly computed @@ -416,7 +437,7 @@ mod tests { // Note: 4-minute diff -> expect == 2 assert_eq!( - EpochService::compute_current_epoch_slice(now_date, epoch_slice_duration, now_f), + EpochService::compute_current_epoch_slice(now_date, epoch_slice_duration, &now_f), 2 ); // Note: 5 minutes and 59 seconds diff -> still expect == 2 @@ -424,7 +445,7 @@ mod tests { EpochService::compute_current_epoch_slice( now_date, epoch_slice_duration, - Box::new(now_f_2) + &Box::new(now_f_2) ), 2 ); @@ -433,7 +454,7 @@ mod tests { EpochService::compute_current_epoch_slice( now_date, epoch_slice_duration, - Box::new(now_f_3) + &Box::new(now_f_3) ), 3 ); @@ -458,17 +479,21 @@ mod tests { let counter_0 = Arc::new(AtomicU64::new(0)); let counter = counter_0.clone(); + let start = std::time::Instant::now(); let res = tokio::try_join!( epoch_service .listen_for_new_epoch() .map_err(|e| AppErrorExt::AppError(e)), - // Wait for 3 epoch slices + 100 ms (to wait to receive notif + counter incr) + // Wait for 3 epoch slices + // + WAIT_UNTIL_MIN_DURATION * 2 (expect a maximum of 2 retry) + // + 500 ms (to wait to receive notif + counter incr) + // Note: this might fail if there is more retry (see list_for_new_epoch code) tokio::time::timeout( - epoch_slice_duration * 3 + Duration::from_millis(500), + epoch_slice_duration * 3 + WAIT_UNTIL_MIN_DURATION * 2 + Duration::from_millis(500), async move { loop { notifier.notified().await; - debug!("[Notified] Epoch update..."); + // debug!("[Notified] Epoch update..."); let _v = counter.fetch_add(1, Ordering::SeqCst); } // Ok::<(), AppErrorExt>(()) @@ -476,7 +501,11 @@ mod tests { ) .map_err(|_e| AppErrorExt::Elapsed) ); + + debug!("Elapsed time: {}", start.elapsed().as_millis()); + // debug!("res: {:?}", res); assert!(matches!(res, Err(AppErrorExt::Elapsed))); - assert_eq!(counter_0.load(Ordering::SeqCst), 3); + // Because the timeout is quite large - it is expected that sometimes the counter == 4 + assert_ge!(counter_0.load(Ordering::SeqCst), 3); } } diff --git a/prover/src/lib.rs b/prover/src/lib.rs index ae45a72..78076a1 100644 --- a/prover/src/lib.rs +++ b/prover/src/lib.rs @@ -29,9 +29,6 @@ use tracing::{ // info }; // internal -use rln_proof::RlnIdentifier; -use smart_contract::KarmaTiersSC::KarmaTiersSCInstance; -use smart_contract::TIER_LIMITS; pub use crate::args::{AppArgs, AppArgsConfig}; use crate::epoch_service::EpochService; use crate::grpc_service::GrpcProverService; @@ -42,6 +39,9 @@ use crate::tier::TierLimits; use crate::tiers_listener::TiersListener; use crate::user_db_service::UserDbService; use crate::user_db_types::RateLimit; +use rln_proof::RlnIdentifier; +use smart_contract::KarmaTiersSC::KarmaTiersSCInstance; +use smart_contract::TIER_LIMITS; const RLN_IDENTIFIER_NAME: &[u8] = b"test-rln-identifier"; const PROVER_SPAM_LIMIT: RateLimit = RateLimit::new(10_000u64); @@ -49,8 +49,9 @@ const GENESIS: DateTime = DateTime::from_timestamp(1431648000, 0).unwrap(); const PROVER_MINIMAL_AMOUNT_FOR_REGISTRATION: U256 = U256::from_le_slice(10u64.to_le_bytes().as_slice()); -pub async fn run_prover(app_args: AppArgs) -> Result<(), Box> { - +pub async fn run_prover( + app_args: AppArgs, +) -> Result<(), Box> { // Epoch let epoch_service = EpochService::try_from((Duration::from_secs(60 * 2), GENESIS)) .expect("Failed to create epoch service"); @@ -61,7 +62,7 @@ pub async fn run_prover(app_args: AppArgs) -> Result<(), Box, proof_count: usize) { - let chain_id = GrpcU256 { // FIXME: LE or BE? value: U256::from(1).to_le_bytes::<32>().to_vec(), @@ -247,20 +241,18 @@ mod tests { }; let request = tonic::Request::new(request_0); - let response: Response = client.send_transaction(request).await.unwrap(); + let response: Response = + client.send_transaction(request).await.unwrap(); assert_eq!(response.into_inner().result, true); } async fn proof_collector(port: u16) -> Vec { - - let result= Arc::new(RwLock::new(vec![])); + let result = Arc::new(RwLock::new(vec![])); let url = format!("http://127.0.0.1:{}", port); let mut client = RlnProverClient::connect(url).await.unwrap(); - let request_0 = RlnProofFilter { - address: None, - }; + let request_0 = RlnProofFilter { address: None }; let request = tonic::Request::new(request_0); let stream_ = client.get_proofs(request).await.unwrap(); @@ -279,30 +271,27 @@ mod tests { } async fn register_users(port: u16, addresses: Vec
) { - let url = format!("http://127.0.0.1:{}", port); let mut client = RlnProverClient::connect(url).await.unwrap(); for address in addresses { - let addr = GrpcAddress { value: address.to_vec(), }; - let request_0 = RegisterUserRequest { - user: Some(addr), - }; + let request_0 = RegisterUserRequest { user: Some(addr) }; let request = tonic::Request::new(request_0); - let response: Response = client.register_user(request).await.unwrap(); + let response: Response = + client.register_user(request).await.unwrap(); assert_eq!( RegistrationStatus::try_from(response.into_inner().status).unwrap(), - RegistrationStatus::Success); + RegistrationStatus::Success + ); } } async fn query_user_info(port: u16, addresses: Vec
) -> Vec { - let url = format!("http://127.0.0.1:{}", port); let mut client = RlnProverClient::connect(url).await.unwrap(); @@ -311,11 +300,10 @@ mod tests { let addr = GrpcAddress { value: address.to_vec(), }; - let request_0 = GetUserTierInfoRequest { - user: Some(addr), - }; + let request_0 = GetUserTierInfoRequest { user: Some(addr) }; let request = tonic::Request::new(request_0); - let resp: Response = client.get_user_tier_info(request).await.unwrap(); + let resp: Response = + client.get_user_tier_info(request).await.unwrap(); result.push(resp.into_inner()); } @@ -326,7 +314,6 @@ mod tests { #[tokio::test] #[traced_test] async fn test_grpc_register_users() { - let addresses = vec![ Address::from_str("0xd8da6bf26964af9d7eed9e03e53415d37aa96045").unwrap(), Address::from_str("0xb20a608c624Ca5003905aA834De7156C68b2E1d0").unwrap(), @@ -373,7 +360,6 @@ mod tests { #[tokio::test] #[traced_test] async fn test_grpc_gen_proof() { - let addresses = vec![ Address::from_str("0xd8da6bf26964af9d7eed9e03e53415d37aa96045").unwrap(), Address::from_str("0xb20a608c624Ca5003905aA834De7156C68b2E1d0").unwrap(), @@ -414,8 +400,7 @@ mod tests { let proof_count = 1; let mut set = JoinSet::new(); set.spawn( - proof_sender(port, addresses.clone(), proof_count) - .map(|_| vec![]) // JoinSet require having the same return type + proof_sender(port, addresses.clone(), proof_count).map(|_| vec![]), // JoinSet require having the same return type ); set.spawn(proof_collector(port)); let res = set.join_all().await; diff --git a/prover_cli/src/main.rs b/prover_cli/src/main.rs index e5a88ea..d98a125 100644 --- a/prover_cli/src/main.rs +++ b/prover_cli/src/main.rs @@ -10,11 +10,7 @@ use tracing::{ }; use tracing_subscriber::{EnvFilter, fmt, layer::SubscriberExt, util::SubscriberInitExt}; // internal -use prover::{ - run_prover, - AppArgs, - AppArgsConfig, -}; +use prover::{AppArgs, AppArgsConfig, run_prover}; #[tokio::main] async fn main() -> Result<(), Box> {