From 939bc520722eeae4abbdb59a0a7dc827a265e11b Mon Sep 17 00:00:00 2001 From: narodnik Date: Sun, 20 Dec 2020 15:17:48 +0100 Subject: [PATCH] removed obsolete targets in cargo.toml and ran cargo fmt --- Cargo.toml | 68 ------------------------ src/bin/dfi.rs | 43 ++++++++++----- src/lib.rs | 2 +- src/net/protocol/client_protocol.rs | 82 +++++++++++++++++------------ src/net/protocol/mod.rs | 2 +- src/net/protocol/protocol_base.rs | 10 ++-- src/net/protocol/seed_protocol.rs | 23 ++++---- src/net/protocol/server_protocol.rs | 32 +++++------ 8 files changed, 115 insertions(+), 147 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index e2ef03db3..8eeb72fca 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -46,78 +46,10 @@ async-dup = "1.1.0" async-std = "1.6.2" easy-parallel = "3.1.0" -[[bin]] -name = "sha256" -path = "src/sha256.rs" - -[[bin]] -name = "pedersen_hash" -path = "src/pedersen_hash.rs" - -[[bin]] -name = "oldmimc" -path = "src/mimc.rs" - -[[bin]] -name = "blake" -path = "src/blake.rs" - -[[bin]] -name = "zec" -path = "src/zec.rs" - -[[bin]] -name = "simple" -path = "src/simple.rs" - -[[bin]] -name = "mint-old" -path = "src/mint.rs" - -[[bin]] -name = "spend" -path = "src/spend.rs" - -[[bin]] -name = "eq" -path = "src/eq.rs" - -[[bin]] -name = "basic" -path = "src/basic_minimal.rs" - -[[bin]] -name = "vmtest" -path = "src/vmtest.rs" - -[[bin]] -name = "jubjub-old" -path = "src/jubjub.rs" - -[[bin]] -name = "bits" -path = "src/bits.rs" - -[[bin]] -name = "mimc" -path = "src/bin/mimc.rs" - -[[bin]] -name = "mint2" -path = "src/mint2.rs" - [[bin]] name = "zkvm" path = "src/bin/zkvm.rs" -[[bin]] -name = "mint" -path = "src/bin/mint.rs" - -[[bin]] -name = "jubjub" -path = "src/bin/jubjub.rs" - [[bin]] name = "dfi" path = "src/bin/dfi.rs" diff --git a/src/bin/dfi.rs b/src/bin/dfi.rs index 3ff1dcadd..f7148d3b6 100644 --- a/src/bin/dfi.rs +++ b/src/bin/dfi.rs @@ -2,14 +2,14 @@ extern crate clap; use async_channel::unbounded; use async_dup::Arc; -use log::*; -use async_std::sync::Mutex; use async_executor::Executor; +use async_std::sync::Mutex; use easy_parallel::Parallel; +use log::*; use std::collections::HashMap; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; -use sapvi::{Result, SeedProtocol, ServerProtocol, ClientProtocol}; +use sapvi::{ClientProtocol, Result, SeedProtocol, ServerProtocol}; async fn start(executor: Arc>, options: ProgramOptions) -> Result<()> { let connections = Arc::new(Mutex::new(HashMap::new())); @@ -25,7 +25,9 @@ async fn start(executor: Arc>, options: ProgramOptions) -> Result<( let mut protocol = ServerProtocol::new(connections.clone()); server_task = Some(executor.spawn(async move { - protocol.start(accept_addr, stored_addrs2, executor2).await?; + protocol + .start(accept_addr, stored_addrs2, executor2) + .await?; Ok::<(), sapvi::Error>(()) })); } @@ -38,7 +40,12 @@ async fn start(executor: Arc>, options: ProgramOptions) -> Result<( for seed_addr in options.seed_addrs.iter() { let mut protocol = SeedProtocol::new(); protocol - .start(seed_addr.clone(), local_addr, stored_addrs.clone(), executor.clone()) + .start( + seed_addr.clone(), + local_addr, + stored_addrs.clone(), + executor.clone(), + ) .await; seed_protocols.push(protocol); } @@ -58,7 +65,9 @@ async fn start(executor: Arc>, options: ProgramOptions) -> Result<( debug!("Starting connection slot {}", i); let mut client = ClientProtocol::new(connections.clone()); - client.start(accept_addr.clone(), stored_addrs.clone(), executor.clone()).await; + client + .start(accept_addr.clone(), stored_addrs.clone(), executor.clone()) + .await; client_slots.push(client); } @@ -66,7 +75,14 @@ async fn start(executor: Arc>, options: ProgramOptions) -> Result<( debug!("Starting connection (manual) to {}", remote_addr); let mut client = ClientProtocol::new(connections.clone()); - client.start_manual(remote_addr, accept_addr.clone(), stored_addrs.clone(), executor.clone()).await; + client + .start_manual( + remote_addr, + accept_addr.clone(), + stored_addrs.clone(), + executor.clone(), + ) + .await; client_slots.push(client); } @@ -82,7 +98,7 @@ struct ProgramOptions { accept_addr: Option, seed_addrs: Vec, manual_connects: Vec, - connection_slots: u32 + connection_slots: u32, } impl ProgramOptions { @@ -128,16 +144,19 @@ impl ProgramOptions { accept_addr, seed_addrs, manual_connects, - connection_slots + connection_slots, }) } } fn main() -> Result<()> { use simplelog::*; - CombinedLogger::init(vec![ - TermLogger::new(LevelFilter::Debug, Config::default(), TerminalMode::Mixed).unwrap(), - ]) + CombinedLogger::init(vec![TermLogger::new( + LevelFilter::Debug, + Config::default(), + TerminalMode::Mixed, + ) + .unwrap()]) .unwrap(); let options = ProgramOptions::load()?; diff --git a/src/lib.rs b/src/lib.rs index 78e17604a..db8b426c3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -15,9 +15,9 @@ pub mod vm_serial; pub use crate::bls_extensions::BlsStringConversion; pub use crate::error::{Error, Result}; pub use crate::net::net::{select_event, send_message, sleep}; +pub use crate::net::protocol::client_protocol::ClientProtocol; pub use crate::net::protocol::seed_protocol::SeedProtocol; pub use crate::net::protocol::server_protocol::ServerProtocol; -pub use crate::net::protocol::client_protocol::ClientProtocol; pub use crate::serial::{Decodable, Encodable}; pub use crate::vm::{ AllocType, ConstraintInstruction, CryptoOperation, VariableIndex, VariableRef, ZKVMCircuit, diff --git a/src/net/protocol/client_protocol.rs b/src/net/protocol/client_protocol.rs index 6b93c75ed..435604ec1 100644 --- a/src/net/protocol/client_protocol.rs +++ b/src/net/protocol/client_protocol.rs @@ -1,9 +1,9 @@ +use async_dup::Arc; +use log::*; +use rand::seq::SliceRandom; +use smol::{Async, Executor}; use std::net::{SocketAddr, TcpStream}; use std::sync::atomic::AtomicU64; -use log::*; -use smol::{Async, Executor}; -use async_dup::Arc; -use rand::seq::SliceRandom; use crate::error::Result; use crate::net::net; @@ -24,7 +24,7 @@ impl ClientProtocol { send_sx, send_rx, connections, - main_process: None + main_process: None, } } @@ -38,29 +38,30 @@ impl ClientProtocol { connections: &ConnectionsMap, ) { loop { - let addr = match stored_addrs.lock().await.choose(&mut rand_core::OsRng) { - Some(addr) => addr.clone(), - None => { - debug!("No addresses in store. Sleeping for 2 secs before retrying..."); - net::sleep(2).await; - continue; - } - }; - if connections.lock().await.contains_key(&addr) { + let addr = match stored_addrs.lock().await.choose(&mut rand_core::OsRng) { + Some(addr) => addr.clone(), + None => { + debug!("No addresses in store. Sleeping for 2 secs before retrying..."); + net::sleep(2).await; continue; } - if let Some(accept_addr) = accept_addr { - if addr == *accept_addr { continue; } + }; + if connections.lock().await.contains_key(&addr) { + continue; + } + if let Some(accept_addr) = accept_addr { + if addr == *accept_addr { + continue; } + } } - } pub async fn start( &mut self, accept_addr: Option, stored_addrs: AddrsStorage, - executor: Arc> + executor: Arc>, ) { let connections = self.connections.clone(); let (send_sx, send_rx) = (self.send_sx.clone(), self.send_rx.clone()); @@ -81,12 +82,22 @@ impl ClientProtocol { continue; } if let Some(accept_addr) = accept_addr { - if addr == accept_addr { continue; } + if addr == accept_addr { + continue; + } } debug!("Attempting connect to {}", addr); - Self::try_connect_process(addr, connections.clone(), accept_addr.clone(), stored_addrs.clone(), (send_sx.clone(), send_rx.clone()), executor2.clone()).await; + Self::try_connect_process( + addr, + connections.clone(), + accept_addr.clone(), + stored_addrs.clone(), + (send_sx.clone(), send_rx.clone()), + executor2.clone(), + ) + .await; // TODO: Fix this net::sleep(2).await; @@ -99,8 +110,8 @@ impl ClientProtocol { remote_addr: SocketAddr, accept_addr: Option, stored_addrs: AddrsStorage, - executor: Arc> - ) { + executor: Arc>, + ) { let connections = self.connections.clone(); let (send_sx, send_rx) = (self.send_sx.clone(), self.send_rx.clone()); @@ -111,7 +122,15 @@ impl ClientProtocol { for _ in 0..4 { debug!("Attempting connect to {}", remote_addr); - Self::try_connect_process(remote_addr, connections.clone(), accept_addr.clone(), stored_addrs.clone(), (send_sx.clone(), send_rx.clone()), executor2.clone()).await; + Self::try_connect_process( + remote_addr, + connections.clone(), + accept_addr.clone(), + stored_addrs.clone(), + (send_sx.clone(), send_rx.clone()), + executor2.clone(), + ) + .await; } net::sleep(2).await; } @@ -127,7 +146,7 @@ impl ClientProtocol { async_channel::Sender, async_channel::Receiver, ), - executor: Arc> + executor: Arc>, ) { match Async::::connect(address.clone()).await { Ok(stream) => { @@ -138,14 +157,13 @@ impl ClientProtocol { address, (send_sx.clone(), send_rx.clone()), accept_addr, - executor + executor, ) .await; } - Err(_err) => { warn!( - "Unable to connect to addr {:?}: {}", - address, _err - ); } + Err(_err) => { + warn!("Unable to connect to addr {:?}: {}", address, _err); + } } } @@ -159,7 +177,7 @@ impl ClientProtocol { async_channel::Receiver, ), accept_addr: Option, - executor: Arc> + executor: Arc>, ) -> Result<()> { debug!("Connected to {}", address); @@ -222,9 +240,7 @@ impl ClientProtocol { let clock = Arc::new(AtomicU64::new(0)); let send_sx2 = send_sx.clone(); let clock2 = clock.clone(); - let ping_task = executor.spawn( - protocol_base::repeat_ping(send_sx2, clock2) - ); + let ping_task = executor.spawn(protocol_base::repeat_ping(send_sx2, clock2)); let mut send_addr_task = None; if let Some(accept_addr) = accept_addr { diff --git a/src/net/protocol/mod.rs b/src/net/protocol/mod.rs index 735956691..bff223721 100644 --- a/src/net/protocol/mod.rs +++ b/src/net/protocol/mod.rs @@ -1,4 +1,4 @@ -pub mod server_protocol; pub mod client_protocol; pub mod protocol_base; pub mod seed_protocol; +pub mod server_protocol; diff --git a/src/net/protocol/protocol_base.rs b/src/net/protocol/protocol_base.rs index 1ed0ea9f2..4f0e30e2f 100644 --- a/src/net/protocol/protocol_base.rs +++ b/src/net/protocol/protocol_base.rs @@ -1,5 +1,5 @@ -use std::sync::atomic::Ordering; use log::*; +use std::sync::atomic::Ordering; use crate::net::net; use crate::utility::{get_current_time, AddrsStorage, Clock, ConnectionsMap}; @@ -63,7 +63,7 @@ pub async fn protocol( })) .await?;*/ } - net::Message::Inv(message) => { + net::Message::Inv(_message) => { info!("received inv message"); /* let mut list_of_hash: Vec = vec![]; @@ -82,7 +82,7 @@ pub async fn protocol( net::Message::GetSlabs(message) => { info!("received GetSlabs message."); - for slab_hash in message.slabs_hash { + for _slab_hash in message.slabs_hash { /*let slab = slabman.get_slab(&slab_hash); if let Some(slab) = slab { send_sx.send(net::Message::Slab(slab.clone())).await?; @@ -90,13 +90,13 @@ pub async fn protocol( } } net::Message::Slab(message) => { - let slab = net::SlabMessage { + let _slab = net::SlabMessage { nonce: message.nonce, ciphertext: message.ciphertext.clone(), }; // TODO: it doesn't have to send inv message to the connection which sent the slab. - for (a, send) in connections.lock().await.iter() { + for (a, _send) in connections.lock().await.iter() { println!("send to {:?}", a); /*send.send(net::Message::Inv(net::InvMessage { slabs_hash: vec![slab.cipher_hash()], diff --git a/src/net/protocol/seed_protocol.rs b/src/net/protocol/seed_protocol.rs index f09f2d678..075f90ecb 100644 --- a/src/net/protocol/seed_protocol.rs +++ b/src/net/protocol/seed_protocol.rs @@ -21,7 +21,7 @@ pub struct SeedProtocol { enum ProtocolSignal { Waiting, Finished, - Timeout + Timeout, } impl SeedProtocol { @@ -56,7 +56,9 @@ impl SeedProtocol { ) .await; } - Err(err) => { warn!("Unable to connect to seed {}: {}", seed_addr, err) }, + Err(err) => { + warn!("Unable to connect to seed {}: {}", seed_addr, err) + } } })); } @@ -80,7 +82,9 @@ impl SeedProtocol { ) -> Result<()> { if let Some(local_addr) = local_addr { send_sx - .send(net::Message::Addrs(net::AddrsMessage { addrs: vec![local_addr] })) + .send(net::Message::Addrs(net::AddrsMessage { + addrs: vec![local_addr], + })) .await?; } @@ -91,13 +95,8 @@ impl SeedProtocol { let stream = Arc::new(stream); // Run event loop - match Self::event_loop_process( - stream, - stored_addrs.clone(), - (send_sx, send_rx), - executor, - ) - .await + match Self::event_loop_process(stream, stored_addrs.clone(), (send_sx, send_rx), executor) + .await { Ok(ProtocolSignal::Finished) => { info!("Seed node queried successfully: {}", seed_addr); @@ -105,7 +104,9 @@ impl SeedProtocol { Ok(ProtocolSignal::Timeout) => { warn!("Seed node timeout: {}", seed_addr); } - Ok(_) => { unreachable!(); } + Ok(_) => { + unreachable!(); + } Err(err) => { warn!("Seed disconnected: {} {}", seed_addr, err); } diff --git a/src/net/protocol/server_protocol.rs b/src/net/protocol/server_protocol.rs index 230b14e22..e5f076a30 100644 --- a/src/net/protocol/server_protocol.rs +++ b/src/net/protocol/server_protocol.rs @@ -58,7 +58,7 @@ impl ServerProtocol { stored_addrs, (send_sx, send_rx), connections.clone(), - executor2 + executor2, ) .await { @@ -91,22 +91,22 @@ impl ServerProtocol { let event = net::select_event(&mut stream, &send_rx, &inactivity_timer).await?; match event { - net::Event::Send(message) => { - net::send_message(&mut stream, message).await?; - } - net::Event::Receive(message) => { - inactivity_timer.reset().await?; - protocol_base::protocol( - message, - &stored_addrs, - &send_sx, - None, - connections.clone(), - ) - .await?; - } - net::Event::Timeout => break, + net::Event::Send(message) => { + net::send_message(&mut stream, message).await?; } + net::Event::Receive(message) => { + inactivity_timer.reset().await?; + protocol_base::protocol( + message, + &stored_addrs, + &send_sx, + None, + connections.clone(), + ) + .await?; + } + net::Event::Timeout => break, + } } inactivity_timer.stop().await;