From 69b29420d736d064c53d2ea040dc2f11cafb486d Mon Sep 17 00:00:00 2001 From: ghassmo Date: Wed, 9 Jun 2021 05:04:09 +0300 Subject: [PATCH] create test function for client subscriber --- src/bin/darkfid.rs | 142 +++++++++++++++++++++++++++++++++++++++------ 1 file changed, 123 insertions(+), 19 deletions(-) diff --git a/src/bin/darkfid.rs b/src/bin/darkfid.rs index aa3b15d08..518daa5e6 100644 --- a/src/bin/darkfid.rs +++ b/src/bin/darkfid.rs @@ -246,7 +246,12 @@ fn main() -> Result<()> { result } -//// $ cargo test --bin darkfid +//// $ cargo test test_ten_clients_simultaneously --bin darkfid + //this will run 10 clients simultaneously + +//// $ cargo test test_subscriber --bin darkfid + // Run Client A and send 10 slabs + // Client B should receive 10 slabs from subscriber #[cfg(test)] mod test { @@ -263,8 +268,14 @@ mod test { use rand::Rng; use simplelog::*; - #[test] - fn test_ten_clients_simultaneously() { + pub async fn subscribe(gateway_slabs_sub: GatewaySlabsSubscriber, id: String) { + loop { + gateway_slabs_sub.recv().await.unwrap(); + info!("Client {}: update state", id); + } + } + + fn setup_log() { let logger_config = ConfigBuilder::new().set_time_format_str("%T%.6f").build(); CombinedLogger::init(vec![ @@ -276,6 +287,11 @@ mod test { ), ]) .unwrap(); + } + + #[test] + fn test_ten_clients_simultaneously() { + setup_log(); let mut thread_pools: Vec> = vec![]; @@ -284,20 +300,14 @@ mod test { let ex = Arc::new(Executor::new()); let (signal, shutdown) = async_channel::unbounded::<()>(); + let ex2 = ex.clone(); + let (_, _) = Parallel::new() // Run four executor threads. - .each(0..3, |_| smol::future::block_on(ex.run(shutdown.recv()))) + .each(0..3, |_| smol::future::block_on(ex2.run(shutdown.recv()))) // Run the main future on the current thread. .finish(|| { smol::future::block_on(async move { - pub async fn subscribe(gateway_slabs_sub: GatewaySlabsSubscriber, id: u32) { - loop { - gateway_slabs_sub.recv().await.unwrap(); - info!("client {}: update state", id); - } - } - - let executor = Arc::new(Executor::new()); let connect_addr: SocketAddr = "127.0.0.1:3333".parse().unwrap(); let sub_addr: SocketAddr = "127.0.0.1:4444".parse().unwrap(); @@ -314,19 +324,17 @@ mod test { let mut client = GatewayClient::new(connect_addr, slabstore).unwrap(); // start subscribing - let gateway_slabs_sub: GatewaySlabsSubscriber = client - .start_subscriber(sub_addr, executor.clone()) - .await - .unwrap(); - executor.spawn(subscribe(gateway_slabs_sub, rnd.clone())).detach(); + let gateway_slabs_sub: GatewaySlabsSubscriber = + client.start_subscriber(sub_addr, ex.clone()).await.unwrap(); + ex.clone() + .spawn(subscribe(gateway_slabs_sub, rnd.clone().to_string())) + .detach(); // start gateway client client.start().await.unwrap(); let slab = Slab::new("btc".to_string(), rnd.to_le_bytes().to_vec()); client.put_slab(slab).await.unwrap(); - - }); drop(signal); Ok::<(), drk::Error>(()) @@ -338,4 +346,100 @@ mod test { t.join().unwrap(); } } + + + #[test] + fn test_subscriber() { + setup_log(); + + let mut thread_pools: Vec> = vec![]; + + // Client A + let thread = std::thread::spawn(|| { + smol::future::block_on(async move { + let connect_addr: SocketAddr = "127.0.0.1:3333".parse().unwrap(); + + let mut rng = rand::thread_rng(); + let rnd: u32 = rng.gen(); + let path_str = format!("database_{}.db", rnd); + + let database_path = Path::new(path_str.as_str()); + let rocks = Rocks::new(database_path.clone()).unwrap(); + + let slabstore = RocksColumn::::new(rocks.clone()); + + // create gateway client + let mut client = GatewayClient::new(connect_addr, slabstore).unwrap(); + + // start gateway client + client.start().await.unwrap(); + + let slab = Slab::new("btc".to_string(), rnd.to_le_bytes().to_vec()); + + client.put_slab(slab.clone()).await.unwrap(); + client.put_slab(slab.clone()).await.unwrap(); + client.put_slab(slab.clone()).await.unwrap(); + client.put_slab(slab.clone()).await.unwrap(); + client.put_slab(slab.clone()).await.unwrap(); + client.put_slab(slab.clone()).await.unwrap(); + client.put_slab(slab.clone()).await.unwrap(); + client.put_slab(slab.clone()).await.unwrap(); + client.put_slab(slab.clone()).await.unwrap(); + client.put_slab(slab.clone()).await.unwrap(); + + }); + }); + // Client B + let thread2 = std::thread::spawn(|| { + let ex = Arc::new(Executor::new()); + let (signal, shutdown) = async_channel::unbounded::<()>(); + + let ex2 = ex.clone(); + + let (_, _) = Parallel::new() + // Run four executor threads. + .each(0..3, |_| smol::future::block_on(ex2.run(shutdown.recv()))) + // Run the main future on the current thread. + .finish(|| { + smol::future::block_on(async move { + let connect_addr: SocketAddr = "127.0.0.1:3333".parse().unwrap(); + let sub_addr: SocketAddr = "127.0.0.1:4444".parse().unwrap(); + + let mut rng = rand::thread_rng(); + let rnd: u32 = rng.gen(); + let path_str = format!("database_{}.db", rnd); + + let database_path = Path::new(path_str.as_str()); + let rocks = Rocks::new(database_path.clone()).unwrap(); + + let slabstore = RocksColumn::::new(rocks.clone()); + + // create gateway client + let mut client = GatewayClient::new(connect_addr, slabstore).unwrap(); + + // start subscribing + let gateway_slabs_sub: GatewaySlabsSubscriber = + client.start_subscriber(sub_addr, ex.clone()).await.unwrap(); + + ex.clone() + .spawn(subscribe(gateway_slabs_sub, "B".to_string())).detach(); + + // start gateway client + client.start().await.unwrap(); + + // sleep for 2 seconds + std::thread::sleep(std::time::Duration::from_secs(2)); + }); + drop(signal); + Ok::<(), drk::Error>(()) + }); + }); + + thread_pools.push(thread); + thread_pools.push(thread2); + + for t in thread_pools { + t.join().unwrap(); + } + } }