diff --git a/src/bin/darkfid.rs b/src/bin/darkfid.rs index 3a945e7be..74005b867 100644 --- a/src/bin/darkfid.rs +++ b/src/bin/darkfid.rs @@ -273,305 +273,3 @@ fn main() -> Result<()> { result } -//// $ cargo test test_ten_clients_simultaneously --bin darkfid -//this will run 10 clients simultaneously - -//// $ cargo test test_subscriber --bin darkfid -// Run Client A and send 10 slabs -// Client B should receive 10 slabs from subscriber - -//// $ cargo test test_deposit --bin darkfid -// Run Client A and send 10 slabs -// Client B should receive 10 slabs from subscriber -#[cfg(test)] -mod test { - - use std::net::SocketAddr; - use std::path::Path; - use std::path::PathBuf; - use std::sync::Arc; - - use drk::blockchain::{rocks::columns, Rocks, RocksColumn, Slab}; - use drk::service::{GatewayClient, GatewaySlabsSubscriber}; - use drk::util::join_config_path; - - use async_executor::Executor; - use easy_parallel::Parallel; - use log::*; - use rand::Rng; - use simplelog::*; - - pub async fn subscribe(gateway_slabs_sub: GatewaySlabsSubscriber, id: String) { - loop { - gateway_slabs_sub.recv().await.unwrap(); - info!("Client {}: update state", id); - } - } - - fn setup_log() { - let logger_config = ConfigBuilder::new().set_time_format_str("%T%.6f").build(); - - CombinedLogger::init(vec![ - TermLogger::new(LevelFilter::Debug, logger_config, TerminalMode::Mixed).unwrap(), - WriteLogger::new( - LevelFilter::Debug, - Config::default(), - std::fs::File::create(Path::new("/tmp/dar.log")).unwrap(), - ), - ]) - .unwrap(); - } - - #[test] - fn test_ten_clients_simultaneously() { - setup_log(); - - let mut thread_pools: Vec> = vec![]; - - for _ in 0..10 { - let thread = std::thread::spawn(|| { - let ex = Arc::new(Executor::new()); - let (signal, shutdown) = async_channel::unbounded::<()>(); - - let ex2 = ex.clone(); - - let (_, _) = Parallel::new() - // Run four executor threads. - .each(0..3, |_| smol::future::block_on(ex2.run(shutdown.recv()))) - // Run the main future on the current thread. - .finish(|| { - smol::future::block_on(async move { - let connect_addr: SocketAddr = "127.0.0.1:3333".parse().unwrap(); - let sub_addr: SocketAddr = "127.0.0.1:4444".parse().unwrap(); - - let mut rng = rand::thread_rng(); - let rnd: u32 = rng.gen(); - let path_str = format!("database_{}.db", rnd); - - let database_path = PathBuf::from(path_str.as_str()); - let database_path = join_config_path(&database_path).unwrap(); - let rocks = Rocks::new(&database_path).unwrap(); - - let slabstore = RocksColumn::::new(rocks.clone()); - - // create gateway client - let mut client = GatewayClient::new(connect_addr, slabstore).unwrap(); - - // start subscribing - let gateway_slabs_sub: GatewaySlabsSubscriber = - client.start_subscriber(sub_addr, ex.clone()).await.unwrap(); - ex.clone() - .spawn(subscribe(gateway_slabs_sub, rnd.clone().to_string())) - .detach(); - - // start gateway client - client.start().await.unwrap(); - - let slab = Slab::new(rnd.to_le_bytes().to_vec()); - client.put_slab(slab).await.unwrap(); - }); - drop(signal); - Ok::<(), drk::Error>(()) - }); - }); - thread_pools.push(thread); - } - for t in thread_pools { - t.join().unwrap(); - } - } - - #[test] - fn test_subscriber() { - setup_log(); - - let mut thread_pools: Vec> = vec![]; - - // Client A - let thread = std::thread::spawn(|| { - smol::future::block_on(async move { - let connect_addr: SocketAddr = "127.0.0.1:3333".parse().unwrap(); - - let mut rng = rand::thread_rng(); - let rnd: u32 = rng.gen(); - let path_str = format!("database_{}.db", rnd); - - let database_path = PathBuf::from(path_str.as_str()); - let database_path = join_config_path(&database_path).unwrap(); - let rocks = Rocks::new(&database_path).unwrap(); - - let slabstore = RocksColumn::::new(rocks.clone()); - - // create gateway client - let mut client = GatewayClient::new(connect_addr, slabstore).unwrap(); - - // start gateway client - client.start().await.unwrap(); - - let slab = Slab::new(rnd.to_le_bytes().to_vec()); - - client.put_slab(slab.clone()).await.unwrap(); - client.put_slab(slab.clone()).await.unwrap(); - client.put_slab(slab.clone()).await.unwrap(); - client.put_slab(slab.clone()).await.unwrap(); - client.put_slab(slab.clone()).await.unwrap(); - client.put_slab(slab.clone()).await.unwrap(); - client.put_slab(slab.clone()).await.unwrap(); - client.put_slab(slab.clone()).await.unwrap(); - client.put_slab(slab.clone()).await.unwrap(); - client.put_slab(slab.clone()).await.unwrap(); - }); - }); - // Client B - let thread2 = std::thread::spawn(|| { - let ex = Arc::new(Executor::new()); - let (signal, shutdown) = async_channel::unbounded::<()>(); - - let ex2 = ex.clone(); - - let (_, _) = Parallel::new() - // Run four executor threads. - .each(0..3, |_| smol::future::block_on(ex2.run(shutdown.recv()))) - // Run the main future on the current thread. - .finish(|| { - smol::future::block_on(async move { - let connect_addr: SocketAddr = "127.0.0.1:3333".parse().unwrap(); - let sub_addr: SocketAddr = "127.0.0.1:4444".parse().unwrap(); - - let mut rng = rand::thread_rng(); - let rnd: u32 = rng.gen(); - let path_str = format!("database_{}.db", rnd); - - let database_path = PathBuf::from(path_str.as_str()); - let database_path = join_config_path(&database_path).unwrap(); - let rocks = Rocks::new(&database_path).unwrap(); - - let slabstore = RocksColumn::::new(rocks.clone()); - - // create gateway client - let mut client = GatewayClient::new(connect_addr, slabstore).unwrap(); - - // start subscribing - let gateway_slabs_sub: GatewaySlabsSubscriber = - client.start_subscriber(sub_addr, ex.clone()).await.unwrap(); - - ex.clone() - .spawn(subscribe(gateway_slabs_sub, "B".to_string())) - .detach(); - - // start gateway client - client.start().await.unwrap(); - - // sleep for 2 seconds - std::thread::sleep(std::time::Duration::from_secs(2)); - }); - drop(signal); - Ok::<(), drk::Error>(()) - }); - }); - - thread_pools.push(thread); - thread_pools.push(thread2); - - for t in thread_pools { - t.join().unwrap(); - } - } - - #[test] - fn test_deposit() { - setup_log(); - - let mut thread_pools: Vec> = vec![]; - - // Client A: User - let thread = std::thread::spawn(|| { - smol::future::block_on(async move { - let connect_addr: SocketAddr = "127.0.0.1:3333".parse().unwrap(); - - let mut rng = rand::thread_rng(); - let rnd: u32 = rng.gen(); - let path_str = format!("database_{}.db", rnd); - - let database_path = PathBuf::from(path_str.as_str()); - let database_path = join_config_path(&database_path).unwrap(); - let rocks = Rocks::new(&database_path).unwrap(); - - let slabstore = RocksColumn::::new(rocks.clone()); - - // create gateway client - let mut client = GatewayClient::new(connect_addr, slabstore).unwrap(); - - // start gateway client - client.start().await.unwrap(); - - let slab = Slab::new(rnd.to_le_bytes().to_vec()); - - client.put_slab(slab.clone()).await.unwrap(); - client.put_slab(slab.clone()).await.unwrap(); - client.put_slab(slab.clone()).await.unwrap(); - client.put_slab(slab.clone()).await.unwrap(); - client.put_slab(slab.clone()).await.unwrap(); - client.put_slab(slab.clone()).await.unwrap(); - client.put_slab(slab.clone()).await.unwrap(); - client.put_slab(slab.clone()).await.unwrap(); - client.put_slab(slab.clone()).await.unwrap(); - client.put_slab(slab.clone()).await.unwrap(); - }); - }); - // Client B: Cashier - let thread2 = std::thread::spawn(|| { - let ex = Arc::new(Executor::new()); - let (signal, shutdown) = async_channel::unbounded::<()>(); - - let ex2 = ex.clone(); - - let (_, _) = Parallel::new() - // Run four executor threads. - .each(0..3, |_| smol::future::block_on(ex2.run(shutdown.recv()))) - // Run the main future on the current thread. - .finish(|| { - smol::future::block_on(async move { - let connect_addr: SocketAddr = "127.0.0.1:3333".parse().unwrap(); - let sub_addr: SocketAddr = "127.0.0.1:4444".parse().unwrap(); - - let mut rng = rand::thread_rng(); - let rnd: u32 = rng.gen(); - let path_str = format!("database_{}.db", rnd); - - let database_path = PathBuf::from(path_str.as_str()); - let database_path = join_config_path(&database_path).unwrap(); - let rocks = Rocks::new(&database_path).unwrap(); - - let slabstore = RocksColumn::::new(rocks.clone()); - - // create gateway client - let mut client = GatewayClient::new(connect_addr, slabstore).unwrap(); - - // start subscribing - let gateway_slabs_sub: GatewaySlabsSubscriber = - client.start_subscriber(sub_addr, ex.clone()).await.unwrap(); - - ex.clone() - .spawn(subscribe(gateway_slabs_sub, "B".to_string())) - .detach(); - - // start gateway client - client.start().await.unwrap(); - - // sleep for 2 seconds - std::thread::sleep(std::time::Duration::from_secs(2)); - }); - drop(signal); - Ok::<(), drk::Error>(()) - }); - }); - - thread_pools.push(thread); - thread_pools.push(thread2); - - for t in thread_pools { - t.join().unwrap(); - } - } -}