From bcb1b26b122419cb32ef63c7811eaea7b4acb62e Mon Sep 17 00:00:00 2001 From: ghassmo Date: Tue, 26 Apr 2022 02:24:58 +0300 Subject: [PATCH] net3: WIP using Url & preparing for migrate to main net dir --- Cargo.toml | 1 + src/net3/acceptor.rs | 32 ++++++++++++---------------- src/net3/channel.rs | 20 ++++++++--------- src/net3/connector.rs | 16 ++++++-------- src/net3/hosts.rs | 14 ++++++------ src/net3/message.rs | 5 +++-- src/net3/p2p.rs | 16 ++++++++------ src/net3/protocol/protocol_seed.rs | 2 +- src/net3/session/inbound_session.rs | 20 ++++++++--------- src/net3/session/manual_session.rs | 19 +++++++---------- src/net3/session/outbound_session.rs | 27 +++++++++++------------ src/net3/session/seed_session.rs | 14 ++++++------ src/net3/settings.rs | 29 ++++++++++++++++++------- 13 files changed, 109 insertions(+), 106 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 665096be0..4e9dc4205 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -239,6 +239,7 @@ net3 = [ "rcgen", "regex", "rustls-pemfile", + "structopt", "util", "system", diff --git a/src/net3/acceptor.rs b/src/net3/acceptor.rs index 916ec6f27..1e820133d 100644 --- a/src/net3/acceptor.rs +++ b/src/net3/acceptor.rs @@ -1,5 +1,4 @@ -use async_std::stream::StreamExt; -use std::{net::SocketAddr, sync::Arc}; +use async_std::{stream::StreamExt, sync::Arc}; use futures_rustls::TlsStream; use smol::Executor; @@ -10,10 +9,7 @@ use crate::{ Error, Result, }; -use super::{ - TcpTransport, TlsTransport, - Channel, ChannelPtr, Transport, -}; +use super::{Channel, ChannelPtr, TcpTransport, TlsTransport, Transport}; /// Atomic pointer to Acceptor class. pub type AcceptorPtr = Arc; @@ -34,7 +30,7 @@ impl Acceptor { /// thread, erroring if a connection problem occurs. pub async fn start( self: Arc, - accept_addr: SocketAddr, + accept_addr: Url, executor: Arc>, ) -> Result<()> { self.accept(accept_addr, executor); @@ -54,7 +50,7 @@ impl Acceptor { /// Run the accept loop in a new thread and error if a connection problem /// occurs. - fn accept(self: Arc, accept_addr: SocketAddr, executor: Arc>) { + fn accept(self: Arc, accept_addr: Url, executor: Arc>) { self.task.clone().start( self.clone().run_accept_loop(accept_addr), |result| self.handle_stop(result), @@ -64,31 +60,31 @@ impl Acceptor { } /// Run the accept loop. - async fn run_accept_loop(self: Arc, accept_addr: SocketAddr) -> Result<()> { - let mut url = Url::parse(&accept_addr.to_string())?; - url.set_host(Some("tcp"))?; - - match url.scheme() { + async fn run_accept_loop(self: Arc, accept_url: Url) -> Result<()> { + match accept_url.scheme() { "tcp" => { let transport = TcpTransport::new(None, 1024); - let listener = transport.listen_on(url)?.await?; + let listener = transport.listen_on(accept_url)?.await?; let mut incoming = listener.incoming(); while let Some(stream) = incoming.next().await { let stream = stream?; - let peer_addr = stream.peer_addr()?; + let mut peer_addr = Url::parse(&stream.peer_addr()?.to_string())?; + peer_addr.set_scheme("tcp")?; let channel = Channel::new(Box::new(stream), peer_addr).await; self.channel_subscriber.notify(Ok(channel)).await; } } "tls" => { let transport = TlsTransport::new(None, 1024); - let (acceptor, listener) = transport.listen_on(url)?.await?; + let (acceptor, listener) = transport.listen_on(accept_url)?.await?; let mut incoming = listener.incoming(); while let Some(stream) = incoming.next().await { let stream = stream?; - let peer_addr = stream.peer_addr()?; + let mut peer_addr = Url::parse(&stream.peer_addr()?.to_string())?; + peer_addr.set_scheme("tls")?; let stream = acceptor.accept(stream).await?; - let channel = Channel::new(Box::new(TlsStream::Server(stream)), peer_addr).await; + let channel = + Channel::new(Box::new(TlsStream::Server(stream)), peer_addr).await; self.channel_subscriber.notify(Ok(channel)).await; } } diff --git a/src/net3/channel.rs b/src/net3/channel.rs index fa53d1686..0a45bd165 100644 --- a/src/net3/channel.rs +++ b/src/net3/channel.rs @@ -1,11 +1,8 @@ -use async_std::{net::TcpStream, sync::Mutex}; -use std::{ - net::SocketAddr, - sync::{ - atomic::{AtomicBool, Ordering}, - Arc, - }, +use async_std::{ + net::TcpStream, + sync::{Arc, Mutex}, }; +use std::sync::atomic::{AtomicBool, Ordering}; use futures::{ io::{ReadHalf, WriteHalf}, @@ -16,6 +13,7 @@ use log::{debug, error, info}; use rand::Rng; use serde_json::json; use smol::Executor; +use url::Url; use crate::{ system::{StoppableTask, StoppableTaskPtr, Subscriber, SubscriberPtr, Subscription}, @@ -69,7 +67,7 @@ impl Stream for TlsStream {} pub struct Channel { reader: Mutex>>, writer: Mutex>>, - address: SocketAddr, + address: Url, message_subsystem: MessageSubsystem, stop_subscriber: SubscriberPtr, receive_task: StoppableTaskPtr, @@ -81,7 +79,7 @@ impl Channel { /// Sets up a new channel. Creates a reader and writer TCP stream and /// summons the message subscriber subsystem. Performs a network /// handshake on the subsystem dispatchers. - pub async fn new(stream: Box, address: SocketAddr) -> Arc { + pub async fn new(stream: Box, address: Url) -> Arc { let (reader, writer) = stream.split(); let reader = Mutex::new(reader); let writer = Mutex::new(writer); @@ -222,8 +220,8 @@ impl Channel { } /// Return the local socket address. - pub fn address(&self) -> SocketAddr { - self.address + pub fn address(&self) -> Url { + self.address.clone() } /// End of file error. Triggered when unexpected end of file occurs. diff --git a/src/net3/connector.rs b/src/net3/connector.rs index 7357fcbd5..96bead028 100644 --- a/src/net3/connector.rs +++ b/src/net3/connector.rs @@ -1,5 +1,5 @@ use async_std::future::timeout; -use std::{net::SocketAddr, time::Duration}; +use std::time::Duration; use url::Url; use crate::Result; @@ -18,21 +18,19 @@ impl Connector { } /// Establish an outbound connection. - pub async fn connect(&self, hosturl: SocketAddr) -> Result { - let mut url = Url::parse(&hosturl.to_string())?; - url.set_host(Some("tcp"))?; + pub async fn connect(&self, connect_url: Url) -> Result { let result = timeout(Duration::from_secs(self.settings.connect_timeout_seconds.into()), async { - match url.scheme() { + match connect_url.scheme() { "tcp" => { let transport = TcpTransport::new(None, 1024); - let stream = transport.dial(url)?.await?; - Ok(Channel::new(Box::new(stream), hosturl).await) + let stream = transport.dial(connect_url.clone())?.await?; + Ok(Channel::new(Box::new(stream), connect_url).await) } "tls" => { let transport = TlsTransport::new(None, 1024); - let stream = transport.dial(url)?.await?; - Ok(Channel::new(Box::new(stream), hosturl).await) + let stream = transport.dial(connect_url.clone())?.await?; + Ok(Channel::new(Box::new(stream), connect_url).await) } "tor" => todo!(), _ => unimplemented!(), diff --git a/src/net3/hosts.rs b/src/net3/hosts.rs index 35412f5f7..abab52244 100644 --- a/src/net3/hosts.rs +++ b/src/net3/hosts.rs @@ -1,14 +1,14 @@ -use async_std::sync::Mutex; -use std::{net::SocketAddr, sync::Arc}; +use async_std::sync::{Arc, Mutex}; use fxhash::FxHashSet; +use url::Url; /// Pointer to hosts class. pub type HostsPtr = Arc; /// Manages a store of network addresses. pub struct Hosts { - addrs: Mutex>, + addrs: Mutex>, } impl Hosts { @@ -18,20 +18,20 @@ impl Hosts { } /// Checks if a host address is in the host list. - async fn contains(&self, addrs: &[SocketAddr]) -> bool { - let a_set: FxHashSet<_> = addrs.iter().copied().collect(); + async fn contains(&self, addrs: &[Url]) -> bool { + let a_set: FxHashSet<_> = addrs.iter().cloned().collect(); self.addrs.lock().await.iter().any(|item| a_set.contains(item)) } /// Add a new host to the host list. - pub async fn store(&self, addrs: Vec) { + pub async fn store(&self, addrs: Vec) { if !self.contains(&addrs).await { self.addrs.lock().await.extend(addrs) } } /// Return the list of hosts. - pub async fn load_all(&self) -> Vec { + pub async fn load_all(&self) -> Vec { self.addrs.lock().await.clone() } diff --git a/src/net3/message.rs b/src/net3/message.rs index 69393d1ae..cf93963fc 100644 --- a/src/net3/message.rs +++ b/src/net3/message.rs @@ -1,7 +1,8 @@ -use std::{io, net::SocketAddr}; +use std::io; use futures::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; use log::debug; +use url::Url; use crate::{ util::serial::{Decodable, Encodable, VarInt}, @@ -31,7 +32,7 @@ pub struct GetAddrsMessage {} /// Sends address information to inbound connection. Response to GetAddrs /// message. pub struct AddrsMessage { - pub addrs: Vec, + pub addrs: Vec, } /// Requests version information of outbound connection. diff --git a/src/net3/p2p.rs b/src/net3/p2p.rs index 7f95c5b61..ba674459a 100644 --- a/src/net3/p2p.rs +++ b/src/net3/p2p.rs @@ -1,10 +1,11 @@ -use async_std::sync::Mutex; -use std::{fmt, net::SocketAddr, sync::Arc}; +use async_std::sync::{Arc, Mutex}; +use std::fmt; use async_executor::Executor; use fxhash::{FxHashMap, FxHashSet}; use log::debug; use serde_json::json; +use url::Url; use crate::{ system::{Subscriber, SubscriberPtr, Subscription}, @@ -19,9 +20,9 @@ use super::{ }; /// List of channels that are awaiting connection. -pub type PendingChannels = Mutex>; +pub type PendingChannels = Mutex>; /// List of connected channels. -pub type ConnectedChannels = Mutex>>; +pub type ConnectedChannels = Mutex>>; /// Atomic pointer to p2p interface. pub type P2pPtr = Arc; @@ -105,6 +106,7 @@ impl P2p { let external_addr = self .settings .external_addr + .as_ref() .map(|addr| serde_json::Value::from(addr.to_string())) .unwrap_or(serde_json::Value::Null); @@ -195,17 +197,17 @@ impl P2p { } /// Check whether a channel is stored in the list of connected channels. - pub async fn exists(&self, addr: &SocketAddr) -> bool { + pub async fn exists(&self, addr: &Url) -> bool { self.channels.lock().await.contains_key(addr) } /// Add a channel to the list of pending channels. - pub async fn add_pending(&self, addr: SocketAddr) -> bool { + pub async fn add_pending(&self, addr: Url) -> bool { self.pending.lock().await.insert(addr) } /// Remove a channel from the list of pending channels. - pub async fn remove_pending(&self, addr: &SocketAddr) { + pub async fn remove_pending(&self, addr: &Url) { self.pending.lock().await.remove(addr); } diff --git a/src/net3/protocol/protocol_seed.rs b/src/net3/protocol/protocol_seed.rs index 21942a680..4c1243787 100644 --- a/src/net3/protocol/protocol_seed.rs +++ b/src/net3/protocol/protocol_seed.rs @@ -31,7 +31,7 @@ impl ProtocolSeed { /// from settings, then adds that address to an address message and /// sends it out over the channel. pub async fn send_self_address(&self) -> Result<()> { - match self.settings.external_addr { + match self.settings.external_addr.clone() { Some(addr) => { debug!(target: "net", "ProtocolSeed::send_own_address() addr={}", addr); let addr = message::AddrsMessage { addrs: vec![addr] }; diff --git a/src/net3/session/inbound_session.rs b/src/net3/session/inbound_session.rs index 238a7e8dc..5f100c206 100644 --- a/src/net3/session/inbound_session.rs +++ b/src/net3/session/inbound_session.rs @@ -1,14 +1,11 @@ -use async_std::sync::Mutex; -use std::{ - net::SocketAddr, - sync::{Arc, Weak}, -}; +use async_std::sync::{Arc, Mutex, Weak}; use async_executor::Executor; use async_trait::async_trait; use fxhash::FxHashMap; use log::{error, info}; use serde_json::json; +use url::Url; use crate::{ system::{StoppableTask, StoppableTaskPtr}, @@ -35,7 +32,7 @@ pub struct InboundSession { p2p: Weak, acceptor: AcceptorPtr, accept_task: StoppableTaskPtr, - connect_infos: Mutex>, + connect_infos: Mutex>, } impl InboundSession { @@ -55,9 +52,9 @@ impl InboundSession { /// the address is not configured. Then runs the channel subscription /// loop. pub async fn start(self: Arc, executor: Arc>) -> Result<()> { - match self.p2p().settings().inbound { + match self.p2p().settings().inbound.as_ref() { Some(accept_addr) => { - self.clone().start_accept_session(accept_addr, executor.clone()).await?; + self.clone().start_accept_session(accept_addr.clone(), executor.clone()).await?; } None => { info!(target: "net", "Not configured for accepting incoming connections."); @@ -83,7 +80,7 @@ impl InboundSession { /// Start accepting connections for inbound session. async fn start_accept_session( self: Arc, - accept_addr: SocketAddr, + accept_addr: Url, executor: Arc>, ) -> Result<()> { info!(target: "net", "Starting inbound session on {}", accept_addr); @@ -125,7 +122,10 @@ impl InboundSession { async fn manage_channel_for_get_info(&self, channel: ChannelPtr) { let key = channel.address(); - self.connect_infos.lock().await.insert(key, InboundInfo { channel: channel.clone() }); + self.connect_infos + .lock() + .await + .insert(key.clone(), InboundInfo { channel: channel.clone() }); let stop_sub = channel.subscribe_stop().await; stop_sub.receive().await; diff --git a/src/net3/session/manual_session.rs b/src/net3/session/manual_session.rs index 04114fa23..9045eb46e 100644 --- a/src/net3/session/manual_session.rs +++ b/src/net3/session/manual_session.rs @@ -1,13 +1,10 @@ -use async_std::sync::Mutex; -use std::{ - net::SocketAddr, - sync::{Arc, Weak}, -}; +use async_std::sync::{Arc, Mutex, Weak}; use async_executor::Executor; use async_trait::async_trait; use log::*; use serde_json::json; +use url::Url; use crate::{ system::{StoppableTask, StoppableTaskPtr}, @@ -40,11 +37,11 @@ impl ManualSession { } } - pub async fn connect(self: Arc, addr: &SocketAddr, executor: Arc>) { + pub async fn connect(self: Arc, addr: &Url, executor: Arc>) { let task = StoppableTask::new(); task.clone().start( - self.clone().channel_connect_loop(*addr, executor.clone()), + self.clone().channel_connect_loop(addr.clone(), executor.clone()), // Ignore stop handler |_| async {}, Error::ServiceStopped, @@ -56,7 +53,7 @@ impl ManualSession { pub async fn channel_connect_loop( self: Arc, - addr: SocketAddr, + addr: Url, executor: Arc>, ) -> Result<()> { let connector = Connector::new(self.p2p().settings()); @@ -73,11 +70,11 @@ impl ManualSession { break } - self.p2p().add_pending(addr).await; + self.p2p().add_pending(addr.clone()).await; info!(target: "net", "Connecting to manual outbound [{}]", addr); - match connector.connect(addr).await { + match connector.connect(addr.clone()).await { Ok(channel) => { // Blacklist goes here @@ -108,7 +105,7 @@ impl ManualSession { warn!( target: "net", "Suspending manual connection to [{}] after {} failed attempts.", - addr, + &addr, attempts ); diff --git a/src/net3/session/outbound_session.rs b/src/net3/session/outbound_session.rs index 797430f3c..437c14729 100644 --- a/src/net3/session/outbound_session.rs +++ b/src/net3/session/outbound_session.rs @@ -1,15 +1,12 @@ -use async_std::sync::Mutex; -use std::{ - fmt, - net::SocketAddr, - sync::{Arc, Weak}, -}; +use async_std::sync::{Arc, Mutex, Weak}; +use std::fmt; use async_executor::Executor; use async_trait::async_trait; use log::{error, info}; use rand::seq::SliceRandom; use serde_json::json; +use url::Url; use crate::{ system::{StoppableTask, StoppableTaskPtr}, @@ -44,14 +41,14 @@ impl fmt::Display for OutboundState { #[derive(Clone)] struct OutboundInfo { - addr: Option, + addr: Option, channel: Option, state: OutboundState, } impl OutboundInfo { async fn get_info(&self) -> serde_json::Value { - let addr = match self.addr { + let addr = match self.addr.as_ref() { Some(addr) => serde_json::Value::String(addr.to_string()), None => serde_json::Value::Null, }; @@ -144,11 +141,11 @@ impl OutboundSession { info!(target: "net", "#{} connecting to outbound [{}]", slot_number, addr); { let info = &mut self.slot_info.lock().await[slot_number as usize]; - info.addr = Some(addr); + info.addr = Some(addr.clone()); info.state = OutboundState::Pending; } - match connector.connect(addr).await { + match connector.connect(addr.clone()).await { Ok(channel) => { // Blacklist goes here @@ -172,7 +169,7 @@ impl OutboundSession { stop_sub.receive().await; } Err(err) => { - info!(target: "net", "Unable to connect to outbound [{}]: {}", addr, err); + info!(target: "net", "Unable to connect to outbound [{}]: {}", &addr, err); { let info = &mut self.slot_info.lock().await[slot_number as usize]; info.addr = None; @@ -189,9 +186,9 @@ impl OutboundSession { /// our own inbound address, then checks whether it is already connected /// (exists) or connecting (pending). Keeps looping until address is /// found that passes all checks. - async fn load_address(&self, slot_number: u32) -> Result { + async fn load_address(&self, slot_number: u32) -> Result { let p2p = self.p2p(); - let self_inbound_addr = p2p.settings().external_addr; + let self_inbound_addr = p2p.settings().external_addr.clone(); let mut addrs; @@ -208,7 +205,7 @@ impl OutboundSession { } // Obtain a lock on this address to prevent duplicate connections - if !p2p.add_pending(addr).await { + if !p2p.add_pending(addr.clone()).await { continue } @@ -225,7 +222,7 @@ impl OutboundSession { /// Checks whether an address is our own inbound address to avoid connecting /// to ourselves. - fn is_self_inbound(addr: &SocketAddr, inbound_addr: &Option) -> bool { + fn is_self_inbound(addr: &Url, inbound_addr: &Option) -> bool { match inbound_addr { Some(inbound_addr) => inbound_addr == addr, // No inbound listening address configured diff --git a/src/net3/session/seed_session.rs b/src/net3/session/seed_session.rs index 70a7af232..40068dd58 100644 --- a/src/net3/session/seed_session.rs +++ b/src/net3/session/seed_session.rs @@ -1,14 +1,14 @@ -use async_std::future::timeout; -use std::{ - net::SocketAddr, +use async_std::{ + future::timeout, sync::{Arc, Weak}, - time::Duration, }; +use std::time::Duration; use async_executor::Executor; use async_trait::async_trait; use log::*; use serde_json::json; +use url::Url; use crate::{Error, Result}; @@ -44,7 +44,7 @@ impl SeedSession { let mut tasks = Vec::new(); for (i, seed) in settings.seeds.iter().enumerate() { - tasks.push(executor.spawn(self.clone().start_seed(i, *seed, executor.clone()))); + tasks.push(executor.spawn(self.clone().start_seed(i, seed.clone(), executor.clone()))); } // This line loops through all the tasks and waits for them to finish. @@ -83,7 +83,7 @@ impl SeedSession { async fn start_seed( self: Arc, seed_index: usize, - seed: SocketAddr, + seed: Url, executor: Arc>, ) -> Result<()> { debug!(target: "net", "SeedSession::start_seed(i={}) [START]", seed_index); @@ -93,7 +93,7 @@ impl SeedSession { }; let connector = Connector::new(settings.clone()); - match connector.connect(seed).await { + match connector.connect(seed.clone()).await { Ok(channel) => { // Blacklist goes here diff --git a/src/net3/settings.rs b/src/net3/settings.rs index ab40f87a7..592355705 100644 --- a/src/net3/settings.rs +++ b/src/net3/settings.rs @@ -1,23 +1,36 @@ -use std::{net::SocketAddr, sync::Arc}; +use std::sync::Arc; + +use serde::Deserialize; +use structopt::StructOpt; +use url::Url; /// Atomic pointer to network settings. pub type SettingsPtr = Arc; /// Defines the network settings. -#[derive(Clone)] +#[derive(Clone, Debug, Deserialize, StructOpt)] +#[structopt()] pub struct Settings { - pub inbound: Option, + #[structopt(short, long)] + pub inbound: Option, + #[structopt(long, default_value = "0")] pub outbound_connections: u32, + #[structopt(long, default_value = "0")] pub manual_attempt_limit: u32, - + #[structopt(long, default_value = "8")] pub seed_query_timeout_seconds: u32, + #[structopt(long, default_value = "10")] pub connect_timeout_seconds: u32, + #[structopt(long, default_value = "4")] pub channel_handshake_seconds: u32, + #[structopt(long, default_value = "10")] pub channel_heartbeat_seconds: u32, - - pub external_addr: Option, - pub peers: Vec, - pub seeds: Vec, + #[structopt(short, long)] + pub external_addr: Option, + #[structopt(short, long)] + pub peers: Vec, + #[structopt(short, long)] + pub seeds: Vec, } impl Default for Settings {