start publish loop when once publisher get started

This commit is contained in:
ghassmo
2021-05-29 07:55:47 +03:00
parent f7552adf8a
commit fe1ba95797

View File

@@ -56,8 +56,8 @@ impl RepProtocol {
pub async fn start(
&mut self,
) -> Result<(
async_channel::Sender<Reply>,
async_channel::Receiver<Request>,
async_channel::Sender<Reply>,
async_channel::Receiver<Request>,
)> {
let addr = addr_to_string(self.addr);
self.socket.bind(addr.as_str()).await?;
@@ -150,7 +150,7 @@ impl ReqProtocol {
Ok(reply.get_payload())
} else {
Err(crate::Error::ZMQError(
"Couldn't parse ZmqMessage".to_string(),
"Couldn't parse ZmqMessage".to_string(),
))
}
}
@@ -159,20 +159,26 @@ impl ReqProtocol {
pub struct Publisher {
addr: SocketAddr,
socket: zeromq::PubSocket,
service_name: String,
}
impl Publisher {
pub fn new(addr: SocketAddr) -> Publisher {
pub fn new(addr: SocketAddr, service_name: String) -> Publisher {
let socket = zeromq::PubSocket::new();
Publisher { addr, socket }
}
pub async fn start(&mut self) -> Result<()> {
let addr = addr_to_string(self.addr);
self.socket.bind(addr.as_str()).await?;
Ok(())
Publisher { addr, socket, service_name}
}
pub async fn publish(&mut self, data: Vec<u8>) -> Result<()> {
pub async fn start(&mut self, recv_queue: async_channel::Receiver<Vec<u8>>) -> Result<()> {
let addr = addr_to_string(self.addr);
self.socket.bind(addr.as_str()).await?;
info!("{} SERVICE PUBLISHER: started - bind to {}", self.service_name, addr);
loop {
let x = recv_queue.recv().await?;
self.publish(x).await?;
}
}
async fn publish(&mut self, data: Vec<u8>) -> Result<()> {
let data = Bytes::from(data);
self.socket.send(data.into()).await?;
Ok(())
@@ -207,7 +213,7 @@ impl Subscriber {
Ok(data)
}
None => Err(crate::Error::ZMQError(
"Couldn't parse ZmqMessage".to_string(),
"Couldn't parse ZmqMessage".to_string(),
)),
}
}