diff --git a/benches/add_users_benchmark.rs b/benches/add_users_benchmark.rs index 0635ac8..d0d109e 100644 --- a/benches/add_users_benchmark.rs +++ b/benches/add_users_benchmark.rs @@ -1,16 +1,16 @@ use alloy::signers::local::PrivateKeySigner; use criterion::{criterion_group, criterion_main, Criterion}; -use de_mls::user::User; use itertools::Itertools; use openmls::prelude::KeyPackage; use rand::Rng; -use rayon::prelude::*; -use tokio::runtime::Runtime; +use tokio; + +use de_mls::user::User; async fn setup_group(n: usize) -> (User, Vec, String) { let group_name = "group".to_string() + &rand::thread_rng().gen::().to_string(); - let mut alice = - User::new("0xac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80").unwrap(); + let signer = PrivateKeySigner::random(); + let mut alice = User::new(&signer.to_bytes().to_string()).unwrap(); alice .create_group(group_name.clone(), true) @@ -20,7 +20,6 @@ async fn setup_group(n: usize) -> (User, Vec, String) { let mut users = Vec::with_capacity(n); let mut key_packages = Vec::with_capacity(n); for _ in 0..n { - let signer = PrivateKeySigner::random(); let mut user = User::new(&signer.to_bytes().to_string()).unwrap(); let key_package = user.new_key_package().unwrap(); user.create_group(group_name.clone(), false) @@ -40,13 +39,14 @@ async fn setup_group(n: usize) -> (User, Vec, String) { assert!(res.is_ok(), "Failed to build waku message"); let waku_welcome_message = res.unwrap(); - let results: Vec<_> = futures::future::join_all( + futures::future::join_all( users .iter_mut() .map(|user| user.process_waku_msg(waku_welcome_message.clone())), ) - .await; - assert!(!results.is_empty(), "No results collected"); + .await + .into_iter() + .all(|result| result.is_ok()); (alice, users, group_name) } @@ -66,103 +66,44 @@ async fn generate_users_chunk(n: usize, group_name: String) -> (Vec, Vec (Vec>, Vec>) { - let results: Vec<(Vec, Vec)> = (0..sample_size) - .into_par_iter() // Use parallel iterator for chunks - .map(|_| { - // Call the async function in a blocking manner - let rt = tokio::runtime::Runtime::new().unwrap(); - let (users, key_packages) = rt.block_on(generate_users_chunk(n, group_name.clone())); - (users, key_packages) // Return only the users for this chunk - }) - .collect(); // Collect all user chunks - - let (users_chunks, key_packages_samples): (Vec>, Vec>) = results - .into_iter() - .map(|(users, key_packages)| (users, key_packages)) - .unzip(); - - (users_chunks, key_packages_samples) -} - fn multi_user_apply_commit_benchmark(c: &mut Criterion) { + let rt = tokio::runtime::Runtime::new().unwrap(); for (i, j) in [1, 10, 100] .into_iter() .cartesian_product([1, 10, 100, 1000, 5000, 10000]) { - let rt = Runtime::new().unwrap(); - println!("Start to setup group with {} users", i); let (mut alice, mut users, group_name) = rt.block_on(async { setup_group(i).await }); - println!("Start to generate {} users to join", j); - let (_, mut kp_to_join) = - rt.block_on(async { generate_users_to_join(j, 100, group_name.clone()).await }); - println!("Start to benchmark"); c.bench_function( - format!("multi_user_commit_benchmark_{}_{}", i, j).as_str(), + format!("multi_user_apply_commit_benchmark_{}_{}", i, j).as_str(), |b| { - b.iter(|| { + b.iter_custom(|iters| { rt.block_on(async { - if kp_to_join.len() < j { - return; - } - let msgs = alice - .invite_users(kp_to_join.pop().unwrap(), group_name.clone()) + let mut total_duration = std::time::Duration::ZERO; + + for _ in 0..iters { + // Setup phase: generate data + let (_, kp_to_join) = generate_users_chunk(j, group_name.clone()).await; + + let start = std::time::Instant::now(); + let msgs = alice + .invite_users(kp_to_join, group_name.clone()) + .await + .expect("Failed to invite users"); + + let commit_msg = msgs[0].build_waku_message().unwrap(); + futures::future::join_all( + users + .iter_mut() + .map(|user| user.process_waku_msg(commit_msg.clone())), + ) .await - .expect("Failed to invite users"); - - let commit_msg = msgs[0].build_waku_message().unwrap(); - let results: Vec<_> = futures::future::join_all( - users - .iter_mut() - .map(|user| user.process_waku_msg(commit_msg.clone())), - ) - .await; - assert!(!results.is_empty(), "No results collected"); - }); - }) - }, - ); - } -} - -fn multi_user_apply_commit_big_benchmark(c: &mut Criterion) { - for (i, j) in [1000] - .into_iter() - .cartesian_product([1, 10, 100, 1000, 5000, 10000]) - { - let rt = Runtime::new().unwrap(); - println!("Start to setup group with {} users", i); - let (mut alice, mut users, group_name) = rt.block_on(async { setup_group(i).await }); - println!("Start to generate {} users to join", j); - let (_, mut kp_to_join) = - rt.block_on(async { generate_users_to_join(j, 100, group_name.clone()).await }); - println!("Start to benchmark"); - c.bench_function( - format!("multi_user_commit_benchmark_{}_{}", i, j).as_str(), - |b| { - b.iter(|| { - rt.block_on(async { - if kp_to_join.len() < j { - return; + .into_iter() + .all(|result| result.is_ok()); + total_duration += start.elapsed(); } - let msgs = alice - .invite_users(kp_to_join.pop().unwrap(), group_name.clone()) - .await - .expect("Failed to invite users"); - let commit_msg = msgs[0].build_waku_message().unwrap(); - let results: Vec<_> = futures::future::join_all( - users - .iter_mut() - .map(|user| user.process_waku_msg(commit_msg.clone())), - ) - .await; - assert!(!results.is_empty(), "No results collected"); - }); + total_duration + }) }) }, ); @@ -170,81 +111,78 @@ fn multi_user_apply_commit_big_benchmark(c: &mut Criterion) { } fn multi_user_welcome_benchmark(c: &mut Criterion) { + let rt = tokio::runtime::Runtime::new().unwrap(); for (i, j) in [1, 10, 100] .into_iter() .cartesian_product([1, 10, 100, 1000, 5000, 10000]) { - let rt = Runtime::new().unwrap(); - println!("Start to setup group with {} users", i); let (mut alice, _, group_name) = rt.block_on(async { setup_group(i).await }); - println!("Start to generate {} users to join", j); - let (mut users_to_join, mut kp_to_join) = - rt.block_on(async { generate_users_to_join(j, 100, group_name.clone()).await }); - println!("Start to benchmark"); c.bench_function( format!("multi_user_welcome_benchmark_{}_{}", i, j).as_str(), |b| { - b.iter(|| { + b.iter_custom(|iters| { rt.block_on(async { - if kp_to_join.len() < j { - return; - } - let msgs = alice - .invite_users(kp_to_join.pop().unwrap(), group_name.clone()) - .await - .expect("Failed to invite users"); + let mut total_duration = std::time::Duration::ZERO; - let mut users_to_join_sample = users_to_join.pop().unwrap(); - let welcome_msg = msgs[1].build_waku_message().unwrap(); - let results: Vec<_> = futures::future::join_all( - users_to_join_sample - .iter_mut() - .map(|user| user.process_waku_msg(welcome_msg.clone())), - ) - .await; - assert!(!results.is_empty(), "No results collected"); - }); + for _ in 0..iters { + // Setup phase: generate data + let (mut users_to_join, kp_to_join) = + generate_users_chunk(j, group_name.clone()).await; + + let start = std::time::Instant::now(); + let msgs = alice + .invite_users(kp_to_join, group_name.clone()) + .await + .expect("Failed to invite users"); + + let welcome_msg = msgs[1].build_waku_message().unwrap(); + futures::future::join_all( + users_to_join + .iter_mut() + .map(|user| user.process_waku_msg(welcome_msg.clone())), + ) + .await + .into_iter() + .all(|result| result.is_ok()); + total_duration += start.elapsed(); + } + + total_duration + }) }) }, ); } } -fn multi_user_welcome_big_benchmark(c: &mut Criterion) { - for (i, j) in [1000] +fn multi_user_generate_invite_benchmark(c: &mut Criterion) { + let rt = tokio::runtime::Runtime::new().unwrap(); + for (i, j) in [1, 10, 100, 1000] .into_iter() .cartesian_product([1, 10, 100, 1000, 5000, 10000]) { - let rt = Runtime::new().unwrap(); - println!("Start to setup group with {} users", i); let (mut alice, _, group_name) = rt.block_on(async { setup_group(i).await }); - println!("Start to generate {} users to join", j); - let (mut users_to_join, mut kp_to_join) = - rt.block_on(async { generate_users_to_join(j, 100, group_name.clone()).await }); - println!("Start to benchmark"); c.bench_function( - format!("multi_user_welcome_benchmark_{}_{}", i, j).as_str(), + format!("multi_user_generate_invite_benchmark_{}_{}", i, j).as_str(), |b| { - b.iter(|| { + b.iter_custom(|iters| { rt.block_on(async { - if kp_to_join.len() < j { - return; - } - let msgs = alice - .invite_users(kp_to_join.pop().unwrap(), group_name.clone()) - .await - .expect("Failed to invite users"); + let mut total_duration = std::time::Duration::ZERO; - let mut users_to_join_sample = users_to_join.pop().unwrap(); - let welcome_msg = msgs[1].build_waku_message().unwrap(); - let results: Vec<_> = futures::future::join_all( - users_to_join_sample - .iter_mut() - .map(|user| user.process_waku_msg(welcome_msg.clone())), - ) - .await; - assert!(!results.is_empty(), "No results collected"); - }); + for _ in 0..iters { + // Setup phase: generate data + let (_, kp_to_join) = generate_users_chunk(j, group_name.clone()).await; + + let start = std::time::Instant::now(); + let _ = alice + .invite_users(kp_to_join, group_name.clone()) + .await + .expect("Failed to invite users"); + total_duration += start.elapsed(); + } + + total_duration + }) }) }, ); @@ -255,9 +193,8 @@ criterion_group!( name = benches; config = Criterion::default().sample_size(10); targets = - multi_user_apply_commit_big_benchmark, - multi_user_welcome_big_benchmark, multi_user_apply_commit_benchmark, multi_user_welcome_benchmark, + multi_user_generate_invite_benchmark ); criterion_main!(benches);