implement gateway client

This commit is contained in:
ghassmo
2021-05-14 17:47:55 +03:00
parent 182cf36bbc
commit cd213f2689
4 changed files with 81 additions and 17 deletions

View File

@@ -2,10 +2,9 @@ use std::fmt;
use crate::net::error::NetError;
use crate::vm::ZKVMError;
use crate::service::ServicesError;
use rusqlite;
use zeromq;
pub type Result<T> = std::result::Result<T, Error>;
#[derive(Debug)]
@@ -36,7 +35,6 @@ pub enum Error {
VMError(ZKVMError),
BadContract,
Groth16Error(bellman::SynthesisError),
ZMQError(zeromq::ZmqError),
RusqliteError(rusqlite::Error),
OperationFailed,
ConnectFailed,
@@ -46,6 +44,8 @@ pub enum Error {
ServiceStopped,
Utf8Error,
NoteDecryptionFailed,
ServicesError(ServicesError),
ZMQError(zeromq::ZmqError)
}
impl std::error::Error for Error {}
@@ -81,7 +81,6 @@ impl fmt::Display for Error {
Error::VMError(_) => f.write_str("VM error"),
Error::BadContract => f.write_str("Contract is poorly defined"),
Error::Groth16Error(ref err) => write!(f, "groth16 error: {}", err),
Error::ZMQError(ref err) => write!(f, "ZMQ error: {}", err),
Error::RusqliteError(ref err) => write!(f, "Rusqlite error: {}", err),
Error::OperationFailed => f.write_str("Operation failed"),
Error::ConnectFailed => f.write_str("Connection failed"),
@@ -91,21 +90,31 @@ impl fmt::Display for Error {
Error::ServiceStopped => f.write_str("Service stopped"),
Error::Utf8Error => f.write_str("Malformed UTF8"),
Error::NoteDecryptionFailed => f.write_str("Unable to decrypt mint note"),
Error::ServicesError(ref err) => write!(f, "Services error: {}", err),
Error::ZMQError(ref err) => write!(f, "zmq error: {}", err),
}
}
}
impl From<ServicesError> for Error {
fn from(err: ServicesError) -> Error {
Error::ServicesError(err)
}
}
impl From<zeromq::ZmqError> for Error {
fn from(err: zeromq::ZmqError) -> Error {
Error::ZMQError(err)
}
}
impl From<std::io::Error> for Error {
fn from(err: std::io::Error) -> Error {
Error::Io(err)
}
}
impl From<zeromq::ZmqError> for Error {
fn from(err: zeromq::ZmqError) -> Error {
Error::ZMQError(err)
}
}
impl From<rusqlite::Error> for Error {
fn from(err: rusqlite::Error) -> Error {

View File

@@ -1,4 +1,7 @@
use std::convert::TryInto;
use super::reqrep::{Reply, Request};
use super::ServicesError;
use crate::serial::{deserialize, serialize};
use crate::Result;
@@ -10,7 +13,9 @@ use zeromq::*;
pub type Slabs = Vec<Vec<u8>>;
pub struct GatewayService;
pub struct GatewayService{
slabs: Slabs,
}
enum NetEvent {
RECEIVE(zeromq::ZmqMessage),
@@ -35,7 +40,7 @@ impl GatewayService {
NetEvent::RECEIVE(request) => {
ex2.spawn(Self::handle_request(send_queue_s.clone(), request))
.detach();
}
}
NetEvent::SEND(reply) => {
worker.send(reply).await?;
}
@@ -66,20 +71,54 @@ impl GatewayService {
struct GatewayClient {
slabs: Slabs,
sender: zeromq::ReqSocket,
}
impl GatewayClient {
pub fn new() -> GatewayClient {
GatewayClient { slabs: vec![] }
let sender = zeromq::ReqSocket::new();
GatewayClient { slabs: vec![], sender}
}
pub async fn start() {}
pub async fn start(&mut self) -> Result<()> {
pub async fn get_slab(index: u32) -> Vec<u8> {
vec![]
self.sender.connect("tcp://127.0.0.1:3333").await?;
Ok(())
}
async fn request(&mut self, command: GatewayCommand, data: Vec<u8>) -> Result<Vec<u8>> {
let request = Request::new(command as u8, data);
let req = serialize(&request);
let req = bytes::Bytes::from(req);
self.sender.send(req.into()).await?;
let rep: zeromq::ZmqMessage = self.sender.recv().await?;
let rep: &Bytes = rep.get(0).unwrap();
let rep: Vec<u8> = rep.to_vec();
let reply: Reply = deserialize(&rep)?;
if reply.has_error() {
return Err(ServicesError::ResonseError("response has an error").into());
}
assert!(reply.get_id() == request.get_id());
Ok(reply.get_payload())
}
pub async fn put_slab(&mut self, data: Vec<u8>) {
self.slabs.push(data);
pub async fn get_slab(&mut self, index: u32) -> Result<Vec<u8>> {
self.request(GatewayCommand::GETSLAB, index.to_be_bytes().to_vec()).await
}
pub async fn put_slab(&mut self, data: Vec<u8>) -> Result<()>{
self.request(GatewayCommand::GETSLAB, data).await?;
Ok(())
}
pub async fn get_last_index(&mut self) -> Result<u32>{
let rep = self.request(GatewayCommand::GETLASTINDEX, vec![]).await?;
let rep: [u8; 4] = rep.try_into().unwrap();
Ok(u32::from_be_bytes(rep))
}
}

View File

@@ -1,2 +1,6 @@
pub mod gateway;
pub mod reqrep;
mod error;
pub use error::ServicesError;

View File

@@ -86,6 +86,18 @@ impl Reply {
payload,
}
}
pub fn has_error(&self) -> bool {
if self.error == 0 { false } else { true }
}
pub fn get_payload(&self) -> Vec<u8> {
self.payload.clone()
}
pub fn get_id(&self) -> u32 {
self.id
}
}
impl Encodable for Request {