diff --git a/Cargo.toml b/Cargo.toml index 2cc9c25b0..a331a11d1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -122,26 +122,6 @@ libsqlite3-sys = {version = "0.24.2", features = ["bundled-sqlcipher"], optiona # Blockchain store sled = {version = "0.34.7", optional = true} -# Node utilities -signal-hook = {version = "0.3.13", optional = true} -signal-hook-async-std = {version = "0.2.2", optional = true} - -# Node protocol -[dependencies.zeromq] -version = "0.3.3" -default-features = false -features = ["async-std-runtime", "all-transport"] -optional = true - -[dependencies.rocksdb] -# TODO: Revert to upstream after bd966750ec861d687913d59a9939a1408ac53131 is merged. -git = "https://github.com/parazyd/rust-rocksdb" -rev = "bd966750ec861d687913d59a9939a1408ac53131" -default-features = false -features = ["zstd"] -optional = true - - [dev-dependencies] clap = {version = "3.1.8", features = ["derive"]} diff --git a/src/node/service/gateway.rs b/src/node/service/gateway.rs deleted file mode 100644 index 44ec28a2c..000000000 --- a/src/node/service/gateway.rs +++ /dev/null @@ -1,361 +0,0 @@ -use std::{ - convert::From, - net::{SocketAddr, ToSocketAddrs}, - sync::Arc, -}; - -use async_executor::Executor; -use log::debug; -use url::Url; - -use super::reqrep::{PeerId, Publisher, RepProtocol, Reply, ReqProtocol, Request, Subscriber}; -use crate::{ - blockchain::{rocks::columns, RocksColumn, Slab, SlabStore}, - util::serial::{deserialize, serialize}, - Error, Result, -}; - -pub type GatewaySlabsSubscriber = async_channel::Receiver; - -#[repr(u8)] -enum GatewayError { - NoError, - UpdateIndex, - IndexNotExist, -} - -#[repr(u8)] -enum GatewayCommand { - PutSlab, - GetSlab, - GetLastIndex, -} - -pub struct GatewayService { - slabstore: Arc, - addr: SocketAddr, - pub_addr: SocketAddr, -} - -impl GatewayService { - pub fn new( - addr: SocketAddr, - pub_addr: SocketAddr, - rocks: RocksColumn, - ) -> Result> { - let slabstore = SlabStore::new(rocks)?; - - Ok(Arc::new(GatewayService { slabstore, addr, pub_addr })) - } - - pub async fn start(self: Arc, executor: Arc>) -> Result<()> { - let service_name = String::from("GATEWAY DAEMON"); - - let mut protocol = RepProtocol::new(self.addr, service_name.clone()); - - let (send, recv) = protocol.start().await?; - - let (publish_queue, publish_recv_queue) = async_channel::unbounded::>(); - let publisher_task = executor.spawn(Self::start_publisher( - self.pub_addr, - service_name, - publish_recv_queue.clone(), - )); - - let handle_request_task = executor.spawn(self.handle_request_loop( - send.clone(), - recv.clone(), - publish_queue.clone(), - executor.clone(), - )); - - protocol.run(executor.clone()).await?; - - let _ = publisher_task.cancel().await; - let _ = handle_request_task.cancel().await; - Ok(()) - } - - async fn start_publisher( - pub_addr: SocketAddr, - service_name: String, - publish_recv_queue: async_channel::Receiver>, - ) -> Result<()> { - let mut publisher = Publisher::new(pub_addr, service_name); - publisher.start(publish_recv_queue).await?; - Ok(()) - } - - async fn handle_request_loop( - self: Arc, - send_queue: async_channel::Sender<(PeerId, Reply)>, - recv_queue: async_channel::Receiver<(PeerId, Request)>, - publish_queue: async_channel::Sender>, - executor: Arc>, - ) -> Result<()> { - while let Ok(msg) = recv_queue.recv().await { - let slabstore = self.slabstore.clone(); - let _ = executor - .spawn(Self::handle_request( - msg, - slabstore, - send_queue.clone(), - publish_queue.clone(), - )) - .detach(); - } - Ok(()) - } - - async fn handle_request( - msg: (PeerId, Request), - slabstore: Arc, - send_queue: async_channel::Sender<(PeerId, Reply)>, - publish_queue: async_channel::Sender>, - ) -> Result<()> { - let request = msg.1; - let peer = msg.0; - match request.get_command() { - 0 => { - debug!(target: "GATEWAY DAEMON", "Received putslab msg"); - // PUTSLAB - let slab = request.get_payload(); - - // add to slabstore - let error = slabstore.put(deserialize(&slab)?)?; - - let mut reply = Reply::from(&request, GatewayError::NoError as u32, vec![]); - - if error.is_none() { - reply.set_error(GatewayError::UpdateIndex as u32); - } - - // send reply - send_queue.send((peer, reply)).await?; - - // publish to all subscribes - publish_queue.send(slab).await?; - } - 1 => { - debug!(target: "GATEWAY DAEMON", "Received getslab msg"); - let index = request.get_payload(); - let slab = slabstore.get(index)?; - - let mut reply = Reply::from(&request, GatewayError::NoError as u32, vec![]); - - if let Some(payload) = slab { - reply.set_payload(payload); - } else { - reply.set_error(GatewayError::IndexNotExist as u32); - } - - send_queue.send((peer, reply)).await?; - - // GETSLAB - } - 2 => { - debug!(target: "GATEWAY DAEMON","Received getlastindex msg"); - let index = slabstore.get_last_index_as_bytes()?; - - let reply = Reply::from(&request, GatewayError::NoError as u32, index); - send_queue.send((peer, reply)).await?; - - // GETLASTINDEX - } - _ => return Err(Error::ServicesError("received wrong command")), - } - Ok(()) - } -} - -pub struct GatewayClient { - protocol: ReqProtocol, - slabstore: Arc, - gateway_slabs_sub_s: async_channel::Sender, - gateway_slabs_sub_rv: GatewaySlabsSubscriber, - is_running: bool, - sub_addr: SocketAddr, -} - -impl GatewayClient { - pub fn new(addr: Url, sub_addr: Url, rocks: RocksColumn) -> Result { - // TODO: We'll want differentiation between TCP and TLS here. - let addr_sock = ( - addr.host() - .ok_or_else(|| Error::UrlParseError(format!("Missing host in {}", addr)))? - .to_string(), - addr.port().ok_or_else(|| Error::UrlParseError(format!("Missing port in {}", addr)))?, - ) - .to_socket_addrs()? - .next() - .ok_or(Error::NoUrlFound)?; - let protocol = ReqProtocol::new(addr_sock, String::from("GATEWAY CLIENT")); - - let slabstore = SlabStore::new(rocks)?; - - let (gateway_slabs_sub_s, gateway_slabs_sub_rv) = async_channel::unbounded::(); - - let sub_addr_sock = ( - sub_addr - .host() - .ok_or_else(|| Error::UrlParseError(format!("Missing host in {}", sub_addr)))? - .to_string(), - sub_addr - .port() - .ok_or_else(|| Error::UrlParseError(format!("Missing port in {}", sub_addr)))?, - ) - .to_socket_addrs()? - .next() - .ok_or(Error::NoUrlFound)?; - - Ok(GatewayClient { - protocol, - slabstore, - gateway_slabs_sub_s, - gateway_slabs_sub_rv, - is_running: false, - sub_addr: sub_addr_sock, - }) - } - - pub async fn start(&mut self) -> Result<()> { - self.protocol.start().await?; - self.sync().await?; - self.is_running = true; - Ok(()) - } - - pub async fn sync(&mut self) -> Result { - debug!(target: "GATEWAY CLIENT", "Start Syncing"); - - let local_last_index = self.slabstore.get_last_index()?; - - let last_index = self.get_last_index().await?; - - if last_index < local_last_index { - return Err(Error::SlabsStore( - "Local slabstore has higher index than gateway's slabstore. - Run \" darkfid -r \" to refresh the database." - .into(), - )) - } - - if last_index > 0 { - for index in (local_last_index + 1)..(last_index + 1) { - if self.get_slab(index).await?.is_none() { - break - } - } - } - - debug!(target: "GATEWAY CLIENT","End Syncing"); - Ok(last_index) - } - - pub async fn get_slab(&mut self, index: u64) -> Result> { - debug!(target: "GATEWAY CLIENT","Get slab"); - - let handle_error = Arc::new(handle_error); - let rep = self - .protocol - .request(GatewayCommand::GetSlab as u8, serialize(&index), handle_error) - .await?; - - if let Some(slab) = rep { - let slab: Slab = deserialize(&slab)?; - self.gateway_slabs_sub_s.send(slab.clone()).await?; - self.slabstore.put(slab.clone())?; - return Ok(Some(slab)) - } - - Ok(None) - } - - pub async fn put_slab(&mut self, mut slab: Slab) -> Result<()> { - debug!(target: "GATEWAY CLIENT","Put slab"); - - loop { - let last_index = self.sync().await?; - slab.set_index(last_index + 1); - let slab = serialize(&slab); - - let handle_error = Arc::new(handle_error); - - let rep = self - .protocol - .request(GatewayCommand::PutSlab as u8, slab.clone(), handle_error) - .await?; - - if rep.is_some() { - break - } - } - Ok(()) - } - - pub async fn get_last_index(&mut self) -> Result { - debug!(target: "GATEWAY CLIENT","Get last index"); - - let handle_error = Arc::new(handle_error); - - let rep = - self.protocol.request(GatewayCommand::GetLastIndex as u8, vec![], handle_error).await?; - if let Some(index) = rep { - return deserialize(&index) - } - Ok(0) - } - - pub fn get_slabstore(&self) -> Arc { - self.slabstore.clone() - } - - pub async fn start_subscriber( - &self, - executor: Arc>, - ) -> Result { - debug!(target: "GATEWAY CLIENT", "Start subscriber"); - - let mut subscriber = Subscriber::new(self.sub_addr, String::from("GATEWAY CLIENT")); - subscriber.start().await?; - executor - .spawn(Self::subscribe_loop( - subscriber, - self.slabstore.clone(), - self.gateway_slabs_sub_s.clone(), - )) - .detach(); - Ok(self.gateway_slabs_sub_rv.clone()) - } - - async fn subscribe_loop( - mut subscriber: Subscriber, - slabstore: Arc, - gateway_slabs_sub_s: async_channel::Sender, - ) -> Result<()> { - debug!(target: "GATEWAY CLIENT", "Start subscribe loop"); - - loop { - let slab = subscriber.fetch::().await?; - debug!(target: "GATEWAY CLIENT", "Received new slab"); - gateway_slabs_sub_s.send(slab.clone()).await?; - slabstore.put(slab)?; - } - } - - pub fn is_running(&self) -> bool { - self.is_running - } -} - -fn handle_error(status_code: u32) { - match status_code { - 1 => { - debug!(target: "GATEWAY SERVICE", "Reply has an Error: Index is not updated"); - } - 2 => { - debug!(target: "GATEWAY SERVICE", "Reply has an Error: Index Not Exist"); - } - _ => {} - } -} diff --git a/src/node/service/gateway_p2p.rs b/src/node/service/gateway_p2p.rs deleted file mode 100644 index b54f862ed..000000000 --- a/src/node/service/gateway_p2p.rs +++ /dev/null @@ -1,209 +0,0 @@ -use async_std::sync::{Arc, Mutex}; -use std::io; - -use async_executor::Executor; -use log::debug; - -use crate::{ - blockchain::{rocks::columns, RocksColumn, Slab, SlabStore}, - net, - net::{P2p, P2pPtr, Settings}, - util::{ - serial::{deserialize, serialize, Decodable, Encodable}, - sleep, - }, - Error, Result, -}; - -pub struct Gateway { - p2p: P2pPtr, - slabstore: Arc, - _last_indexes: Arc>>, -} - -impl Gateway { - pub async fn new(_settings: Settings, rocks: RocksColumn) -> Result { - let slabstore = SlabStore::new(rocks)?; - let settings = Settings::default(); - - let p2p = P2p::new(settings).await; - let last_indexes = Arc::new(Mutex::new(vec![0; 10])); - Ok(Self { p2p, slabstore, _last_indexes: last_indexes }) - } - - pub async fn start(&self, executor: Arc>) -> Result<()> { - self.p2p.clone().start(executor.clone()).await?; - - self.p2p.clone().run(executor.clone()).await?; - - Ok(()) - } - - async fn publish(&self, msg: GatewayMessage) -> Result<()> { - self.p2p.broadcast(msg).await - } - - async fn _subscribe_loop(&self, executor: Arc>) -> Result<()> { - let new_channel_sub = self.p2p.subscribe_channel().await; - - loop { - let channel = new_channel_sub.receive().await?; - - let message_subsytem = channel.get_message_subsystem(); - - message_subsytem.add_dispatch::().await; - - let msg_sub = channel.subscribe_msg::().await?; - - let jobsman = net::ProtocolJobsManager::new("GatewayMessage", channel); - - jobsman.clone().start(executor.clone()); - - jobsman - .spawn(Self::handle_msg(self.slabstore.clone(), msg_sub), executor.clone()) - .await; - } - } - - pub async fn handle_msg( - slabstore: Arc, - msg_sub: net::MessageSubscription, - ) -> Result<()> { - loop { - let msg = msg_sub.receive().await?; - - match msg.get_command() { - GatewayCommand::PutSlab => { - debug!(target: "GATEWAY", "Received putslab msg"); - - let slab = msg.get_payload(); - - slabstore.put(deserialize(&slab)?)?; - - // TODO publish the new received slab - } - GatewayCommand::GetSlab => { - debug!(target: "GATEWAY", "Received getslab msg"); - - let index = msg.get_payload(); - let _slab = slabstore.get(index)?; - - // TODO publish the slab - } - GatewayCommand::GetLastIndex => { - debug!(target: "GATEWAY","Received getlastindex msg"); - - let _index = slabstore.get_last_index_as_bytes()?; - - // TODO publish the inex - } - } - } - } - - pub async fn sync(&self) -> Result<()> { - debug!(target: "GATEWAY", "Start Syncing"); - - loop { - let local_last_index = self.slabstore.get_last_index()?; - - // start syncing every 4 seconds - sleep(4).await; - - self.get_last_index().await?; - let last_index = 0; - - if last_index < local_last_index { - return Err(Error::SlabsStore( - "Local slabstore has higher index than gateway's slabstore. - Run \" darkfid -r \" to refresh the database." - .into(), - )) - } - - if last_index > 0 { - for index in (local_last_index + 1)..(last_index + 1) { - self.get_slab(index).await? - } - } - - debug!(target: "GATEWAY","End Syncing"); - } - } - - pub async fn get_slab(&self, index: u64) -> Result<()> { - debug!(target: "GATEWAY","Send get slab msg"); - let msg = GatewayMessage::new(GatewayCommand::GetSlab, serialize(&index)); - self.publish(msg).await - } - - pub async fn put_slab(&self, slab: Slab) -> Result<()> { - debug!(target: "GATEWAY","Send put slab msg"); - let msg = GatewayMessage::new(GatewayCommand::PutSlab, serialize(&slab)); - self.publish(msg).await - } - - pub async fn get_last_index(&self) -> Result<()> { - debug!(target: "GATEWAY","Send get last index msg"); - let msg = GatewayMessage::new(GatewayCommand::PutSlab, vec![]); - self.publish(msg).await - } - - pub fn get_slabstore(&self) -> Arc { - self.slabstore.clone() - } -} - -#[derive(Debug, PartialEq, Clone)] -pub enum GatewayCommand { - PutSlab, - GetSlab, - GetLastIndex, -} - -#[derive(Debug, PartialEq, Clone)] -pub struct GatewayMessage { - command: GatewayCommand, - payload: Vec, -} - -impl GatewayMessage { - pub fn new(command: GatewayCommand, payload: Vec) -> Self { - Self { command, payload } - } - pub fn get_command(&self) -> GatewayCommand { - self.command.clone() - } - - pub fn get_payload(&self) -> Vec { - self.payload.clone() - } -} - -impl Encodable for GatewayMessage { - fn encode(&self, mut s: S) -> Result { - let mut len = 0; - len += (self.command.clone() as u8).encode(&mut s)?; - len += self.payload.encode(&mut s)?; - Ok(len) - } -} - -impl Decodable for GatewayMessage { - fn decode(mut d: D) -> Result { - let command_code: u8 = Decodable::decode(&mut d)?; - let command = match command_code { - 0 => GatewayCommand::PutSlab, - 1 => GatewayCommand::GetSlab, - _ => GatewayCommand::GetLastIndex, - }; - - Ok(Self { command, payload: Decodable::decode(&mut d)? }) - } -} - -impl net::Message for GatewayMessage { - fn name() -> &'static str { - "reply" - } -} diff --git a/src/node/service/mod.rs b/src/node/service/mod.rs deleted file mode 100644 index 9505c99e0..000000000 --- a/src/node/service/mod.rs +++ /dev/null @@ -1,5 +0,0 @@ -pub mod gateway; -pub mod gateway_p2p; -pub mod reqrep; - -pub use gateway::{GatewayClient, GatewayService, GatewaySlabsSubscriber}; diff --git a/src/node/service/reqrep.rs b/src/node/service/reqrep.rs deleted file mode 100644 index 382666225..000000000 --- a/src/node/service/reqrep.rs +++ /dev/null @@ -1,394 +0,0 @@ -use std::{io, net::SocketAddr, sync::Arc}; - -use async_executor::Executor; -use async_std::prelude::*; -use bytes::Bytes; -use futures::FutureExt; -use log::*; -use rand::Rng; -use signal_hook::consts::SIGINT; -use signal_hook_async_std::Signals; -use zeromq::*; - -use crate::{ - util::serial::{deserialize, serialize, Decodable, Encodable}, - Result, -}; - -pub type PeerId = Vec; - -pub type Channels = - (async_channel::Sender<(PeerId, Reply)>, async_channel::Receiver<(PeerId, Request)>); - -enum NetEvent { - Receive(zeromq::ZmqMessage), - Send((PeerId, Reply)), - Stop, -} - -pub fn addr_to_string(addr: SocketAddr) -> String { - format!("tcp://{}", addr) -} - -pub struct RepProtocol { - addr: SocketAddr, - socket: zeromq::RouterSocket, - recv_queue: async_channel::Receiver<(PeerId, Reply)>, - send_queue: async_channel::Sender<(PeerId, Request)>, - channels: Channels, - service_name: String, -} - -impl RepProtocol { - pub fn new(addr: SocketAddr, service_name: String) -> RepProtocol { - let socket = zeromq::RouterSocket::new(); - let (send_queue, recv_channel) = async_channel::unbounded::<(PeerId, Request)>(); - let (send_channel, recv_queue) = async_channel::unbounded::<(PeerId, Reply)>(); - - let channels = (send_channel, recv_channel); - - RepProtocol { addr, socket, recv_queue, send_queue, channels, service_name } - } - - pub async fn start( - &mut self, - ) -> Result<(async_channel::Sender<(PeerId, Reply)>, async_channel::Receiver<(PeerId, Request)>)> - { - let addr = addr_to_string(self.addr); - self.socket.bind(addr.as_str()).await?; - debug!(target: "REP PROTOCOL API", "{} SERVICE: Bound To {}", self.service_name, addr); - Ok(self.channels.clone()) - } - - pub async fn run(&mut self, executor: Arc>) -> Result<()> { - debug!(target: "REP PROTOCOL API", "{} SERVICE: Running", self.service_name); - - let (stop_s, stop_r) = async_channel::unbounded::<()>(); - - let signals = Signals::new(&[SIGINT])?; - let handle = signals.handle(); - - let signals_task = executor.spawn(async move { - let mut signals = signals.fuse(); - while let Some(signal) = signals.next().await { - match signal { - SIGINT => { - stop_s.send(()).await?; - break - } - _ => unreachable!(), - } - } - Ok::<(), crate::Error>(()) - }); - - loop { - let event = futures::select! { - msg = self.socket.recv().fuse() => NetEvent::Receive(msg?), - msg = self.recv_queue.recv().fuse() => NetEvent::Send(msg?), - _ = stop_r.recv().fuse() => NetEvent::Stop - }; - - match event { - NetEvent::Receive(msg) => { - if let Some(peer) = msg.get(0) { - if let Some(request) = msg.get(1) { - let request: Vec = request.to_vec(); - let request: Request = deserialize(&request)?; - self.send_queue.send((peer.to_vec(), request)).await?; - } - } - } - NetEvent::Send((peer, reply)) => { - let peer = Bytes::from(peer); - let mut msg: Vec = vec![peer]; - let reply: Vec = serialize(&reply); - let reply = Bytes::from(reply); - msg.push(reply); - - let reply = zeromq::ZmqMessage::try_from(msg) - .map_err(|_| crate::Error::TryFromError)?; - - self.socket.send(reply).await?; - } - NetEvent::Stop => break, - } - } - - handle.close(); - signals_task.await?; - - debug!(target: "REP PROTOCOL API","{} SERVICE: Stopped", self.service_name); - Ok(()) - } -} - -pub struct ReqProtocol { - addr: SocketAddr, - socket: zeromq::DealerSocket, - service_name: String, -} - -impl ReqProtocol { - pub fn new(addr: SocketAddr, service_name: String) -> ReqProtocol { - let socket = zeromq::DealerSocket::new(); - ReqProtocol { addr, socket, service_name } - } - - pub async fn start(&mut self) -> Result<()> { - let addr = addr_to_string(self.addr); - self.socket.connect(addr.as_str()).await?; - debug!(target: "REQ PROTOCOL API","{} SERVICE: Connected To {}", self.service_name, self.addr); - Ok(()) - } - - pub async fn request( - &mut self, - command: u8, - data: Vec, - handle_error: Arc, - ) -> Result>> { - let request = Request::new(command, data); - let req = serialize(&request); - let req = bytes::Bytes::from(req); - let req: zeromq::ZmqMessage = req.into(); - - self.socket.send(req).await?; - debug!( - target: "REQ PROTOCOL API", - "{} SERVICE: Sent Request {{ command: {} }}", - self.service_name, command - ); - - let rep: zeromq::ZmqMessage = self.socket.recv().await?; - if let Some(reply) = rep.get(0) { - let reply: Vec = reply.to_vec(); - - let reply: Reply = deserialize(&reply)?; - - debug!( - target: "REQ PROTOCOL API", - "{} SERVICE: Received Reply {{ error: {} }}", - self.service_name, - reply.has_error() - ); - - if reply.has_error() { - handle_error(reply.get_error()); - return Ok(None) - } - - if reply.get_id() != request.get_id() { - warn!("Reply id is not equal to Request id"); - return Ok(None) - } - - Ok(Some(reply.get_payload())) - } else { - Err(crate::Error::ZmqError("Couldn't parse ZmqMessage".to_string())) - } - } -} - -pub struct Publisher { - addr: SocketAddr, - socket: zeromq::PubSocket, - service_name: String, -} - -impl Publisher { - pub fn new(addr: SocketAddr, service_name: String) -> Publisher { - let socket = zeromq::PubSocket::new(); - Publisher { addr, socket, service_name } - } - - pub async fn start(&mut self, recv_queue: async_channel::Receiver>) -> Result<()> { - let addr = addr_to_string(self.addr); - self.socket.bind(addr.as_str()).await?; - debug!( - target: "PUBLISHER API", - "{} SERVICE : Bound To {}", - self.service_name, addr - ); - loop { - let msg = recv_queue.recv().await?; - self.publish(msg).await?; - } - } - - async fn publish(&mut self, data: Vec) -> Result<()> { - let data = Bytes::from(data); - self.socket.send(data.into()).await?; - Ok(()) - } -} - -pub struct Subscriber { - addr: SocketAddr, - socket: zeromq::SubSocket, - service_name: String, -} - -impl Subscriber { - pub fn new(addr: SocketAddr, service_name: String) -> Subscriber { - let socket = zeromq::SubSocket::new(); - Subscriber { addr, socket, service_name } - } - - pub async fn start(&mut self) -> Result<()> { - let addr = addr_to_string(self.addr); - self.socket.connect(addr.as_str()).await?; - - self.socket.subscribe("").await?; - debug!( - target: "SUBSCRIBER API", - "{} SERVICE : Connected To {}", - self.service_name, addr - ); - Ok(()) - } - - pub async fn fetch(&mut self) -> Result { - let data = self.socket.recv().await?; - match data.get(0) { - Some(d) => { - let data = d.to_vec(); - let data: T = deserialize(&data)?; - Ok(data) - } - None => Err(crate::Error::ZmqError("Couldn't parse ZmqMessage".to_string())), - } - } -} - -#[derive(Debug, PartialEq)] -pub struct Request { - command: u8, - id: u32, - payload: Vec, -} - -impl Request { - pub fn new(command: u8, payload: Vec) -> Request { - let id = Self::gen_id(); - Request { command, id, payload } - } - fn gen_id() -> u32 { - let mut rng = rand::thread_rng(); - rng.gen() - } - - pub fn get_id(&self) -> u32 { - self.id - } - - pub fn get_command(&self) -> u8 { - self.command - } - - pub fn get_payload(&self) -> Vec { - self.payload.clone() - } -} - -#[derive(Debug, PartialEq)] -pub struct Reply { - id: u32, - error: u32, - payload: Vec, -} - -impl Reply { - pub fn from(request: &Request, error: u32, payload: Vec) -> Reply { - Reply { id: request.get_id(), error, payload } - } - - pub fn has_error(&self) -> bool { - self.error != 0 - } - - pub fn get_error(&self) -> u32 { - self.error - } - - pub fn get_payload(&self) -> Vec { - self.payload.clone() - } - - pub fn set_payload(&mut self, payload: Vec) { - self.payload = payload; - } - - pub fn set_error(&mut self, error: u32) { - self.error = error; - } - - pub fn get_id(&self) -> u32 { - self.id - } -} - -impl Encodable for Request { - fn encode(&self, mut s: S) -> Result { - let mut len = 0; - len += self.command.encode(&mut s)?; - len += self.id.encode(&mut s)?; - len += self.payload.encode(&mut s)?; - Ok(len) - } -} - -impl Encodable for Reply { - fn encode(&self, mut s: S) -> Result { - let mut len = 0; - len += self.id.encode(&mut s)?; - len += self.error.encode(&mut s)?; - len += self.payload.encode(&mut s)?; - Ok(len) - } -} - -impl Decodable for Request { - fn decode(mut d: D) -> Result { - Ok(Self { - command: Decodable::decode(&mut d)?, - id: Decodable::decode(&mut d)?, - payload: Decodable::decode(&mut d)?, - }) - } -} - -impl Decodable for Reply { - fn decode(mut d: D) -> Result { - Ok(Self { - id: Decodable::decode(&mut d)?, - error: Decodable::decode(&mut d)?, - payload: Decodable::decode(&mut d)?, - }) - } -} - -#[cfg(test)] -mod tests { - use super::{Reply, Request, Result}; - use crate::util::serial::{deserialize, serialize}; - - #[test] - fn serialize_and_deserialize_request_test() { - let request = Request::new(2, vec![2, 3, 4, 6, 4]); - let serialized_request = serialize(&request); - assert!((deserialize(&serialized_request) as Result).is_err()); - let deserialized_request = deserialize(&serialized_request).ok(); - assert_eq!(deserialized_request, Some(request)); - } - - #[test] - fn serialize_and_deserialize_reply_test() { - let request = Request::new(2, vec![2, 3, 4, 6, 4]); - let reply = Reply::from(&request, 0, vec![2, 3, 4, 6, 4]); - let serialized_reply = serialize(&reply); - assert!((deserialize(&serialized_reply) as Result).is_err()); - let deserialized_reply = deserialize(&serialized_reply).ok(); - assert_eq!(deserialized_reply, Some(reply)); - } -}