net3: WIP using Url & preparing for migrate to main net dir

This commit is contained in:
ghassmo
2022-04-26 02:24:58 +03:00
parent 7ef37c9e9a
commit bcb1b26b12
13 changed files with 109 additions and 106 deletions

View File

@@ -239,6 +239,7 @@ net3 = [
"rcgen",
"regex",
"rustls-pemfile",
"structopt",
"util",
"system",

View File

@@ -1,5 +1,4 @@
use async_std::stream::StreamExt;
use std::{net::SocketAddr, sync::Arc};
use async_std::{stream::StreamExt, sync::Arc};
use futures_rustls::TlsStream;
use smol::Executor;
@@ -10,10 +9,7 @@ use crate::{
Error, Result,
};
use super::{
TcpTransport, TlsTransport,
Channel, ChannelPtr, Transport,
};
use super::{Channel, ChannelPtr, TcpTransport, TlsTransport, Transport};
/// Atomic pointer to Acceptor class.
pub type AcceptorPtr = Arc<Acceptor>;
@@ -34,7 +30,7 @@ impl Acceptor {
/// thread, erroring if a connection problem occurs.
pub async fn start(
self: Arc<Self>,
accept_addr: SocketAddr,
accept_addr: Url,
executor: Arc<Executor<'_>>,
) -> Result<()> {
self.accept(accept_addr, executor);
@@ -54,7 +50,7 @@ impl Acceptor {
/// Run the accept loop in a new thread and error if a connection problem
/// occurs.
fn accept(self: Arc<Self>, accept_addr: SocketAddr, executor: Arc<Executor<'_>>) {
fn accept(self: Arc<Self>, accept_addr: Url, executor: Arc<Executor<'_>>) {
self.task.clone().start(
self.clone().run_accept_loop(accept_addr),
|result| self.handle_stop(result),
@@ -64,31 +60,31 @@ impl Acceptor {
}
/// Run the accept loop.
async fn run_accept_loop(self: Arc<Self>, accept_addr: SocketAddr) -> Result<()> {
let mut url = Url::parse(&accept_addr.to_string())?;
url.set_host(Some("tcp"))?;
match url.scheme() {
async fn run_accept_loop(self: Arc<Self>, accept_url: Url) -> Result<()> {
match accept_url.scheme() {
"tcp" => {
let transport = TcpTransport::new(None, 1024);
let listener = transport.listen_on(url)?.await?;
let listener = transport.listen_on(accept_url)?.await?;
let mut incoming = listener.incoming();
while let Some(stream) = incoming.next().await {
let stream = stream?;
let peer_addr = stream.peer_addr()?;
let mut peer_addr = Url::parse(&stream.peer_addr()?.to_string())?;
peer_addr.set_scheme("tcp")?;
let channel = Channel::new(Box::new(stream), peer_addr).await;
self.channel_subscriber.notify(Ok(channel)).await;
}
}
"tls" => {
let transport = TlsTransport::new(None, 1024);
let (acceptor, listener) = transport.listen_on(url)?.await?;
let (acceptor, listener) = transport.listen_on(accept_url)?.await?;
let mut incoming = listener.incoming();
while let Some(stream) = incoming.next().await {
let stream = stream?;
let peer_addr = stream.peer_addr()?;
let mut peer_addr = Url::parse(&stream.peer_addr()?.to_string())?;
peer_addr.set_scheme("tls")?;
let stream = acceptor.accept(stream).await?;
let channel = Channel::new(Box::new(TlsStream::Server(stream)), peer_addr).await;
let channel =
Channel::new(Box::new(TlsStream::Server(stream)), peer_addr).await;
self.channel_subscriber.notify(Ok(channel)).await;
}
}

View File

@@ -1,11 +1,8 @@
use async_std::{net::TcpStream, sync::Mutex};
use std::{
net::SocketAddr,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
use async_std::{
net::TcpStream,
sync::{Arc, Mutex},
};
use std::sync::atomic::{AtomicBool, Ordering};
use futures::{
io::{ReadHalf, WriteHalf},
@@ -16,6 +13,7 @@ use log::{debug, error, info};
use rand::Rng;
use serde_json::json;
use smol::Executor;
use url::Url;
use crate::{
system::{StoppableTask, StoppableTaskPtr, Subscriber, SubscriberPtr, Subscription},
@@ -69,7 +67,7 @@ impl<T: Stream> Stream for TlsStream<T> {}
pub struct Channel {
reader: Mutex<ReadHalf<Box<dyn Stream>>>,
writer: Mutex<WriteHalf<Box<dyn Stream>>>,
address: SocketAddr,
address: Url,
message_subsystem: MessageSubsystem,
stop_subscriber: SubscriberPtr<Error>,
receive_task: StoppableTaskPtr,
@@ -81,7 +79,7 @@ impl Channel {
/// Sets up a new channel. Creates a reader and writer TCP stream and
/// summons the message subscriber subsystem. Performs a network
/// handshake on the subsystem dispatchers.
pub async fn new(stream: Box<dyn Stream>, address: SocketAddr) -> Arc<Self> {
pub async fn new(stream: Box<dyn Stream>, address: Url) -> Arc<Self> {
let (reader, writer) = stream.split();
let reader = Mutex::new(reader);
let writer = Mutex::new(writer);
@@ -222,8 +220,8 @@ impl Channel {
}
/// Return the local socket address.
pub fn address(&self) -> SocketAddr {
self.address
pub fn address(&self) -> Url {
self.address.clone()
}
/// End of file error. Triggered when unexpected end of file occurs.

View File

@@ -1,5 +1,5 @@
use async_std::future::timeout;
use std::{net::SocketAddr, time::Duration};
use std::time::Duration;
use url::Url;
use crate::Result;
@@ -18,21 +18,19 @@ impl Connector {
}
/// Establish an outbound connection.
pub async fn connect(&self, hosturl: SocketAddr) -> Result<ChannelPtr> {
let mut url = Url::parse(&hosturl.to_string())?;
url.set_host(Some("tcp"))?;
pub async fn connect(&self, connect_url: Url) -> Result<ChannelPtr> {
let result =
timeout(Duration::from_secs(self.settings.connect_timeout_seconds.into()), async {
match url.scheme() {
match connect_url.scheme() {
"tcp" => {
let transport = TcpTransport::new(None, 1024);
let stream = transport.dial(url)?.await?;
Ok(Channel::new(Box::new(stream), hosturl).await)
let stream = transport.dial(connect_url.clone())?.await?;
Ok(Channel::new(Box::new(stream), connect_url).await)
}
"tls" => {
let transport = TlsTransport::new(None, 1024);
let stream = transport.dial(url)?.await?;
Ok(Channel::new(Box::new(stream), hosturl).await)
let stream = transport.dial(connect_url.clone())?.await?;
Ok(Channel::new(Box::new(stream), connect_url).await)
}
"tor" => todo!(),
_ => unimplemented!(),

View File

@@ -1,14 +1,14 @@
use async_std::sync::Mutex;
use std::{net::SocketAddr, sync::Arc};
use async_std::sync::{Arc, Mutex};
use fxhash::FxHashSet;
use url::Url;
/// Pointer to hosts class.
pub type HostsPtr = Arc<Hosts>;
/// Manages a store of network addresses.
pub struct Hosts {
addrs: Mutex<Vec<SocketAddr>>,
addrs: Mutex<Vec<Url>>,
}
impl Hosts {
@@ -18,20 +18,20 @@ impl Hosts {
}
/// Checks if a host address is in the host list.
async fn contains(&self, addrs: &[SocketAddr]) -> bool {
let a_set: FxHashSet<_> = addrs.iter().copied().collect();
async fn contains(&self, addrs: &[Url]) -> bool {
let a_set: FxHashSet<_> = addrs.iter().cloned().collect();
self.addrs.lock().await.iter().any(|item| a_set.contains(item))
}
/// Add a new host to the host list.
pub async fn store(&self, addrs: Vec<SocketAddr>) {
pub async fn store(&self, addrs: Vec<Url>) {
if !self.contains(&addrs).await {
self.addrs.lock().await.extend(addrs)
}
}
/// Return the list of hosts.
pub async fn load_all(&self) -> Vec<SocketAddr> {
pub async fn load_all(&self) -> Vec<Url> {
self.addrs.lock().await.clone()
}

View File

@@ -1,7 +1,8 @@
use std::{io, net::SocketAddr};
use std::io;
use futures::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
use log::debug;
use url::Url;
use crate::{
util::serial::{Decodable, Encodable, VarInt},
@@ -31,7 +32,7 @@ pub struct GetAddrsMessage {}
/// Sends address information to inbound connection. Response to GetAddrs
/// message.
pub struct AddrsMessage {
pub addrs: Vec<SocketAddr>,
pub addrs: Vec<Url>,
}
/// Requests version information of outbound connection.

View File

@@ -1,10 +1,11 @@
use async_std::sync::Mutex;
use std::{fmt, net::SocketAddr, sync::Arc};
use async_std::sync::{Arc, Mutex};
use std::fmt;
use async_executor::Executor;
use fxhash::{FxHashMap, FxHashSet};
use log::debug;
use serde_json::json;
use url::Url;
use crate::{
system::{Subscriber, SubscriberPtr, Subscription},
@@ -19,9 +20,9 @@ use super::{
};
/// List of channels that are awaiting connection.
pub type PendingChannels = Mutex<FxHashSet<SocketAddr>>;
pub type PendingChannels = Mutex<FxHashSet<Url>>;
/// List of connected channels.
pub type ConnectedChannels = Mutex<fxhash::FxHashMap<SocketAddr, Arc<Channel>>>;
pub type ConnectedChannels = Mutex<fxhash::FxHashMap<Url, Arc<Channel>>>;
/// Atomic pointer to p2p interface.
pub type P2pPtr = Arc<P2p>;
@@ -105,6 +106,7 @@ impl P2p {
let external_addr = self
.settings
.external_addr
.as_ref()
.map(|addr| serde_json::Value::from(addr.to_string()))
.unwrap_or(serde_json::Value::Null);
@@ -195,17 +197,17 @@ impl P2p {
}
/// Check whether a channel is stored in the list of connected channels.
pub async fn exists(&self, addr: &SocketAddr) -> bool {
pub async fn exists(&self, addr: &Url) -> bool {
self.channels.lock().await.contains_key(addr)
}
/// Add a channel to the list of pending channels.
pub async fn add_pending(&self, addr: SocketAddr) -> bool {
pub async fn add_pending(&self, addr: Url) -> bool {
self.pending.lock().await.insert(addr)
}
/// Remove a channel from the list of pending channels.
pub async fn remove_pending(&self, addr: &SocketAddr) {
pub async fn remove_pending(&self, addr: &Url) {
self.pending.lock().await.remove(addr);
}

View File

@@ -31,7 +31,7 @@ impl ProtocolSeed {
/// from settings, then adds that address to an address message and
/// sends it out over the channel.
pub async fn send_self_address(&self) -> Result<()> {
match self.settings.external_addr {
match self.settings.external_addr.clone() {
Some(addr) => {
debug!(target: "net", "ProtocolSeed::send_own_address() addr={}", addr);
let addr = message::AddrsMessage { addrs: vec![addr] };

View File

@@ -1,14 +1,11 @@
use async_std::sync::Mutex;
use std::{
net::SocketAddr,
sync::{Arc, Weak},
};
use async_std::sync::{Arc, Mutex, Weak};
use async_executor::Executor;
use async_trait::async_trait;
use fxhash::FxHashMap;
use log::{error, info};
use serde_json::json;
use url::Url;
use crate::{
system::{StoppableTask, StoppableTaskPtr},
@@ -35,7 +32,7 @@ pub struct InboundSession {
p2p: Weak<P2p>,
acceptor: AcceptorPtr,
accept_task: StoppableTaskPtr,
connect_infos: Mutex<FxHashMap<SocketAddr, InboundInfo>>,
connect_infos: Mutex<FxHashMap<Url, InboundInfo>>,
}
impl InboundSession {
@@ -55,9 +52,9 @@ impl InboundSession {
/// the address is not configured. Then runs the channel subscription
/// loop.
pub async fn start(self: Arc<Self>, executor: Arc<Executor<'_>>) -> Result<()> {
match self.p2p().settings().inbound {
match self.p2p().settings().inbound.as_ref() {
Some(accept_addr) => {
self.clone().start_accept_session(accept_addr, executor.clone()).await?;
self.clone().start_accept_session(accept_addr.clone(), executor.clone()).await?;
}
None => {
info!(target: "net", "Not configured for accepting incoming connections.");
@@ -83,7 +80,7 @@ impl InboundSession {
/// Start accepting connections for inbound session.
async fn start_accept_session(
self: Arc<Self>,
accept_addr: SocketAddr,
accept_addr: Url,
executor: Arc<Executor<'_>>,
) -> Result<()> {
info!(target: "net", "Starting inbound session on {}", accept_addr);
@@ -125,7 +122,10 @@ impl InboundSession {
async fn manage_channel_for_get_info(&self, channel: ChannelPtr) {
let key = channel.address();
self.connect_infos.lock().await.insert(key, InboundInfo { channel: channel.clone() });
self.connect_infos
.lock()
.await
.insert(key.clone(), InboundInfo { channel: channel.clone() });
let stop_sub = channel.subscribe_stop().await;
stop_sub.receive().await;

View File

@@ -1,13 +1,10 @@
use async_std::sync::Mutex;
use std::{
net::SocketAddr,
sync::{Arc, Weak},
};
use async_std::sync::{Arc, Mutex, Weak};
use async_executor::Executor;
use async_trait::async_trait;
use log::*;
use serde_json::json;
use url::Url;
use crate::{
system::{StoppableTask, StoppableTaskPtr},
@@ -40,11 +37,11 @@ impl ManualSession {
}
}
pub async fn connect(self: Arc<Self>, addr: &SocketAddr, executor: Arc<Executor<'_>>) {
pub async fn connect(self: Arc<Self>, addr: &Url, executor: Arc<Executor<'_>>) {
let task = StoppableTask::new();
task.clone().start(
self.clone().channel_connect_loop(*addr, executor.clone()),
self.clone().channel_connect_loop(addr.clone(), executor.clone()),
// Ignore stop handler
|_| async {},
Error::ServiceStopped,
@@ -56,7 +53,7 @@ impl ManualSession {
pub async fn channel_connect_loop(
self: Arc<Self>,
addr: SocketAddr,
addr: Url,
executor: Arc<Executor<'_>>,
) -> Result<()> {
let connector = Connector::new(self.p2p().settings());
@@ -73,11 +70,11 @@ impl ManualSession {
break
}
self.p2p().add_pending(addr).await;
self.p2p().add_pending(addr.clone()).await;
info!(target: "net", "Connecting to manual outbound [{}]", addr);
match connector.connect(addr).await {
match connector.connect(addr.clone()).await {
Ok(channel) => {
// Blacklist goes here
@@ -108,7 +105,7 @@ impl ManualSession {
warn!(
target: "net",
"Suspending manual connection to [{}] after {} failed attempts.",
addr,
&addr,
attempts
);

View File

@@ -1,15 +1,12 @@
use async_std::sync::Mutex;
use std::{
fmt,
net::SocketAddr,
sync::{Arc, Weak},
};
use async_std::sync::{Arc, Mutex, Weak};
use std::fmt;
use async_executor::Executor;
use async_trait::async_trait;
use log::{error, info};
use rand::seq::SliceRandom;
use serde_json::json;
use url::Url;
use crate::{
system::{StoppableTask, StoppableTaskPtr},
@@ -44,14 +41,14 @@ impl fmt::Display for OutboundState {
#[derive(Clone)]
struct OutboundInfo {
addr: Option<SocketAddr>,
addr: Option<Url>,
channel: Option<ChannelPtr>,
state: OutboundState,
}
impl OutboundInfo {
async fn get_info(&self) -> serde_json::Value {
let addr = match self.addr {
let addr = match self.addr.as_ref() {
Some(addr) => serde_json::Value::String(addr.to_string()),
None => serde_json::Value::Null,
};
@@ -144,11 +141,11 @@ impl OutboundSession {
info!(target: "net", "#{} connecting to outbound [{}]", slot_number, addr);
{
let info = &mut self.slot_info.lock().await[slot_number as usize];
info.addr = Some(addr);
info.addr = Some(addr.clone());
info.state = OutboundState::Pending;
}
match connector.connect(addr).await {
match connector.connect(addr.clone()).await {
Ok(channel) => {
// Blacklist goes here
@@ -172,7 +169,7 @@ impl OutboundSession {
stop_sub.receive().await;
}
Err(err) => {
info!(target: "net", "Unable to connect to outbound [{}]: {}", addr, err);
info!(target: "net", "Unable to connect to outbound [{}]: {}", &addr, err);
{
let info = &mut self.slot_info.lock().await[slot_number as usize];
info.addr = None;
@@ -189,9 +186,9 @@ impl OutboundSession {
/// our own inbound address, then checks whether it is already connected
/// (exists) or connecting (pending). Keeps looping until address is
/// found that passes all checks.
async fn load_address(&self, slot_number: u32) -> Result<SocketAddr> {
async fn load_address(&self, slot_number: u32) -> Result<Url> {
let p2p = self.p2p();
let self_inbound_addr = p2p.settings().external_addr;
let self_inbound_addr = p2p.settings().external_addr.clone();
let mut addrs;
@@ -208,7 +205,7 @@ impl OutboundSession {
}
// Obtain a lock on this address to prevent duplicate connections
if !p2p.add_pending(addr).await {
if !p2p.add_pending(addr.clone()).await {
continue
}
@@ -225,7 +222,7 @@ impl OutboundSession {
/// Checks whether an address is our own inbound address to avoid connecting
/// to ourselves.
fn is_self_inbound(addr: &SocketAddr, inbound_addr: &Option<SocketAddr>) -> bool {
fn is_self_inbound(addr: &Url, inbound_addr: &Option<Url>) -> bool {
match inbound_addr {
Some(inbound_addr) => inbound_addr == addr,
// No inbound listening address configured

View File

@@ -1,14 +1,14 @@
use async_std::future::timeout;
use std::{
net::SocketAddr,
use async_std::{
future::timeout,
sync::{Arc, Weak},
time::Duration,
};
use std::time::Duration;
use async_executor::Executor;
use async_trait::async_trait;
use log::*;
use serde_json::json;
use url::Url;
use crate::{Error, Result};
@@ -44,7 +44,7 @@ impl SeedSession {
let mut tasks = Vec::new();
for (i, seed) in settings.seeds.iter().enumerate() {
tasks.push(executor.spawn(self.clone().start_seed(i, *seed, executor.clone())));
tasks.push(executor.spawn(self.clone().start_seed(i, seed.clone(), executor.clone())));
}
// This line loops through all the tasks and waits for them to finish.
@@ -83,7 +83,7 @@ impl SeedSession {
async fn start_seed(
self: Arc<Self>,
seed_index: usize,
seed: SocketAddr,
seed: Url,
executor: Arc<Executor<'_>>,
) -> Result<()> {
debug!(target: "net", "SeedSession::start_seed(i={}) [START]", seed_index);
@@ -93,7 +93,7 @@ impl SeedSession {
};
let connector = Connector::new(settings.clone());
match connector.connect(seed).await {
match connector.connect(seed.clone()).await {
Ok(channel) => {
// Blacklist goes here

View File

@@ -1,23 +1,36 @@
use std::{net::SocketAddr, sync::Arc};
use std::sync::Arc;
use serde::Deserialize;
use structopt::StructOpt;
use url::Url;
/// Atomic pointer to network settings.
pub type SettingsPtr = Arc<Settings>;
/// Defines the network settings.
#[derive(Clone)]
#[derive(Clone, Debug, Deserialize, StructOpt)]
#[structopt()]
pub struct Settings {
pub inbound: Option<SocketAddr>,
#[structopt(short, long)]
pub inbound: Option<Url>,
#[structopt(long, default_value = "0")]
pub outbound_connections: u32,
#[structopt(long, default_value = "0")]
pub manual_attempt_limit: u32,
#[structopt(long, default_value = "8")]
pub seed_query_timeout_seconds: u32,
#[structopt(long, default_value = "10")]
pub connect_timeout_seconds: u32,
#[structopt(long, default_value = "4")]
pub channel_handshake_seconds: u32,
#[structopt(long, default_value = "10")]
pub channel_heartbeat_seconds: u32,
pub external_addr: Option<SocketAddr>,
pub peers: Vec<SocketAddr>,
pub seeds: Vec<SocketAddr>,
#[structopt(short, long)]
pub external_addr: Option<Url>,
#[structopt(short, long)]
pub peers: Vec<Url>,
#[structopt(short, long)]
pub seeds: Vec<Url>,
}
impl Default for Settings {