switch sockets type to XREQ and XREP

This commit is contained in:
ghassmo
2021-05-29 13:48:17 +03:00
parent d70f7c42f2
commit ce100374d2
2 changed files with 53 additions and 34 deletions

View File

@@ -46,9 +46,9 @@ impl GatewayService {
let (publish_queue, publish_recv_queue) = async_channel::unbounded::<Vec<u8>>();
let publisher_task = executor.spawn(Self::start_publisher(
self.pub_addr,
service_name,
publish_recv_queue.clone(),
self.pub_addr,
service_name,
publish_recv_queue.clone(),
));
let handle_request_task =
@@ -73,14 +73,16 @@ impl GatewayService {
async fn handle_request(
self: Arc<Self>,
send_queue: async_channel::Sender<Reply>,
recv_queue: async_channel::Receiver<Request>,
send_queue: async_channel::Sender<(Vec<u8>,Reply)>,
recv_queue: async_channel::Receiver<(Vec<u8>, Request)>,
publish_queue: async_channel::Sender<Vec<u8>>,
) -> Result<()> {
loop {
match recv_queue.recv().await {
Ok(request) => {
Ok(msg) => {
// TODO spawn new task when receive new msg
let request = msg.1;
let peer = msg.0;
match request.get_command() {
0 => {
// PUTSLAB
@@ -92,7 +94,7 @@ impl GatewayService {
// send reply
let reply = Reply::from(&request, 0, vec![]);
send_queue.send(reply).await?;
send_queue.send((peer,reply)).await?;
// publish to all subscribes
publish_queue.send(slab).await?;
@@ -110,7 +112,7 @@ impl GatewayService {
}
let reply = Reply::from(&request, 0, payload);
send_queue.send(reply).await?;
send_queue.send((peer, reply)).await?;
// GETSLAB
info!("Received getslab msg");
@@ -118,7 +120,7 @@ impl GatewayService {
2 => {
let index = self.slabstore.get_last_index_as_bytes()?;
let reply = Reply::from(&request, 0, index);
send_queue.send(reply).await?;
send_queue.send((peer, reply)).await?;
// GETLASTINDEX
info!("Received getlastindex msg");
@@ -167,6 +169,8 @@ impl GatewayClient {
}
}
info!("End Syncing");
Ok(())
}
@@ -202,9 +206,14 @@ impl GatewayClient {
self.slabstore.clone()
}
pub async fn subscribe(slabstore: Arc<SlabStore>, sub_addr: SocketAddr) -> Result<()> {
pub async fn start_subscriber(sub_addr: SocketAddr) -> Result<Subscriber> {
let mut subscriber = Subscriber::new(sub_addr, String::from("GATEWAY CLIENT"));
subscriber.start().await?;
Ok(subscriber)
}
pub async fn subscribe(mut subscriber: Subscriber, slabstore: Arc<SlabStore>) -> Result<()> {
loop {
let slab: Vec<u8>;
slab = subscriber.fetch().await?;

View File

@@ -1,6 +1,7 @@
use async_std::sync::Arc;
use std::io;
use std::net::SocketAddr;
use std::convert::TryFrom;
use crate::serial::{deserialize, serialize};
use crate::{Decodable, Encodable, Result};
@@ -15,7 +16,7 @@ use zeromq::*;
enum NetEvent {
Receive(zeromq::ZmqMessage),
Send(Reply),
Send((Vec<u8>, Reply)),
Stop,
}
@@ -25,21 +26,21 @@ pub fn addr_to_string(addr: SocketAddr) -> String {
pub struct RepProtocol {
addr: SocketAddr,
socket: zeromq::RepSocket,
recv_queue: async_channel::Receiver<Reply>,
send_queue: async_channel::Sender<Request>,
socket: zeromq::RouterSocket,
recv_queue: async_channel::Receiver<(Vec<u8>,Reply)>,
send_queue: async_channel::Sender<(Vec<u8>, Request)>,
channels: (
async_channel::Sender<Reply>,
async_channel::Receiver<Request>,
async_channel::Sender<(Vec<u8>, Reply)>,
async_channel::Receiver<(Vec<u8>,Request)>,
),
service_name: String,
}
impl RepProtocol {
pub fn new(addr: SocketAddr, service_name: String) -> RepProtocol {
let socket = zeromq::RepSocket::new();
let (send_queue, recv_channel) = async_channel::unbounded::<Request>();
let (send_channel, recv_queue) = async_channel::unbounded::<Reply>();
let socket = zeromq::RouterSocket::new();
let (send_queue, recv_channel) = async_channel::unbounded::<(Vec<u8>, Request)>();
let (send_channel, recv_queue) = async_channel::unbounded::<(Vec<u8>, Reply)>();
let channels = (send_channel.clone(), recv_channel.clone());
@@ -56,8 +57,8 @@ impl RepProtocol {
pub async fn start(
&mut self,
) -> Result<(
async_channel::Sender<Reply>,
async_channel::Receiver<Request>,
async_channel::Sender<(Vec<u8>, Reply)>,
async_channel::Receiver<(Vec<u8>,Request)>,
)> {
let addr = addr_to_string(self.addr);
self.socket.bind(addr.as_str()).await?;
@@ -82,23 +83,31 @@ impl RepProtocol {
loop {
let event = futures::select! {
request = self.socket.recv().fuse() => NetEvent::Receive(request?),
reply = self.recv_queue.recv().fuse() => NetEvent::Send(reply?),
msg = self.socket.recv().fuse() => NetEvent::Receive(msg?),
msg = self.recv_queue.recv().fuse() => NetEvent::Send(msg?),
_ = stop_r.recv().fuse() => NetEvent::Stop
};
match event {
NetEvent::Receive(request) => {
if let Some(req) = request.get(0) {
let req: Vec<u8> = req.to_vec();
let req: Request = deserialize(&req)?;
self.send_queue.send(req).await?;
NetEvent::Receive(msg) => {
if let Some(request) = msg.get(1) {
let request: Vec<u8> = request.to_vec();
let request: Request = deserialize(&request)?;
if let Some(peer) = msg.get(0){
self.send_queue.send((peer.to_vec(), request)).await?;
}
}
}
NetEvent::Send(reply) => {
NetEvent::Send((peer, reply)) => {
let peer = Bytes::from(peer);
let mut msg:Vec<Bytes> = vec![peer];
let reply: Vec<u8> = serialize(&reply);
let reply = Bytes::from(reply);
let reply: zeromq::ZmqMessage = reply.into();
msg.push(reply);
let reply = zeromq::ZmqMessage::try_from(msg)
.map_err(|_| crate::Error::TryFromError)?;
self.socket.send(reply).await?;
}
NetEvent::Stop => break,
@@ -112,13 +121,13 @@ impl RepProtocol {
pub struct ReqProtocol {
addr: SocketAddr,
socket: zeromq::ReqSocket,
socket: zeromq::DealerSocket,
service_name: String,
}
impl ReqProtocol {
pub fn new(addr: SocketAddr, service_name: String) -> ReqProtocol {
let socket = zeromq::ReqSocket::new();
let socket = zeromq::DealerSocket::new();
ReqProtocol {
addr,
socket,
@@ -163,10 +172,11 @@ impl ReqProtocol {
assert!(reply.get_id() == request.get_id());
Ok(reply.get_payload())
} else {
Err(crate::Error::ZMQError(
"Couldn't parse ZmqMessage".to_string(),
"Couldn't parse ZmqMessage".to_string(),
))
}
}
@@ -244,7 +254,7 @@ impl Subscriber {
Ok(data)
}
None => Err(crate::Error::ZMQError(
"Couldn't parse ZmqMessage".to_string(),
"Couldn't parse ZmqMessage".to_string(),
)),
}
}