remove Mutex form publisher and use async_channel to publish

This commit is contained in:
ghassmo
2021-05-29 07:56:55 +03:00
parent fe1ba95797
commit 4fc8214876

View File

@@ -1,4 +1,4 @@
use async_std::sync::{Arc, Mutex};
use async_std::sync::Arc;
use std::convert::From;
use std::net::SocketAddr;
use std::path::Path;
@@ -23,41 +23,59 @@ enum GatewayCommand {
pub struct GatewayService {
slabstore: Arc<SlabStore>,
addr: SocketAddr,
publisher: Mutex<Publisher>,
pub_addr: SocketAddr,
}
impl GatewayService {
pub fn new(addr: SocketAddr, pub_addr: SocketAddr) -> Result<Arc<GatewayService>> {
let publisher = Mutex::new(Publisher::new(pub_addr));
let slabstore = SlabStore::new(Path::new("slabstore.db"))?;
Ok(Arc::new(GatewayService {
slabstore,
addr,
publisher,
pub_addr,
}))
}
pub async fn start(self: Arc<Self>, executor: Arc<Executor<'_>>) -> Result<()> {
let mut protocol = RepProtocol::new(String::from("GATEWAY"), self.addr.clone());
let service_name = String::from("GATEWAY");
let mut protocol = RepProtocol::new(service_name.clone(), self.addr.clone());
let (send, recv) = protocol.start().await?;
self.publisher.lock().await.start().await?;
let (publish_queue, publish_recv_queue) = async_channel::unbounded::<Vec<u8>>();
let publisher_task = executor.spawn(Self::start_publisher(
self.pub_addr,
service_name,
publish_recv_queue.clone(),
));
let handle_request_task = executor.spawn(self.handle_request(send.clone(), recv.clone()));
let handle_request_task =
executor.spawn(self.handle_request(send.clone(), recv.clone(), publish_queue.clone()));
protocol.run(executor.clone()).await?;
let _ = publisher_task.cancel().await;
let _ = handle_request_task.cancel().await;
Ok(())
}
async fn start_publisher(
pub_addr: SocketAddr,
service_name: String,
publish_recv_queue: async_channel::Receiver<Vec<u8>>,
) -> Result<()> {
let mut publisher = Publisher::new(pub_addr, service_name);
publisher.start(publish_recv_queue).await?;
Ok(())
}
async fn handle_request(
self: Arc<Self>,
send_queue: async_channel::Sender<Reply>,
recv_queue: async_channel::Receiver<Request>,
publish_queue: async_channel::Sender<Vec<u8>>,
) -> Result<()> {
loop {
match recv_queue.recv().await {
@@ -77,7 +95,7 @@ impl GatewayService {
send_queue.send(reply).await?;
// publish to all subscribes
self.publisher.lock().await.publish(slab).await?;
publish_queue.send(slab).await?;
info!("received putslab msg");
}
@@ -193,8 +211,4 @@ impl GatewayClient {
slabstore.put(slab)?;
}
}
}