diff --git a/src/service/gateway.rs b/src/service/gateway.rs index 49a8af843..1249d3415 100644 --- a/src/service/gateway.rs +++ b/src/service/gateway.rs @@ -1,9 +1,10 @@ use async_std::sync::{Arc, Mutex}; -use std::convert::TryInto; use std::net::SocketAddr; +use std::path::Path; +use std::convert::From; use super::reqrep::{Publisher, RepProtocol, Reply, ReqProtocol, Request, Subscriber}; -use crate::{Error, Result}; +use crate::{Error, Result, slabstore::SlabStore, serial::serialize, serial::deserialize}; use async_executor::Executor; use log::*; @@ -18,20 +19,23 @@ enum GatewayCommand { } pub struct GatewayService { - slabs: Mutex, + slabstore: SlabStore, addr: SocketAddr, publisher: Mutex, } impl GatewayService { - pub fn new(addr: SocketAddr, pub_addr: SocketAddr) -> Arc { - let slabs = Mutex::new(vec![]); + pub fn new(addr: SocketAddr, pub_addr: SocketAddr) -> Result> { + let publisher = Mutex::new(Publisher::new(pub_addr)); - Arc::new(GatewayService { - slabs, + + let slabstore = SlabStore::new(Path::new("../slabstore.db"))?; + + Ok(Arc::new(GatewayService { + slabstore, addr, publisher, - }) + })) } pub async fn start(self: Arc, executor: Arc>) -> Result<()> { @@ -56,15 +60,22 @@ impl GatewayService { send_queue: async_channel::Sender, recv_queue: async_channel::Receiver, ) -> Result<()> { - let data = vec![]; loop { match recv_queue.recv().await { Ok(request) => { + // TODO spawn new task when receive new msg match request.get_command() { 0 => { // PUTSLAB + let slab = request.get_payload(); - self.slabs.lock().await.push(slab.clone()); + + // add to slabstore + self.slabstore.put(slab.clone())?; + + // send reply + let reply = Reply::from(&request, 0, vec![]); + send_queue.send(reply).await?; // publish to all subscribes self.publisher.lock().await.publish(slab).await?; @@ -72,10 +83,27 @@ impl GatewayService { info!("received putslab msg"); } 1 => { + + let index = request.get_payload(); + let slab = self.slabstore.get(index)?; + + let mut payload = vec![]; + + if let Some(sb) = slab { + payload = sb; + } + + let reply = Reply::from(&request, 0, payload); + send_queue.send(reply).await?; + // GETSLAB info!("received getslab msg"); } 2 => { + let index = self.slabstore.get_last_index_as_bytes()?; + let reply = Reply::from(&request, 0, index); + send_queue.send(reply).await?; + // GETLASTINDEX info!("received getlastindex msg"); } @@ -83,8 +111,6 @@ impl GatewayService { return Err(Error::ServicesError("received wrong command")); } } - let rep = Reply::from(&request, 0, data.clone()); - send_queue.send(rep.into()).await?; } Err(_) => { break; @@ -115,9 +141,9 @@ impl GatewayClient { Ok(Arc::new(Mutex::new(subscriber))) } - pub async fn get_slab(&mut self, index: u32) -> Result> { + pub async fn get_slab(&mut self, index: u64) -> Result> { self.protocol - .request(GatewayCommand::GetSlab as u8, index.to_be_bytes().to_vec()) + .request(GatewayCommand::GetSlab as u8, serialize(&index)) .await } @@ -127,16 +153,16 @@ impl GatewayClient { .await?; Ok(()) } - pub async fn get_last_index(&mut self) -> Result { + pub async fn get_last_index(&mut self) -> Result { let rep = self .protocol .request(GatewayCommand::GetLastIndex as u8, vec![]) .await?; - let rep: [u8; 4] = rep.try_into().map_err(|_| crate::Error::TryIntoError)?; - Ok(u32::from_be_bytes(rep)) + Ok(deserialize(&rep)?) } } + pub async fn fetch_slabs_loop( subscriber: Arc>, slabs: Arc>, @@ -151,3 +177,4 @@ pub async fn fetch_slabs_loop( slabs.lock().await.push(slab); } } +