diff --git a/src/service/gateway.rs b/src/service/gateway.rs index 1e014cee0..e1146f3fa 100644 --- a/src/service/gateway.rs +++ b/src/service/gateway.rs @@ -46,9 +46,9 @@ impl GatewayService { let (publish_queue, publish_recv_queue) = async_channel::unbounded::>(); let publisher_task = executor.spawn(Self::start_publisher( - self.pub_addr, - service_name, - publish_recv_queue.clone(), + self.pub_addr, + service_name, + publish_recv_queue.clone(), )); let handle_request_task = @@ -73,14 +73,16 @@ impl GatewayService { async fn handle_request( self: Arc, - send_queue: async_channel::Sender, - recv_queue: async_channel::Receiver, + send_queue: async_channel::Sender<(Vec,Reply)>, + recv_queue: async_channel::Receiver<(Vec, Request)>, publish_queue: async_channel::Sender>, ) -> Result<()> { loop { match recv_queue.recv().await { - Ok(request) => { + Ok(msg) => { // TODO spawn new task when receive new msg + let request = msg.1; + let peer = msg.0; match request.get_command() { 0 => { // PUTSLAB @@ -92,7 +94,7 @@ impl GatewayService { // send reply let reply = Reply::from(&request, 0, vec![]); - send_queue.send(reply).await?; + send_queue.send((peer,reply)).await?; // publish to all subscribes publish_queue.send(slab).await?; @@ -110,7 +112,7 @@ impl GatewayService { } let reply = Reply::from(&request, 0, payload); - send_queue.send(reply).await?; + send_queue.send((peer, reply)).await?; // GETSLAB info!("Received getslab msg"); @@ -118,7 +120,7 @@ impl GatewayService { 2 => { let index = self.slabstore.get_last_index_as_bytes()?; let reply = Reply::from(&request, 0, index); - send_queue.send(reply).await?; + send_queue.send((peer, reply)).await?; // GETLASTINDEX info!("Received getlastindex msg"); @@ -167,6 +169,8 @@ impl GatewayClient { } } + info!("End Syncing"); + Ok(()) } @@ -202,9 +206,14 @@ impl GatewayClient { self.slabstore.clone() } - pub async fn subscribe(slabstore: Arc, sub_addr: SocketAddr) -> Result<()> { + + pub async fn start_subscriber(sub_addr: SocketAddr) -> Result { let mut subscriber = Subscriber::new(sub_addr, String::from("GATEWAY CLIENT")); subscriber.start().await?; + Ok(subscriber) + } + + pub async fn subscribe(mut subscriber: Subscriber, slabstore: Arc) -> Result<()> { loop { let slab: Vec; slab = subscriber.fetch().await?; diff --git a/src/service/reqrep.rs b/src/service/reqrep.rs index a06e13e62..e96bae4c9 100644 --- a/src/service/reqrep.rs +++ b/src/service/reqrep.rs @@ -1,6 +1,7 @@ use async_std::sync::Arc; use std::io; use std::net::SocketAddr; +use std::convert::TryFrom; use crate::serial::{deserialize, serialize}; use crate::{Decodable, Encodable, Result}; @@ -15,7 +16,7 @@ use zeromq::*; enum NetEvent { Receive(zeromq::ZmqMessage), - Send(Reply), + Send((Vec, Reply)), Stop, } @@ -25,21 +26,21 @@ pub fn addr_to_string(addr: SocketAddr) -> String { pub struct RepProtocol { addr: SocketAddr, - socket: zeromq::RepSocket, - recv_queue: async_channel::Receiver, - send_queue: async_channel::Sender, + socket: zeromq::RouterSocket, + recv_queue: async_channel::Receiver<(Vec,Reply)>, + send_queue: async_channel::Sender<(Vec, Request)>, channels: ( - async_channel::Sender, - async_channel::Receiver, + async_channel::Sender<(Vec, Reply)>, + async_channel::Receiver<(Vec,Request)>, ), service_name: String, } impl RepProtocol { pub fn new(addr: SocketAddr, service_name: String) -> RepProtocol { - let socket = zeromq::RepSocket::new(); - let (send_queue, recv_channel) = async_channel::unbounded::(); - let (send_channel, recv_queue) = async_channel::unbounded::(); + let socket = zeromq::RouterSocket::new(); + let (send_queue, recv_channel) = async_channel::unbounded::<(Vec, Request)>(); + let (send_channel, recv_queue) = async_channel::unbounded::<(Vec, Reply)>(); let channels = (send_channel.clone(), recv_channel.clone()); @@ -56,8 +57,8 @@ impl RepProtocol { pub async fn start( &mut self, ) -> Result<( - async_channel::Sender, - async_channel::Receiver, + async_channel::Sender<(Vec, Reply)>, + async_channel::Receiver<(Vec,Request)>, )> { let addr = addr_to_string(self.addr); self.socket.bind(addr.as_str()).await?; @@ -82,23 +83,31 @@ impl RepProtocol { loop { let event = futures::select! { - request = self.socket.recv().fuse() => NetEvent::Receive(request?), - reply = self.recv_queue.recv().fuse() => NetEvent::Send(reply?), + msg = self.socket.recv().fuse() => NetEvent::Receive(msg?), + msg = self.recv_queue.recv().fuse() => NetEvent::Send(msg?), _ = stop_r.recv().fuse() => NetEvent::Stop }; match event { - NetEvent::Receive(request) => { - if let Some(req) = request.get(0) { - let req: Vec = req.to_vec(); - let req: Request = deserialize(&req)?; - self.send_queue.send(req).await?; + NetEvent::Receive(msg) => { + if let Some(request) = msg.get(1) { + let request: Vec = request.to_vec(); + let request: Request = deserialize(&request)?; + if let Some(peer) = msg.get(0){ + self.send_queue.send((peer.to_vec(), request)).await?; + } } } - NetEvent::Send(reply) => { + NetEvent::Send((peer, reply)) => { + let peer = Bytes::from(peer); + let mut msg:Vec = vec![peer]; let reply: Vec = serialize(&reply); let reply = Bytes::from(reply); - let reply: zeromq::ZmqMessage = reply.into(); + msg.push(reply); + + let reply = zeromq::ZmqMessage::try_from(msg) + .map_err(|_| crate::Error::TryFromError)?; + self.socket.send(reply).await?; } NetEvent::Stop => break, @@ -112,13 +121,13 @@ impl RepProtocol { pub struct ReqProtocol { addr: SocketAddr, - socket: zeromq::ReqSocket, + socket: zeromq::DealerSocket, service_name: String, } impl ReqProtocol { pub fn new(addr: SocketAddr, service_name: String) -> ReqProtocol { - let socket = zeromq::ReqSocket::new(); + let socket = zeromq::DealerSocket::new(); ReqProtocol { addr, socket, @@ -163,10 +172,11 @@ impl ReqProtocol { assert!(reply.get_id() == request.get_id()); + Ok(reply.get_payload()) } else { Err(crate::Error::ZMQError( - "Couldn't parse ZmqMessage".to_string(), + "Couldn't parse ZmqMessage".to_string(), )) } } @@ -244,7 +254,7 @@ impl Subscriber { Ok(data) } None => Err(crate::Error::ZMQError( - "Couldn't parse ZmqMessage".to_string(), + "Couldn't parse ZmqMessage".to_string(), )), } }