From cd213f26898f4a51eaa2173137bbff57a03f14b7 Mon Sep 17 00:00:00 2001 From: ghassmo Date: Fri, 14 May 2021 17:47:55 +0300 Subject: [PATCH] implement gateway client --- src/error.rs | 27 ++++++++++++++------- src/service/gateway.rs | 55 ++++++++++++++++++++++++++++++++++++------ src/service/mod.rs | 4 +++ src/service/reqrep.rs | 12 +++++++++ 4 files changed, 81 insertions(+), 17 deletions(-) diff --git a/src/error.rs b/src/error.rs index b4d2ecd46..aa1e0bfaa 100644 --- a/src/error.rs +++ b/src/error.rs @@ -2,10 +2,9 @@ use std::fmt; use crate::net::error::NetError; use crate::vm::ZKVMError; +use crate::service::ServicesError; use rusqlite; -use zeromq; - pub type Result = std::result::Result; #[derive(Debug)] @@ -36,7 +35,6 @@ pub enum Error { VMError(ZKVMError), BadContract, Groth16Error(bellman::SynthesisError), - ZMQError(zeromq::ZmqError), RusqliteError(rusqlite::Error), OperationFailed, ConnectFailed, @@ -46,6 +44,8 @@ pub enum Error { ServiceStopped, Utf8Error, NoteDecryptionFailed, + ServicesError(ServicesError), + ZMQError(zeromq::ZmqError) } impl std::error::Error for Error {} @@ -81,7 +81,6 @@ impl fmt::Display for Error { Error::VMError(_) => f.write_str("VM error"), Error::BadContract => f.write_str("Contract is poorly defined"), Error::Groth16Error(ref err) => write!(f, "groth16 error: {}", err), - Error::ZMQError(ref err) => write!(f, "ZMQ error: {}", err), Error::RusqliteError(ref err) => write!(f, "Rusqlite error: {}", err), Error::OperationFailed => f.write_str("Operation failed"), Error::ConnectFailed => f.write_str("Connection failed"), @@ -91,21 +90,31 @@ impl fmt::Display for Error { Error::ServiceStopped => f.write_str("Service stopped"), Error::Utf8Error => f.write_str("Malformed UTF8"), Error::NoteDecryptionFailed => f.write_str("Unable to decrypt mint note"), + Error::ServicesError(ref err) => write!(f, "Services error: {}", err), + Error::ZMQError(ref err) => write!(f, "zmq error: {}", err), } } } + +impl From for Error { + fn from(err: ServicesError) -> Error { + Error::ServicesError(err) + } +} + +impl From for Error { + fn from(err: zeromq::ZmqError) -> Error { + Error::ZMQError(err) + } +} + impl From for Error { fn from(err: std::io::Error) -> Error { Error::Io(err) } } -impl From for Error { - fn from(err: zeromq::ZmqError) -> Error { - Error::ZMQError(err) - } -} impl From for Error { fn from(err: rusqlite::Error) -> Error { diff --git a/src/service/gateway.rs b/src/service/gateway.rs index 0161fd858..c424a4d8c 100644 --- a/src/service/gateway.rs +++ b/src/service/gateway.rs @@ -1,4 +1,7 @@ +use std::convert::TryInto; + use super::reqrep::{Reply, Request}; +use super::ServicesError; use crate::serial::{deserialize, serialize}; use crate::Result; @@ -10,7 +13,9 @@ use zeromq::*; pub type Slabs = Vec>; -pub struct GatewayService; +pub struct GatewayService{ + slabs: Slabs, +} enum NetEvent { RECEIVE(zeromq::ZmqMessage), @@ -35,7 +40,7 @@ impl GatewayService { NetEvent::RECEIVE(request) => { ex2.spawn(Self::handle_request(send_queue_s.clone(), request)) .detach(); - } + } NetEvent::SEND(reply) => { worker.send(reply).await?; } @@ -66,20 +71,54 @@ impl GatewayService { struct GatewayClient { slabs: Slabs, + sender: zeromq::ReqSocket, } impl GatewayClient { pub fn new() -> GatewayClient { - GatewayClient { slabs: vec![] } + let sender = zeromq::ReqSocket::new(); + GatewayClient { slabs: vec![], sender} } - pub async fn start() {} + pub async fn start(&mut self) -> Result<()> { - pub async fn get_slab(index: u32) -> Vec { - vec![] + self.sender.connect("tcp://127.0.0.1:3333").await?; + Ok(()) + + } + async fn request(&mut self, command: GatewayCommand, data: Vec) -> Result> { + let request = Request::new(command as u8, data); + let req = serialize(&request); + let req = bytes::Bytes::from(req); + + self.sender.send(req.into()).await?; + + let rep: zeromq::ZmqMessage = self.sender.recv().await?; + let rep: &Bytes = rep.get(0).unwrap(); + let rep: Vec = rep.to_vec(); + + let reply: Reply = deserialize(&rep)?; + + if reply.has_error() { + return Err(ServicesError::ResonseError("response has an error").into()); + } + + assert!(reply.get_id() == request.get_id()); + + Ok(reply.get_payload()) } - pub async fn put_slab(&mut self, data: Vec) { - self.slabs.push(data); + pub async fn get_slab(&mut self, index: u32) -> Result> { + self.request(GatewayCommand::GETSLAB, index.to_be_bytes().to_vec()).await + } + + pub async fn put_slab(&mut self, data: Vec) -> Result<()>{ + self.request(GatewayCommand::GETSLAB, data).await?; + Ok(()) + } + pub async fn get_last_index(&mut self) -> Result{ + let rep = self.request(GatewayCommand::GETLASTINDEX, vec![]).await?; + let rep: [u8; 4] = rep.try_into().unwrap(); + Ok(u32::from_be_bytes(rep)) } } diff --git a/src/service/mod.rs b/src/service/mod.rs index 94f34055d..66ab79e0b 100644 --- a/src/service/mod.rs +++ b/src/service/mod.rs @@ -1,2 +1,6 @@ pub mod gateway; pub mod reqrep; +mod error; + + +pub use error::ServicesError; diff --git a/src/service/reqrep.rs b/src/service/reqrep.rs index d88e0709e..456949ab0 100644 --- a/src/service/reqrep.rs +++ b/src/service/reqrep.rs @@ -86,6 +86,18 @@ impl Reply { payload, } } + + pub fn has_error(&self) -> bool { + if self.error == 0 { false } else { true } + } + + pub fn get_payload(&self) -> Vec { + self.payload.clone() + } + + pub fn get_id(&self) -> u32 { + self.id + } } impl Encodable for Request {