cargo fmt

This commit is contained in:
ghassmo
2021-05-29 13:49:56 +03:00
parent ce100374d2
commit 6305c5151f
3 changed files with 23 additions and 33 deletions

View File

@@ -53,8 +53,6 @@ fn main() -> Result<()> {
LevelFilter::Off
};
CombinedLogger::init(vec![
TermLogger::new(debug_level, logger_config, TerminalMode::Mixed).unwrap(),
WriteLogger::new(
@@ -63,9 +61,7 @@ fn main() -> Result<()> {
std::fs::File::create(options.log_path.as_path()).unwrap(),
),
])
.unwrap();
.unwrap();
let ex2 = ex.clone();
@@ -90,17 +86,15 @@ fn main() -> Result<()> {
mod test {
#[test]
fn test_darkfid_client(){
fn test_darkfid_client() {
use std::path::Path;
use drk::service::GatewayClient;
use drk::slab::Slab;
use log::*;
use rand::Rng;
use simplelog::*;
use log::*;
let logger_config = ConfigBuilder::new().set_time_format_str("%T%.6f").build();
@@ -112,29 +106,27 @@ mod test {
std::fs::File::create(Path::new("/tmp/dar.log")).unwrap(),
),
])
.unwrap();
.unwrap();
let mut thread_pools: Vec<std::thread::JoinHandle<()>> = vec![];
for _ in 1..11 {
let thread = std::thread::spawn(|| {
smol::future::block_on(async move {
let mut rng = rand::thread_rng();
let rnd: u32 = rng.gen();
let mut client = GatewayClient::new("127.0.0.1:3333".parse().unwrap(), Path::new(&format!("slabstore_{}.db", rnd))).unwrap();
let mut client = GatewayClient::new(
"127.0.0.1:3333".parse().unwrap(),
Path::new(&format!("slabstore_{}.db", rnd)),
)
.unwrap();
client.start().await.unwrap();
let _slab = Slab::new("testcoin".to_string(), rnd.to_le_bytes().to_vec());
client.put_slab(_slab).await.unwrap();
std::thread::sleep(std::time::Duration::from_secs(3));
let last_index = client.slabstore.get_last_index().unwrap();
info!("last index: {}", last_index);

View File

@@ -46,9 +46,9 @@ impl GatewayService {
let (publish_queue, publish_recv_queue) = async_channel::unbounded::<Vec<u8>>();
let publisher_task = executor.spawn(Self::start_publisher(
self.pub_addr,
service_name,
publish_recv_queue.clone(),
self.pub_addr,
service_name,
publish_recv_queue.clone(),
));
let handle_request_task =
@@ -73,7 +73,7 @@ impl GatewayService {
async fn handle_request(
self: Arc<Self>,
send_queue: async_channel::Sender<(Vec<u8>,Reply)>,
send_queue: async_channel::Sender<(Vec<u8>, Reply)>,
recv_queue: async_channel::Receiver<(Vec<u8>, Request)>,
publish_queue: async_channel::Sender<Vec<u8>>,
) -> Result<()> {
@@ -94,7 +94,7 @@ impl GatewayService {
// send reply
let reply = Reply::from(&request, 0, vec![]);
send_queue.send((peer,reply)).await?;
send_queue.send((peer, reply)).await?;
// publish to all subscribes
publish_queue.send(slab).await?;
@@ -206,7 +206,6 @@ impl GatewayClient {
self.slabstore.clone()
}
pub async fn start_subscriber(sub_addr: SocketAddr) -> Result<Subscriber> {
let mut subscriber = Subscriber::new(sub_addr, String::from("GATEWAY CLIENT"));
subscriber.start().await?;

View File

@@ -1,7 +1,7 @@
use async_std::sync::Arc;
use std::convert::TryFrom;
use std::io;
use std::net::SocketAddr;
use std::convert::TryFrom;
use crate::serial::{deserialize, serialize};
use crate::{Decodable, Encodable, Result};
@@ -27,11 +27,11 @@ pub fn addr_to_string(addr: SocketAddr) -> String {
pub struct RepProtocol {
addr: SocketAddr,
socket: zeromq::RouterSocket,
recv_queue: async_channel::Receiver<(Vec<u8>,Reply)>,
recv_queue: async_channel::Receiver<(Vec<u8>, Reply)>,
send_queue: async_channel::Sender<(Vec<u8>, Request)>,
channels: (
async_channel::Sender<(Vec<u8>, Reply)>,
async_channel::Receiver<(Vec<u8>,Request)>,
async_channel::Receiver<(Vec<u8>, Request)>,
),
service_name: String,
}
@@ -57,8 +57,8 @@ impl RepProtocol {
pub async fn start(
&mut self,
) -> Result<(
async_channel::Sender<(Vec<u8>, Reply)>,
async_channel::Receiver<(Vec<u8>,Request)>,
async_channel::Sender<(Vec<u8>, Reply)>,
async_channel::Receiver<(Vec<u8>, Request)>,
)> {
let addr = addr_to_string(self.addr);
self.socket.bind(addr.as_str()).await?;
@@ -93,14 +93,14 @@ impl RepProtocol {
if let Some(request) = msg.get(1) {
let request: Vec<u8> = request.to_vec();
let request: Request = deserialize(&request)?;
if let Some(peer) = msg.get(0){
if let Some(peer) = msg.get(0) {
self.send_queue.send((peer.to_vec(), request)).await?;
}
}
}
NetEvent::Send((peer, reply)) => {
let peer = Bytes::from(peer);
let mut msg:Vec<Bytes> = vec![peer];
let mut msg: Vec<Bytes> = vec![peer];
let reply: Vec<u8> = serialize(&reply);
let reply = Bytes::from(reply);
msg.push(reply);
@@ -172,11 +172,10 @@ impl ReqProtocol {
assert!(reply.get_id() == request.get_id());
Ok(reply.get_payload())
} else {
Err(crate::Error::ZMQError(
"Couldn't parse ZmqMessage".to_string(),
"Couldn't parse ZmqMessage".to_string(),
))
}
}
@@ -254,7 +253,7 @@ impl Subscriber {
Ok(data)
}
None => Err(crate::Error::ZMQError(
"Couldn't parse ZmqMessage".to_string(),
"Couldn't parse ZmqMessage".to_string(),
)),
}
}