p2pnet: inbound and external addresses vectorized

This commit is contained in:
aggstam
2022-08-23 21:49:28 +03:00
parent 6a292a4d81
commit 6134dcdad7
40 changed files with 248 additions and 220 deletions

View File

@@ -24,11 +24,11 @@
# Participate in the consensus protocol
#consensus = false
# P2P accept address for the consensus protocol
#consensus_p2p_accept = "tls://127.0.0.1:8341"
# P2P accept addresses for the consensus protocol
#consensus_p2p_accept = ["tls://127.0.0.1:8341"]
# P2P external address for the consensus protocol
#consensus_p2p_external = "tls://127.0.0.1:8341"
# P2P external addresses for the consensus protocol
#consensus_p2p_external = ["tls://127.0.0.1:8341"]
# Connection slots for the consensus protocol
#consensus_slots = 8
@@ -45,11 +45,11 @@
# Peers JSON-RPC listen URL for clock synchronization
#consensus_peer_rpc = []
# P2P accept address for the syncing protocol
#sync_p2p_accept = "tls://127.0.0.1:8342"
# P2P accept addresses for the syncing protocol
#sync_p2p_accept = ["tls://127.0.0.1:8342"]
# P2P external address for the syncing protocol
#sync_p2p_external = "tls://127.0.0.1:8342"
# P2P external addresses for the syncing protocol
#sync_p2p_external = ["tls://127.0.0.1:8342"]
# Connection slots for the syncing protocol
#sync_slots = 8

View File

@@ -82,12 +82,12 @@ struct Args {
rpc_listen: Url,
#[structopt(long)]
/// P2P accept address for the consensus protocol
consensus_p2p_accept: Option<Url>,
/// P2P accept addresses for the consensus protocol (repeatable flag)
consensus_p2p_accept: Vec<Url>,
#[structopt(long)]
/// P2P external address for the consensus protocol
consensus_p2p_external: Option<Url>,
/// P2P external addresses for the consensus protocol (repeatable flag)
consensus_p2p_external: Vec<Url>,
#[structopt(long, default_value = "8")]
/// Connection slots for the consensus protocol
@@ -110,12 +110,12 @@ struct Args {
consensus_seed_rpc: Vec<Url>,
#[structopt(long)]
/// P2P accept address for the syncing protocol
sync_p2p_accept: Option<Url>,
/// P2P accept addresses for the syncing protocol (repeatable flag)
sync_p2p_accept: Vec<Url>,
#[structopt(long)]
/// P2P external address for the syncing protocol
sync_p2p_external: Option<Url>,
/// P2P external addresses for the syncing protocol (repeatable flag)
sync_p2p_external: Vec<Url>,
#[structopt(long, default_value = "8")]
/// Connection slots for the syncing protocol

View File

@@ -12,14 +12,14 @@ secret = "86MGNN31r3VxT4ULMmhQnMtV8pDnod339KwHwHCfabG2"
## Raft net settings
[net]
## P2P accept address
inbound="tls://127.0.0.1:13001"
## P2P accept addresses
inbound=["tls://127.0.0.1:13001"]
## Connection slots
outbound_connections=5
## P2P external address
external_addr="tls://127.0.0.1:13001"
## P2P external addresses
external_addr=["tls://127.0.0.1:13001"]
## Peers to connect to
#peers=["tls://127.0.0.1:13003"]

View File

@@ -21,11 +21,11 @@
# JSON-RPC listen URL
#rpc_listen = "tcp://127.0.0.1:8340"
# P2P accept address for the syncing protocol
#sync_p2p_accept = "tls://127.0.0.1:9342"
# P2P accept addresses for the syncing protocol
#sync_p2p_accept = ["tls://127.0.0.1:9342"]
# P2P external address for the syncing protocol
#sync_p2p_external = "tls://127.0.0.1:9342"
# P2P external addresses for the syncing protocol
#sync_p2p_external = ["tls://127.0.0.1:9342"]
# Connection slots for the syncing protocol
#sync_slots = 8

View File

@@ -77,12 +77,12 @@ struct Args {
rpc_listen: Url,
#[structopt(long)]
/// P2P accept address for the syncing protocol
sync_p2p_accept: Option<Url>,
/// P2P accept addresses for the syncing protocol
sync_p2p_accept: Vec<Url>,
#[structopt(long)]
/// P2P external address for the syncing protocol
sync_p2p_external: Option<Url>,
/// P2P external addresses for the syncing protocol
sync_p2p_external: Vec<Url>,
#[structopt(long, default_value = "8")]
/// Connection slots for the syncing protocol

View File

@@ -9,14 +9,14 @@ datastore="~/.config/ircd-inbound"
## Raft net settings
[net]
## P2P accept address
inbound="127.0.0.1:11002"
## P2P accept addresses
inbound=["127.0.0.1:11002"]
## Connection slots
#outbound_connections=5
## P2P external address
external_addr="127.0.0.1:11004"
## P2P external addresses
external_addr=["127.0.0.1:11004"]
## Peers to connect to
#peers=["127.0.0.1:11003"]

View File

@@ -9,14 +9,14 @@ datastore="~/.config/ircd-outbound"
## Raft net settings
[net]
## P2P accept address
# inbound="127.0.0.1:11002"
## P2P accept addresses
# inbound=["127.0.0.1:11002"]
## Connection slots
outbound_connections=5
## P2P external address
#external_addr="127.0.0.1:11002"
## P2P external addresses
#external_addr=["127.0.0.1:11002"]
## Peers to connect to
#peers=["127.0.0.1:11003"]

View File

@@ -9,14 +9,14 @@ datastore="~/.config/ircd-seed"
## Raft net settings
[net]
## P2P accept address
inbound="127.0.0.1:11001"
## P2P accept addresses
inbound=["127.0.0.1:11001"]
## Connection slots
# outbound_connections=5
## P2P external address
# external_addr="127.0.0.1:11001"
## P2P external addresses
# external_addr=["127.0.0.1:11001"]
## Peers to connect to
# peers=["127.0.0.1:11001"]

View File

@@ -12,14 +12,14 @@ autojoin = ["#dev"]
## Raft net settings
[net]
## P2P accept address
#inbound="tls://127.0.0.1:11002"
## P2P accept addresses
#inbound=["tls://127.0.0.1:11002"]
## Connection slots
outbound_connections=5
## P2P external address
#external_addr="tls://127.0.0.1:11002"
## P2P external addresses
#external_addr=["tls://127.0.0.1:11002"]
## Peers to connect to
#peers=["tls://127.0.0.1:11003"]

View File

@@ -9,14 +9,14 @@ datastore="~/.config/ircd-inbound"
## Raft net settings
[net]
## P2P accept address
inbound="127.0.0.1:11002"
## P2P accept addresses
inbound=["127.0.0.1:11002"]
## Connection slots
#outbound_connections=5
## P2P external address
external_addr="127.0.0.1:11004"
## P2P external addresses
external_addr=["127.0.0.1:11004"]
## Peers to connect to
#peers=["127.0.0.1:11003"]

View File

@@ -9,14 +9,14 @@ datastore="~/.config/ircd-outbound"
## Raft net settings
[net]
## P2P accept address
# inbound="127.0.0.1:11002"
## P2P accept addresses
# inbound=["127.0.0.1:11002"]
## Connection slots
outbound_connections=5
## P2P external address
#external_addr="127.0.0.1:11002"
## P2P external addresses
#external_addr=["127.0.0.1:11002"]
## Peers to connect to
#peers=["127.0.0.1:11003"]

View File

@@ -9,14 +9,14 @@ datastore="~/.config/ircd-seed"
## Raft net settings
[net]
## P2P accept address
inbound="127.0.0.1:11001"
## P2P accept addresses
inbound=["127.0.0.1:11001"]
## Connection slots
# outbound_connections=5
## P2P external address
# external_addr="127.0.0.1:11001"
## P2P external addresses
# external_addr=["127.0.0.1:11001"]
## Peers to connect to
# peers=["127.0.0.1:11001"]

View File

@@ -23,18 +23,18 @@ autojoin = ["#dev", "#memes", "#philosophy", "#markets", "#math", "#random"]
## Connection slots
outbound_connections=5
## P2P accept address
#inbound="tls://0.0.0.0:11002"
## P2P accept addresses
#inbound=["tls://0.0.0.0:11002"]
# ipv6 version:
#inbound="tls://[::]:11002"
#inbound=["tls://[::]:11002"]
## P2P external address
## Put your IP or hostname here
## P2P external addresses
## Put your IPs or hostnames here
## This is how people can reach you on the inbound port configured above
## You can also put an ipv6 address :)
#external_addr="tls://XXX.XXX.XXX.XXX:11002"
#external_addr=["tls://XXX.XXX.XXX.XXX:11002"]
# ipv6 version:
#external_addr="tls://[ipv6 address here]:11002"
#external_addr=["tls://[ipv6 address here]:11002"]
## Manually configured peers to connect to
#peers=["tls://127.0.0.1:11003"]

View File

@@ -6,8 +6,8 @@
## The default values are left commented. They can be overridden either by
## uncommenting, or by using the command-line.
# Daemon published url, common for all enabled networks
#url = "tcp://127.0.0.1"
# Daemon published urls, common for all enabled networks
#urls = ["tcp://127.0.0.1"]
## Per-network settings
#[network."darkfid_sync"]

View File

@@ -16,9 +16,9 @@ pub struct Args {
/// Configuration file to use
pub config: Option<String>,
#[structopt(long, default_value = "tcp://127.0.0.1")]
/// Daemon published url, common for all enabled networks
pub url: Url,
#[structopt(long)]
/// Daemon published urls, common for all enabled networks (repeatable flag)
pub urls: Vec<Url>,
#[structopt(short, parse(from_occurrences))]
/// Increase verbosity (-vvv supported)
@@ -40,8 +40,8 @@ pub struct NetInfo {
/// a map containing said configurations.
///
/// ```toml
/// [network."darkfid"]
/// port = 7650
/// [network."darkfid_sync"]
/// port = 33032
/// seeds = []
/// peers = []
/// ```

View File

@@ -23,12 +23,17 @@ const CONFIG_FILE_CONTENTS: &str = include_str!("../lilith_config.toml");
async fn spawn_network(
name: &str,
info: NetInfo,
mut url: Url,
urls: Vec<Url>,
ex: Arc<Executor<'_>>,
) -> Result<()> {
url.set_port(Some(info.port))?;
let mut full_urls = Vec::new();
for url in &urls {
let mut url = url.clone();
url.set_port(Some(info.port))?;
full_urls.push(url);
}
let network_settings = net::Settings {
inbound: Some(url.clone()),
inbound: full_urls.clone(),
seeds: info.seeds,
peers: info.peers,
outbound_connections: 0,
@@ -37,7 +42,12 @@ async fn spawn_network(
let p2p = net::P2p::new(network_settings).await;
info!("Starting seed network node for {} at: {}", name, url);
// Building ext_addr_vec string
let mut urls_vec = vec![];
for url in &full_urls {
urls_vec.push(url.as_ref().to_string());
}
info!("Starting seed network node for {} at: {:?}", name, urls_vec);
p2p.clone().start(ex.clone()).await?;
let _ex = ex.clone();
ex.spawn(async move {
@@ -72,9 +82,17 @@ async fn realmain(args: Args, ex: Arc<Executor<'_>>) -> Result<()> {
return Ok(())
}
// Setting urls
let mut urls = args.urls.clone();
if urls.is_empty() {
info!("Urls are not provided, will use: tcp://127.0.0.1");
let url = Url::parse("tcp://127.0.0.1")?;
urls.push(url);
}
// Spawn configured networks
for (name, info) in &configured_nets {
if let Err(e) = spawn_network(name, info.clone(), args.url.clone(), ex.clone()).await {
if let Err(e) = spawn_network(name, info.clone(), urls.clone(), ex.clone()).await {
error!("Failed starting {} P2P network seed: {}", name, e);
}
}

View File

@@ -9,14 +9,14 @@
## Raft net settings
[net]
## P2P accept address
#inbound="tcp://127.0.0.1:12002"
## P2P accept addresses
#inbound=["tcp://127.0.0.1:12002"]
## Connection slots
outbound_connections=5
## P2P external address
#external_addr="tls://127.0.0.1:12002"
## P2P external addresses
#external_addr=["tls://127.0.0.1:12002"]
## Peers to connect to
#peers=["tls://127.0.0.1:12003"]
@@ -37,4 +37,4 @@ seeds=["tls://lilith0.dark.fi:23331", "tls://lilith1.dark.fi:23331"]
secret = "7CkVuFgwTUpJn5Sv67Q3fyEDpa28yrSeL5Hg2GqQ4jfM"
[workspace."general"]
## Create with `taud --key-gen`
secret = "6ZkvojUcSRML7wSGc3AMmM5meyyEXLoykT23cyUUB6GM"
secret = "6ZkvojUcSRML7wSGc3AMmM5meyyEXLoykT23cyUUB6GM"

View File

@@ -24,11 +24,11 @@ rpc_listen = "tcp://127.0.0.1:8340"
# Participate in the consensus protocol
consensus = true
# P2P accept address for the consensus protocol
consensus_p2p_accept = "tls://127.0.0.1:8341"
# P2P accept addresses for the consensus protocol
consensus_p2p_accept = ["tls://127.0.0.1:8341"]
# P2P external address for the consensus protocol
consensus_p2p_external = "tls://127.0.0.1:8341"
# P2P external addresses for the consensus protocol
consensus_p2p_external = ["tls://127.0.0.1:8341"]
# Connection slots for the consensus protocol
#consensus_slots = 8
@@ -39,11 +39,11 @@ consensus_p2p_seed = ["tls://127.0.0.1:33033"]
# Peers to connect to for the consensus protocol
#consensus_p2p_peer = []
# P2P accept address for the syncing protocol
sync_p2p_accept = "tls://127.0.0.1:8342"
# P2P accept addresses for the syncing protocol
sync_p2p_accept = ["tls://127.0.0.1:8342"]
# P2P external address for the syncing protocol
sync_p2p_external = "tls://127.0.0.1:8342"
# P2P external addresses for the syncing protocol
sync_p2p_external = ["tls://127.0.0.1:8342"]
# Connection slots for the syncing protocol
#sync_slots = 8

View File

@@ -24,11 +24,11 @@ rpc_listen = "tcp://127.0.0.1:8440"
# Participate in the consensus protocol
consensus = true
# P2P accept address for the consensus protocol
consensus_p2p_accept = "tls://127.0.0.1:8441"
# P2P accept addresses for the consensus protocol
consensus_p2p_accept = ["tls://127.0.0.1:8441"]
# P2P external address for the consensus protocol
consensus_p2p_external = "tls://127.0.0.1:8441"
# P2P external addresses for the consensus protocol
consensus_p2p_external = ["tls://127.0.0.1:8441"]
# Connection slots for the consensus protocol
#consensus_slots = 8
@@ -45,11 +45,11 @@ consensus_seed_rpc = ["tcp://127.0.0.1:8340"]
# Peers JSON-RPC listen URL for clock synchronization
#consensus_peer_rpc = []
# P2P accept address for the syncing protocol
sync_p2p_accept = "tls://127.0.0.1:8442"
# P2P accept addresses for the syncing protocol
sync_p2p_accept = ["tls://127.0.0.1:8442"]
# P2P external address for the syncing protocol
sync_p2p_external = "tls://127.0.0.1:8442"
# P2P external addresses for the syncing protocol
sync_p2p_external = ["tls://127.0.0.1:8442"]
# Connection slots for the syncing protocol
#sync_slots = 8

View File

@@ -24,11 +24,11 @@ rpc_listen = "tcp://127.0.0.1:8540"
# Participate in the consensus protocol
consensus = false
# P2P accept address for the consensus protocol
#consensus_p2p_accept = "tls://127.0.0.1:8541"
# P2P accept addresses for the consensus protocol
#consensus_p2p_accept = ["tls://127.0.0.1:8541"]
# P2P external address for the consensus protocol
#consensus_p2p_external = "tls://127.0.0.1:8541"
# P2P external addressesfor the consensus protocol
#consensus_p2p_external = ["tls://127.0.0.1:8541"]
# Connection slots for the consensus protocol
#consensus_slots = 8
@@ -39,11 +39,11 @@ consensus = false
# Peers to connect to for the consensus protocol
#consensus_p2p_peer = []
# P2P accept address for the syncing protocol
sync_p2p_accept = "tls://127.0.0.1:8542"
# P2P accept addresses for the syncing protocol
sync_p2p_accept = ["tls://127.0.0.1:8542"]
# P2P external address for the syncing protocol
sync_p2p_external = "tls://127.0.0.1:8542"
# P2P external addresses for the syncing protocol
sync_p2p_external = ["tls://127.0.0.1:8542"]
# Connection slots for the syncing protocol
#sync_slots = 8

View File

@@ -21,11 +21,11 @@ database = "faucetd/blockchain"
# JSON-RPC listen URL
rpc_listen = "tls://127.0.0.1:8640"
# P2P accept address for the syncing protocol
sync_p2p_accept = "tls://127.0.0.1:8642"
# P2P accept addresses for the syncing protocol
sync_p2p_accept = ["tls://127.0.0.1:8642"]
# P2P external address for the syncing protocol
sync_p2p_external = "tls://127.0.0.1:8642"
# P2P external addresses for the syncing protocol
sync_p2p_external = ["tls://127.0.0.1:8642"]
# Connection slots for the syncing protocol
#sync_slots = 8

View File

@@ -6,8 +6,8 @@
## The default values are left commented. They can be overridden either by
## uncommenting, or by using the command-line.
# Daemon published url, common for all enabled networks
url = "tls://127.0.0.1"
# Daemon published urls, common for all enabled networks
urls = ["tls://127.0.0.1"]
## Per-network settings
[network."darkfid_sync"]

View File

@@ -1,14 +1,14 @@
# chat toml
[net]
## P2P accept address
#inbound="tls://127.0.0.1:11002"
## P2P accept addresses
#inbound=["tls://127.0.0.1:11002"]
## Connection slots
outbound_connections=5
## P2P external address
#external_addr="tls://127.0.0.1:11002"
## P2P external addresses
#external_addr=["tls://127.0.0.1:11002"]
## Peers to connect to
#peers=["tls://127.0.0.1:11003"]

View File

@@ -145,8 +145,8 @@ fn alice() -> Result<AppSettings> {
let ext_addr = Url::parse("tcp://127.0.0.1:51554").unwrap();
let net = Settings {
inbound: Some(inbound),
external_addr: Some(ext_addr),
inbound: vec![inbound],
external_addr: vec![ext_addr],
seeds: vec![seed],
..Default::default()
};
@@ -168,7 +168,7 @@ fn bob() -> Result<AppSettings> {
let seed = Url::parse("tcp://127.0.0.1:50515").unwrap();
let net = Settings {
inbound: None,
inbound: vec![],
outbound_connections: 5,
seeds: vec![seed],
..Default::default()

View File

@@ -48,9 +48,9 @@ impl ProgramOptions {
let programcli = DarkCli::parse();
let accept_addr = if let Some(accept_addr) = programcli.accept {
Some(accept_addr.parse()?)
vec![accept_addr.parse()?]
} else {
None
vec![]
};
let mut seed_addrs: Vec<url::Url> = vec![];

View File

@@ -9,11 +9,11 @@
# JSON-RPC listen URL
#rpc_listen = "tcp://127.0.0.1:9540"
# P2P accept address
#p2p_accept = "tls://127.0.0.1:9541"
# P2P accept addresses
#p2p_accept = ["tls://127.0.0.1:9541"]
# P2P external address
#p2p_external = "tls://127.0.0.1:9541"
# P2P external addresses
#p2p_external = ["tls://127.0.0.1:9541"]
# Connection slots
#slots = 8

View File

@@ -46,12 +46,12 @@ struct Args {
rpc_listen: Url,
#[structopt(long)]
/// P2P accept address
p2p_accept: Option<Url>,
/// P2P accept addresses (repeatable flag)
p2p_accept: Vec<Url>,
#[structopt(long)]
/// P2P external address
p2p_external: Option<Url>,
/// P2P external addresses (repeatable flag)
p2p_external: Vec<Url>,
#[structopt(long, default_value = "8")]
/// Connection slots

View File

@@ -12,11 +12,11 @@
# JSON-RPC listen URL
#rpc_listen = "tcp://127.0.0.1:13336"
# P2P accept address
#p2p_accept = "tls://127.0.0.1:13337"
# P2P accept addresses
#p2p_accept = ["tls://127.0.0.1:13337"]
# P2P external address
#p2p_external = "tls://127.0.0.1:13337"
# P2P external addresses
#p2p_external = ["tls://127.0.0.1:13337"]
# Connection slots
#slots = 8

View File

@@ -52,12 +52,12 @@ struct Args {
rpc_listen: Url,
#[structopt(long)]
/// P2P accept address
p2p_accept: Option<Url>,
/// P2P accept addresses (repeatable flag)
p2p_accept: Vec<Url>,
#[structopt(long)]
/// P2P external address
p2p_external: Option<Url>,
/// P2P external addresses (repeatable flag)
p2p_external: Vec<Url>,
#[structopt(long, default_value = "8")]
/// Connection slots

View File

@@ -13,10 +13,10 @@ folder = "fud0"
rpc_listen = "tls://127.0.0.1:13338"
# P2P accept address
p2p_accept = "tls://127.0.0.1:13339"
p2p_accept = ["tls://127.0.0.1:13339"]
# P2P external address
p2p_external = "tls://127.0.0.1:13339"
p2p_external = ["tls://127.0.0.1:13339"]
# Connection slots
#slots = 8

View File

@@ -13,10 +13,10 @@ folder = "fud1"
rpc_listen = "tls://127.0.0.1:13340"
# P2P accept address
p2p_accept = "tls://127.0.0.1:13341"
p2p_accept = ["tls://127.0.0.1:13341"]
# P2P external address
p2p_external = "tls://127.0.0.1:13341"
p2p_external = ["tls://127.0.0.1:13341"]
# Connection slots
#slots = 8

View File

@@ -3,7 +3,6 @@ set -e
# Start a tmux session with a lilith node and two fud nodes.
tmux new-session -d
tmux new-session -d
tmux send-keys "../../../../lilith -c lilith_config.toml" Enter
sleep 2

View File

@@ -28,7 +28,7 @@ pub struct Args {
pub rpc_listen: Url,
/// Inbound listen URL
#[structopt(long = "inbound")]
pub inbound_url: Option<Url>,
pub inbound_url: Vec<Url>,
/// Seed Urls
#[structopt(long = "seeds")]
pub seed_urls: Vec<Url>,

View File

@@ -109,15 +109,14 @@ impl P2p {
}
pub async fn get_info(&self) -> serde_json::Value {
let external_addr = self
.settings
.external_addr
.as_ref()
.map(|addr| serde_json::Value::from(addr.to_string()))
.unwrap_or(serde_json::Value::Null);
// Building ext_addr_vec string
let mut ext_addr_vec = vec![];
for ext_addr in &self.settings.external_addr {
ext_addr_vec.push(ext_addr.as_ref().to_string());
}
json!({
"external_addr": external_addr,
"external_addr": format!("{:?}", ext_addr_vec),
"session_manual": self.session_manual().await.get_info().await,
"session_inbound": self.session_inbound().await.get_info().await,
"session_outbound": self.session_outbound().await.get_info().await,

View File

@@ -3,7 +3,6 @@ use std::sync::Arc;
use async_trait::async_trait;
use log::debug;
use smol::Executor;
use url::Url;
use crate::{util::async_util, Result};
@@ -102,10 +101,10 @@ impl ProtocolAddress {
}
}
async fn send_addrs(self: Arc<Self>, addrs: Vec<Url>) -> Result<()> {
async fn send_my_addrs(self: Arc<Self>) -> Result<()> {
debug!(target: "net", "ProtocolAddress::send_addrs() [START]");
loop {
let addrs = addrs.clone();
let addrs = self.settings.external_addr.clone();
let addr_msg = message::AddrsMessage { addrs };
self.channel.clone().send(addr_msg).await?;
async_util::sleep(SEND_ADDR_SLEEP_SECONDS).await;
@@ -123,15 +122,9 @@ impl ProtocolBase for ProtocolAddress {
// if it's an outbound session + has an external address
// send our address
if type_id == SESSION_OUTBOUND && self.settings.external_addr.is_some() {
if type_id == SESSION_OUTBOUND && !self.settings.external_addr.is_empty() {
self.jobsman.clone().start(executor.clone());
self.jobsman
.clone()
.spawn(
self.clone().send_addrs(vec![self.settings.external_addr.clone().unwrap()]),
executor.clone(),
)
.await;
self.jobsman.clone().spawn(self.clone().send_my_addrs(), executor.clone()).await;
}
debug!(target: "net", "ProtocolAddress::start() [START]");

View File

@@ -37,19 +37,19 @@ impl ProtocolSeed {
Arc::new(Self { channel, hosts, settings, addr_sub })
}
/// Sends own external address over a channel. Imports own external address
/// from settings, then adds that address to an address message and
/// Sends own external addresses over a channel. Imports own external addresses
/// from settings, then adds that addresses to an address message and
/// sends it out over the channel.
pub async fn send_self_address(&self) -> Result<()> {
match self.settings.external_addr.clone() {
Some(addr) => {
debug!(target: "net", "ProtocolSeed::send_own_address() addr={}", addr);
let addr = message::AddrsMessage { addrs: vec![addr] };
Ok(self.channel.clone().send(addr).await?)
}
// Do nothing if external address is not configured
None => Ok(()),
// Do nothing if external address is not configured
if self.settings.external_addr.is_empty() {
return Ok(())
}
let addrs = self.settings.external_addr.clone();
debug!(target: "net", "ProtocolSeed::send_own_address() addrs={:?}", addrs);
let addrs = message::AddrsMessage { addrs };
Ok(self.channel.clone().send(addrs).await?)
}
}

View File

@@ -31,8 +31,8 @@ impl InboundInfo {
pub struct InboundSession {
p2p: Weak<P2p>,
acceptor: AcceptorPtr,
accept_task: StoppableTaskPtr,
connect_infos: Mutex<FxHashMap<Url, InboundInfo>>,
accept_tasks: Mutex<Vec<StoppableTaskPtr>>,
connect_infos: Mutex<Vec<FxHashMap<Url, InboundInfo>>>,
}
impl InboundSession {
@@ -43,8 +43,8 @@ impl InboundSession {
let self_ = Arc::new(Self {
p2p,
acceptor,
accept_task: StoppableTask::new(),
connect_infos: Mutex::new(FxHashMap::default()),
accept_tasks: Mutex::new(Vec::new()),
connect_infos: Mutex::new(Vec::new()),
});
let parent = Arc::downgrade(&self_);
@@ -55,57 +55,73 @@ impl InboundSession {
}
/// Starts the inbound session. Begins by accepting connections and fails if
/// the address is not configured. Then runs the channel subscription
/// the addresses are not configured. Then runs the channel subscription
/// loop.
pub async fn start(self: Arc<Self>, executor: Arc<Executor<'_>>) -> Result<()> {
match self.p2p().settings().inbound.as_ref() {
Some(accept_addr) => {
self.clone().start_accept_session(accept_addr.clone(), executor.clone()).await?;
}
None => {
info!(target: "net", "Not configured for accepting incoming connections.");
return Ok(())
}
if self.p2p().settings().inbound.is_empty() {
info!(target: "net", "Not configured for accepting incoming connections.");
return Ok(())
}
self.accept_task.clone().start(
self.clone().channel_sub_loop(executor.clone()),
// Ignore stop handler
|_| async {},
Error::NetworkServiceStopped,
executor,
);
// Activate mutex lock on connection slots.
let mut accept_tasks = self.accept_tasks.lock().await;
for (index, accept_addr) in self.p2p().settings().inbound.iter().enumerate() {
self.clone().start_accept_session(index, accept_addr.clone(), executor.clone()).await?;
let task = StoppableTask::new();
task.clone().start(
self.clone().channel_sub_loop(index, executor.clone()),
// Ignore stop handler
|_| async {},
Error::NetworkServiceStopped,
executor.clone(),
);
self.connect_infos.lock().await.push(FxHashMap::default());
accept_tasks.push(task);
}
Ok(())
}
/// Stops the inbound session.
pub async fn stop(&self) {
self.acceptor.stop().await;
self.accept_task.stop().await;
let accept_tasks = &*self.accept_tasks.lock().await;
for accept_task in accept_tasks {
accept_task.stop().await;
}
}
/// Start accepting connections for inbound session.
async fn start_accept_session(
self: Arc<Self>,
index: usize,
accept_addr: Url,
executor: Arc<Executor<'_>>,
) -> Result<()> {
info!(target: "net", "Starting inbound session on {}", accept_addr);
info!(target: "net", "#{} starting inbound session on {}", index, accept_addr);
let result = self.acceptor.clone().start(accept_addr, executor).await;
if let Err(err) = result.clone() {
error!(target: "net", "Error starting listener: {}", err);
error!(target: "net", "#{} error starting listener: {}", index, err);
}
result
}
/// Wait for all new channels created by the acceptor and call
/// setup_channel() on them.
async fn channel_sub_loop(self: Arc<Self>, executor: Arc<Executor<'_>>) -> Result<()> {
async fn channel_sub_loop(
self: Arc<Self>,
index: usize,
executor: Arc<Executor<'_>>,
) -> Result<()> {
let channel_sub = self.acceptor.clone().subscribe().await;
loop {
let channel = channel_sub.receive().await?;
// Spawn a detached task to process the channel
// This will just perform the channel setup then exit.
executor.spawn(self.clone().setup_channel(channel, executor.clone())).detach();
executor.spawn(self.clone().setup_channel(index, channel, executor.clone())).detach();
}
}
@@ -114,23 +130,22 @@ impl InboundSession {
/// channel.
async fn setup_channel(
self: Arc<Self>,
index: usize,
channel: ChannelPtr,
executor: Arc<Executor<'_>>,
) -> Result<()> {
info!(target: "net", "Connected inbound [{}]", channel.address());
info!(target: "net", "#{} connected inbound [{}]", index, channel.address());
self.clone().register_channel(channel.clone(), executor.clone()).await?;
self.manage_channel_for_get_info(channel).await;
self.manage_channel_for_get_info(index, channel).await;
Ok(())
}
async fn manage_channel_for_get_info(&self, channel: ChannelPtr) {
async fn manage_channel_for_get_info(&self, index: usize, channel: ChannelPtr) {
let key = channel.address();
self.connect_infos
.lock()
.await
self.connect_infos.lock().await[index]
.insert(key.clone(), InboundInfo { channel: channel.clone() });
let stop_sub = channel.subscribe_stop().await;
@@ -139,7 +154,7 @@ impl InboundSession {
stop_sub.unwrap().receive().await;
}
self.connect_infos.lock().await.remove(&key);
self.connect_infos.lock().await[index].remove(&key);
}
}
@@ -147,8 +162,9 @@ impl InboundSession {
impl Session for InboundSession {
async fn get_info(&self) -> serde_json::Value {
let mut infos = FxHashMap::default();
if let Some(accept_addr) = self.p2p().settings().inbound.as_ref() {
for (addr, info) in self.connect_infos.lock().await.iter() {
for (index, accept_addr) in self.p2p().settings().inbound.iter().enumerate() {
let connect_infos = &self.connect_infos.lock().await[index];
for (addr, info) in connect_infos {
let json_addr = json!({ "accept_addr": accept_addr });
let info = vec![json_addr, info.get_info().await];
infos.insert(addr.to_string(), info);

View File

@@ -3,7 +3,7 @@ use std::fmt;
use async_executor::Executor;
use async_trait::async_trait;
use log::{info, warn};
use log::{debug, info};
use rand::seq::SliceRandom;
use serde_json::json;
use url::Url;
@@ -224,20 +224,22 @@ impl OutboundSession {
return Ok(addr)
}
warn!(target: "net", "Hosts address pool is empty. Retrying connect slot #{}", slot_number);
debug!(target: "net", "Hosts address pool is empty. Retrying connect slot #{}", slot_number);
async_util::sleep(p2p.settings().outbound_retry_seconds).await;
}
}
/// Checks whether an address is our own inbound address to avoid connecting
/// Checks whether an address is in our own inbound addresses to avoid connecting
/// to ourselves.
fn is_self_inbound(addr: &Url, inbound_addr: &Option<Url>) -> bool {
match inbound_addr {
Some(inbound_addr) => inbound_addr == addr,
// No inbound listening address configured
None => false,
fn is_self_inbound(addr: &Url, inbound_addr: &Vec<Url>) -> bool {
for ext_addr in inbound_addr {
if ext_addr == addr {
return true
}
}
// No inbound listening address configured
false
}
}

View File

@@ -38,9 +38,8 @@ impl SeedSyncSession {
if settings.seeds.is_empty() {
warn!("Skipping seed sync process since no seeds are configured.");
// Store external address in hosts explicitly
match &settings.external_addr {
Some(addr) => self.p2p().hosts().store(vec![addr.clone()]).await,
None => (),
if !settings.external_addr.is_empty() {
self.p2p().hosts().store(settings.external_addr.clone()).await
}
return Ok(())

View File

@@ -11,7 +11,7 @@ pub type SettingsPtr = Arc<Settings>;
/// Default settings for the network. Can be manually configured.
#[derive(Clone, Debug)]
pub struct Settings {
pub inbound: Option<Url>,
pub inbound: Vec<Url>,
pub outbound_connections: u32,
pub manual_attempt_limit: u32,
pub seed_query_timeout_seconds: u32,
@@ -19,7 +19,7 @@ pub struct Settings {
pub channel_handshake_seconds: u32,
pub channel_heartbeat_seconds: u32,
pub outbound_retry_seconds: u64,
pub external_addr: Option<Url>,
pub external_addr: Vec<Url>,
pub peers: Vec<Url>,
pub seeds: Vec<Url>,
pub node_id: String,
@@ -28,7 +28,7 @@ pub struct Settings {
impl Default for Settings {
fn default() -> Self {
Self {
inbound: None,
inbound: Vec::new(),
outbound_connections: 0,
manual_attempt_limit: 0,
seed_query_timeout_seconds: 8,
@@ -36,7 +36,7 @@ impl Default for Settings {
channel_handshake_seconds: 4,
channel_heartbeat_seconds: 10,
outbound_retry_seconds: 20,
external_addr: None,
external_addr: Vec::new(),
peers: Vec::new(),
seeds: Vec::new(),
node_id: String::new(),
@@ -49,16 +49,18 @@ impl Default for Settings {
#[structopt()]
pub struct SettingsOpt {
/// P2P accept address
#[serde(default)]
#[structopt(long = "accept")]
pub inbound: Option<Url>,
pub inbound: Vec<Url>,
/// Connection slots
#[structopt(long = "slots")]
pub outbound_connections: Option<u32>,
/// P2P external address
#[serde(default)]
#[structopt(long)]
pub external_addr: Option<Url>,
pub external_addr: Vec<Url>,
/// Peer nodes to connect to
#[serde(default)]