diff --git a/src/bin/darkfid.rs b/src/bin/darkfid.rs index 76c63114e..820922a17 100644 --- a/src/bin/darkfid.rs +++ b/src/bin/darkfid.rs @@ -1,9 +1,9 @@ -use std::net::SocketAddr; use async_executor::Executor; use async_std::sync::Arc; use easy_parallel::Parallel; +use std::net::SocketAddr; -use drk::service::{GatewayClient, ClientProgramOptions}; +use drk::service::{ClientProgramOptions, GatewayClient}; use drk::{slab::Slab, Result}; fn setup_addr(address: Option, default: SocketAddr) -> SocketAddr { @@ -13,13 +13,11 @@ fn setup_addr(address: Option, default: SocketAddr) -> SocketAddr { } } - async fn start(executor: Arc>, options: ClientProgramOptions) -> Result<()> { let connect_addr: SocketAddr = setup_addr(options.connect_addr, "127.0.0.1:3333".parse()?); let sub_addr: SocketAddr = setup_addr(options.sub_addr, "127.0.0.1:4444".parse()?); let slabstore_path = options.slabstore_path.as_path(); - // create gateway client let mut client = GatewayClient::new(connect_addr, slabstore_path)?; @@ -28,10 +26,7 @@ async fn start(executor: Arc>, options: ClientProgramOptions) -> Re // start subscribe to gateway publisher let slabstore = client.get_slabstore(); - let subscriber_task = executor.spawn(GatewayClient::subscribe( - slabstore, - sub_addr, - )); + let subscriber_task = executor.spawn(GatewayClient::subscribe(slabstore, sub_addr)); // TEST let _slab = Slab::new("testcoin".to_string(), vec![0, 0, 0, 0]); @@ -65,9 +60,7 @@ fn main() -> Result<()> { std::fs::File::create(options.log_path.as_path()).unwrap(), ), ]) - .unwrap(); - - + .unwrap(); let ex2 = ex.clone(); diff --git a/src/service/reqrep.rs b/src/service/reqrep.rs index d5b8e726e..a06e13e62 100644 --- a/src/service/reqrep.rs +++ b/src/service/reqrep.rs @@ -56,8 +56,8 @@ impl RepProtocol { pub async fn start( &mut self, ) -> Result<( - async_channel::Sender, - async_channel::Receiver, + async_channel::Sender, + async_channel::Receiver, )> { let addr = addr_to_string(self.addr); self.socket.bind(addr.as_str()).await?; @@ -119,7 +119,11 @@ pub struct ReqProtocol { impl ReqProtocol { pub fn new(addr: SocketAddr, service_name: String) -> ReqProtocol { let socket = zeromq::ReqSocket::new(); - ReqProtocol { addr, socket, service_name} + ReqProtocol { + addr, + socket, + service_name, + } } pub async fn start(&mut self) -> Result<()> { @@ -136,16 +140,22 @@ impl ReqProtocol { let req: zeromq::ZmqMessage = req.into(); self.socket.send(req).await?; - info!("{} SERVICE: Sent Request {{ command: {} }}", self.service_name, command); + info!( + "{} SERVICE: Sent Request {{ command: {} }}", + self.service_name, command + ); let rep: zeromq::ZmqMessage = self.socket.recv().await?; if let Some(reply) = rep.get(0) { let reply: Vec = reply.to_vec(); - let reply: Reply = deserialize(&reply)?; - info!("{} SERVICE: Received Reply {{ error: {} }}", self.service_name, reply.has_error() ); + info!( + "{} SERVICE: Received Reply {{ error: {} }}", + self.service_name, + reply.has_error() + ); if reply.has_error() { return Err(crate::Error::ServicesError("response has an error")); @@ -156,7 +166,7 @@ impl ReqProtocol { Ok(reply.get_payload()) } else { Err(crate::Error::ZMQError( - "Couldn't parse ZmqMessage".to_string(), + "Couldn't parse ZmqMessage".to_string(), )) } } @@ -207,7 +217,11 @@ pub struct Subscriber { impl Subscriber { pub fn new(addr: SocketAddr, service_name: String) -> Subscriber { let socket = zeromq::SubSocket::new(); - Subscriber { addr, socket , service_name} + Subscriber { + addr, + socket, + service_name, + } } pub async fn start(&mut self) -> Result<()> { @@ -230,7 +244,7 @@ impl Subscriber { Ok(data) } None => Err(crate::Error::ZMQError( - "Couldn't parse ZmqMessage".to_string(), + "Couldn't parse ZmqMessage".to_string(), )), } }