From 248e15242374f2f8fcd477f7c7328ab7306eea8d Mon Sep 17 00:00:00 2001 From: ghassmo Date: Fri, 14 May 2021 14:57:01 +0300 Subject: [PATCH] switch to zmq.rs library --- Cargo.toml | 4 ++- src/bin/demoservices.rs | 33 +++++++++++++++++++ src/bin/demowallet.rs | 39 ++++++++++------------ src/error.rs | 8 ++--- src/service/gateway.rs | 73 ++++++++++++++++++++++++----------------- src/service/reqrep.rs | 69 ++++++++++++++++---------------------- 6 files changed, 130 insertions(+), 96 deletions(-) create mode 100644 src/bin/demoservices.rs diff --git a/Cargo.toml b/Cargo.toml index 111d4ca5e..722bfd91d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -80,7 +80,9 @@ tobj = "2.0.4" fs_extra = "1.2" glob = "0.3" -async_zmq = "0.3.2" +# zmq +zeromq = { git="https://github.com/zeromq/zmq.rs", default-features = false, features = ["async-std-runtime", "all-transport"] } +bytes = "1.0.1" # wallet deps rocksdb = "0.16.0" diff --git a/src/bin/demoservices.rs b/src/bin/demoservices.rs new file mode 100644 index 000000000..4a8f86f6e --- /dev/null +++ b/src/bin/demoservices.rs @@ -0,0 +1,33 @@ +use async_executor::Executor; +use easy_parallel::Parallel; +use std::sync::Arc; + +use sapvi::Result; + +use sapvi::service::{gateway, reqrep}; + +async fn start(executor: Arc>) -> Result<()> { + executor.clone().spawn(reqrep::ReqRepAPI::start()).detach(); + gateway::GatewayService::start(executor.clone()).await?; + Ok(()) +} + +fn main() -> Result<()> { + let ex = Arc::new(Executor::new()); + let (signal, shutdown) = async_channel::unbounded::<()>(); + let ex2 = ex.clone(); + + let (_, result) = Parallel::new() + // Run four executor threads. + .each(0..3, |_| smol::future::block_on(ex.run(shutdown.recv()))) + // Run the main future on the current thread. + .finish(|| { + smol::future::block_on(async move { + start(ex2).await?; + drop(signal); + Ok::<(), sapvi::Error>(()) + }) + }); + + result +} diff --git a/src/bin/demowallet.rs b/src/bin/demowallet.rs index 9c510907c..94df02dc6 100644 --- a/src/bin/demowallet.rs +++ b/src/bin/demowallet.rs @@ -1,33 +1,30 @@ -//! cargo run --example request --features="rt-tokio" --no-default-features - -use async_zmq::zmq; -use sapvi::serial; use sapvi::service::reqrep::{Reply, Request}; +use sapvi::{serial, Result}; -fn connect() { - let context = zmq::Context::new(); - let requester = context.socket(zmq::REQ).unwrap(); - requester - .connect("tcp://127.0.0.1:3333") - .expect("failed to connect requester"); +use bytes::Bytes; +use zeromq::*; + +async fn connect() -> Result<()> { + let mut requester = zeromq::ReqSocket::new(); + requester.connect("tcp://127.0.0.1:3333").await?; + + println!("connected") ; for request_nbr in 0..10 { + println!("start sending"); let req = Request::new(0, "test".as_bytes().to_vec()); let req = serial::serialize(&req); - requester.send(req, 0).unwrap(); - let message = requester.recv_msg(0).unwrap(); + let req = bytes::Bytes::from(req); + requester.send(req.into()).await?; + let message: zeromq::ZmqMessage = requester.recv().await?; + let message: &Bytes = message.get(0).unwrap(); + let message: Vec = message.to_vec(); let rep: Reply = serial::deserialize(&message).unwrap(); println!("Received reply {:?} {:?}", request_nbr, rep); } + Ok(()) } -fn main() { - let mut thread_pools = vec![]; - for _ in 0..20 { - let t = std::thread::spawn(connect); - thread_pools.push(t); - } - for t in thread_pools { - t.join().unwrap(); - } +fn main() { + futures::executor::block_on(connect()).unwrap(); } diff --git a/src/error.rs b/src/error.rs index 7109983d9..b4d2ecd46 100644 --- a/src/error.rs +++ b/src/error.rs @@ -4,7 +4,7 @@ use crate::net::error::NetError; use crate::vm::ZKVMError; use rusqlite; -use async_zmq::zmq; +use zeromq; pub type Result = std::result::Result; @@ -36,7 +36,7 @@ pub enum Error { VMError(ZKVMError), BadContract, Groth16Error(bellman::SynthesisError), - ZMQError(zmq::Error), + ZMQError(zeromq::ZmqError), RusqliteError(rusqlite::Error), OperationFailed, ConnectFailed, @@ -101,8 +101,8 @@ impl From for Error { } } -impl From for Error { - fn from(err: zmq::Error) -> Error { +impl From for Error { + fn from(err: zeromq::ZmqError) -> Error { Error::ZMQError(err) } } diff --git a/src/service/gateway.rs b/src/service/gateway.rs index 03e5e6bd8..0161fd858 100644 --- a/src/service/gateway.rs +++ b/src/service/gateway.rs @@ -1,35 +1,34 @@ -use image::EncodableLayout; - use super::reqrep::{Reply, Request}; use crate::serial::{deserialize, serialize}; use crate::Result; use async_executor::Executor; use async_std::sync::Arc; -use async_zmq; +use bytes::Bytes; use futures::FutureExt; +use zeromq::*; + +pub type Slabs = Vec>; pub struct GatewayService; enum NetEvent { - RECEIVE(async_zmq::Multipart), - SEND(async_zmq::Multipart), + RECEIVE(zeromq::ZmqMessage), + SEND(zeromq::ZmqMessage), } impl GatewayService { - pub async fn start(executor: Arc>) { - let mut worker = async_zmq::reply("tcp://127.0.0.1:4444") - .unwrap() - .connect() - .unwrap(); + pub async fn start(executor: Arc>) -> Result<()> { + let mut worker = zeromq::RepSocket::new(); + worker.connect("tcp://127.0.0.1:4444").await?; - let (send_queue_s, send_queue_r) = async_channel::unbounded::(); + let (send_queue_s, send_queue_r) = async_channel::unbounded::(); let ex2 = executor.clone(); loop { let event = futures::select! { - request = worker.recv().fuse() => NetEvent::RECEIVE(request.unwrap()), - reply = send_queue_r.recv().fuse() => NetEvent::SEND(reply.unwrap()) + request = worker.recv().fuse() => NetEvent::RECEIVE(request?), + reply = send_queue_r.recv().fuse() => NetEvent::SEND(reply?) }; match event { @@ -38,37 +37,51 @@ impl GatewayService { .detach(); } NetEvent::SEND(reply) => { - worker.send(reply).await.unwrap(); + worker.send(reply).await?; } } } } async fn handle_request( - send_queue: async_channel::Sender, - request: async_zmq::Multipart, + send_queue: async_channel::Sender, + request: zeromq::ZmqMessage, ) -> Result<()> { - let mut messages = vec![]; - for req in request.iter() { - let req = req.as_bytes(); - let req: Request = deserialize(req).unwrap(); + let request: &Bytes = request.get(0).unwrap(); + let request: Vec = request.to_vec(); + let req: Request = deserialize(&request)?; - // TODO - // do things + // TODO + // do things - println!("Gateway service received a msg {:?}", req); + println!("Gateway service received a msg {:?}", req); - let rep = Reply::from(&req, 0, "text".as_bytes().to_vec()); - let rep = serialize(&rep); - let msg = async_zmq::Message::from(rep); - messages.push(msg); - } - send_queue.send(messages).await?; + let rep = Reply::from(&req, 0, "text".as_bytes().to_vec()); + let rep: Vec = serialize(&rep); + let rep = Bytes::from(rep); + send_queue.send(rep.into()).await?; Ok(()) } } -struct GatewayClient; +struct GatewayClient { + slabs: Slabs, +} + +impl GatewayClient { + pub fn new() -> GatewayClient { + GatewayClient { slabs: vec![] } + } + pub async fn start() {} + + pub async fn get_slab(index: u32) -> Vec { + vec![] + } + + pub async fn put_slab(&mut self, data: Vec) { + self.slabs.push(data); + } +} #[repr(u8)] enum GatewayCommand { diff --git a/src/service/reqrep.rs b/src/service/reqrep.rs index 002f0ea2f..d88e0709e 100644 --- a/src/service/reqrep.rs +++ b/src/service/reqrep.rs @@ -2,56 +2,45 @@ use std::io; use crate::{Decodable, Encodable, Result}; -use async_zmq::zmq; +use futures::FutureExt; use rand::Rng; +use zeromq::*; pub struct ReqRepAPI; impl ReqRepAPI { - pub async fn start() { - let context = zmq::Context::new(); - let frontend = context.socket(zmq::ROUTER).unwrap(); - let backend = context.socket(zmq::DEALER).unwrap(); + pub async fn start() -> Result<()> { + println!("start reqrep"); - frontend - .bind("tcp://127.0.0.1:3333") - .expect("failed binding frontend"); - backend - .bind("tcp://127.0.0.1:4444") - .expect("failed binding backend"); + let mut frontend = zeromq::RouterSocket::new(); + frontend.bind("tcp://127.0.0.1:3333").await?; + let mut backend = zeromq::DealerSocket::new(); + backend.bind("tcp://127.0.0.1:4444").await?; loop { - let mut items = [ - frontend.as_poll_item(zmq::POLLIN), - backend.as_poll_item(zmq::POLLIN), - ]; - - zmq::poll(&mut items, -1).unwrap(); - - if items[0].is_readable() { - loop { - let message = frontend.recv_msg(0).unwrap(); - let more = message.get_more(); - backend - .send(message, if more { zmq::SNDMORE } else { 0 }) - .unwrap(); - if !more { - break; + println!("start reqrep loop"); + futures::select! { + frontend_mess = frontend.recv().fuse() => { + match frontend_mess { + Ok(message) => { + backend.send(message).await?; + } + Err(_) => { + // TODO + } + } + }, + backend_mess = backend.recv().fuse() => { + match backend_mess { + Ok(message) => { + frontend.send(message).await?; + } + Err(_) => { + // TODO + } } } - } - if items[1].is_readable() { - loop { - let message = backend.recv_msg(0).unwrap(); - let more = message.get_more(); - frontend - .send(message, if more { zmq::SNDMORE } else { 0 }) - .unwrap(); - if !more { - break; - } - } - } + }; } } }