mirror of
https://github.com/darkrenaissance/darkfi.git
synced 2026-04-28 03:00:18 -04:00
node/service: Remove obsolete gateway code.
This commit is contained in:
@@ -1,361 +0,0 @@
|
||||
use std::{
|
||||
convert::From,
|
||||
net::{SocketAddr, ToSocketAddrs},
|
||||
sync::Arc,
|
||||
};
|
||||
|
||||
use async_executor::Executor;
|
||||
use log::debug;
|
||||
use url::Url;
|
||||
|
||||
use super::reqrep::{PeerId, Publisher, RepProtocol, Reply, ReqProtocol, Request, Subscriber};
|
||||
use crate::{
|
||||
blockchain::{rocks::columns, RocksColumn, Slab, SlabStore},
|
||||
util::serial::{deserialize, serialize},
|
||||
Error, Result,
|
||||
};
|
||||
|
||||
pub type GatewaySlabsSubscriber = async_channel::Receiver<Slab>;
|
||||
|
||||
#[repr(u8)]
|
||||
enum GatewayError {
|
||||
NoError,
|
||||
UpdateIndex,
|
||||
IndexNotExist,
|
||||
}
|
||||
|
||||
#[repr(u8)]
|
||||
enum GatewayCommand {
|
||||
PutSlab,
|
||||
GetSlab,
|
||||
GetLastIndex,
|
||||
}
|
||||
|
||||
pub struct GatewayService {
|
||||
slabstore: Arc<SlabStore>,
|
||||
addr: SocketAddr,
|
||||
pub_addr: SocketAddr,
|
||||
}
|
||||
|
||||
impl GatewayService {
|
||||
pub fn new(
|
||||
addr: SocketAddr,
|
||||
pub_addr: SocketAddr,
|
||||
rocks: RocksColumn<columns::Slabs>,
|
||||
) -> Result<Arc<GatewayService>> {
|
||||
let slabstore = SlabStore::new(rocks)?;
|
||||
|
||||
Ok(Arc::new(GatewayService { slabstore, addr, pub_addr }))
|
||||
}
|
||||
|
||||
pub async fn start(self: Arc<Self>, executor: Arc<Executor<'_>>) -> Result<()> {
|
||||
let service_name = String::from("GATEWAY DAEMON");
|
||||
|
||||
let mut protocol = RepProtocol::new(self.addr, service_name.clone());
|
||||
|
||||
let (send, recv) = protocol.start().await?;
|
||||
|
||||
let (publish_queue, publish_recv_queue) = async_channel::unbounded::<Vec<u8>>();
|
||||
let publisher_task = executor.spawn(Self::start_publisher(
|
||||
self.pub_addr,
|
||||
service_name,
|
||||
publish_recv_queue.clone(),
|
||||
));
|
||||
|
||||
let handle_request_task = executor.spawn(self.handle_request_loop(
|
||||
send.clone(),
|
||||
recv.clone(),
|
||||
publish_queue.clone(),
|
||||
executor.clone(),
|
||||
));
|
||||
|
||||
protocol.run(executor.clone()).await?;
|
||||
|
||||
let _ = publisher_task.cancel().await;
|
||||
let _ = handle_request_task.cancel().await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn start_publisher(
|
||||
pub_addr: SocketAddr,
|
||||
service_name: String,
|
||||
publish_recv_queue: async_channel::Receiver<Vec<u8>>,
|
||||
) -> Result<()> {
|
||||
let mut publisher = Publisher::new(pub_addr, service_name);
|
||||
publisher.start(publish_recv_queue).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn handle_request_loop(
|
||||
self: Arc<Self>,
|
||||
send_queue: async_channel::Sender<(PeerId, Reply)>,
|
||||
recv_queue: async_channel::Receiver<(PeerId, Request)>,
|
||||
publish_queue: async_channel::Sender<Vec<u8>>,
|
||||
executor: Arc<Executor<'_>>,
|
||||
) -> Result<()> {
|
||||
while let Ok(msg) = recv_queue.recv().await {
|
||||
let slabstore = self.slabstore.clone();
|
||||
let _ = executor
|
||||
.spawn(Self::handle_request(
|
||||
msg,
|
||||
slabstore,
|
||||
send_queue.clone(),
|
||||
publish_queue.clone(),
|
||||
))
|
||||
.detach();
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn handle_request(
|
||||
msg: (PeerId, Request),
|
||||
slabstore: Arc<SlabStore>,
|
||||
send_queue: async_channel::Sender<(PeerId, Reply)>,
|
||||
publish_queue: async_channel::Sender<Vec<u8>>,
|
||||
) -> Result<()> {
|
||||
let request = msg.1;
|
||||
let peer = msg.0;
|
||||
match request.get_command() {
|
||||
0 => {
|
||||
debug!(target: "GATEWAY DAEMON", "Received putslab msg");
|
||||
// PUTSLAB
|
||||
let slab = request.get_payload();
|
||||
|
||||
// add to slabstore
|
||||
let error = slabstore.put(deserialize(&slab)?)?;
|
||||
|
||||
let mut reply = Reply::from(&request, GatewayError::NoError as u32, vec![]);
|
||||
|
||||
if error.is_none() {
|
||||
reply.set_error(GatewayError::UpdateIndex as u32);
|
||||
}
|
||||
|
||||
// send reply
|
||||
send_queue.send((peer, reply)).await?;
|
||||
|
||||
// publish to all subscribes
|
||||
publish_queue.send(slab).await?;
|
||||
}
|
||||
1 => {
|
||||
debug!(target: "GATEWAY DAEMON", "Received getslab msg");
|
||||
let index = request.get_payload();
|
||||
let slab = slabstore.get(index)?;
|
||||
|
||||
let mut reply = Reply::from(&request, GatewayError::NoError as u32, vec![]);
|
||||
|
||||
if let Some(payload) = slab {
|
||||
reply.set_payload(payload);
|
||||
} else {
|
||||
reply.set_error(GatewayError::IndexNotExist as u32);
|
||||
}
|
||||
|
||||
send_queue.send((peer, reply)).await?;
|
||||
|
||||
// GETSLAB
|
||||
}
|
||||
2 => {
|
||||
debug!(target: "GATEWAY DAEMON","Received getlastindex msg");
|
||||
let index = slabstore.get_last_index_as_bytes()?;
|
||||
|
||||
let reply = Reply::from(&request, GatewayError::NoError as u32, index);
|
||||
send_queue.send((peer, reply)).await?;
|
||||
|
||||
// GETLASTINDEX
|
||||
}
|
||||
_ => return Err(Error::ServicesError("received wrong command")),
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
pub struct GatewayClient {
|
||||
protocol: ReqProtocol,
|
||||
slabstore: Arc<SlabStore>,
|
||||
gateway_slabs_sub_s: async_channel::Sender<Slab>,
|
||||
gateway_slabs_sub_rv: GatewaySlabsSubscriber,
|
||||
is_running: bool,
|
||||
sub_addr: SocketAddr,
|
||||
}
|
||||
|
||||
impl GatewayClient {
|
||||
pub fn new(addr: Url, sub_addr: Url, rocks: RocksColumn<columns::Slabs>) -> Result<Self> {
|
||||
// TODO: We'll want differentiation between TCP and TLS here.
|
||||
let addr_sock = (
|
||||
addr.host()
|
||||
.ok_or_else(|| Error::UrlParseError(format!("Missing host in {}", addr)))?
|
||||
.to_string(),
|
||||
addr.port().ok_or_else(|| Error::UrlParseError(format!("Missing port in {}", addr)))?,
|
||||
)
|
||||
.to_socket_addrs()?
|
||||
.next()
|
||||
.ok_or(Error::NoUrlFound)?;
|
||||
let protocol = ReqProtocol::new(addr_sock, String::from("GATEWAY CLIENT"));
|
||||
|
||||
let slabstore = SlabStore::new(rocks)?;
|
||||
|
||||
let (gateway_slabs_sub_s, gateway_slabs_sub_rv) = async_channel::unbounded::<Slab>();
|
||||
|
||||
let sub_addr_sock = (
|
||||
sub_addr
|
||||
.host()
|
||||
.ok_or_else(|| Error::UrlParseError(format!("Missing host in {}", sub_addr)))?
|
||||
.to_string(),
|
||||
sub_addr
|
||||
.port()
|
||||
.ok_or_else(|| Error::UrlParseError(format!("Missing port in {}", sub_addr)))?,
|
||||
)
|
||||
.to_socket_addrs()?
|
||||
.next()
|
||||
.ok_or(Error::NoUrlFound)?;
|
||||
|
||||
Ok(GatewayClient {
|
||||
protocol,
|
||||
slabstore,
|
||||
gateway_slabs_sub_s,
|
||||
gateway_slabs_sub_rv,
|
||||
is_running: false,
|
||||
sub_addr: sub_addr_sock,
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn start(&mut self) -> Result<()> {
|
||||
self.protocol.start().await?;
|
||||
self.sync().await?;
|
||||
self.is_running = true;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn sync(&mut self) -> Result<u64> {
|
||||
debug!(target: "GATEWAY CLIENT", "Start Syncing");
|
||||
|
||||
let local_last_index = self.slabstore.get_last_index()?;
|
||||
|
||||
let last_index = self.get_last_index().await?;
|
||||
|
||||
if last_index < local_last_index {
|
||||
return Err(Error::SlabsStore(
|
||||
"Local slabstore has higher index than gateway's slabstore.
|
||||
Run \" darkfid -r \" to refresh the database."
|
||||
.into(),
|
||||
))
|
||||
}
|
||||
|
||||
if last_index > 0 {
|
||||
for index in (local_last_index + 1)..(last_index + 1) {
|
||||
if self.get_slab(index).await?.is_none() {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
debug!(target: "GATEWAY CLIENT","End Syncing");
|
||||
Ok(last_index)
|
||||
}
|
||||
|
||||
pub async fn get_slab(&mut self, index: u64) -> Result<Option<Slab>> {
|
||||
debug!(target: "GATEWAY CLIENT","Get slab");
|
||||
|
||||
let handle_error = Arc::new(handle_error);
|
||||
let rep = self
|
||||
.protocol
|
||||
.request(GatewayCommand::GetSlab as u8, serialize(&index), handle_error)
|
||||
.await?;
|
||||
|
||||
if let Some(slab) = rep {
|
||||
let slab: Slab = deserialize(&slab)?;
|
||||
self.gateway_slabs_sub_s.send(slab.clone()).await?;
|
||||
self.slabstore.put(slab.clone())?;
|
||||
return Ok(Some(slab))
|
||||
}
|
||||
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
pub async fn put_slab(&mut self, mut slab: Slab) -> Result<()> {
|
||||
debug!(target: "GATEWAY CLIENT","Put slab");
|
||||
|
||||
loop {
|
||||
let last_index = self.sync().await?;
|
||||
slab.set_index(last_index + 1);
|
||||
let slab = serialize(&slab);
|
||||
|
||||
let handle_error = Arc::new(handle_error);
|
||||
|
||||
let rep = self
|
||||
.protocol
|
||||
.request(GatewayCommand::PutSlab as u8, slab.clone(), handle_error)
|
||||
.await?;
|
||||
|
||||
if rep.is_some() {
|
||||
break
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn get_last_index(&mut self) -> Result<u64> {
|
||||
debug!(target: "GATEWAY CLIENT","Get last index");
|
||||
|
||||
let handle_error = Arc::new(handle_error);
|
||||
|
||||
let rep =
|
||||
self.protocol.request(GatewayCommand::GetLastIndex as u8, vec![], handle_error).await?;
|
||||
if let Some(index) = rep {
|
||||
return deserialize(&index)
|
||||
}
|
||||
Ok(0)
|
||||
}
|
||||
|
||||
pub fn get_slabstore(&self) -> Arc<SlabStore> {
|
||||
self.slabstore.clone()
|
||||
}
|
||||
|
||||
pub async fn start_subscriber(
|
||||
&self,
|
||||
executor: Arc<Executor<'_>>,
|
||||
) -> Result<GatewaySlabsSubscriber> {
|
||||
debug!(target: "GATEWAY CLIENT", "Start subscriber");
|
||||
|
||||
let mut subscriber = Subscriber::new(self.sub_addr, String::from("GATEWAY CLIENT"));
|
||||
subscriber.start().await?;
|
||||
executor
|
||||
.spawn(Self::subscribe_loop(
|
||||
subscriber,
|
||||
self.slabstore.clone(),
|
||||
self.gateway_slabs_sub_s.clone(),
|
||||
))
|
||||
.detach();
|
||||
Ok(self.gateway_slabs_sub_rv.clone())
|
||||
}
|
||||
|
||||
async fn subscribe_loop(
|
||||
mut subscriber: Subscriber,
|
||||
slabstore: Arc<SlabStore>,
|
||||
gateway_slabs_sub_s: async_channel::Sender<Slab>,
|
||||
) -> Result<()> {
|
||||
debug!(target: "GATEWAY CLIENT", "Start subscribe loop");
|
||||
|
||||
loop {
|
||||
let slab = subscriber.fetch::<Slab>().await?;
|
||||
debug!(target: "GATEWAY CLIENT", "Received new slab");
|
||||
gateway_slabs_sub_s.send(slab.clone()).await?;
|
||||
slabstore.put(slab)?;
|
||||
}
|
||||
}
|
||||
|
||||
pub fn is_running(&self) -> bool {
|
||||
self.is_running
|
||||
}
|
||||
}
|
||||
|
||||
fn handle_error(status_code: u32) {
|
||||
match status_code {
|
||||
1 => {
|
||||
debug!(target: "GATEWAY SERVICE", "Reply has an Error: Index is not updated");
|
||||
}
|
||||
2 => {
|
||||
debug!(target: "GATEWAY SERVICE", "Reply has an Error: Index Not Exist");
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
@@ -1,209 +0,0 @@
|
||||
use async_std::sync::{Arc, Mutex};
|
||||
use std::io;
|
||||
|
||||
use async_executor::Executor;
|
||||
use log::debug;
|
||||
|
||||
use crate::{
|
||||
blockchain::{rocks::columns, RocksColumn, Slab, SlabStore},
|
||||
net,
|
||||
net::{P2p, P2pPtr, Settings},
|
||||
util::{
|
||||
serial::{deserialize, serialize, Decodable, Encodable},
|
||||
sleep,
|
||||
},
|
||||
Error, Result,
|
||||
};
|
||||
|
||||
pub struct Gateway {
|
||||
p2p: P2pPtr,
|
||||
slabstore: Arc<SlabStore>,
|
||||
_last_indexes: Arc<Mutex<Vec<u64>>>,
|
||||
}
|
||||
|
||||
impl Gateway {
|
||||
pub async fn new(_settings: Settings, rocks: RocksColumn<columns::Slabs>) -> Result<Self> {
|
||||
let slabstore = SlabStore::new(rocks)?;
|
||||
let settings = Settings::default();
|
||||
|
||||
let p2p = P2p::new(settings).await;
|
||||
let last_indexes = Arc::new(Mutex::new(vec![0; 10]));
|
||||
Ok(Self { p2p, slabstore, _last_indexes: last_indexes })
|
||||
}
|
||||
|
||||
pub async fn start(&self, executor: Arc<Executor<'_>>) -> Result<()> {
|
||||
self.p2p.clone().start(executor.clone()).await?;
|
||||
|
||||
self.p2p.clone().run(executor.clone()).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn publish(&self, msg: GatewayMessage) -> Result<()> {
|
||||
self.p2p.broadcast(msg).await
|
||||
}
|
||||
|
||||
async fn _subscribe_loop(&self, executor: Arc<Executor<'_>>) -> Result<()> {
|
||||
let new_channel_sub = self.p2p.subscribe_channel().await;
|
||||
|
||||
loop {
|
||||
let channel = new_channel_sub.receive().await?;
|
||||
|
||||
let message_subsytem = channel.get_message_subsystem();
|
||||
|
||||
message_subsytem.add_dispatch::<GatewayMessage>().await;
|
||||
|
||||
let msg_sub = channel.subscribe_msg::<GatewayMessage>().await?;
|
||||
|
||||
let jobsman = net::ProtocolJobsManager::new("GatewayMessage", channel);
|
||||
|
||||
jobsman.clone().start(executor.clone());
|
||||
|
||||
jobsman
|
||||
.spawn(Self::handle_msg(self.slabstore.clone(), msg_sub), executor.clone())
|
||||
.await;
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn handle_msg(
|
||||
slabstore: Arc<SlabStore>,
|
||||
msg_sub: net::MessageSubscription<GatewayMessage>,
|
||||
) -> Result<()> {
|
||||
loop {
|
||||
let msg = msg_sub.receive().await?;
|
||||
|
||||
match msg.get_command() {
|
||||
GatewayCommand::PutSlab => {
|
||||
debug!(target: "GATEWAY", "Received putslab msg");
|
||||
|
||||
let slab = msg.get_payload();
|
||||
|
||||
slabstore.put(deserialize(&slab)?)?;
|
||||
|
||||
// TODO publish the new received slab
|
||||
}
|
||||
GatewayCommand::GetSlab => {
|
||||
debug!(target: "GATEWAY", "Received getslab msg");
|
||||
|
||||
let index = msg.get_payload();
|
||||
let _slab = slabstore.get(index)?;
|
||||
|
||||
// TODO publish the slab
|
||||
}
|
||||
GatewayCommand::GetLastIndex => {
|
||||
debug!(target: "GATEWAY","Received getlastindex msg");
|
||||
|
||||
let _index = slabstore.get_last_index_as_bytes()?;
|
||||
|
||||
// TODO publish the inex
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn sync(&self) -> Result<()> {
|
||||
debug!(target: "GATEWAY", "Start Syncing");
|
||||
|
||||
loop {
|
||||
let local_last_index = self.slabstore.get_last_index()?;
|
||||
|
||||
// start syncing every 4 seconds
|
||||
sleep(4).await;
|
||||
|
||||
self.get_last_index().await?;
|
||||
let last_index = 0;
|
||||
|
||||
if last_index < local_last_index {
|
||||
return Err(Error::SlabsStore(
|
||||
"Local slabstore has higher index than gateway's slabstore.
|
||||
Run \" darkfid -r \" to refresh the database."
|
||||
.into(),
|
||||
))
|
||||
}
|
||||
|
||||
if last_index > 0 {
|
||||
for index in (local_last_index + 1)..(last_index + 1) {
|
||||
self.get_slab(index).await?
|
||||
}
|
||||
}
|
||||
|
||||
debug!(target: "GATEWAY","End Syncing");
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn get_slab(&self, index: u64) -> Result<()> {
|
||||
debug!(target: "GATEWAY","Send get slab msg");
|
||||
let msg = GatewayMessage::new(GatewayCommand::GetSlab, serialize(&index));
|
||||
self.publish(msg).await
|
||||
}
|
||||
|
||||
pub async fn put_slab(&self, slab: Slab) -> Result<()> {
|
||||
debug!(target: "GATEWAY","Send put slab msg");
|
||||
let msg = GatewayMessage::new(GatewayCommand::PutSlab, serialize(&slab));
|
||||
self.publish(msg).await
|
||||
}
|
||||
|
||||
pub async fn get_last_index(&self) -> Result<()> {
|
||||
debug!(target: "GATEWAY","Send get last index msg");
|
||||
let msg = GatewayMessage::new(GatewayCommand::PutSlab, vec![]);
|
||||
self.publish(msg).await
|
||||
}
|
||||
|
||||
pub fn get_slabstore(&self) -> Arc<SlabStore> {
|
||||
self.slabstore.clone()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Clone)]
|
||||
pub enum GatewayCommand {
|
||||
PutSlab,
|
||||
GetSlab,
|
||||
GetLastIndex,
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Clone)]
|
||||
pub struct GatewayMessage {
|
||||
command: GatewayCommand,
|
||||
payload: Vec<u8>,
|
||||
}
|
||||
|
||||
impl GatewayMessage {
|
||||
pub fn new(command: GatewayCommand, payload: Vec<u8>) -> Self {
|
||||
Self { command, payload }
|
||||
}
|
||||
pub fn get_command(&self) -> GatewayCommand {
|
||||
self.command.clone()
|
||||
}
|
||||
|
||||
pub fn get_payload(&self) -> Vec<u8> {
|
||||
self.payload.clone()
|
||||
}
|
||||
}
|
||||
|
||||
impl Encodable for GatewayMessage {
|
||||
fn encode<S: io::Write>(&self, mut s: S) -> Result<usize> {
|
||||
let mut len = 0;
|
||||
len += (self.command.clone() as u8).encode(&mut s)?;
|
||||
len += self.payload.encode(&mut s)?;
|
||||
Ok(len)
|
||||
}
|
||||
}
|
||||
|
||||
impl Decodable for GatewayMessage {
|
||||
fn decode<D: io::Read>(mut d: D) -> Result<Self> {
|
||||
let command_code: u8 = Decodable::decode(&mut d)?;
|
||||
let command = match command_code {
|
||||
0 => GatewayCommand::PutSlab,
|
||||
1 => GatewayCommand::GetSlab,
|
||||
_ => GatewayCommand::GetLastIndex,
|
||||
};
|
||||
|
||||
Ok(Self { command, payload: Decodable::decode(&mut d)? })
|
||||
}
|
||||
}
|
||||
|
||||
impl net::Message for GatewayMessage {
|
||||
fn name() -> &'static str {
|
||||
"reply"
|
||||
}
|
||||
}
|
||||
@@ -1,5 +0,0 @@
|
||||
pub mod gateway;
|
||||
pub mod gateway_p2p;
|
||||
pub mod reqrep;
|
||||
|
||||
pub use gateway::{GatewayClient, GatewayService, GatewaySlabsSubscriber};
|
||||
@@ -1,394 +0,0 @@
|
||||
use std::{io, net::SocketAddr, sync::Arc};
|
||||
|
||||
use async_executor::Executor;
|
||||
use async_std::prelude::*;
|
||||
use bytes::Bytes;
|
||||
use futures::FutureExt;
|
||||
use log::*;
|
||||
use rand::Rng;
|
||||
use signal_hook::consts::SIGINT;
|
||||
use signal_hook_async_std::Signals;
|
||||
use zeromq::*;
|
||||
|
||||
use crate::{
|
||||
util::serial::{deserialize, serialize, Decodable, Encodable},
|
||||
Result,
|
||||
};
|
||||
|
||||
pub type PeerId = Vec<u8>;
|
||||
|
||||
pub type Channels =
|
||||
(async_channel::Sender<(PeerId, Reply)>, async_channel::Receiver<(PeerId, Request)>);
|
||||
|
||||
enum NetEvent {
|
||||
Receive(zeromq::ZmqMessage),
|
||||
Send((PeerId, Reply)),
|
||||
Stop,
|
||||
}
|
||||
|
||||
pub fn addr_to_string(addr: SocketAddr) -> String {
|
||||
format!("tcp://{}", addr)
|
||||
}
|
||||
|
||||
pub struct RepProtocol {
|
||||
addr: SocketAddr,
|
||||
socket: zeromq::RouterSocket,
|
||||
recv_queue: async_channel::Receiver<(PeerId, Reply)>,
|
||||
send_queue: async_channel::Sender<(PeerId, Request)>,
|
||||
channels: Channels,
|
||||
service_name: String,
|
||||
}
|
||||
|
||||
impl RepProtocol {
|
||||
pub fn new(addr: SocketAddr, service_name: String) -> RepProtocol {
|
||||
let socket = zeromq::RouterSocket::new();
|
||||
let (send_queue, recv_channel) = async_channel::unbounded::<(PeerId, Request)>();
|
||||
let (send_channel, recv_queue) = async_channel::unbounded::<(PeerId, Reply)>();
|
||||
|
||||
let channels = (send_channel, recv_channel);
|
||||
|
||||
RepProtocol { addr, socket, recv_queue, send_queue, channels, service_name }
|
||||
}
|
||||
|
||||
pub async fn start(
|
||||
&mut self,
|
||||
) -> Result<(async_channel::Sender<(PeerId, Reply)>, async_channel::Receiver<(PeerId, Request)>)>
|
||||
{
|
||||
let addr = addr_to_string(self.addr);
|
||||
self.socket.bind(addr.as_str()).await?;
|
||||
debug!(target: "REP PROTOCOL API", "{} SERVICE: Bound To {}", self.service_name, addr);
|
||||
Ok(self.channels.clone())
|
||||
}
|
||||
|
||||
pub async fn run(&mut self, executor: Arc<Executor<'_>>) -> Result<()> {
|
||||
debug!(target: "REP PROTOCOL API", "{} SERVICE: Running", self.service_name);
|
||||
|
||||
let (stop_s, stop_r) = async_channel::unbounded::<()>();
|
||||
|
||||
let signals = Signals::new(&[SIGINT])?;
|
||||
let handle = signals.handle();
|
||||
|
||||
let signals_task = executor.spawn(async move {
|
||||
let mut signals = signals.fuse();
|
||||
while let Some(signal) = signals.next().await {
|
||||
match signal {
|
||||
SIGINT => {
|
||||
stop_s.send(()).await?;
|
||||
break
|
||||
}
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
Ok::<(), crate::Error>(())
|
||||
});
|
||||
|
||||
loop {
|
||||
let event = futures::select! {
|
||||
msg = self.socket.recv().fuse() => NetEvent::Receive(msg?),
|
||||
msg = self.recv_queue.recv().fuse() => NetEvent::Send(msg?),
|
||||
_ = stop_r.recv().fuse() => NetEvent::Stop
|
||||
};
|
||||
|
||||
match event {
|
||||
NetEvent::Receive(msg) => {
|
||||
if let Some(peer) = msg.get(0) {
|
||||
if let Some(request) = msg.get(1) {
|
||||
let request: Vec<u8> = request.to_vec();
|
||||
let request: Request = deserialize(&request)?;
|
||||
self.send_queue.send((peer.to_vec(), request)).await?;
|
||||
}
|
||||
}
|
||||
}
|
||||
NetEvent::Send((peer, reply)) => {
|
||||
let peer = Bytes::from(peer);
|
||||
let mut msg: Vec<Bytes> = vec![peer];
|
||||
let reply: Vec<u8> = serialize(&reply);
|
||||
let reply = Bytes::from(reply);
|
||||
msg.push(reply);
|
||||
|
||||
let reply = zeromq::ZmqMessage::try_from(msg)
|
||||
.map_err(|_| crate::Error::TryFromError)?;
|
||||
|
||||
self.socket.send(reply).await?;
|
||||
}
|
||||
NetEvent::Stop => break,
|
||||
}
|
||||
}
|
||||
|
||||
handle.close();
|
||||
signals_task.await?;
|
||||
|
||||
debug!(target: "REP PROTOCOL API","{} SERVICE: Stopped", self.service_name);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
pub struct ReqProtocol {
|
||||
addr: SocketAddr,
|
||||
socket: zeromq::DealerSocket,
|
||||
service_name: String,
|
||||
}
|
||||
|
||||
impl ReqProtocol {
|
||||
pub fn new(addr: SocketAddr, service_name: String) -> ReqProtocol {
|
||||
let socket = zeromq::DealerSocket::new();
|
||||
ReqProtocol { addr, socket, service_name }
|
||||
}
|
||||
|
||||
pub async fn start(&mut self) -> Result<()> {
|
||||
let addr = addr_to_string(self.addr);
|
||||
self.socket.connect(addr.as_str()).await?;
|
||||
debug!(target: "REQ PROTOCOL API","{} SERVICE: Connected To {}", self.service_name, self.addr);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn request(
|
||||
&mut self,
|
||||
command: u8,
|
||||
data: Vec<u8>,
|
||||
handle_error: Arc<dyn Fn(u32) + Send + Sync>,
|
||||
) -> Result<Option<Vec<u8>>> {
|
||||
let request = Request::new(command, data);
|
||||
let req = serialize(&request);
|
||||
let req = bytes::Bytes::from(req);
|
||||
let req: zeromq::ZmqMessage = req.into();
|
||||
|
||||
self.socket.send(req).await?;
|
||||
debug!(
|
||||
target: "REQ PROTOCOL API",
|
||||
"{} SERVICE: Sent Request {{ command: {} }}",
|
||||
self.service_name, command
|
||||
);
|
||||
|
||||
let rep: zeromq::ZmqMessage = self.socket.recv().await?;
|
||||
if let Some(reply) = rep.get(0) {
|
||||
let reply: Vec<u8> = reply.to_vec();
|
||||
|
||||
let reply: Reply = deserialize(&reply)?;
|
||||
|
||||
debug!(
|
||||
target: "REQ PROTOCOL API",
|
||||
"{} SERVICE: Received Reply {{ error: {} }}",
|
||||
self.service_name,
|
||||
reply.has_error()
|
||||
);
|
||||
|
||||
if reply.has_error() {
|
||||
handle_error(reply.get_error());
|
||||
return Ok(None)
|
||||
}
|
||||
|
||||
if reply.get_id() != request.get_id() {
|
||||
warn!("Reply id is not equal to Request id");
|
||||
return Ok(None)
|
||||
}
|
||||
|
||||
Ok(Some(reply.get_payload()))
|
||||
} else {
|
||||
Err(crate::Error::ZmqError("Couldn't parse ZmqMessage".to_string()))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct Publisher {
|
||||
addr: SocketAddr,
|
||||
socket: zeromq::PubSocket,
|
||||
service_name: String,
|
||||
}
|
||||
|
||||
impl Publisher {
|
||||
pub fn new(addr: SocketAddr, service_name: String) -> Publisher {
|
||||
let socket = zeromq::PubSocket::new();
|
||||
Publisher { addr, socket, service_name }
|
||||
}
|
||||
|
||||
pub async fn start(&mut self, recv_queue: async_channel::Receiver<Vec<u8>>) -> Result<()> {
|
||||
let addr = addr_to_string(self.addr);
|
||||
self.socket.bind(addr.as_str()).await?;
|
||||
debug!(
|
||||
target: "PUBLISHER API",
|
||||
"{} SERVICE : Bound To {}",
|
||||
self.service_name, addr
|
||||
);
|
||||
loop {
|
||||
let msg = recv_queue.recv().await?;
|
||||
self.publish(msg).await?;
|
||||
}
|
||||
}
|
||||
|
||||
async fn publish(&mut self, data: Vec<u8>) -> Result<()> {
|
||||
let data = Bytes::from(data);
|
||||
self.socket.send(data.into()).await?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
pub struct Subscriber {
|
||||
addr: SocketAddr,
|
||||
socket: zeromq::SubSocket,
|
||||
service_name: String,
|
||||
}
|
||||
|
||||
impl Subscriber {
|
||||
pub fn new(addr: SocketAddr, service_name: String) -> Subscriber {
|
||||
let socket = zeromq::SubSocket::new();
|
||||
Subscriber { addr, socket, service_name }
|
||||
}
|
||||
|
||||
pub async fn start(&mut self) -> Result<()> {
|
||||
let addr = addr_to_string(self.addr);
|
||||
self.socket.connect(addr.as_str()).await?;
|
||||
|
||||
self.socket.subscribe("").await?;
|
||||
debug!(
|
||||
target: "SUBSCRIBER API",
|
||||
"{} SERVICE : Connected To {}",
|
||||
self.service_name, addr
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn fetch<T: Decodable>(&mut self) -> Result<T> {
|
||||
let data = self.socket.recv().await?;
|
||||
match data.get(0) {
|
||||
Some(d) => {
|
||||
let data = d.to_vec();
|
||||
let data: T = deserialize(&data)?;
|
||||
Ok(data)
|
||||
}
|
||||
None => Err(crate::Error::ZmqError("Couldn't parse ZmqMessage".to_string())),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq)]
|
||||
pub struct Request {
|
||||
command: u8,
|
||||
id: u32,
|
||||
payload: Vec<u8>,
|
||||
}
|
||||
|
||||
impl Request {
|
||||
pub fn new(command: u8, payload: Vec<u8>) -> Request {
|
||||
let id = Self::gen_id();
|
||||
Request { command, id, payload }
|
||||
}
|
||||
fn gen_id() -> u32 {
|
||||
let mut rng = rand::thread_rng();
|
||||
rng.gen()
|
||||
}
|
||||
|
||||
pub fn get_id(&self) -> u32 {
|
||||
self.id
|
||||
}
|
||||
|
||||
pub fn get_command(&self) -> u8 {
|
||||
self.command
|
||||
}
|
||||
|
||||
pub fn get_payload(&self) -> Vec<u8> {
|
||||
self.payload.clone()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq)]
|
||||
pub struct Reply {
|
||||
id: u32,
|
||||
error: u32,
|
||||
payload: Vec<u8>,
|
||||
}
|
||||
|
||||
impl Reply {
|
||||
pub fn from(request: &Request, error: u32, payload: Vec<u8>) -> Reply {
|
||||
Reply { id: request.get_id(), error, payload }
|
||||
}
|
||||
|
||||
pub fn has_error(&self) -> bool {
|
||||
self.error != 0
|
||||
}
|
||||
|
||||
pub fn get_error(&self) -> u32 {
|
||||
self.error
|
||||
}
|
||||
|
||||
pub fn get_payload(&self) -> Vec<u8> {
|
||||
self.payload.clone()
|
||||
}
|
||||
|
||||
pub fn set_payload(&mut self, payload: Vec<u8>) {
|
||||
self.payload = payload;
|
||||
}
|
||||
|
||||
pub fn set_error(&mut self, error: u32) {
|
||||
self.error = error;
|
||||
}
|
||||
|
||||
pub fn get_id(&self) -> u32 {
|
||||
self.id
|
||||
}
|
||||
}
|
||||
|
||||
impl Encodable for Request {
|
||||
fn encode<S: io::Write>(&self, mut s: S) -> Result<usize> {
|
||||
let mut len = 0;
|
||||
len += self.command.encode(&mut s)?;
|
||||
len += self.id.encode(&mut s)?;
|
||||
len += self.payload.encode(&mut s)?;
|
||||
Ok(len)
|
||||
}
|
||||
}
|
||||
|
||||
impl Encodable for Reply {
|
||||
fn encode<S: io::Write>(&self, mut s: S) -> Result<usize> {
|
||||
let mut len = 0;
|
||||
len += self.id.encode(&mut s)?;
|
||||
len += self.error.encode(&mut s)?;
|
||||
len += self.payload.encode(&mut s)?;
|
||||
Ok(len)
|
||||
}
|
||||
}
|
||||
|
||||
impl Decodable for Request {
|
||||
fn decode<D: io::Read>(mut d: D) -> Result<Self> {
|
||||
Ok(Self {
|
||||
command: Decodable::decode(&mut d)?,
|
||||
id: Decodable::decode(&mut d)?,
|
||||
payload: Decodable::decode(&mut d)?,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl Decodable for Reply {
|
||||
fn decode<D: io::Read>(mut d: D) -> Result<Self> {
|
||||
Ok(Self {
|
||||
id: Decodable::decode(&mut d)?,
|
||||
error: Decodable::decode(&mut d)?,
|
||||
payload: Decodable::decode(&mut d)?,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::{Reply, Request, Result};
|
||||
use crate::util::serial::{deserialize, serialize};
|
||||
|
||||
#[test]
|
||||
fn serialize_and_deserialize_request_test() {
|
||||
let request = Request::new(2, vec![2, 3, 4, 6, 4]);
|
||||
let serialized_request = serialize(&request);
|
||||
assert!((deserialize(&serialized_request) as Result<bool>).is_err());
|
||||
let deserialized_request = deserialize(&serialized_request).ok();
|
||||
assert_eq!(deserialized_request, Some(request));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn serialize_and_deserialize_reply_test() {
|
||||
let request = Request::new(2, vec![2, 3, 4, 6, 4]);
|
||||
let reply = Reply::from(&request, 0, vec![2, 3, 4, 6, 4]);
|
||||
let serialized_reply = serialize(&reply);
|
||||
assert!((deserialize(&serialized_reply) as Result<bool>).is_err());
|
||||
let deserialized_reply = deserialize(&serialized_reply).ok();
|
||||
assert_eq!(deserialized_reply, Some(reply));
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user