From fe1ba9579700e5fde35da97def8ba44922ea2f2c Mon Sep 17 00:00:00 2001 From: ghassmo Date: Sat, 29 May 2021 07:55:47 +0300 Subject: [PATCH] start publish loop when once publisher get started --- src/service/reqrep.rs | 30 ++++++++++++++++++------------ 1 file changed, 18 insertions(+), 12 deletions(-) diff --git a/src/service/reqrep.rs b/src/service/reqrep.rs index 501dbaed1..fbc9e15ab 100644 --- a/src/service/reqrep.rs +++ b/src/service/reqrep.rs @@ -56,8 +56,8 @@ impl RepProtocol { pub async fn start( &mut self, ) -> Result<( - async_channel::Sender, - async_channel::Receiver, + async_channel::Sender, + async_channel::Receiver, )> { let addr = addr_to_string(self.addr); self.socket.bind(addr.as_str()).await?; @@ -150,7 +150,7 @@ impl ReqProtocol { Ok(reply.get_payload()) } else { Err(crate::Error::ZMQError( - "Couldn't parse ZmqMessage".to_string(), + "Couldn't parse ZmqMessage".to_string(), )) } } @@ -159,20 +159,26 @@ impl ReqProtocol { pub struct Publisher { addr: SocketAddr, socket: zeromq::PubSocket, + service_name: String, } impl Publisher { - pub fn new(addr: SocketAddr) -> Publisher { + pub fn new(addr: SocketAddr, service_name: String) -> Publisher { let socket = zeromq::PubSocket::new(); - Publisher { addr, socket } - } - pub async fn start(&mut self) -> Result<()> { - let addr = addr_to_string(self.addr); - self.socket.bind(addr.as_str()).await?; - Ok(()) + Publisher { addr, socket, service_name} } - pub async fn publish(&mut self, data: Vec) -> Result<()> { + pub async fn start(&mut self, recv_queue: async_channel::Receiver>) -> Result<()> { + let addr = addr_to_string(self.addr); + self.socket.bind(addr.as_str()).await?; + info!("{} SERVICE PUBLISHER: started - bind to {}", self.service_name, addr); + loop { + let x = recv_queue.recv().await?; + self.publish(x).await?; + } + } + + async fn publish(&mut self, data: Vec) -> Result<()> { let data = Bytes::from(data); self.socket.send(data.into()).await?; Ok(()) @@ -207,7 +213,7 @@ impl Subscriber { Ok(data) } None => Err(crate::Error::ZMQError( - "Couldn't parse ZmqMessage".to_string(), + "Couldn't parse ZmqMessage".to_string(), )), } }