diff --git a/src/bin/demoservices.rs b/src/bin/demoservices.rs index c469b7970..0717af19a 100644 --- a/src/bin/demoservices.rs +++ b/src/bin/demoservices.rs @@ -7,12 +7,10 @@ use sapvi::Result; use sapvi::service::gateway; async fn start(executor: Arc>) -> Result<()> { - - let gateway = - gateway::GatewayService::new( - String::from("tcp://127.0.0.1:3333"), - String::from("tcp://127.0.0.1:4444") - ); + let gateway = gateway::GatewayService::new( + String::from("tcp://127.0.0.1:3333"), + String::from("tcp://127.0.0.1:4444"), + ); gateway.start(executor.clone()).await?; Ok(()) diff --git a/src/bin/demowallet.rs b/src/bin/demowallet.rs index ea7053193..d549b98a5 100644 --- a/src/bin/demowallet.rs +++ b/src/bin/demowallet.rs @@ -1,42 +1,38 @@ use async_executor::Executor; -use easy_parallel::Parallel; use async_std::sync::{Arc, Mutex}; +use easy_parallel::Parallel; -use sapvi::Result; use sapvi::service::gateway::GatewayClient; +use sapvi::Result; async fn start(executor: Arc>) -> Result<()> { - - let mut client = - GatewayClient::new( - String::from("tcp://127.0.0.1:3333"), - ); + let mut client = GatewayClient::new(String::from("tcp://127.0.0.1:3333")); client.start().await?; println!("connected to a server"); let slabs = Arc::new(Mutex::new(vec![])); - let subscriber = client.subscribe( - String::from("tcp://127.0.0.1:4444") - ).await?; + let subscriber = client + .subscribe(String::from("tcp://127.0.0.1:4444")) + .await?; println!("subscription ready"); + let fetch_loop_task = executor.spawn(GatewayClient::fetch_slabs_loop( + subscriber.clone(), + slabs.clone(), + )); - let fetch_loop_task = executor.spawn(GatewayClient::fetch_slabs_loop(subscriber.clone(), slabs.clone())); - - client.put_slab(vec![0,0,0,0]).await?; - client.put_slab(vec![0,0,0,0]).await?; - client.put_slab(vec![0,0,0,0]).await?; + client.put_slab(vec![0, 0, 0, 0]).await?; + client.put_slab(vec![0, 0, 0, 0]).await?; + client.put_slab(vec![0, 0, 0, 0]).await?; fetch_loop_task.cancel().await; Ok(()) } - - fn main() -> Result<()> { let ex = Arc::new(Executor::new()); let (signal, shutdown) = async_channel::unbounded::<()>(); diff --git a/src/service/gateway.rs b/src/service/gateway.rs index e179bf44b..8c46f85b0 100644 --- a/src/service/gateway.rs +++ b/src/service/gateway.rs @@ -1,7 +1,7 @@ -use std::convert::TryInto; use async_std::sync::{Arc, Mutex}; +use std::convert::TryInto; -use super::reqrep::{Reply, Request, RepProtocol, ReqProtocol, Publisher, Subscriber}; +use super::reqrep::{Publisher, RepProtocol, Reply, ReqProtocol, Request, Subscriber}; use crate::{Error, Result}; use async_executor::Executor; @@ -11,28 +11,29 @@ pub type Slabs = Vec>; pub struct GatewayService { slabs: Mutex, addr: String, - publisher: Mutex + publisher: Mutex, } impl GatewayService { - pub fn new(addr: String, pub_addr: String) -> Arc{ + pub fn new(addr: String, pub_addr: String) -> Arc { let slabs = Mutex::new(vec![]); let publisher = Mutex::new(Publisher::new(pub_addr)); Arc::new(GatewayService { slabs, addr, - publisher + publisher, }) } pub async fn start(self: Arc, executor: Arc>) -> Result<()> { - let (send_queue_s, send_queue_r) = async_channel::unbounded::(); let (recv_queue_s, recv_queue_r) = async_channel::unbounded::(); - - let mut reqrep = RepProtocol::new(self.addr.clone(), send_queue_r.clone(), recv_queue_s.clone(),); - + let mut reqrep = RepProtocol::new( + self.addr.clone(), + send_queue_r.clone(), + recv_queue_s.clone(), + ); reqrep.start().await?; println!("server started"); @@ -41,7 +42,8 @@ impl GatewayService { println!("publisher started"); - let handle_request_task = executor.spawn(self.handle_request(send_queue_s.clone(), recv_queue_r.clone())); + let handle_request_task = + executor.spawn(self.handle_request(send_queue_s.clone(), recv_queue_r.clone())); reqrep.run().await?; @@ -54,11 +56,10 @@ impl GatewayService { send_queue: async_channel::Sender, recv_queue: async_channel::Receiver, ) -> Result<()> { - let data = vec![]; loop { - match recv_queue.recv().await{ + match recv_queue.recv().await { Ok(request) => { match request.get_command() { 0 => { @@ -85,7 +86,6 @@ impl GatewayService { } let rep = Reply::from(&request, 0, data.clone()); send_queue.send(rep.into()).await?; - } Err(_) => {} } @@ -100,9 +100,7 @@ pub struct GatewayClient { impl GatewayClient { pub fn new(addr: String) -> GatewayClient { let protocol = ReqProtocol::new(addr); - GatewayClient { - protocol, - } + GatewayClient { protocol } } pub async fn start(&mut self) -> Result<()> { self.protocol.start().await?; @@ -116,26 +114,35 @@ impl GatewayClient { } pub async fn get_slab(&mut self, index: u32) -> Result> { - self.protocol.request(GatewayCommand::GetSlab as u8, index.to_be_bytes().to_vec()) + self.protocol + .request(GatewayCommand::GetSlab as u8, index.to_be_bytes().to_vec()) .await } pub async fn put_slab(&mut self, data: Vec) -> Result<()> { - self.protocol.request(GatewayCommand::PutSlab as u8, data.clone()).await?; + self.protocol + .request(GatewayCommand::PutSlab as u8, data.clone()) + .await?; Ok(()) } pub async fn get_last_index(&mut self) -> Result { - let rep = self.protocol.request(GatewayCommand::GetLastIndex as u8, vec![]).await?; + let rep = self + .protocol + .request(GatewayCommand::GetLastIndex as u8, vec![]) + .await?; let rep: [u8; 4] = rep.try_into().unwrap(); Ok(u32::from_be_bytes(rep)) } - pub async fn fetch_slabs_loop(subscriber: Arc>, slabs: Arc>) -> Result<()>{ + pub async fn fetch_slabs_loop( + subscriber: Arc>, + slabs: Arc>, + ) -> Result<()> { loop { let mut subscriber = subscriber.lock().await; let slab = subscriber.fetch().await?; - println!("received new slab from subscriber"); + println!("received new slab from subscriber"); slabs.lock().await.push(slab); } } diff --git a/src/service/reqrep.rs b/src/service/reqrep.rs index bf0e081da..48af54ec5 100644 --- a/src/service/reqrep.rs +++ b/src/service/reqrep.rs @@ -1,7 +1,7 @@ use std::io; -use crate::{Decodable, Encodable, Result}; use crate::serial::{deserialize, serialize}; +use crate::{Decodable, Encodable, Result}; use bytes::Bytes; use futures::FutureExt; @@ -13,8 +13,7 @@ enum NetEvent { Send(Reply), } - -pub struct RepProtocol{ +pub struct RepProtocol { addr: String, socket: zeromq::RepSocket, recv_queue: async_channel::Receiver, @@ -22,18 +21,18 @@ pub struct RepProtocol{ } impl RepProtocol { - pub fn new(addr: String, + pub fn new( + addr: String, recv_queue: async_channel::Receiver, - send_queue: async_channel::Sender + send_queue: async_channel::Sender, ) -> RepProtocol { let socket = zeromq::RepSocket::new(); - RepProtocol{ + RepProtocol { addr, socket, recv_queue, send_queue, } - } pub async fn start(&mut self) -> Result<()> { self.socket.bind(self.addr.as_str()).await?; @@ -61,23 +60,18 @@ impl RepProtocol { } } } - } } - pub struct ReqProtocol { addr: String, socket: zeromq::ReqSocket, } impl ReqProtocol { - pub fn new(addr: String) -> ReqProtocol{ + pub fn new(addr: String) -> ReqProtocol { let socket = zeromq::ReqSocket::new(); - ReqProtocol { - addr, - socket - } + ReqProtocol { addr, socket } } pub async fn start(&mut self) -> Result<()> { @@ -106,8 +100,6 @@ impl ReqProtocol { Ok(reply.get_payload()) } - - } pub struct Publisher { @@ -115,53 +107,43 @@ pub struct Publisher { socket: zeromq::PubSocket, } - - impl Publisher { pub fn new(addr: String) -> Publisher { let socket = zeromq::PubSocket::new(); - Publisher{ - addr, - socket - } + Publisher { addr, socket } } pub async fn start(&mut self) -> Result<()> { self.socket.bind(self.addr.as_str()).await?; Ok(()) } - pub async fn publish(&mut self, data: Vec) -> Result<()>{ + pub async fn publish(&mut self, data: Vec) -> Result<()> { let data = Bytes::from(data); self.socket.send(data.into()).await?; Ok(()) } } -pub struct Subscriber{ +pub struct Subscriber { addr: String, - socket: zeromq::SubSocket + socket: zeromq::SubSocket, } impl Subscriber { pub fn new(addr: String) -> Subscriber { let socket = zeromq::SubSocket::new(); - Subscriber{ - addr, - socket - } + Subscriber { addr, socket } } pub async fn start(&mut self) -> Result<()> { - self.socket - .connect(self.addr.as_str()) - .await?; + self.socket.connect(self.addr.as_str()).await?; self.socket.subscribe("").await?; Ok(()) } - pub async fn fetch(&mut self) -> Result>{ + pub async fn fetch(&mut self) -> Result> { let data = self.socket.recv().await?; let data: &Bytes = data.get(0).unwrap(); let data = data.to_vec(); @@ -169,7 +151,6 @@ impl Subscriber { } } - #[derive(Debug, PartialEq)] pub struct Request { command: u8, @@ -301,6 +282,3 @@ mod tests { assert_eq!(deserialized_reply, Some(reply)); } } - - -