net: Minor behaviour improvements

This commit is contained in:
parazyd
2023-07-08 23:41:55 +02:00
parent 968550a680
commit c1100588d3
6 changed files with 52 additions and 59 deletions

View File

@@ -20,7 +20,7 @@ use std::time::{Duration, Instant};
use async_std::{future::timeout, sync::Arc};
use async_trait::async_trait;
use log::{debug, error};
use log::{debug, error, warn};
use rand::{rngs::OsRng, Rng};
use smol::Executor;
@@ -106,6 +106,10 @@ impl ProtocolPing {
Err(_e) => {
// Pong timeout. We didn't receive any message back
// so close the connection.
warn!(
target: "net::protocol_ping::run_ping_pong()",
"Ping-Pong protocol timed out for {}", self.channel.address(),
);
self.channel.stop().await;
return Err(Error::ChannelStopped)
}

View File

@@ -19,6 +19,7 @@
use std::time::Duration;
use async_std::{future::timeout, sync::Arc};
use futures::future::join_all;
use log::{debug, error};
use smol::Executor;
@@ -98,8 +99,22 @@ impl ProtocolVersion {
let send = executor.spawn(self.clone().send_version());
let recv = executor.spawn(self.clone().recv_version());
send.await?;
recv.await?;
let rets = join_all(vec![send, recv]).await;
if let Err(e) = &rets[0] {
error!(
target: "net::protocol_version::exchange_versions()",
"send_version() failed: {}", e,
);
return Err(e.clone())
}
if let Err(e) = &rets[1] {
error!(
target: "net::protocol_version::exchange_versions()",
"recv_version() failed: {}", e,
);
return Err(e.clone())
}
debug!(
target: "net::protocol_version::exchange_versions()",
@@ -123,13 +138,6 @@ impl ProtocolVersion {
let verack_msg = self.verack_sub.receive().await?;
// Validate peer received version against our version.
// Seeds get ignored
if self.settings.seeds.contains(self.channel.address()) {
debug!(target: "net::protocol_version::send_version()", "Peer is a seed, skipping version");
debug!(target: "net::protocol_version::send_version()", "END => address={}", self.channel.address());
return Ok(())
}
debug!(
target: "net::protocol_version::send_version()",
"App version: {}, Recv version: {}",
@@ -152,6 +160,10 @@ impl ProtocolVersion {
}
// Versions are compatible
debug!(
target: "net::protocol_version::send_version()",
"END => address={}", self.channel.address(),
);
Ok(())
}
@@ -165,7 +177,7 @@ impl ProtocolVersion {
// Receive version message
let _version = self.version_sub.receive().await?;
//self.channel.set_remote_node_id(version.node_id.clone()).await;
// TODO: self.channel.set_remote_node_id(version.node_id.clone()).await;
// Send verack
let verack = VerackMessage { app_version: self.settings.app_version.clone() };

View File

@@ -37,15 +37,10 @@
//! function. This runs the version exchange protocol, stores the channel in the
//! p2p list of channels, and subscribes to a stop signal.
use std::time::Duration;
use async_std::{
future::timeout,
sync::{Arc, Weak},
};
use async_std::sync::{Arc, Weak};
use async_trait::async_trait;
use futures::future::join_all;
use log::{debug, error, info, warn};
use log::{debug, info, warn};
use smol::Executor;
use url::Url;
@@ -88,39 +83,20 @@ impl SeedSyncSession {
// Gather tasks so we can execute concurrently
let mut tasks = Vec::with_capacity(settings.seeds.len());
let conn_timeout = Duration::from_secs(settings.seed_query_timeout);
for (i, seed) in settings.seeds.iter().enumerate() {
let ex_ = executor.clone();
let self_ = self.clone();
tasks.push(async move {
let task = self_.clone().start_seed(i, seed.clone(), ex_.clone());
let result = timeout(conn_timeout, task).await;
match result {
Ok(t) => match t {
Ok(()) => {
info!(
target: "net::session::seedsync_session",
"[P2P] Seed #{} connected successfully", i,
);
}
Err(err) => {
warn!(
target: "net::session::seedsync_session",
"[P2P] Seed #{} connection failed: {}", i, err,
);
}
},
Err(_) => {
error!(
target: "net::session::seedsync_session",
"[P2P] Seed #{} timed out", i
);
}
if let Err(e) = self_.clone().start_seed(i, seed.clone(), ex_.clone()).await {
warn!(
target: "net::session::seedsync_session",
"[P2P] Seed #{} connection failed: {}", i, e,
);
}
});
}
// Poll concurrently
join_all(tasks).await;

View File

@@ -50,8 +50,6 @@ pub struct Settings {
pub outbound_connections: usize,
/// Manual connections retry limit, 0 for forever looping
pub manual_attempt_limit: usize,
/// Seed connection establishment timeout (in seconds)
pub seed_query_timeout: u64,
/// Outbound connection timeout (in seconds)
pub outbound_connect_timeout: u64,
/// Exchange versions (handshake) timeout (in seconds)
@@ -78,9 +76,8 @@ impl Default for Settings {
transport_mixing: true,
outbound_connections: 0,
manual_attempt_limit: 0,
seed_query_timeout: 30,
outbound_connect_timeout: 15,
channel_handshake_timeout: 4,
channel_handshake_timeout: 10,
channel_heartbeat_interval: 10,
localnet: false,
}
@@ -125,10 +122,6 @@ pub struct SettingsOpt {
#[structopt(skip)]
pub manual_attempt_limit: Option<usize>,
/// Seed connection establishment timeout in seconds
#[structopt(skip)]
pub seed_query_timeout: Option<u64>,
/// Connection establishment timeout in seconds
#[structopt(skip)]
pub outbound_connect_timeout: Option<u64>,
@@ -176,9 +169,8 @@ impl From<SettingsOpt> for Settings {
transport_mixing: opt.transport_mixing.unwrap_or(false),
outbound_connections: opt.outbound_connections.unwrap_or(0),
manual_attempt_limit: opt.manual_attempt_limit.unwrap_or(0),
seed_query_timeout: opt.seed_query_timeout.unwrap_or(30),
outbound_connect_timeout: opt.outbound_connect_timeout.unwrap_or(15),
channel_handshake_timeout: opt.channel_handshake_timeout.unwrap_or(4),
channel_handshake_timeout: opt.channel_handshake_timeout.unwrap_or(10),
channel_heartbeat_interval: opt.channel_heartbeat_interval.unwrap_or(10),
localnet: opt.localnet,
}

View File

@@ -21,6 +21,7 @@ use std::{io, time::Duration};
use async_rustls::{TlsAcceptor, TlsStream};
use async_std::net::{SocketAddr, TcpListener as AsyncStdTcpListener, TcpStream};
use async_trait::async_trait;
use log::debug;
use socket2::{Domain, Socket, TcpKeepalive, Type};
use url::Url;
@@ -67,6 +68,7 @@ impl TcpDialer {
socket_addr: SocketAddr,
timeout: Option<Duration>,
) -> Result<TcpStream> {
debug!(target: "net::tcp::do_dial", "Dialing {} with TCP...", socket_addr);
let socket = self.create_socket(socket_addr).await?;
let connection = if timeout.is_some() {

View File

@@ -18,8 +18,9 @@
use std::time::Duration;
use arti_client::{BootstrapBehavior, DataStream, TorClient};
use arti_client::{config::BoolOrAuto, DataStream, StreamPrefs, TorClient};
use async_std::future;
use log::debug;
use crate::Result;
@@ -40,15 +41,21 @@ impl TorDialer {
port: u16,
timeout: Option<Duration>,
) -> Result<DataStream> {
let client = TorClient::builder()
.bootstrap_behavior(BootstrapBehavior::OnDemand)
.create_unbootstrapped()?;
debug!(target: "net::tor::do_dial", "Dialing {}:{} with Tor...", host, port);
debug!(target: "net::tor::do_dial", "Bootstrapping...");
let client = TorClient::builder().create_bootstrapped().await?;
let mut stream_prefs = StreamPrefs::new();
stream_prefs.connect_to_onion_services(BoolOrAuto::Explicit(true));
if timeout.is_some() {
let res = future::timeout(timeout.unwrap(), client.connect((host, port))).await?;
let res = future::timeout(
timeout.unwrap(),
client.connect_with_prefs((host, port), &stream_prefs),
)
.await?;
return Ok(res?)
}
Ok(client.connect((host, port)).await?)
Ok(client.connect_with_prefs((host, port), &stream_prefs).await?)
}
}