cargo fmt

This commit is contained in:
ghassmo
2021-05-29 09:02:52 +03:00
parent b1c52b7ac2
commit b02bd11c18
2 changed files with 27 additions and 20 deletions

View File

@@ -1,9 +1,9 @@
use std::net::SocketAddr;
use async_executor::Executor;
use async_std::sync::Arc;
use easy_parallel::Parallel;
use std::net::SocketAddr;
use drk::service::{GatewayClient, ClientProgramOptions};
use drk::service::{ClientProgramOptions, GatewayClient};
use drk::{slab::Slab, Result};
fn setup_addr(address: Option<SocketAddr>, default: SocketAddr) -> SocketAddr {
@@ -13,13 +13,11 @@ fn setup_addr(address: Option<SocketAddr>, default: SocketAddr) -> SocketAddr {
}
}
async fn start(executor: Arc<Executor<'_>>, options: ClientProgramOptions) -> Result<()> {
let connect_addr: SocketAddr = setup_addr(options.connect_addr, "127.0.0.1:3333".parse()?);
let sub_addr: SocketAddr = setup_addr(options.sub_addr, "127.0.0.1:4444".parse()?);
let slabstore_path = options.slabstore_path.as_path();
// create gateway client
let mut client = GatewayClient::new(connect_addr, slabstore_path)?;
@@ -28,10 +26,7 @@ async fn start(executor: Arc<Executor<'_>>, options: ClientProgramOptions) -> Re
// start subscribe to gateway publisher
let slabstore = client.get_slabstore();
let subscriber_task = executor.spawn(GatewayClient::subscribe(
slabstore,
sub_addr,
));
let subscriber_task = executor.spawn(GatewayClient::subscribe(slabstore, sub_addr));
// TEST
let _slab = Slab::new("testcoin".to_string(), vec![0, 0, 0, 0]);
@@ -65,9 +60,7 @@ fn main() -> Result<()> {
std::fs::File::create(options.log_path.as_path()).unwrap(),
),
])
.unwrap();
.unwrap();
let ex2 = ex.clone();

View File

@@ -56,8 +56,8 @@ impl RepProtocol {
pub async fn start(
&mut self,
) -> Result<(
async_channel::Sender<Reply>,
async_channel::Receiver<Request>,
async_channel::Sender<Reply>,
async_channel::Receiver<Request>,
)> {
let addr = addr_to_string(self.addr);
self.socket.bind(addr.as_str()).await?;
@@ -119,7 +119,11 @@ pub struct ReqProtocol {
impl ReqProtocol {
pub fn new(addr: SocketAddr, service_name: String) -> ReqProtocol {
let socket = zeromq::ReqSocket::new();
ReqProtocol { addr, socket, service_name}
ReqProtocol {
addr,
socket,
service_name,
}
}
pub async fn start(&mut self) -> Result<()> {
@@ -136,16 +140,22 @@ impl ReqProtocol {
let req: zeromq::ZmqMessage = req.into();
self.socket.send(req).await?;
info!("{} SERVICE: Sent Request {{ command: {} }}", self.service_name, command);
info!(
"{} SERVICE: Sent Request {{ command: {} }}",
self.service_name, command
);
let rep: zeromq::ZmqMessage = self.socket.recv().await?;
if let Some(reply) = rep.get(0) {
let reply: Vec<u8> = reply.to_vec();
let reply: Reply = deserialize(&reply)?;
info!("{} SERVICE: Received Reply {{ error: {} }}", self.service_name, reply.has_error() );
info!(
"{} SERVICE: Received Reply {{ error: {} }}",
self.service_name,
reply.has_error()
);
if reply.has_error() {
return Err(crate::Error::ServicesError("response has an error"));
@@ -156,7 +166,7 @@ impl ReqProtocol {
Ok(reply.get_payload())
} else {
Err(crate::Error::ZMQError(
"Couldn't parse ZmqMessage".to_string(),
"Couldn't parse ZmqMessage".to_string(),
))
}
}
@@ -207,7 +217,11 @@ pub struct Subscriber {
impl Subscriber {
pub fn new(addr: SocketAddr, service_name: String) -> Subscriber {
let socket = zeromq::SubSocket::new();
Subscriber { addr, socket , service_name}
Subscriber {
addr,
socket,
service_name,
}
}
pub async fn start(&mut self) -> Result<()> {
@@ -230,7 +244,7 @@ impl Subscriber {
Ok(data)
}
None => Err(crate::Error::ZMQError(
"Couldn't parse ZmqMessage".to_string(),
"Couldn't parse ZmqMessage".to_string(),
)),
}
}