From 4fc8214876f46d49f9e00ff5325d8d1eaa137d24 Mon Sep 17 00:00:00 2001 From: ghassmo Date: Sat, 29 May 2021 07:56:55 +0300 Subject: [PATCH] remove Mutex form publisher and use async_channel to publish --- src/service/gateway.rs | 40 +++++++++++++++++++++++++++------------- 1 file changed, 27 insertions(+), 13 deletions(-) diff --git a/src/service/gateway.rs b/src/service/gateway.rs index 09a1f6173..ac4ce26e0 100644 --- a/src/service/gateway.rs +++ b/src/service/gateway.rs @@ -1,4 +1,4 @@ -use async_std::sync::{Arc, Mutex}; +use async_std::sync::Arc; use std::convert::From; use std::net::SocketAddr; use std::path::Path; @@ -23,41 +23,59 @@ enum GatewayCommand { pub struct GatewayService { slabstore: Arc, addr: SocketAddr, - publisher: Mutex, + pub_addr: SocketAddr, } impl GatewayService { pub fn new(addr: SocketAddr, pub_addr: SocketAddr) -> Result> { - let publisher = Mutex::new(Publisher::new(pub_addr)); - let slabstore = SlabStore::new(Path::new("slabstore.db"))?; Ok(Arc::new(GatewayService { slabstore, addr, - publisher, + pub_addr, })) } pub async fn start(self: Arc, executor: Arc>) -> Result<()> { - let mut protocol = RepProtocol::new(String::from("GATEWAY"), self.addr.clone()); + let service_name = String::from("GATEWAY"); + + let mut protocol = RepProtocol::new(service_name.clone(), self.addr.clone()); let (send, recv) = protocol.start().await?; - self.publisher.lock().await.start().await?; + let (publish_queue, publish_recv_queue) = async_channel::unbounded::>(); + let publisher_task = executor.spawn(Self::start_publisher( + self.pub_addr, + service_name, + publish_recv_queue.clone(), + )); - let handle_request_task = executor.spawn(self.handle_request(send.clone(), recv.clone())); + let handle_request_task = + executor.spawn(self.handle_request(send.clone(), recv.clone(), publish_queue.clone())); protocol.run(executor.clone()).await?; + let _ = publisher_task.cancel().await; let _ = handle_request_task.cancel().await; Ok(()) } + async fn start_publisher( + pub_addr: SocketAddr, + service_name: String, + publish_recv_queue: async_channel::Receiver>, + ) -> Result<()> { + let mut publisher = Publisher::new(pub_addr, service_name); + publisher.start(publish_recv_queue).await?; + Ok(()) + } + async fn handle_request( self: Arc, send_queue: async_channel::Sender, recv_queue: async_channel::Receiver, + publish_queue: async_channel::Sender>, ) -> Result<()> { loop { match recv_queue.recv().await { @@ -77,7 +95,7 @@ impl GatewayService { send_queue.send(reply).await?; // publish to all subscribes - self.publisher.lock().await.publish(slab).await?; + publish_queue.send(slab).await?; info!("received putslab msg"); } @@ -193,8 +211,4 @@ impl GatewayClient { slabstore.put(slab)?; } } - } - - -