switch to zmq.rs library

This commit is contained in:
ghassmo
2021-05-14 14:57:01 +03:00
parent d84a84fc65
commit 248e152423
6 changed files with 130 additions and 96 deletions

View File

@@ -80,7 +80,9 @@ tobj = "2.0.4"
fs_extra = "1.2"
glob = "0.3"
async_zmq = "0.3.2"
# zmq
zeromq = { git="https://github.com/zeromq/zmq.rs", default-features = false, features = ["async-std-runtime", "all-transport"] }
bytes = "1.0.1"
# wallet deps
rocksdb = "0.16.0"

33
src/bin/demoservices.rs Normal file
View File

@@ -0,0 +1,33 @@
use async_executor::Executor;
use easy_parallel::Parallel;
use std::sync::Arc;
use sapvi::Result;
use sapvi::service::{gateway, reqrep};
async fn start(executor: Arc<Executor<'_>>) -> Result<()> {
executor.clone().spawn(reqrep::ReqRepAPI::start()).detach();
gateway::GatewayService::start(executor.clone()).await?;
Ok(())
}
fn main() -> Result<()> {
let ex = Arc::new(Executor::new());
let (signal, shutdown) = async_channel::unbounded::<()>();
let ex2 = ex.clone();
let (_, result) = Parallel::new()
// Run four executor threads.
.each(0..3, |_| smol::future::block_on(ex.run(shutdown.recv())))
// Run the main future on the current thread.
.finish(|| {
smol::future::block_on(async move {
start(ex2).await?;
drop(signal);
Ok::<(), sapvi::Error>(())
})
});
result
}

View File

@@ -1,33 +1,30 @@
//! cargo run --example request --features="rt-tokio" --no-default-features
use async_zmq::zmq;
use sapvi::serial;
use sapvi::service::reqrep::{Reply, Request};
use sapvi::{serial, Result};
fn connect() {
let context = zmq::Context::new();
let requester = context.socket(zmq::REQ).unwrap();
requester
.connect("tcp://127.0.0.1:3333")
.expect("failed to connect requester");
use bytes::Bytes;
use zeromq::*;
async fn connect() -> Result<()> {
let mut requester = zeromq::ReqSocket::new();
requester.connect("tcp://127.0.0.1:3333").await?;
println!("connected") ;
for request_nbr in 0..10 {
println!("start sending");
let req = Request::new(0, "test".as_bytes().to_vec());
let req = serial::serialize(&req);
requester.send(req, 0).unwrap();
let message = requester.recv_msg(0).unwrap();
let req = bytes::Bytes::from(req);
requester.send(req.into()).await?;
let message: zeromq::ZmqMessage = requester.recv().await?;
let message: &Bytes = message.get(0).unwrap();
let message: Vec<u8> = message.to_vec();
let rep: Reply = serial::deserialize(&message).unwrap();
println!("Received reply {:?} {:?}", request_nbr, rep);
}
Ok(())
}
fn main() {
let mut thread_pools = vec![];
for _ in 0..20 {
let t = std::thread::spawn(connect);
thread_pools.push(t);
}
for t in thread_pools {
t.join().unwrap();
}
fn main() {
futures::executor::block_on(connect()).unwrap();
}

View File

@@ -4,7 +4,7 @@ use crate::net::error::NetError;
use crate::vm::ZKVMError;
use rusqlite;
use async_zmq::zmq;
use zeromq;
pub type Result<T> = std::result::Result<T, Error>;
@@ -36,7 +36,7 @@ pub enum Error {
VMError(ZKVMError),
BadContract,
Groth16Error(bellman::SynthesisError),
ZMQError(zmq::Error),
ZMQError(zeromq::ZmqError),
RusqliteError(rusqlite::Error),
OperationFailed,
ConnectFailed,
@@ -101,8 +101,8 @@ impl From<std::io::Error> for Error {
}
}
impl From<zmq::Error> for Error {
fn from(err: zmq::Error) -> Error {
impl From<zeromq::ZmqError> for Error {
fn from(err: zeromq::ZmqError) -> Error {
Error::ZMQError(err)
}
}

View File

@@ -1,35 +1,34 @@
use image::EncodableLayout;
use super::reqrep::{Reply, Request};
use crate::serial::{deserialize, serialize};
use crate::Result;
use async_executor::Executor;
use async_std::sync::Arc;
use async_zmq;
use bytes::Bytes;
use futures::FutureExt;
use zeromq::*;
pub type Slabs = Vec<Vec<u8>>;
pub struct GatewayService;
enum NetEvent {
RECEIVE(async_zmq::Multipart),
SEND(async_zmq::Multipart),
RECEIVE(zeromq::ZmqMessage),
SEND(zeromq::ZmqMessage),
}
impl GatewayService {
pub async fn start(executor: Arc<Executor<'_>>) {
let mut worker = async_zmq::reply("tcp://127.0.0.1:4444")
.unwrap()
.connect()
.unwrap();
pub async fn start(executor: Arc<Executor<'_>>) -> Result<()> {
let mut worker = zeromq::RepSocket::new();
worker.connect("tcp://127.0.0.1:4444").await?;
let (send_queue_s, send_queue_r) = async_channel::unbounded::<async_zmq::Multipart>();
let (send_queue_s, send_queue_r) = async_channel::unbounded::<zeromq::ZmqMessage>();
let ex2 = executor.clone();
loop {
let event = futures::select! {
request = worker.recv().fuse() => NetEvent::RECEIVE(request.unwrap()),
reply = send_queue_r.recv().fuse() => NetEvent::SEND(reply.unwrap())
request = worker.recv().fuse() => NetEvent::RECEIVE(request?),
reply = send_queue_r.recv().fuse() => NetEvent::SEND(reply?)
};
match event {
@@ -38,37 +37,51 @@ impl GatewayService {
.detach();
}
NetEvent::SEND(reply) => {
worker.send(reply).await.unwrap();
worker.send(reply).await?;
}
}
}
}
async fn handle_request(
send_queue: async_channel::Sender<async_zmq::Multipart>,
request: async_zmq::Multipart,
send_queue: async_channel::Sender<zeromq::ZmqMessage>,
request: zeromq::ZmqMessage,
) -> Result<()> {
let mut messages = vec![];
for req in request.iter() {
let req = req.as_bytes();
let req: Request = deserialize(req).unwrap();
let request: &Bytes = request.get(0).unwrap();
let request: Vec<u8> = request.to_vec();
let req: Request = deserialize(&request)?;
// TODO
// do things
// TODO
// do things
println!("Gateway service received a msg {:?}", req);
println!("Gateway service received a msg {:?}", req);
let rep = Reply::from(&req, 0, "text".as_bytes().to_vec());
let rep = serialize(&rep);
let msg = async_zmq::Message::from(rep);
messages.push(msg);
}
send_queue.send(messages).await?;
let rep = Reply::from(&req, 0, "text".as_bytes().to_vec());
let rep: Vec<u8> = serialize(&rep);
let rep = Bytes::from(rep);
send_queue.send(rep.into()).await?;
Ok(())
}
}
struct GatewayClient;
struct GatewayClient {
slabs: Slabs,
}
impl GatewayClient {
pub fn new() -> GatewayClient {
GatewayClient { slabs: vec![] }
}
pub async fn start() {}
pub async fn get_slab(index: u32) -> Vec<u8> {
vec![]
}
pub async fn put_slab(&mut self, data: Vec<u8>) {
self.slabs.push(data);
}
}
#[repr(u8)]
enum GatewayCommand {

View File

@@ -2,56 +2,45 @@ use std::io;
use crate::{Decodable, Encodable, Result};
use async_zmq::zmq;
use futures::FutureExt;
use rand::Rng;
use zeromq::*;
pub struct ReqRepAPI;
impl ReqRepAPI {
pub async fn start() {
let context = zmq::Context::new();
let frontend = context.socket(zmq::ROUTER).unwrap();
let backend = context.socket(zmq::DEALER).unwrap();
pub async fn start() -> Result<()> {
println!("start reqrep");
frontend
.bind("tcp://127.0.0.1:3333")
.expect("failed binding frontend");
backend
.bind("tcp://127.0.0.1:4444")
.expect("failed binding backend");
let mut frontend = zeromq::RouterSocket::new();
frontend.bind("tcp://127.0.0.1:3333").await?;
let mut backend = zeromq::DealerSocket::new();
backend.bind("tcp://127.0.0.1:4444").await?;
loop {
let mut items = [
frontend.as_poll_item(zmq::POLLIN),
backend.as_poll_item(zmq::POLLIN),
];
zmq::poll(&mut items, -1).unwrap();
if items[0].is_readable() {
loop {
let message = frontend.recv_msg(0).unwrap();
let more = message.get_more();
backend
.send(message, if more { zmq::SNDMORE } else { 0 })
.unwrap();
if !more {
break;
println!("start reqrep loop");
futures::select! {
frontend_mess = frontend.recv().fuse() => {
match frontend_mess {
Ok(message) => {
backend.send(message).await?;
}
Err(_) => {
// TODO
}
}
},
backend_mess = backend.recv().fuse() => {
match backend_mess {
Ok(message) => {
frontend.send(message).await?;
}
Err(_) => {
// TODO
}
}
}
}
if items[1].is_readable() {
loop {
let message = backend.recv_msg(0).unwrap();
let more = message.get_more();
frontend
.send(message, if more { zmq::SNDMORE } else { 0 })
.unwrap();
if !more {
break;
}
}
}
};
}
}
}