diff --git a/src/bin/darkfid.rs b/src/bin/darkfid.rs index f039181fa..4dd26efd5 100644 --- a/src/bin/darkfid.rs +++ b/src/bin/darkfid.rs @@ -53,8 +53,6 @@ fn main() -> Result<()> { LevelFilter::Off }; - - CombinedLogger::init(vec![ TermLogger::new(debug_level, logger_config, TerminalMode::Mixed).unwrap(), WriteLogger::new( @@ -63,9 +61,7 @@ fn main() -> Result<()> { std::fs::File::create(options.log_path.as_path()).unwrap(), ), ]) - .unwrap(); - - + .unwrap(); let ex2 = ex.clone(); @@ -90,17 +86,15 @@ fn main() -> Result<()> { mod test { #[test] - fn test_darkfid_client(){ - + fn test_darkfid_client() { use std::path::Path; use drk::service::GatewayClient; use drk::slab::Slab; + use log::*; use rand::Rng; use simplelog::*; - use log::*; - let logger_config = ConfigBuilder::new().set_time_format_str("%T%.6f").build(); @@ -112,29 +106,27 @@ mod test { std::fs::File::create(Path::new("/tmp/dar.log")).unwrap(), ), ]) - .unwrap(); + .unwrap(); let mut thread_pools: Vec> = vec![]; - - for _ in 1..11 { let thread = std::thread::spawn(|| { smol::future::block_on(async move { let mut rng = rand::thread_rng(); let rnd: u32 = rng.gen(); - - let mut client = GatewayClient::new("127.0.0.1:3333".parse().unwrap(), Path::new(&format!("slabstore_{}.db", rnd))).unwrap(); + let mut client = GatewayClient::new( + "127.0.0.1:3333".parse().unwrap(), + Path::new(&format!("slabstore_{}.db", rnd)), + ) + .unwrap(); client.start().await.unwrap(); - - let _slab = Slab::new("testcoin".to_string(), rnd.to_le_bytes().to_vec()); client.put_slab(_slab).await.unwrap(); - std::thread::sleep(std::time::Duration::from_secs(3)); let last_index = client.slabstore.get_last_index().unwrap(); info!("last index: {}", last_index); diff --git a/src/service/gateway.rs b/src/service/gateway.rs index e1146f3fa..6d3d2f742 100644 --- a/src/service/gateway.rs +++ b/src/service/gateway.rs @@ -46,9 +46,9 @@ impl GatewayService { let (publish_queue, publish_recv_queue) = async_channel::unbounded::>(); let publisher_task = executor.spawn(Self::start_publisher( - self.pub_addr, - service_name, - publish_recv_queue.clone(), + self.pub_addr, + service_name, + publish_recv_queue.clone(), )); let handle_request_task = @@ -73,7 +73,7 @@ impl GatewayService { async fn handle_request( self: Arc, - send_queue: async_channel::Sender<(Vec,Reply)>, + send_queue: async_channel::Sender<(Vec, Reply)>, recv_queue: async_channel::Receiver<(Vec, Request)>, publish_queue: async_channel::Sender>, ) -> Result<()> { @@ -94,7 +94,7 @@ impl GatewayService { // send reply let reply = Reply::from(&request, 0, vec![]); - send_queue.send((peer,reply)).await?; + send_queue.send((peer, reply)).await?; // publish to all subscribes publish_queue.send(slab).await?; @@ -206,7 +206,6 @@ impl GatewayClient { self.slabstore.clone() } - pub async fn start_subscriber(sub_addr: SocketAddr) -> Result { let mut subscriber = Subscriber::new(sub_addr, String::from("GATEWAY CLIENT")); subscriber.start().await?; diff --git a/src/service/reqrep.rs b/src/service/reqrep.rs index e96bae4c9..5ea97870c 100644 --- a/src/service/reqrep.rs +++ b/src/service/reqrep.rs @@ -1,7 +1,7 @@ use async_std::sync::Arc; +use std::convert::TryFrom; use std::io; use std::net::SocketAddr; -use std::convert::TryFrom; use crate::serial::{deserialize, serialize}; use crate::{Decodable, Encodable, Result}; @@ -27,11 +27,11 @@ pub fn addr_to_string(addr: SocketAddr) -> String { pub struct RepProtocol { addr: SocketAddr, socket: zeromq::RouterSocket, - recv_queue: async_channel::Receiver<(Vec,Reply)>, + recv_queue: async_channel::Receiver<(Vec, Reply)>, send_queue: async_channel::Sender<(Vec, Request)>, channels: ( async_channel::Sender<(Vec, Reply)>, - async_channel::Receiver<(Vec,Request)>, + async_channel::Receiver<(Vec, Request)>, ), service_name: String, } @@ -57,8 +57,8 @@ impl RepProtocol { pub async fn start( &mut self, ) -> Result<( - async_channel::Sender<(Vec, Reply)>, - async_channel::Receiver<(Vec,Request)>, + async_channel::Sender<(Vec, Reply)>, + async_channel::Receiver<(Vec, Request)>, )> { let addr = addr_to_string(self.addr); self.socket.bind(addr.as_str()).await?; @@ -93,14 +93,14 @@ impl RepProtocol { if let Some(request) = msg.get(1) { let request: Vec = request.to_vec(); let request: Request = deserialize(&request)?; - if let Some(peer) = msg.get(0){ + if let Some(peer) = msg.get(0) { self.send_queue.send((peer.to_vec(), request)).await?; } } } NetEvent::Send((peer, reply)) => { let peer = Bytes::from(peer); - let mut msg:Vec = vec![peer]; + let mut msg: Vec = vec![peer]; let reply: Vec = serialize(&reply); let reply = Bytes::from(reply); msg.push(reply); @@ -172,11 +172,10 @@ impl ReqProtocol { assert!(reply.get_id() == request.get_id()); - Ok(reply.get_payload()) } else { Err(crate::Error::ZMQError( - "Couldn't parse ZmqMessage".to_string(), + "Couldn't parse ZmqMessage".to_string(), )) } } @@ -254,7 +253,7 @@ impl Subscriber { Ok(data) } None => Err(crate::Error::ZMQError( - "Couldn't parse ZmqMessage".to_string(), + "Couldn't parse ZmqMessage".to_string(), )), } }