From a89c71f32437081e46c7a01e13580d255dca8842 Mon Sep 17 00:00:00 2001 From: ghassmo Date: Wed, 9 Jun 2021 04:27:27 +0300 Subject: [PATCH] update the test function --- src/bin/darkfid.rs | 186 ++++++++++++++++++++++++++------------------- 1 file changed, 107 insertions(+), 79 deletions(-) diff --git a/src/bin/darkfid.rs b/src/bin/darkfid.rs index 06fc0e417..aa3b15d08 100644 --- a/src/bin/darkfid.rs +++ b/src/bin/darkfid.rs @@ -2,7 +2,7 @@ use async_std::sync::Arc; use rand::rngs::OsRng; use std::net::SocketAddr; -use drk::blockchain::{rocks::columns, Rocks, RocksColumn, Slab, SlabStore}; +use drk::blockchain::{rocks::columns, Rocks, RocksColumn}; use drk::crypto::{ coin::Coin, load_params, @@ -13,11 +13,11 @@ use drk::crypto::{ save_params, setup_mint_prover, setup_spend_prover, }; use drk::serial::Decodable; -use drk::service::{ClientProgramOptions, GatewayClient, Subscriber}; +use drk::service::{ClientProgramOptions, GatewayClient, GatewaySlabsSubscriber}; use drk::state::{state_transition, ProgramState, StateUpdate}; use drk::wallet::WalletDB; use drk::{tx, Result}; -use rusqlite::{named_params, Connection}; +use rusqlite::Connection; use async_executor::Executor; use bellman::groth16; @@ -132,19 +132,13 @@ fn setup_addr(address: Option, default: SocketAddr) -> SocketAddr { } } -pub async fn subscribe( - mut subscriber: Subscriber, - slabstore: Arc, - mut state: State, -) -> Result<()> { +pub async fn subscribe(gateway_slabs_sub: GatewaySlabsSubscriber, mut state: State) -> Result<()> { loop { - let slab = subscriber.fetch::().await?; + let slab = gateway_slabs_sub.recv().await?; let tx = tx::Transaction::decode(&slab.get_payload()[..])?; let update = state_transition(&state, tx)?; state.apply(update)?; - - slabstore.put(slab)?; } } @@ -156,11 +150,6 @@ async fn start(executor: Arc>, options: ClientProgramOptions) -> Re let rocks = Rocks::new(database_path)?; let slabstore = RocksColumn::::new(rocks.clone()); - // create gateway client - let mut client = GatewayClient::new(connect_addr, slabstore)?; - - // start gateway client - client.start().await?; // // Auto create trusted ceremony parameters if they don't exist @@ -199,10 +188,16 @@ async fn start(executor: Arc>, options: ClientProgramOptions) -> Re secrets: vec![secret.clone()], }; - // start subscribe to gateway publisher - let subscriber = GatewayClient::start_subscriber(sub_addr).await?; - let slabstore = client.get_slabstore(); - let subscribe_task = executor.spawn(subscribe(subscriber, slabstore, state)); + // create gateway client + let mut client = GatewayClient::new(connect_addr, slabstore)?; + + // start subscribing + let gateway_slabs_sub: GatewaySlabsSubscriber = + client.start_subscriber(sub_addr, executor.clone()).await?; + let subscribe_task = executor.spawn(subscribe(gateway_slabs_sub, state)); + + // start gateway client + client.start().await?; subscribe_task.cancel().await; Ok(()) @@ -251,63 +246,96 @@ fn main() -> Result<()> { result } -// //// $ cargo test --bin darkfid -//// run 10 clients simultaneously -//#[cfg(test)] -//mod test { -// -// #[test] -// fn test_darkfid_client() { -// use std::path::Path; -// -// use drk::blockchain::{Rocks, Slab}; -// use drk::service::GatewayClient; -// -// use log::*; -// use rand::Rng; -// use simplelog::*; -// -// let logger_config = ConfigBuilder::new().set_time_format_str("%T%.6f").build(); -// -// CombinedLogger::init(vec![ -// TermLogger::new(LevelFilter::Debug, logger_config, TerminalMode::Mixed).unwrap(), -// WriteLogger::new( -// LevelFilter::Debug, -// Config::default(), -// std::fs::File::create(Path::new("/tmp/dar.log")).unwrap(), -// ), -// ]) -// .unwrap(); -// -// let mut thread_pools: Vec> = vec![]; -// -// for _ in 0..10 { -// let thread = std::thread::spawn(|| { -// smol::future::block_on(async move { -// let mut rng = rand::thread_rng(); -// let rnd: u32 = rng.gen(); -// -// let path_str = format!("database_{}.db", rnd); -// let database_path = Path::new(path_str.as_str()); -// let rocks = Rocks::new(database_path.clone()).unwrap(); -// -// // create new client and use different slabstore -// let mut client = -// GatewayClient::new("127.0.0.1:3333".parse().unwrap(), rocks).unwrap(); -// -// // start client -// client.start().await.unwrap(); -// -// // sending slab -// let _slab = Slab::new("testcoin".to_string(), rnd.to_le_bytes().to_vec()); -// client.put_slab(_slab).await.unwrap(); -// }) -// }); -// thread_pools.push(thread); -// } -// for t in thread_pools { -// t.join().unwrap(); -// } -// } -//} +#[cfg(test)] +mod test { + + use std::net::SocketAddr; + use std::path::Path; + use std::sync::Arc; + + use drk::blockchain::{rocks::columns, Rocks, RocksColumn, Slab}; + use drk::service::{GatewayClient, GatewaySlabsSubscriber}; + + use async_executor::Executor; + use easy_parallel::Parallel; + use log::*; + use rand::Rng; + use simplelog::*; + + #[test] + fn test_ten_clients_simultaneously() { + let logger_config = ConfigBuilder::new().set_time_format_str("%T%.6f").build(); + + CombinedLogger::init(vec![ + TermLogger::new(LevelFilter::Debug, logger_config, TerminalMode::Mixed).unwrap(), + WriteLogger::new( + LevelFilter::Debug, + Config::default(), + std::fs::File::create(Path::new("/tmp/dar.log")).unwrap(), + ), + ]) + .unwrap(); + + let mut thread_pools: Vec> = vec![]; + + for _ in 0..10 { + let thread = std::thread::spawn(|| { + let ex = Arc::new(Executor::new()); + let (signal, shutdown) = async_channel::unbounded::<()>(); + + let (_, _) = Parallel::new() + // Run four executor threads. + .each(0..3, |_| smol::future::block_on(ex.run(shutdown.recv()))) + // Run the main future on the current thread. + .finish(|| { + smol::future::block_on(async move { + pub async fn subscribe(gateway_slabs_sub: GatewaySlabsSubscriber, id: u32) { + loop { + gateway_slabs_sub.recv().await.unwrap(); + info!("client {}: update state", id); + } + } + + let executor = Arc::new(Executor::new()); + let connect_addr: SocketAddr = "127.0.0.1:3333".parse().unwrap(); + let sub_addr: SocketAddr = "127.0.0.1:4444".parse().unwrap(); + + let mut rng = rand::thread_rng(); + let rnd: u32 = rng.gen(); + let path_str = format!("database_{}.db", rnd); + + let database_path = Path::new(path_str.as_str()); + let rocks = Rocks::new(database_path.clone()).unwrap(); + + let slabstore = RocksColumn::::new(rocks.clone()); + + // create gateway client + let mut client = GatewayClient::new(connect_addr, slabstore).unwrap(); + + // start subscribing + let gateway_slabs_sub: GatewaySlabsSubscriber = client + .start_subscriber(sub_addr, executor.clone()) + .await + .unwrap(); + executor.spawn(subscribe(gateway_slabs_sub, rnd.clone())).detach(); + + // start gateway client + client.start().await.unwrap(); + + let slab = Slab::new("btc".to_string(), rnd.to_le_bytes().to_vec()); + client.put_slab(slab).await.unwrap(); + + + }); + drop(signal); + Ok::<(), drk::Error>(()) + }); + }); + thread_pools.push(thread); + } + for t in thread_pools { + t.join().unwrap(); + } + } +}