diff --git a/src/net/protocol/protocol_ping.rs b/src/net/protocol/protocol_ping.rs index e43681793..a5152321d 100644 --- a/src/net/protocol/protocol_ping.rs +++ b/src/net/protocol/protocol_ping.rs @@ -20,7 +20,7 @@ use std::time::{Duration, Instant}; use async_std::{future::timeout, sync::Arc}; use async_trait::async_trait; -use log::{debug, error}; +use log::{debug, error, warn}; use rand::{rngs::OsRng, Rng}; use smol::Executor; @@ -106,6 +106,10 @@ impl ProtocolPing { Err(_e) => { // Pong timeout. We didn't receive any message back // so close the connection. + warn!( + target: "net::protocol_ping::run_ping_pong()", + "Ping-Pong protocol timed out for {}", self.channel.address(), + ); self.channel.stop().await; return Err(Error::ChannelStopped) } diff --git a/src/net/protocol/protocol_version.rs b/src/net/protocol/protocol_version.rs index e6bf6eb6e..5395a15f7 100644 --- a/src/net/protocol/protocol_version.rs +++ b/src/net/protocol/protocol_version.rs @@ -19,6 +19,7 @@ use std::time::Duration; use async_std::{future::timeout, sync::Arc}; +use futures::future::join_all; use log::{debug, error}; use smol::Executor; @@ -98,8 +99,22 @@ impl ProtocolVersion { let send = executor.spawn(self.clone().send_version()); let recv = executor.spawn(self.clone().recv_version()); - send.await?; - recv.await?; + let rets = join_all(vec![send, recv]).await; + if let Err(e) = &rets[0] { + error!( + target: "net::protocol_version::exchange_versions()", + "send_version() failed: {}", e, + ); + return Err(e.clone()) + } + + if let Err(e) = &rets[1] { + error!( + target: "net::protocol_version::exchange_versions()", + "recv_version() failed: {}", e, + ); + return Err(e.clone()) + } debug!( target: "net::protocol_version::exchange_versions()", @@ -123,13 +138,6 @@ impl ProtocolVersion { let verack_msg = self.verack_sub.receive().await?; // Validate peer received version against our version. - // Seeds get ignored - if self.settings.seeds.contains(self.channel.address()) { - debug!(target: "net::protocol_version::send_version()", "Peer is a seed, skipping version"); - debug!(target: "net::protocol_version::send_version()", "END => address={}", self.channel.address()); - return Ok(()) - } - debug!( target: "net::protocol_version::send_version()", "App version: {}, Recv version: {}", @@ -152,6 +160,10 @@ impl ProtocolVersion { } // Versions are compatible + debug!( + target: "net::protocol_version::send_version()", + "END => address={}", self.channel.address(), + ); Ok(()) } @@ -165,7 +177,7 @@ impl ProtocolVersion { // Receive version message let _version = self.version_sub.receive().await?; - //self.channel.set_remote_node_id(version.node_id.clone()).await; + // TODO: self.channel.set_remote_node_id(version.node_id.clone()).await; // Send verack let verack = VerackMessage { app_version: self.settings.app_version.clone() }; diff --git a/src/net/session/seedsync_session.rs b/src/net/session/seedsync_session.rs index b562935b1..8150cb774 100644 --- a/src/net/session/seedsync_session.rs +++ b/src/net/session/seedsync_session.rs @@ -37,15 +37,10 @@ //! function. This runs the version exchange protocol, stores the channel in the //! p2p list of channels, and subscribes to a stop signal. -use std::time::Duration; - -use async_std::{ - future::timeout, - sync::{Arc, Weak}, -}; +use async_std::sync::{Arc, Weak}; use async_trait::async_trait; use futures::future::join_all; -use log::{debug, error, info, warn}; +use log::{debug, info, warn}; use smol::Executor; use url::Url; @@ -88,39 +83,20 @@ impl SeedSyncSession { // Gather tasks so we can execute concurrently let mut tasks = Vec::with_capacity(settings.seeds.len()); - let conn_timeout = Duration::from_secs(settings.seed_query_timeout); for (i, seed) in settings.seeds.iter().enumerate() { let ex_ = executor.clone(); let self_ = self.clone(); tasks.push(async move { - let task = self_.clone().start_seed(i, seed.clone(), ex_.clone()); - let result = timeout(conn_timeout, task).await; - - match result { - Ok(t) => match t { - Ok(()) => { - info!( - target: "net::session::seedsync_session", - "[P2P] Seed #{} connected successfully", i, - ); - } - Err(err) => { - warn!( - target: "net::session::seedsync_session", - "[P2P] Seed #{} connection failed: {}", i, err, - ); - } - }, - Err(_) => { - error!( - target: "net::session::seedsync_session", - "[P2P] Seed #{} timed out", i - ); - } + if let Err(e) = self_.clone().start_seed(i, seed.clone(), ex_.clone()).await { + warn!( + target: "net::session::seedsync_session", + "[P2P] Seed #{} connection failed: {}", i, e, + ); } }); } + // Poll concurrently join_all(tasks).await; diff --git a/src/net/settings.rs b/src/net/settings.rs index 8c43cf69e..143fd142f 100644 --- a/src/net/settings.rs +++ b/src/net/settings.rs @@ -50,8 +50,6 @@ pub struct Settings { pub outbound_connections: usize, /// Manual connections retry limit, 0 for forever looping pub manual_attempt_limit: usize, - /// Seed connection establishment timeout (in seconds) - pub seed_query_timeout: u64, /// Outbound connection timeout (in seconds) pub outbound_connect_timeout: u64, /// Exchange versions (handshake) timeout (in seconds) @@ -78,9 +76,8 @@ impl Default for Settings { transport_mixing: true, outbound_connections: 0, manual_attempt_limit: 0, - seed_query_timeout: 30, outbound_connect_timeout: 15, - channel_handshake_timeout: 4, + channel_handshake_timeout: 10, channel_heartbeat_interval: 10, localnet: false, } @@ -125,10 +122,6 @@ pub struct SettingsOpt { #[structopt(skip)] pub manual_attempt_limit: Option, - /// Seed connection establishment timeout in seconds - #[structopt(skip)] - pub seed_query_timeout: Option, - /// Connection establishment timeout in seconds #[structopt(skip)] pub outbound_connect_timeout: Option, @@ -176,9 +169,8 @@ impl From for Settings { transport_mixing: opt.transport_mixing.unwrap_or(false), outbound_connections: opt.outbound_connections.unwrap_or(0), manual_attempt_limit: opt.manual_attempt_limit.unwrap_or(0), - seed_query_timeout: opt.seed_query_timeout.unwrap_or(30), outbound_connect_timeout: opt.outbound_connect_timeout.unwrap_or(15), - channel_handshake_timeout: opt.channel_handshake_timeout.unwrap_or(4), + channel_handshake_timeout: opt.channel_handshake_timeout.unwrap_or(10), channel_heartbeat_interval: opt.channel_heartbeat_interval.unwrap_or(10), localnet: opt.localnet, } diff --git a/src/net/transport/tcp.rs b/src/net/transport/tcp.rs index ba9101ca3..9afaea8b4 100644 --- a/src/net/transport/tcp.rs +++ b/src/net/transport/tcp.rs @@ -21,6 +21,7 @@ use std::{io, time::Duration}; use async_rustls::{TlsAcceptor, TlsStream}; use async_std::net::{SocketAddr, TcpListener as AsyncStdTcpListener, TcpStream}; use async_trait::async_trait; +use log::debug; use socket2::{Domain, Socket, TcpKeepalive, Type}; use url::Url; @@ -67,6 +68,7 @@ impl TcpDialer { socket_addr: SocketAddr, timeout: Option, ) -> Result { + debug!(target: "net::tcp::do_dial", "Dialing {} with TCP...", socket_addr); let socket = self.create_socket(socket_addr).await?; let connection = if timeout.is_some() { diff --git a/src/net/transport/tor.rs b/src/net/transport/tor.rs index 0c20677e6..d951ae057 100644 --- a/src/net/transport/tor.rs +++ b/src/net/transport/tor.rs @@ -18,8 +18,9 @@ use std::time::Duration; -use arti_client::{BootstrapBehavior, DataStream, TorClient}; +use arti_client::{config::BoolOrAuto, DataStream, StreamPrefs, TorClient}; use async_std::future; +use log::debug; use crate::Result; @@ -40,15 +41,21 @@ impl TorDialer { port: u16, timeout: Option, ) -> Result { - let client = TorClient::builder() - .bootstrap_behavior(BootstrapBehavior::OnDemand) - .create_unbootstrapped()?; + debug!(target: "net::tor::do_dial", "Dialing {}:{} with Tor...", host, port); + debug!(target: "net::tor::do_dial", "Bootstrapping..."); + let client = TorClient::builder().create_bootstrapped().await?; + let mut stream_prefs = StreamPrefs::new(); + stream_prefs.connect_to_onion_services(BoolOrAuto::Explicit(true)); if timeout.is_some() { - let res = future::timeout(timeout.unwrap(), client.connect((host, port))).await?; + let res = future::timeout( + timeout.unwrap(), + client.connect_with_prefs((host, port), &stream_prefs), + ) + .await?; return Ok(res?) } - Ok(client.connect((host, port)).await?) + Ok(client.connect_with_prefs((host, port), &stream_prefs).await?) } }