From c5724f3d678603b20ace5410c615fa33f2e2e71d Mon Sep 17 00:00:00 2001 From: ghassmo Date: Sat, 29 May 2021 16:04:46 +0300 Subject: [PATCH] gateway daemon spawn new task once receive a message --- src/service/gateway.rs | 128 ++++++++++++++++++++++++----------------- 1 file changed, 75 insertions(+), 53 deletions(-) diff --git a/src/service/gateway.rs b/src/service/gateway.rs index d4870176f..ffa446b38 100644 --- a/src/service/gateway.rs +++ b/src/service/gateway.rs @@ -3,7 +3,7 @@ use std::convert::From; use std::net::SocketAddr; use std::path::Path; -use super::reqrep::{Publisher, RepProtocol, Reply, ReqProtocol, Request, Subscriber, PeerId}; +use super::reqrep::{PeerId, Publisher, RepProtocol, Reply, ReqProtocol, Request, Subscriber}; use crate::{ serial::deserialize, serial::serialize, slab::Slab, slabstore::SlabStore, Error, Result, }; @@ -51,8 +51,12 @@ impl GatewayService { publish_recv_queue.clone(), )); - let handle_request_task = - executor.spawn(self.handle_request(send.clone(), recv.clone(), publish_queue.clone())); + let handle_request_task = executor.spawn(self.handle_request_loop( + send.clone(), + recv.clone(), + publish_queue.clone(), + executor.clone(), + )); protocol.run(executor.clone()).await?; @@ -71,64 +75,25 @@ impl GatewayService { Ok(()) } - async fn handle_request( + async fn handle_request_loop( self: Arc, send_queue: async_channel::Sender<(PeerId, Reply)>, recv_queue: async_channel::Receiver<(PeerId, Request)>, publish_queue: async_channel::Sender>, + executor: Arc>, ) -> Result<()> { loop { match recv_queue.recv().await { Ok(msg) => { - // TODO spawn new task when receive new msg - let request = msg.1; - let peer = msg.0; - match request.get_command() { - 0 => { - // PUTSLAB - - let slab = request.get_payload(); - - // add to slabstore - self.slabstore.put(slab.clone())?; - - // send reply - let reply = Reply::from(&request, 0, vec![]); - send_queue.send((peer, reply)).await?; - - // publish to all subscribes - publish_queue.send(slab).await?; - - info!("Received putslab msg"); - } - 1 => { - let index = request.get_payload(); - let slab = self.slabstore.get(index)?; - - let mut payload = vec![]; - - if let Some(sb) = slab { - payload = sb; - } - - let reply = Reply::from(&request, 0, payload); - send_queue.send((peer, reply)).await?; - - // GETSLAB - info!("Received getslab msg"); - } - 2 => { - let index = self.slabstore.get_last_index_as_bytes()?; - let reply = Reply::from(&request, 0, index); - send_queue.send((peer, reply)).await?; - - // GETLASTINDEX - info!("Received getlastindex msg"); - } - _ => { - return Err(Error::ServicesError("received wrong command")); - } - } + let slabstore = self.slabstore.clone(); + let _ = executor + .spawn(Self::handle_request( + msg, + slabstore, + send_queue.clone(), + publish_queue.clone(), + )) + .detach(); } Err(_) => { break; @@ -137,6 +102,63 @@ impl GatewayService { } Ok(()) } + + async fn handle_request( + msg: (PeerId, Request), + slabstore: Arc, + send_queue: async_channel::Sender<(PeerId, Reply)>, + publish_queue: async_channel::Sender>, + ) -> Result<()> { + let request = msg.1; + let peer = msg.0; + match request.get_command() { + 0 => { + // PUTSLAB + + let slab = request.get_payload(); + + // add to slabstore + slabstore.put(slab.clone())?; + + // send reply + let reply = Reply::from(&request, 0, vec![]); + send_queue.send((peer, reply)).await?; + + // publish to all subscribes + publish_queue.send(slab).await?; + + info!("Received putslab msg"); + } + 1 => { + let index = request.get_payload(); + let slab = slabstore.get(index)?; + + let mut payload = vec![]; + + if let Some(sb) = slab { + payload = sb; + } + + let reply = Reply::from(&request, 0, payload); + send_queue.send((peer, reply)).await?; + + // GETSLAB + info!("Received getslab msg"); + } + 2 => { + let index = slabstore.get_last_index_as_bytes()?; + let reply = Reply::from(&request, 0, index); + send_queue.send((peer, reply)).await?; + + // GETLASTINDEX + info!("Received getlastindex msg"); + } + _ => { + return Err(Error::ServicesError("received wrong command")); + } + } + Ok(()) + } } pub struct GatewayClient {