gateway daemon spawn new task once receive a message

This commit is contained in:
ghassmo
2021-05-29 16:04:46 +03:00
parent eb05266e96
commit c5724f3d67

View File

@@ -3,7 +3,7 @@ use std::convert::From;
use std::net::SocketAddr;
use std::path::Path;
use super::reqrep::{Publisher, RepProtocol, Reply, ReqProtocol, Request, Subscriber, PeerId};
use super::reqrep::{PeerId, Publisher, RepProtocol, Reply, ReqProtocol, Request, Subscriber};
use crate::{
serial::deserialize, serial::serialize, slab::Slab, slabstore::SlabStore, Error, Result,
};
@@ -51,8 +51,12 @@ impl GatewayService {
publish_recv_queue.clone(),
));
let handle_request_task =
executor.spawn(self.handle_request(send.clone(), recv.clone(), publish_queue.clone()));
let handle_request_task = executor.spawn(self.handle_request_loop(
send.clone(),
recv.clone(),
publish_queue.clone(),
executor.clone(),
));
protocol.run(executor.clone()).await?;
@@ -71,64 +75,25 @@ impl GatewayService {
Ok(())
}
async fn handle_request(
async fn handle_request_loop(
self: Arc<Self>,
send_queue: async_channel::Sender<(PeerId, Reply)>,
recv_queue: async_channel::Receiver<(PeerId, Request)>,
publish_queue: async_channel::Sender<Vec<u8>>,
executor: Arc<Executor<'_>>,
) -> Result<()> {
loop {
match recv_queue.recv().await {
Ok(msg) => {
// TODO spawn new task when receive new msg
let request = msg.1;
let peer = msg.0;
match request.get_command() {
0 => {
// PUTSLAB
let slab = request.get_payload();
// add to slabstore
self.slabstore.put(slab.clone())?;
// send reply
let reply = Reply::from(&request, 0, vec![]);
send_queue.send((peer, reply)).await?;
// publish to all subscribes
publish_queue.send(slab).await?;
info!("Received putslab msg");
}
1 => {
let index = request.get_payload();
let slab = self.slabstore.get(index)?;
let mut payload = vec![];
if let Some(sb) = slab {
payload = sb;
}
let reply = Reply::from(&request, 0, payload);
send_queue.send((peer, reply)).await?;
// GETSLAB
info!("Received getslab msg");
}
2 => {
let index = self.slabstore.get_last_index_as_bytes()?;
let reply = Reply::from(&request, 0, index);
send_queue.send((peer, reply)).await?;
// GETLASTINDEX
info!("Received getlastindex msg");
}
_ => {
return Err(Error::ServicesError("received wrong command"));
}
}
let slabstore = self.slabstore.clone();
let _ = executor
.spawn(Self::handle_request(
msg,
slabstore,
send_queue.clone(),
publish_queue.clone(),
))
.detach();
}
Err(_) => {
break;
@@ -137,6 +102,63 @@ impl GatewayService {
}
Ok(())
}
async fn handle_request(
msg: (PeerId, Request),
slabstore: Arc<SlabStore>,
send_queue: async_channel::Sender<(PeerId, Reply)>,
publish_queue: async_channel::Sender<Vec<u8>>,
) -> Result<()> {
let request = msg.1;
let peer = msg.0;
match request.get_command() {
0 => {
// PUTSLAB
let slab = request.get_payload();
// add to slabstore
slabstore.put(slab.clone())?;
// send reply
let reply = Reply::from(&request, 0, vec![]);
send_queue.send((peer, reply)).await?;
// publish to all subscribes
publish_queue.send(slab).await?;
info!("Received putslab msg");
}
1 => {
let index = request.get_payload();
let slab = slabstore.get(index)?;
let mut payload = vec![];
if let Some(sb) = slab {
payload = sb;
}
let reply = Reply::from(&request, 0, payload);
send_queue.send((peer, reply)).await?;
// GETSLAB
info!("Received getslab msg");
}
2 => {
let index = slabstore.get_last_index_as_bytes()?;
let reply = Reply::from(&request, 0, index);
send_queue.send((peer, reply)).await?;
// GETLASTINDEX
info!("Received getlastindex msg");
}
_ => {
return Err(Error::ServicesError("received wrong command"));
}
}
Ok(())
}
}
pub struct GatewayClient {