Merge branch 'net_hostlist'

This commit is contained in:
lunar-mining
2024-01-20 13:43:41 +01:00
20 changed files with 2151 additions and 911 deletions

View File

@@ -1,6 +1,6 @@
## lilith configuration file
## lilith.localuration file
##
## Please make sure you go through all the settings so you can configure
## Please make sure you go through all the settings so you can.localure
## your daemon properly.
##
## The default values are left commented. They can be overridden either by
@@ -9,9 +9,6 @@
# JSON-RPC listen URL
#rpc_listen = "tcp://127.0.0.1:18927"
# Hosts .tsv file to use
#hosts_file="~/.config/darkfi/lilith_hosts.tsv"
## Per-network settings
#[network."darkfid_sync_v4"]
#accept_addrs = ["tcp+tls://0.0.0.0:33022"]
@@ -19,6 +16,8 @@
#peers = []
#version = "0.4.1"
#localnet = false
#hostlist ="~/.local/darkfi/lilith/darkfid_sync_hostlist.tsv"
#[network."darkfid_consensus_v4"]
#accept_addrs = ["tcp+tls://0.0.0.0:33023"]
@@ -26,6 +25,7 @@
#peers = []
#version = "0.4.1"
#localnet = false
#hostlist ="~/.local/darkfi/lilith/darkfid_consensus_hostlist.tsv"
#[network."darkirc_v4"]
#accept_addrs = ["tcp+tls://0.0.0.0:25551"]
@@ -33,6 +33,7 @@
#peers = []
#version = "0.4.1"
#localnet = false
#hostlist ="~/.local/darkfi/lilith/darkirc_hostlist.tsv"
#[network."taud_v4"]
#accept_addrs = ["tcp+tls://0.0.0.0:23331"]
@@ -40,3 +41,4 @@
#peers = []
#version = "0.4.1"
#localnet = false
#hostlist ="~/.local/darkfi/lilith/taud_hostlist.tsv"

View File

@@ -17,18 +17,17 @@
*/
use std::{
collections::{HashMap, HashSet, VecDeque},
path::Path,
collections::{HashMap, HashSet},
process::exit,
sync::Arc,
time::UNIX_EPOCH,
};
use async_trait::async_trait;
use futures::future::join_all;
use log::{debug, error, info, warn};
use semver::Version;
use smol::{
lock::{Mutex, MutexGuard, RwLock},
lock::{Mutex, MutexGuard},
stream::StreamExt,
Executor,
};
@@ -40,26 +39,21 @@ use url::Url;
use darkfi::{
async_daemonize, cli_desc,
net::{self, connector::Connector, protocol::ProtocolVersion, session::Session, P2p, P2pPtr},
net::{self, hosts::refinery::ping_node, P2p, P2pPtr},
rpc::{
jsonrpc::*,
server::{listen_and_serve, RequestHandler},
},
system::{sleep, StoppableTask, StoppableTaskPtr},
util::{
file::{load_file, save_file},
path::{expand_path, get_config_path},
},
util::path::get_config_path,
Error, Result,
};
const CONFIG_FILE: &str = "lilith_config.toml";
const CONFIG_FILE_CONTENTS: &str = include_str!("../lilith_config.toml");
/// Period in which the peer purge happens (in seconds)
const PURGE_PERIOD: u64 = 60;
/// Amount of hosts to try each purge iteration
const PROBE_HOSTS_N: u32 = 10;
/// Interval after which the refinery happens (in seconds)
const REFINERY_INTERVAL: u64 = 60;
#[derive(Clone, Debug, serde::Deserialize, StructOpt, StructOptToml)]
#[serde(default)]
@@ -73,10 +67,6 @@ struct Args {
/// Configuration file to use
pub config: Option<String>,
#[structopt(long, default_value = "~/.config/darkfi/lilith_hosts.tsv")]
/// Hosts .tsv file to use
pub hosts_file: String,
#[structopt(short, long)]
/// Set log file to ouput into
log: Option<String>,
@@ -98,10 +88,10 @@ impl Spawn {
async fn addresses(&self) -> Vec<JsonValue> {
self.p2p
.hosts()
.fetch_all()
.whitelist_fetch_all()
.await
.iter()
.map(|addr| JsonValue::String(addr.to_string()))
.map(|(addr, _url)| JsonValue::String(addr.to_string()))
.collect()
}
@@ -132,6 +122,8 @@ struct NetInfo {
pub version: Version,
/// Enable localnet hosts
pub localnet: bool,
/// Path to hostlist
pub hostlist: String,
}
/// Struct representing the daemon
@@ -143,99 +135,39 @@ struct Lilith {
}
impl Lilith {
/// Internal task to run a periodic purge of unreachable hosts
/// for a specific P2P network.
async fn periodic_purge(name: String, p2p: P2pPtr, ex: Arc<Executor<'_>>) -> Result<()> {
info!(target: "lilith", "Starting periodic host purge task for \"{}\"", name);
/// Periodically ping nodes on the whitelist. If they are still reachable, update their last
/// seen field. Otherwise, downgrade them to the greylist (i.e. do not broadcast them to other
/// peers).
async fn whitelist_refinery(name: String, p2p: P2pPtr) -> Result<()> {
info!(target: "lilith", "Starting whitelist refinery for \"{}\"", name);
let hosts = p2p.hosts();
// Initialize a growable ring buffer(VecDeque) to store known hosts
let ring_buffer = Arc::new(RwLock::new(VecDeque::<Url>::new()));
loop {
// Wait for next purge period
sleep(PURGE_PERIOD).await;
debug!(target: "lilith", "[{}] The Purge has started...", name);
sleep(REFINERY_INTERVAL).await;
// Check if new hosts exist and add them to the end of the ring buffer
let mut lock = ring_buffer.write().await;
let hosts = p2p.clone().hosts().fetch_all().await;
if hosts.len() != lock.len() {
// Since hosts are stored in a HashSet we have to check all of them
for host in hosts {
if !lock.contains(&host) {
lock.push_back(host);
}
}
if hosts.is_empty_whitelist().await {
warn!(target: "lilith", "Whitelist is empty! Cannot start refinery process");
continue
}
// Pick first up to PROBE_HOSTS_N hosts from the ring buffer
let mut purgers = vec![];
let mut index = 0;
while index <= PROBE_HOSTS_N {
match lock.pop_front() {
Some(host) => purgers.push(host),
None => break,
};
index += 1;
let (entry, position) = hosts.whitelist_fetch_random().await;
let url = &entry.0;
if !ping_node(url, p2p.clone()).await {
let (_addr, last_seen) = hosts.get_whitelist_entry_at_addr(url).await.unwrap();
hosts.greylist_store_or_update(&[(url.clone(), last_seen)]).await;
hosts.whitelist_remove(url, position).await;
debug!(target: "lilith", "Host {} is not responsive. Downgraded from whitelist", url);
continue
}
// Try to connect to them. If we can't reach them, remove them from our set.
let purgers_str: Vec<&str> = purgers.iter().map(|x| x.as_str()).collect();
debug!(target: "lilith", "[{}] Got: {:?}", name, purgers_str);
let mut tasks = vec![];
for host in &purgers {
let p2p_ = p2p.clone();
let ex_ = ex.clone();
let ring_buffer_ = ring_buffer.clone();
tasks.push(async move {
let session_out = p2p_.session_outbound();
let session_weak = Arc::downgrade(&session_out);
let connector = Connector::new(p2p_.settings(), session_weak);
debug!(target: "lilith", "Connecting to {}", host);
match connector.connect(host).await {
Ok((_url, channel)) => {
debug!(target: "lilith", "Connected successfully!");
let proto_ver = ProtocolVersion::new(
channel.clone(),
p2p_.settings().clone(),
p2p_.hosts().clone(),
)
.await;
let handshake_task = session_out.perform_handshake_protocols(
proto_ver,
channel.clone(),
ex_.clone(),
);
channel.clone().start(ex_.clone());
match handshake_task.await {
Ok(()) => {
debug!(target: "lilith", "Handshake success! Stopping channel.");
channel.stop().await;
// Push host back to the ring buffer
ring_buffer_.write().await.push_back(host.clone());
}
Err(e) => {
debug!(target: "lilith", "Handshake failure! {}", e);
p2p_.hosts().remove(host).await;
}
}
}
Err(e) => {
debug!(target: "lilith", "Failed to connect to {}, removing from set ({})", host, e);
// Remove from hosts set
p2p_.hosts().remove(host).await;
}
}
});
}
join_all(tasks).await;
// This node is active. Update the last seen field.
let last_seen = UNIX_EPOCH.elapsed().unwrap().as_secs();
hosts.whitelist_update_last_seen(url, last_seen, position).await;
}
}
@@ -271,57 +203,6 @@ impl RequestHandler for Lilith {
}
}
/// Attempt to read existing hosts tsv
fn load_hosts(path: &Path, networks: &[&str]) -> HashMap<String, HashSet<Url>> {
let mut saved_hosts = HashMap::new();
let contents = load_file(path);
if let Err(e) = contents {
warn!(target: "lilith", "Failed retrieving saved hosts: {}", e);
return saved_hosts
}
for line in contents.unwrap().lines() {
let data: Vec<&str> = line.split('\t').collect();
if networks.contains(&data[0]) {
let mut hosts = match saved_hosts.get(data[0]) {
Some(hosts) => hosts.clone(),
None => HashSet::new(),
};
let url = match Url::parse(data[1]) {
Ok(u) => u,
Err(e) => {
warn!(target: "lilith", "Skipping malformed url: {} ({})", data[1], e);
continue
}
};
hosts.insert(url);
saved_hosts.insert(data[0].to_string(), hosts);
}
}
saved_hosts
}
async fn save_hosts(path: &Path, networks: &[Spawn]) {
let mut tsv = String::new();
for spawn in networks {
for host in spawn.p2p.hosts().fetch_all().await {
tsv.push_str(&format!("{}\t{}\n", spawn.name, host.as_str()));
}
}
if !tsv.eq("") {
info!(target: "lilith", "Saving current hosts of spawned networks to: {:?}", path);
if let Err(e) = save_file(path, &tsv) {
error!(target: "lilith", "Failed saving hosts: {}", e);
}
}
}
/// Parse a TOML string for any configured network and return a map containing
/// said configurations.
fn parse_configured_networks(data: &str) -> Result<HashMap<String, NetInfo>> {
@@ -337,6 +218,11 @@ fn parse_configured_networks(data: &str) -> Result<HashMap<String, NetInfo>> {
continue
}
if !table.contains_key("hostlist") {
error!(target: "lilith", "Hostlist path is mandatory! Configure and try again.");
exit(1)
}
let name = net.0.to_string();
let accept_addrs: Vec<Url> = table["accept_addrs"]
.as_array()
@@ -383,7 +269,9 @@ fn parse_configured_networks(data: &str) -> Result<HashMap<String, NetInfo>> {
semver::Version::parse(option_env!("CARGO_PKG_VERSION").unwrap_or("0.0.0"))?
};
let net_info = NetInfo { accept_addrs, seeds, peers, version, localnet };
let hostlist: String = table["hostlist"].as_str().unwrap().to_string();
let net_info = NetInfo { accept_addrs, seeds, peers, version, localnet, hostlist };
ret.insert(name, net_info);
}
}
@@ -392,12 +280,7 @@ fn parse_configured_networks(data: &str) -> Result<HashMap<String, NetInfo>> {
Ok(ret)
}
async fn spawn_net(
name: String,
info: &NetInfo,
saved_hosts: &HashSet<Url>,
ex: Arc<Executor<'static>>,
) -> Result<Spawn> {
async fn spawn_net(name: String, info: &NetInfo, ex: Arc<Executor<'static>>) -> Result<Spawn> {
let mut listen_urls = vec![];
// Configure listen addrs for this network
@@ -415,6 +298,7 @@ async fn spawn_net(
inbound_connections: 512,
app_version: info.version.clone(),
localnet: info.localnet,
hostlist: info.hostlist.clone(),
allowed_transports: vec![
"tcp".to_string(),
"tcp+tls".to_string(),
@@ -429,10 +313,6 @@ async fn spawn_net(
// Create P2P instance
let p2p = P2p::new(settings, ex.clone()).await;
// Fill db with cached hosts
let hosts: Vec<Url> = saved_hosts.iter().cloned().collect();
p2p.hosts().store(&hosts).await;
let addrs_str: Vec<&str> = listen_urls.iter().map(|x| x.as_str()).collect();
info!(target: "lilith", "Starting seed network node for \"{}\" on {:?}", name, addrs_str);
p2p.clone().start().await?;
@@ -453,10 +333,6 @@ async fn realmain(args: Args, ex: Arc<Executor<'static>>) -> Result<()> {
exit(1);
}
// Retrieve any saved hosts for configured networks
let net_names: Vec<&str> = configured_nets.keys().map(|x| x.as_str()).collect();
let saved_hosts = load_hosts(&expand_path(&args.hosts_file)?, &net_names);
// Spawn configured networks
let mut networks = vec![];
for (name, info) in &configured_nets {
@@ -464,14 +340,7 @@ async fn realmain(args: Args, ex: Arc<Executor<'static>>) -> Result<()> {
// e.g. p2p_v3, p2p_v4, etc. Therefore we can spawn multiple networks
// and they would all be version-checked, so we avoid mismatches when
// seeding peers.
match spawn_net(
name.to_string(),
info,
saved_hosts.get(name).unwrap_or(&HashSet::new()),
ex.clone(),
)
.await
{
match spawn_net(name.to_string(), info, ex.clone()).await {
Ok(spawn) => networks.push(spawn),
Err(e) => {
error!(target: "lilith", "Failed to start P2P network seed for \"{}\": {}", name, e);
@@ -480,24 +349,24 @@ async fn realmain(args: Args, ex: Arc<Executor<'static>>) -> Result<()> {
}
}
// Set up main daemon and background tasks
// Set up main daemon and background refinery_tasks
let lilith = Arc::new(Lilith { networks, rpc_connections: Mutex::new(HashSet::new()) });
let mut periodic_tasks = HashMap::new();
let mut refinery_tasks = HashMap::new();
for network in &lilith.networks {
let name = network.name.clone();
let task = StoppableTask::new();
task.clone().start(
Lilith::periodic_purge(name.clone(), network.p2p.clone(), ex.clone()),
Lilith::whitelist_refinery(name.clone(), network.p2p.clone()),
|res| async move {
match res {
Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ }
Err(e) => error!(target: "lilith", "Failed starting periodic task for \"{}\": {}", name, e),
Err(e) => error!(target: "lilith", "Failed starting refinery task for \"{}\": {}", name, e),
}
},
Error::DetachedTaskStopped,
ex.clone(),
);
periodic_tasks.insert(network.name.clone(), task);
refinery_tasks.insert(network.name.clone(), task);
}
// JSON-RPC server
@@ -521,16 +390,13 @@ async fn realmain(args: Args, ex: Arc<Executor<'static>>) -> Result<()> {
signals_handler.wait_termination(signals_task).await?;
info!(target: "lilith", "Caught termination signal, cleaning up and exiting...");
// Save in-memory hosts to tsv file
save_hosts(&expand_path(&args.hosts_file)?, &lilith.networks).await;
info!(target: "lilith", "Stopping JSON-RPC server...");
rpc_task.stop().await;
// Cleanly stop p2p networks
for spawn in &lilith.networks {
info!(target: "lilith", "Stopping \"{}\" periodic task", spawn.name);
periodic_tasks.get(&spawn.name).unwrap().stop().await;
info!(target: "lilith", "Stopping \"{}\" task", spawn.name);
refinery_tasks.get(&spawn.name).unwrap().stop().await;
info!(target: "lilith", "Stopping \"{}\" P2P", spawn.name);
spawn.p2p.stop().await;
}

View File

@@ -190,6 +190,9 @@ pub enum Error {
#[error("P2P network stopped")]
P2PNetworkStopped,
#[error("No matching hostlist entry")]
HostDoesNotExist,
// =============
// Crypto errors
// =============

View File

@@ -1,537 +0,0 @@
/* This file is part of DarkFi (https://dark.fi)
*
* Copyright (C) 2020-2024 Dyne.org foundation
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
use std::{
collections::{HashMap, HashSet},
sync::Arc,
};
use log::debug;
use rand::{prelude::IteratorRandom, rngs::OsRng};
use smol::lock::RwLock;
use url::Url;
use super::settings::SettingsPtr;
use crate::{
system::{Subscriber, SubscriberPtr, Subscription},
Result,
};
/// Atomic pointer to hosts object
pub type HostsPtr = Arc<Hosts>;
// An array containing all possible local host strings
// TODO: This could perhaps be more exhaustive?
pub const LOCAL_HOST_STRS: [&str; 2] = ["localhost", "localhost.localdomain"];
/// Manages a store of network addresses
pub struct Hosts {
/// Set of stored addresses
addrs: RwLock<HashSet<Url>>,
/// Set of stored addresses that are quarantined.
/// We quarantine peers we've been unable to connect to, but we keep them
/// around so we can potentially try them again, up to n tries. This should
/// be helpful in order to self-heal the p2p connections in case we have an
/// Internet interrupt (goblins unplugging cables)
quarantine: RwLock<HashMap<Url, usize>>,
/// Peers we reject from connecting
rejected: RwLock<HashSet<String>>,
/// Subscriber listening for store updates
store_subscriber: SubscriberPtr<usize>,
/// Pointer to configured P2P settings
settings: SettingsPtr,
}
impl Hosts {
/// Create a new hosts list>
pub fn new(settings: SettingsPtr) -> HostsPtr {
Arc::new(Self {
addrs: RwLock::new(HashSet::new()),
quarantine: RwLock::new(HashMap::new()),
rejected: RwLock::new(HashSet::new()),
store_subscriber: Subscriber::new(),
settings,
})
}
/// Append given addrs to the known set.
pub async fn store(&self, addrs: &[Url]) {
debug!(target: "net::hosts::store()", "hosts::store() [START]");
let filtered_addrs = self.filter_addresses(addrs).await;
let filtered_addrs_len = filtered_addrs.len();
if !filtered_addrs.is_empty() {
let mut addrs_map = self.addrs.write().await;
for addr in filtered_addrs {
debug!(target: "net::hosts::store()", "Inserting {}", addr);
addrs_map.insert(addr);
}
}
self.store_subscriber.notify(filtered_addrs_len).await;
debug!(target: "net::hosts::store()", "hosts::store() [END]");
}
pub async fn subscribe_store(&self) -> Result<Subscription<usize>> {
let sub = self.store_subscriber.clone().subscribe().await;
Ok(sub)
}
// Verify whether a URL is local.
// NOTE: This function is stateless and not specific to
// `Hosts`. For this reason, it might make more sense
// to move this function to a more appropriate location
// in the codebase.
pub async fn is_local_host(&self, url: Url) -> bool {
// Reject Urls without host strings.
if url.host_str().is_none() {
return false
}
// We do this hack in order to parse IPs properly.
// https://github.com/whatwg/url/issues/749
let addr = Url::parse(&url.as_str().replace(url.scheme(), "http")).unwrap();
// Filter private IP ranges
match addr.host().unwrap() {
url::Host::Ipv4(ip) => {
if !ip.is_global() {
return true
}
}
url::Host::Ipv6(ip) => {
if !ip.is_global() {
return true
}
}
url::Host::Domain(d) => {
if LOCAL_HOST_STRS.contains(&d) {
return true
}
}
}
false
}
/// Filter given addresses based on certain rulesets and validity.
async fn filter_addresses(&self, addrs: &[Url]) -> Vec<Url> {
debug!(target: "net::hosts::filter_addresses()", "Filtering addrs: {:?}", addrs);
let mut ret = vec![];
let localnet = self.settings.localnet;
'addr_loop: for addr_ in addrs {
// Validate that the format is `scheme://host_str:port`
if addr_.host_str().is_none() ||
addr_.port().is_none() ||
addr_.cannot_be_a_base() ||
addr_.path_segments().is_some()
{
continue
}
if self.is_rejected(addr_).await {
debug!(target: "net::hosts::filter_addresses()", "Peer {} is rejected", addr_);
continue
}
let host_str = addr_.host_str().unwrap();
if !localnet {
// Our own external addresses should never enter the hosts set.
for ext in &self.settings.external_addrs {
if host_str == ext.host_str().unwrap() {
continue 'addr_loop
}
}
}
// We do this hack in order to parse IPs properly.
// https://github.com/whatwg/url/issues/749
let addr = Url::parse(&addr_.as_str().replace(addr_.scheme(), "http")).unwrap();
// Filter non-global ranges if we're not allowing localnet.
// Should never be allowed in production, so we don't really care
// about some of them (e.g. 0.0.0.0, or broadcast, etc.).
if !localnet && self.is_local_host(addr).await {
continue
}
match addr_.scheme() {
// Validate that the address is an actual onion.
#[cfg(feature = "p2p-tor")]
"tor" | "tor+tls" => {
use std::str::FromStr;
if tor_hscrypto::pk::HsId::from_str(host_str).is_err() {
continue
}
debug!(target: "net::hosts::filter_addresses()", "[Tor] Valid: {}", host_str);
}
#[cfg(feature = "p2p-nym")]
"nym" | "nym+tls" => continue, // <-- Temp skip
#[cfg(feature = "p2p-tcp")]
"tcp" | "tcp+tls" => {
debug!(target: "net::hosts::filter_addresses()", "[TCP] Valid: {}", host_str);
}
_ => continue,
}
ret.push(addr_.clone());
}
ret
}
pub async fn remove(&self, url: &Url) {
debug!(target: "net::hosts::remove()", "Removing peer {}", url);
self.addrs.write().await.remove(url);
self.quarantine.write().await.remove(url);
}
/// Quarantine a peer.
/// If they've been quarantined for more than a configured limit, forget them.
pub async fn quarantine(&self, url: &Url) {
debug!(target: "net::hosts::remove()", "Quarantining peer {}", url);
// Remove from main hosts set
self.addrs.write().await.remove(url);
let mut q = self.quarantine.write().await;
if let Some(retries) = q.get_mut(url) {
*retries += 1;
debug!(target: "net::hosts::quarantine()", "Peer {} quarantined {} times", url, retries);
if *retries == self.settings.hosts_quarantine_limit {
debug!(target: "net::hosts::quarantine()", "Banning peer {}", url);
q.remove(url);
self.mark_rejected(url).await;
}
} else {
debug!(target: "net::hosts::remove()", "Added peer {} to quarantine", url);
q.insert(url.clone(), 0);
}
}
/// Check if a given peer (URL) is in the set of rejected hosts
pub async fn is_rejected(&self, peer: &Url) -> bool {
// Skip lookup for UNIX sockets and localhost connections
// as they should never belong to the list of rejected URLs.
let Some(hostname) = peer.host_str() else { return false };
if self.is_local_host(peer.clone()).await {
return false
}
self.rejected.read().await.contains(hostname)
}
/// Mark a peer as rejected by adding it to the set of rejected URLs.
pub async fn mark_rejected(&self, peer: &Url) {
// We ignore UNIX sockets here so we will just work
// with stuff that has host_str().
if let Some(hostname) = peer.host_str() {
// Localhost connections should not be rejected
// This however allows any Tor and Nym connections.
if self.is_local_host(peer.clone()).await {
return
}
self.rejected.write().await.insert(hostname.to_string());
}
}
/// Unmark a rejected peer
pub async fn unmark_rejected(&self, peer: &Url) {
if let Some(hostname) = peer.host_str() {
self.rejected.write().await.remove(hostname);
}
}
/// Check if the host list is empty.
pub async fn is_empty(&self) -> bool {
self.addrs.read().await.is_empty()
}
/// Check if host is already in the set
pub async fn contains(&self, addr: &Url) -> bool {
self.addrs.read().await.contains(addr)
}
/// Return all known hosts
pub async fn fetch_all(&self) -> Vec<Url> {
self.addrs.read().await.iter().cloned().collect()
}
/// Get up to n random peers from the hosts set.
pub async fn fetch_n_random(&self, n: u32) -> Vec<Url> {
let n = n as usize;
if n == 0 {
return vec![]
}
let addrs = self.addrs.read().await;
let urls = addrs.iter().choose_multiple(&mut OsRng, n.min(addrs.len()));
urls.iter().map(|&url| url.clone()).collect()
}
/// Get up to n random peers that match the given transport schemes from the hosts set.
pub async fn fetch_n_random_with_schemes(&self, schemes: &[String], n: u32) -> Vec<Url> {
let n = n as usize;
if n == 0 {
return vec![]
}
// Retrieve all peers corresponding to that transport schemes
let hosts = self.fetch_with_schemes(schemes, None).await;
if hosts.is_empty() {
return hosts
}
// Grab random ones
let urls = hosts.iter().choose_multiple(&mut OsRng, n.min(hosts.len()));
urls.iter().map(|&url| url.clone()).collect()
}
/// Get up to n random peers that don't match the given transport schemes from the hosts set.
pub async fn fetch_n_random_excluding_schemes(&self, schemes: &[String], n: u32) -> Vec<Url> {
let n = n as usize;
if n == 0 {
return vec![]
}
// Retrieve all peers not corresponding to that transport schemes
let hosts = self.fetch_exluding_schemes(schemes, None).await;
if hosts.is_empty() {
return hosts
}
// Grab random ones
let urls = hosts.iter().choose_multiple(&mut OsRng, n.min(hosts.len()));
urls.iter().map(|&url| url.clone()).collect()
}
/// Get up to limit peers that match the given transport schemes from the hosts set.
/// If limit was not provided, return all matching peers.
pub async fn fetch_with_schemes(&self, schemes: &[String], limit: Option<usize>) -> Vec<Url> {
let addrs = self.addrs.read().await;
let mut limit = match limit {
Some(l) => l.min(addrs.len()),
None => addrs.len(),
};
let mut ret = vec![];
if limit == 0 {
return ret
}
for addr in addrs.iter() {
if schemes.contains(&addr.scheme().to_string()) {
ret.push(addr.clone());
limit -= 1;
if limit == 0 {
return ret
}
}
}
// If we didn't find any, pick some from the quarantine zone
if ret.is_empty() {
for addr in self.quarantine.read().await.keys() {
if schemes.contains(&addr.scheme().to_string()) {
ret.push(addr.clone());
limit -= 1;
if limit == 0 {
break
}
}
}
}
ret
}
/// Get up to limit peers that don't match the given transport schemes from the hosts set.
/// If limit was not provided, return all matching peers.
pub async fn fetch_exluding_schemes(
&self,
schemes: &[String],
limit: Option<usize>,
) -> Vec<Url> {
let addrs = self.addrs.read().await;
let mut limit = match limit {
Some(l) => l.min(addrs.len()),
None => addrs.len(),
};
let mut ret = vec![];
if limit == 0 {
return ret
}
for addr in addrs.iter() {
if !schemes.contains(&addr.scheme().to_string()) {
ret.push(addr.clone());
limit -= 1;
if limit == 0 {
return ret
}
}
}
// If we didn't find any, pick some from the quarantine zone
if ret.is_empty() {
for addr in self.quarantine.read().await.keys() {
if !schemes.contains(&addr.scheme().to_string()) {
ret.push(addr.clone());
limit -= 1;
if limit == 0 {
break
}
}
}
}
ret
}
}
#[cfg(test)]
mod tests {
use super::{super::settings::Settings, *};
#[test]
fn test_store_localnet() {
smol::block_on(async {
let settings = Settings {
localnet: true,
external_addrs: vec![
Url::parse("tcp://foo.bar:123").unwrap(),
Url::parse("tcp://lol.cat:321").unwrap(),
],
..Default::default()
};
let hosts = Hosts::new(Arc::new(settings.clone()));
hosts.store(&settings.external_addrs).await;
for i in settings.external_addrs {
assert!(hosts.contains(&i).await);
}
let local_hosts = vec![
Url::parse("tcp://localhost:3921").unwrap(),
Url::parse("tcp://127.0.0.1:23957").unwrap(),
Url::parse("tcp://[::1]:21481").unwrap(),
Url::parse("tcp://192.168.10.65:311").unwrap(),
Url::parse("tcp://0.0.0.0:2312").unwrap(),
Url::parse("tcp://255.255.255.255:2131").unwrap(),
];
hosts.store(&local_hosts).await;
for i in local_hosts {
assert!(hosts.contains(&i).await);
}
let remote_hosts = vec![
Url::parse("tcp://dark.fi:80").unwrap(),
Url::parse("tcp://top.kek:111").unwrap(),
Url::parse("tcp://http.cat:401").unwrap(),
];
hosts.store(&remote_hosts).await;
for i in remote_hosts {
assert!(hosts.contains(&i).await);
}
});
}
#[test]
fn test_store() {
smol::block_on(async {
let settings = Settings {
localnet: false,
external_addrs: vec![
Url::parse("tcp://foo.bar:123").unwrap(),
Url::parse("tcp://lol.cat:321").unwrap(),
],
..Default::default()
};
let hosts = Hosts::new(Arc::new(settings.clone()));
hosts.store(&settings.external_addrs).await;
assert!(hosts.is_empty().await);
let local_hosts = vec![
Url::parse("tcp://localhost:3921").unwrap(),
Url::parse("tor://[::1]:21481").unwrap(),
Url::parse("tcp://192.168.10.65:311").unwrap(),
Url::parse("tcp+tls://0.0.0.0:2312").unwrap(),
Url::parse("tcp://255.255.255.255:2131").unwrap(),
];
hosts.store(&local_hosts).await;
assert!(hosts.is_empty().await);
let remote_hosts = vec![
Url::parse("tcp://dark.fi:80").unwrap(),
Url::parse("tcp://http.cat:401").unwrap(),
Url::parse("tcp://foo.bar:111").unwrap(),
];
hosts.store(&remote_hosts).await;
assert!(hosts.contains(&remote_hosts[0]).await);
assert!(hosts.contains(&remote_hosts[1]).await);
assert!(!hosts.contains(&remote_hosts[2]).await);
});
}
#[test]
fn test_is_local_host() {
smol::block_on(async {
let settings = Settings {
localnet: false,
external_addrs: vec![
Url::parse("tcp://foo.bar:123").unwrap(),
Url::parse("tcp://lol.cat:321").unwrap(),
],
..Default::default()
};
let hosts = Hosts::new(Arc::new(settings.clone()));
let local_hosts: Vec<Url> = vec![
Url::parse("tcp://localhost").unwrap(),
Url::parse("tcp://127.0.0.1").unwrap(),
Url::parse("tcp+tls://[::1]").unwrap(),
Url::parse("tcp://localhost.localdomain").unwrap(),
Url::parse("tcp://192.168.10.65").unwrap(),
];
for host in local_hosts {
eprintln!("{}", host);
assert!(hosts.is_local_host(host).await);
}
let remote_hosts: Vec<Url> = vec![
Url::parse("https://dyne.org").unwrap(),
Url::parse("tcp://77.168.10.65:2222").unwrap(),
Url::parse("tcp://[2345:0425:2CA1:0000:0000:0567:5673:23b5]").unwrap(),
Url::parse("http://eweiibe6tdjsdprb4px6rqrzzcsi22m4koia44kc5pcjr7nec2rlxyad.onion")
.unwrap(),
];
for host in remote_hosts {
assert!(!(hosts.is_local_host(host).await))
}
});
}
}

39
src/net/hosts/mod.rs Normal file
View File

@@ -0,0 +1,39 @@
/* This file is part of DarkFi (https://dark.fi)
*
* Copyright (C) 2020-2023 Dyne.org foundation
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
/// Periodically probe entries in the greylist. Randomly selects a greylist entry and tries to
/// establish a local connection to it using the method probe_node(), which creates a channel and
/// does a version exchange using perform_handshake_protocols().
///
/// If successful, the entry is removed from the greylist and added to the whitelist with an
/// updated last_seen timestamp. If non-successful, the entry is removed from the greylist.
///
/// The method probe_node() is also used by ProtocolSeed and ProtocolAddress. We try to establish
/// local connections to our own external addresses using probe_node() to ensure the address is valid
/// before propagating in ProtocolSeed and ProtocolAddress.
pub mod refinery;
/// The main interface for interacting with the hostlist, which is stored in three sections: white,
/// grey and anchorlists. The whitelist contains hosts that have been seen recently, the anchorlist
/// contains hosts that we have been able to establish a connection to, and the greylist is an
/// intermediary host list of recently received hosts that is periodically refreshed using the
/// greylist refinery.
///
/// Store contains various methods for reading from, quering and writing to the hostlists. It is
/// also responsible for filtering addresses and ensuring channel transport validity.
pub(super) mod store;

161
src/net/hosts/refinery.rs Normal file
View File

@@ -0,0 +1,161 @@
/* This file is part of DarkFi (https://dark.fi)
*
* Copyright (C) 2020-2023 Dyne.org foundation
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
use std::{sync::Arc, time::UNIX_EPOCH};
use log::{debug, warn};
use url::Url;
use super::super::p2p::{P2p, P2pPtr};
use crate::{
net::{connector::Connector, protocol::ProtocolVersion, session::Session},
system::{sleep, LazyWeak, StoppableTask, StoppableTaskPtr},
Error,
};
pub type GreylistRefineryPtr = Arc<GreylistRefinery>;
/// Probe random peers on the greylist. If a peer is responsive, update the last_seen field and
/// add it to the whitelist. If a node does not respond, remove it from the greylist.
/// Called periodically.
pub struct GreylistRefinery {
/// Weak pointer to parent p2p object
pub(in crate::net) p2p: LazyWeak<P2p>,
process: StoppableTaskPtr,
}
impl GreylistRefinery {
pub fn new() -> Arc<Self> {
Arc::new(Self { p2p: LazyWeak::new(), process: StoppableTask::new() })
}
pub async fn start(self: Arc<Self>) {
match self.p2p().hosts().load_hosts().await {
Ok(()) => {
debug!(target: "net::refinery::start()", "Load hosts successful!");
}
Err(e) => {
warn!(target: "net::refinery::start()", "Error loading hosts {}", e);
}
}
let ex = self.p2p().executor();
self.process.clone().start(
async move {
self.run().await;
unreachable!();
},
// Ignore stop handler
|_| async {},
Error::NetworkServiceStopped,
ex,
);
}
pub async fn stop(self: Arc<Self>) {
self.process.stop().await;
match self.p2p().hosts().save_hosts().await {
Ok(()) => {
debug!(target: "net::refinery::stop()", "Save hosts successful!");
}
Err(e) => {
warn!(target: "net::refinery::stop()", "Error saving hosts {}", e);
}
}
}
// Randomly select a peer on the greylist and probe it.
async fn run(self: Arc<Self>) {
loop {
sleep(self.p2p().settings().greylist_refinery_interval).await;
let hosts = self.p2p().hosts();
if hosts.is_empty_greylist().await {
warn!(target: "net::refinery::run()",
"Greylist is empty! Cannot start refinery process");
continue
}
let (entry, position) = hosts.greylist_fetch_random().await;
let url = &entry.0;
if !ping_node(url, self.p2p().clone()).await {
let mut greylist = hosts.greylist.write().await;
greylist.remove(position);
debug!(target: "net::refinery::run()", "Peer {} is not response. Removed from greylist", url);
continue
}
let last_seen = UNIX_EPOCH.elapsed().unwrap().as_secs();
// Append to the whitelist.
hosts.whitelist_store_or_update(&[(url.clone(), last_seen)]).await;
// Remove whitelisted peer from the greylist.
hosts.greylist_remove(url, position).await;
}
}
fn p2p(&self) -> P2pPtr {
self.p2p.upgrade()
}
}
// Ping a node to check it's online.
pub async fn ping_node(addr: &Url, p2p: P2pPtr) -> bool {
let session_outbound = p2p.session_outbound();
let parent = Arc::downgrade(&session_outbound);
let connector = Connector::new(p2p.settings(), parent);
debug!(target: "net::refinery::ping_node()", "Attempting to connect to {}", addr);
match connector.connect(addr).await {
Ok((_url, channel)) => {
debug!(target: "net::refinery::ping_node()", "Connected successfully!");
let proto_ver = ProtocolVersion::new(channel.clone(), p2p.settings()).await;
let handshake_task = session_outbound.perform_handshake_protocols(
proto_ver,
channel.clone(),
p2p.executor(),
);
channel.clone().start(p2p.executor());
match handshake_task.await {
Ok(()) => {
debug!(target: "net::refinery::ping_node()", "Handshake success! Stopping channel.");
channel.stop().await;
true
}
Err(e) => {
debug!(target: "net::refinery::ping_node()", "Handshake failure! {}", e);
channel.stop().await;
false
}
}
}
Err(e) => {
debug!(target: "net::refinery::ping_node()", "Failed to connect to {}, ({})", addr, e);
false
}
}
}

1384
src/net/hosts/store.rs Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -70,11 +70,11 @@ pub struct GetAddrsMessage {
impl_p2p_message!(GetAddrsMessage, "getaddr");
/// Sends address information to inbound connection.
/// Response to `GetAddrsMessage`.
#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
pub struct AddrsMessage {
pub addrs: Vec<Url>,
pub addrs: Vec<(Url, u64)>,
}
impl_p2p_message!(AddrsMessage, "addr");
/// Requests version information of outbound connection.

View File

@@ -16,6 +16,9 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
#[cfg(test)]
mod tests;
/// Defines how to decode generic messages as well as implementing the
/// common network messages that are sent between nodes as described
/// by the [`protocol`] submodule.

View File

@@ -30,7 +30,10 @@ use url::Url;
use super::{
channel::ChannelPtr,
dnet::DnetEvent,
hosts::{Hosts, HostsPtr},
hosts::{
refinery::{GreylistRefinery, GreylistRefineryPtr},
store::{Hosts, HostsPtr},
},
message::Message,
protocol::{protocol_registry::ProtocolRegistry, register_default_protocols},
session::{
@@ -81,6 +84,9 @@ pub struct P2p {
pub dnet_enabled: Mutex<bool>,
/// The subscriber for which we can give dnet info over
dnet_subscriber: SubscriberPtr<DnetEvent>,
// Greylist refinery process
greylist_refinery: Arc<GreylistRefinery>,
}
impl P2p {
@@ -111,12 +117,15 @@ impl P2p {
dnet_enabled: Mutex::new(false),
dnet_subscriber: Subscriber::new(),
greylist_refinery: GreylistRefinery::new(),
});
self_.session_manual.p2p.init(self_.clone());
self_.session_inbound.p2p.init(self_.clone());
self_.session_outbound.p2p.init(self_.clone());
self_.greylist_refinery.p2p.init(self_.clone());
register_default_protocols(self_.clone()).await;
self_
@@ -139,6 +148,9 @@ impl P2p {
return Err(err)
}
info!(target: "net::p2p::start()", "Starting greylist refinery process");
self.greylist_refinery.clone().start().await;
// Start the outbound session
self.session_outbound().start().await;
@@ -166,6 +178,9 @@ impl P2p {
self.session_manual().stop().await;
self.session_inbound().stop().await;
self.session_outbound().stop().await;
// Stop greylist refinery process
self.greylist_refinery().stop().await;
}
/// Broadcasts a message concurrently across all active channels.
@@ -218,9 +233,8 @@ impl P2p {
/// Add a channel to the set of connected channels
pub(super) async fn store(&self, channel: ChannelPtr) {
// TODO: Check the code path for this, and potentially also insert the remote
// into the hosts list?
self.channels.lock().await.insert(channel.address().clone(), channel.clone());
self.channel_subscriber.notify(Ok(channel)).await;
}
@@ -289,6 +303,11 @@ impl P2p {
self.session_outbound.clone()
}
/// Get pointer to greylist refinery
pub fn greylist_refinery(&self) -> GreylistRefineryPtr {
self.greylist_refinery.clone()
}
/// Enable network debugging
pub async fn dnet_enable(&self) {
*self.dnet_enabled.lock().await = true;

View File

@@ -57,7 +57,7 @@ pub use protocol_ping::ProtocolPing;
pub mod protocol_address;
pub use protocol_address::ProtocolAddress;
/// Seed servere protocol. Seed server is used when connecting to the network
/// Seed server protocol. Seed server is used when connecting to the network
/// for the first time. Returns a list of peers that nodes can connect to.
///
/// To start the seed protocol, we create a subscription to the address

View File

@@ -16,16 +16,16 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
use std::sync::Arc;
use std::{sync::Arc, time::UNIX_EPOCH};
use async_trait::async_trait;
use log::debug;
use log::{debug, warn};
use smol::Executor;
use super::{
super::{
channel::ChannelPtr,
hosts::HostsPtr,
hosts::{refinery::ping_node, store::HostsPtr},
message::{AddrsMessage, GetAddrsMessage},
message_subscriber::MessageSubscription,
p2p::P2pPtr,
@@ -35,9 +35,11 @@ use super::{
protocol_base::{ProtocolBase, ProtocolBasePtr},
protocol_jobs_manager::{ProtocolJobsManager, ProtocolJobsManagerPtr},
};
use crate::{system::sleep, Result};
use crate::Result;
/// Defines address and get-address messages
/// Defines address and get-address messages.
/// On receiving GetAddr, nodes send an AddrMessage containing whitelisted nodes.
/// On receiving an AddrMessage, nodes enter the info into their greylists.
pub struct ProtocolAddress {
channel: ChannelPtr,
addrs_sub: MessageSubscription<AddrsMessage>,
@@ -45,6 +47,7 @@ pub struct ProtocolAddress {
hosts: HostsPtr,
settings: SettingsPtr,
jobsman: ProtocolJobsManagerPtr,
p2p: P2pPtr,
}
const PROTO_NAME: &str = "ProtocolAddress";
@@ -72,12 +75,13 @@ impl ProtocolAddress {
hosts,
jobsman: ProtocolJobsManager::new(PROTO_NAME, channel),
settings,
p2p,
})
}
/// Handles receiving the address message. Loops to continually receive
/// address messages on the address subscription. Validates and adds the
/// received addresses to the hosts set.
/// received addresses to the greylist.
async fn handle_receive_addrs(self: Arc<Self>) -> Result<()> {
debug!(
target: "net::protocol_address::handle_receive_addrs()",
@@ -91,9 +95,12 @@ impl ProtocolAddress {
"Received {} addrs from {}", addrs_msg.addrs.len(), self.channel.address(),
);
// TODO: We might want to close the channel here if we're getting
// corrupted addresses.
self.hosts.store(&addrs_msg.addrs).await;
debug!(
target: "net::protocol_address::handle_receive_addrs()",
"Appending to greylist...",
);
self.hosts.greylist_store_or_update(&addrs_msg.addrs).await;
}
}
@@ -117,6 +124,9 @@ impl ProtocolAddress {
// TODO: Verify this limit. It should be the max number of all our allowed transports,
// plus their mixing.
if get_addrs_msg.transports.len() > 20 {
warn!(target: "net::protocol_address::handle_receive_get_addrs()",
"Sending empty Addrs message");
// TODO: Should this error out, effectively ending the connection?
let addrs_msg = AddrsMessage { addrs: vec![] };
self.channel.send(&addrs_msg).await?;
@@ -124,18 +134,22 @@ impl ProtocolAddress {
}
// First we grab address with the requested transports
debug!(target: "net::protocol_address::handle_receive_get_addrs()",
"Fetching whitelist entries with schemes");
let mut addrs = self
.hosts
.fetch_n_random_with_schemes(&get_addrs_msg.transports, get_addrs_msg.max)
.whitelist_fetch_n_random_with_schemes(&get_addrs_msg.transports, get_addrs_msg.max)
.await;
// Then we grab addresses without the requested transports
// to fill a 2 * max length vector.
debug!(target: "net::protocol_address::handle_receive_get_addrs()",
"Fetching whitelist entries without schemes");
let remain = 2 * get_addrs_msg.max - addrs.len() as u32;
addrs.append(
&mut self
.hosts
.fetch_n_random_excluding_schemes(&get_addrs_msg.transports, remain)
.whitelist_fetch_n_random_excluding_schemes(&get_addrs_msg.transports, remain)
.await,
);
@@ -149,39 +163,71 @@ impl ProtocolAddress {
}
}
/// Periodically send our external addresses through the channel.
// If it's an outbound session, we have an extern_addr, and address advertising
// is enabled, send our address.
async fn send_my_addrs(self: Arc<Self>) -> Result<()> {
debug!(target: "net::protocol_address::send_my_addrs()", "[START]");
let type_id = self.channel.session_type_id();
if type_id != SESSION_OUTBOUND {
debug!(target: "net::protocol_address::send_my_addrs()", "Not an outbound session. Stopping");
return Ok(())
}
if self.settings.external_addrs.is_empty() {
debug!(target: "net::protocol_address::send_my_addrs()", "External addr not configured. Stopping");
return Ok(())
}
// Do nothing if advertise is set to false
if !self.settings.advertise {
debug!(target: "net::protocol_address::send_my_addrs()", "Advertise is false. Stopping");
return Ok(())
}
debug!(
target: "net::protocol_address::send_my_addrs()",
"[START] address={}", self.channel.address(),
);
// FIXME: Revisit this. Why do we keep sending it?
loop {
let ext_addr_msg = AddrsMessage { addrs: self.settings.external_addrs.clone() };
self.channel.send(&ext_addr_msg).await?;
sleep(900).await;
let mut addrs = vec![];
for addr in self.settings.external_addrs.clone() {
debug!(target: "net::protocol_address::send_my_addrs()", "Attempting to ping self");
// See if we can do a version exchange with ourself.
if ping_node(&addr, self.p2p.clone()).await {
// We're online. Update last_seen and broadcast our address.
let last_seen = UNIX_EPOCH.elapsed().unwrap().as_secs();
addrs.push((addr, last_seen));
} else {
debug!(target: "net::protocol_address::send_my_addrs()", "Ping self failed");
return Ok(())
}
}
debug!(target: "net::protocol_address::send_my_addrs()", "Broadcasting address");
let ext_addr_msg = AddrsMessage { addrs };
self.channel.send(&ext_addr_msg).await?;
debug!(target: "net::protocol_address::send_my_addrs()", "[END]");
Ok(())
}
}
#[async_trait]
impl ProtocolBase for ProtocolAddress {
/// Starts the address protocol. Runs receive address and get address
/// protocols on the protocol task manager. Then sends get-address msg.
/// Starts the address protocol. If it's an outbound session, has an external address
/// is set to advertise, pings our external address and sends it if everything is fine.
/// Runs receive address and get address protocols on the protocol task manager.
/// Then sends get-address msg.
async fn start(self: Arc<Self>, ex: Arc<Executor<'_>>) -> Result<()> {
debug!(target: "net::protocol_address::start()", "START => address={}", self.channel.address());
let type_id = self.channel.session_type_id();
self.jobsman.clone().start(ex.clone());
// If it's an outbound session + has an extern_addr, send our address.
if type_id == SESSION_OUTBOUND && !self.settings.external_addrs.is_empty() {
self.jobsman.clone().spawn(self.clone().send_my_addrs(), ex.clone()).await;
}
self.jobsman.clone().spawn(self.clone().send_my_addrs(), ex.clone()).await;
self.jobsman.clone().spawn(self.clone().handle_receive_addrs(), ex.clone()).await;
self.jobsman.spawn(self.clone().handle_receive_get_addrs(), ex).await;
// Send get_address message.
@@ -194,7 +240,6 @@ impl ProtocolBase for ProtocolAddress {
debug!(target: "net::protocol_address::start()", "END => address={}", self.channel.address());
Ok(())
}
fn name(&self) -> &'static str {
PROTO_NAME
}

View File

@@ -16,7 +16,7 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
use std::sync::Arc;
use std::{sync::Arc, time::UNIX_EPOCH};
use async_trait::async_trait;
use log::debug;
@@ -25,7 +25,7 @@ use smol::Executor;
use super::{
super::{
channel::ChannelPtr,
hosts::HostsPtr,
hosts::store::HostsPtr,
message::{AddrsMessage, GetAddrsMessage},
message_subscriber::MessageSubscription,
p2p::P2pPtr,
@@ -33,7 +33,7 @@ use super::{
},
protocol_base::{ProtocolBase, ProtocolBasePtr},
};
use crate::Result;
use crate::{net::hosts::refinery::ping_node, Result};
/// Implements the seed protocol
pub struct ProtocolSeed {
@@ -41,6 +41,7 @@ pub struct ProtocolSeed {
hosts: HostsPtr,
settings: SettingsPtr,
addr_sub: MessageSubscription<AddrsMessage>,
p2p: P2pPtr,
}
const PROTO_NAME: &str = "ProtocolSeed";
@@ -55,28 +56,47 @@ impl ProtocolSeed {
let addr_sub =
channel.subscribe_msg::<AddrsMessage>().await.expect("Missing addr dispatcher!");
Arc::new(Self { channel, hosts, settings, addr_sub })
Arc::new(Self { channel, hosts, settings, addr_sub, p2p })
}
/// Sends own external addresses over a channel. Imports own external addresses
/// from settings, then adds those addresses to an addrs message and sends it
/// out over the channel.
pub async fn send_self_address(&self) -> Result<()> {
debug!(target: "net::protocol_seed::send_self_address()", "[START]");
pub async fn send_my_addrs(&self) -> Result<()> {
debug!(target: "net::protocol_seed::send_my_addrs()", "[START]");
// Do nothing if external addresses are not configured
if self.settings.external_addrs.is_empty() {
debug!(target: "net::protocol_seed::send_my_addrs()",
"External address is not configured. Stopping");
return Ok(())
}
let addrs = self.settings.external_addrs.clone();
debug!(
target: "net::protocol_seed::send_self_address()",
"ext_addrs={:?}, dest={}", addrs, self.channel.address(),
);
// Do nothing if advertise is set to false
if !self.settings.advertise {
debug!(target: "net::protocol_seed::send_my_addrs()",
"Advertise is set to false. Stopping");
return Ok(())
}
let mut addrs = vec![];
for addr in self.settings.external_addrs.clone() {
debug!(target: "net::protocol_seed::send_my_addrs()", "Attempting to ping self");
// See if we can do a version exchange with ourself.
if ping_node(&addr, self.p2p.clone()).await {
// We're online. Update last_seen and broadcast our address.
let last_seen = UNIX_EPOCH.elapsed().unwrap().as_secs();
addrs.push((addr, last_seen));
} else {
debug!(target: "net::protocol_seed::send_my_addrs()", "Ping self failed");
return Ok(())
}
}
debug!(target: "net::protocol_seed::send_my_addrs()", "Broadcasting address");
let ext_addr_msg = AddrsMessage { addrs };
self.channel.send(&ext_addr_msg).await?;
debug!(target: "net::protocol_seed::send_self_address()", "[END]");
debug!(target: "net::protocol_seed::send_my_addrs()", "[END]");
Ok(())
}
}
@@ -90,7 +110,7 @@ impl ProtocolBase for ProtocolSeed {
debug!(target: "net::protocol_seed::start()", "START => address={}", self.channel.address());
// Send own address to the seed server
self.send_self_address().await?;
self.send_my_addrs().await?;
// Send get address message
let get_addr = GetAddrsMessage {
@@ -105,7 +125,11 @@ impl ProtocolBase for ProtocolSeed {
target: "net::protocol_seed::start()",
"Received {} addrs from {}", addrs_msg.addrs.len(), self.channel.address(),
);
self.hosts.store(&addrs_msg.addrs).await;
debug!(
target: "net::protocol_seed::start()",
"Appending to greylist...",
);
self.hosts.greylist_store_or_update(&addrs_msg.addrs).await;
debug!(target: "net::protocol_seed::start()", "END => address={}", self.channel.address());
Ok(())

View File

@@ -24,7 +24,6 @@ use smol::Executor;
use super::super::{
channel::ChannelPtr,
hosts::HostsPtr,
message::{VerackMessage, VersionMessage},
message_subscriber::MessageSubscription,
settings::SettingsPtr,
@@ -38,13 +37,12 @@ pub struct ProtocolVersion {
version_sub: MessageSubscription<VersionMessage>,
verack_sub: MessageSubscription<VerackMessage>,
settings: SettingsPtr,
hosts: HostsPtr,
}
impl ProtocolVersion {
/// Create a new version protocol. Makes a version and version ack
/// subscription, then adds them to a version protocol instance.
pub async fn new(channel: ChannelPtr, settings: SettingsPtr, hosts: HostsPtr) -> Arc<Self> {
pub async fn new(channel: ChannelPtr, settings: SettingsPtr) -> Arc<Self> {
// Creates a versi5on subscription
let version_sub =
channel.subscribe_msg::<VersionMessage>().await.expect("Missing version dispatcher!");
@@ -53,7 +51,7 @@ impl ProtocolVersion {
let verack_sub =
channel.subscribe_msg::<VerackMessage>().await.expect("Missing verack dispatcher!");
Arc::new(Self { channel, version_sub, verack_sub, settings, hosts })
Arc::new(Self { channel, version_sub, verack_sub, settings })
}
/// Start version information exchange. Start the timer. Send version
@@ -79,7 +77,7 @@ impl ProtocolVersion {
);
// Remove from hosts
self.hosts.remove(self.channel.address()).await;
//self.hosts.remove(self.channel.address()).await;
self.channel.stop().await;
return Err(Error::ChannelTimeout)
}
@@ -153,7 +151,7 @@ impl ProtocolVersion {
self.channel.address(),
);
self.hosts.remove(self.channel.address()).await;
//self.hosts.remove(self.channel.address()).await;
self.channel.stop().await;
return Err(Error::ChannelStopped)
}

View File

@@ -126,13 +126,17 @@ impl ManualSession {
let stop_sub =
channel.subscribe_stop().await.expect("Channel should not be stopped");
// Channel is now connected but not yet setup
// Register the new channel
self.register_channel(channel.clone(), ex.clone()).await?;
// Channel is now connected but not yet setup
// Remove pending lock since register_channel will add the channel to p2p
self.p2p().remove_pending(&addr).await;
// Add this connection to the anchorlist, remove it from the [otherlist]
self.p2p().hosts().upgrade_host(&addr).await;
// Notify that channel processing has finished
self.channel_subscriber.notify(Ok(channel)).await;
@@ -142,6 +146,9 @@ impl ManualSession {
target: "net::manual_session",
"[P2P] Manual outbound disconnected [{}]", url,
);
// Downgrade this host to greylist if it's on the whitelist or anchorlist.
self.p2p().hosts().downgrade_host(&addr).await;
// DEV NOTE: Here we can choose to attempt reconnection again
return Ok(())
}
@@ -151,6 +158,9 @@ impl ManualSession {
"[P2P] Unable to connect to manual outbound [{}]: {}",
addr, e,
);
// Downgrade this host to greylist if it's on the whitelist or anchorlist.
//self.downgrade_host(&addr).await;
}
}

View File

@@ -95,9 +95,8 @@ pub trait Session: Sync {
p2p.protocol_registry().attach(self.type_id(), channel.clone(), p2p.clone()).await;
// Perform the handshake protocol
let protocol_version =
ProtocolVersion::new(channel.clone(), p2p.settings().clone(), p2p.hosts().clone())
.await;
let protocol_version = ProtocolVersion::new(channel.clone(), p2p.settings().clone()).await;
debug!(target: "net::session::register_channel()", "register_channel {}", channel.clone().address());
let handshake_task =
self.perform_handshake_protocols(protocol_version, channel.clone(), executor.clone());
@@ -134,8 +133,6 @@ pub trait Session: Sync {
// Perform handshake
protocol_version.run(executor.clone()).await?;
// Channel is now initialized
// Add channel to p2p
self.p2p().store(channel.clone()).await;

View File

@@ -35,8 +35,7 @@ use std::{
};
use async_trait::async_trait;
use log::{debug, error, info, trace, warn};
use rand::{prelude::SliceRandom, rngs::OsRng};
use log::{debug, error, info, warn};
use smol::lock::Mutex;
use url::Url;
@@ -127,6 +126,7 @@ impl OutboundSession {
fn wakeup_peer_discovery(&self) {
self.peer_discovery.notify()
}
async fn wakeup_slots(&self) {
let slots = &*self.slots.lock().await;
for slot in slots {
@@ -185,18 +185,58 @@ impl Slot {
self.process.stop().await
}
async fn fetch_address(&self, slot_count: usize, transports: &[String]) -> Option<(Url, u64)> {
let hosts = self.p2p().hosts();
let connects = self.p2p().settings().outbound_connections;
let white_count = connects * self.p2p().settings().white_connection_percent / 100;
let addrs = {
// Up to anchor_connection_count connections:
//
// Select from the anchorlist
// If the anchorlist is empty, select from the whitelist
// If the whitelist is empty, select from the greylist
// If the greylist is empty, do peer discovery
if slot_count < self.p2p().settings().anchor_connection_count {
debug!(target: "net::outbound_session::fetch_address()",
"First two connections- prefer anchor connections");
hosts.anchorlist_fetch_address(transports).await
}
// Up to white_connection_percent connections:
//
// Select from the whitelist
// If the whitelist is empty, select from the greylist
// If the greylist is empty, do peer discovery
else if slot_count < white_count {
debug!(target: "net::outbound_session::fetch_address()",
"Next N connections- prefer white connections");
hosts.whitelist_fetch_address(transports).await
}
// All other connections:
//
// Select from the greylist
// If the greylist is empty, do peer discovery
else {
debug!(target: "net::outbound_session::fetch_address()",
"All other connections- get grey connections");
hosts.greylist_fetch_address(transports).await
}
};
// Check whether:
// * we already have this connection established
// * we already have this configured as a manual peer
// * address is already pending a connection
hosts.check_address_with_lock(self.p2p(), addrs).await
}
// We first try to make connections to the addresses on our anchor list. We then find some
// whitelist connections according to the whitelist percent default. Finally, any remaining
// connections we make from the greylist.
async fn run(self: Arc<Self>) {
// This is the main outbound connection loop where we try to establish
// a connection in the slot. The `try_connect` function will block in
// case the connection was sucessfully established. If it fails, then
// we will wait for a defined number of seconds and try to fill the
// slot again. This function should never exit during the lifetime of
// the P2P network, as it is supposed to represent an outbound slot we
// want to fill.
// The actual connection logic and peer selection is in `try_connect`.
// If the connection is successful, `try_connect` will wait for a stop
// signal and then exit. Once it exits, we'll run `try_connect` again
// and attempt to fill the slot with another peer.
let hosts = self.p2p().hosts();
let slot_count = self.p2p().settings().outbound_connections;
loop {
// Activate the slot
debug!(
@@ -205,13 +245,12 @@ impl Slot {
self.slot,
);
// Retrieve whitelisted outbound transports
// Retrieve outbound transports
let transports = &self.p2p().settings().allowed_transports;
// Find an address to connect to. We also do peer discovery here if needed.
let addr = if let Some(addr) = self.fetch_address_with_lock(transports).await {
addr
} else {
// Do peer discovery if we don't have a hostlist (first time connecting
// to the network).
if hosts.is_empty_hostlist().await {
dnetev!(self, OutboundSlotSleeping, {
slot: self.slot,
});
@@ -221,34 +260,59 @@ impl Slot {
self.session().wakeup_peer_discovery();
// Wait to be woken up by peer discovery
self.wakeup_self.wait().await;
continue
}
let addr = if let Some(addr) = self.fetch_address(slot_count, transports).await {
debug!(target: "net::outbound_session::run()", "Fetched address: {:?}", addr);
addr
} else {
debug!(target: "net::outbound_session::run()", "No address found! Activating peer discovery...");
dnetev!(self, OutboundSlotSleeping, {
slot: self.slot,
});
self.wakeup_self.reset();
// Peer discovery
self.session().wakeup_peer_discovery();
// Wait to be woken up by peer discovery
self.wakeup_self.wait().await;
continue
};
let host = addr.0;
let slot = self.slot;
info!(
target: "net::outbound_session::try_connect()",
"[P2P] Connecting outbound slot #{} [{}]",
self.slot, addr,
slot, host,
);
dnetev!(self, OutboundSlotConnecting, {
slot: self.slot,
addr: addr.clone(),
slot,
addr: host.clone(),
});
let (addr_final, channel) = match self.try_connect(addr.clone()).await {
let (addr, channel) = match self.try_connect(host.clone()).await {
Ok(connect_info) => connect_info,
Err(err) => {
error!(
target: "net::outbound_session",
"[P2P] Outbound slot #{} connection failed: {}",
self.slot, err,
debug!(
target: "net::outbound_session::try_connect()",
"[P2P] Outbound slot #{} connection failed: {}, node {}",
slot, err, self.p2p().settings().node_id
);
dnetev!(self, OutboundSlotDisconnected, {
slot: self.slot,
slot,
err: err.to_string()
});
// Downgrade this host to greylist if it's on the whitelist or anchorlist.
//self.session().downgrade_host(&host).await;
self.channel_id.store(0, Ordering::Relaxed);
continue
}
@@ -257,22 +321,24 @@ impl Slot {
info!(
target: "net::outbound_session::try_connect()",
"[P2P] Outbound slot #{} connected [{}]",
self.slot, addr_final
slot, addr
);
dnetev!(self, OutboundSlotConnected, {
slot: self.slot,
addr: addr_final.clone(),
addr: addr.clone(),
channel_id: channel.info.id
});
// At this point we've managed to connect.
let stop_sub = channel.subscribe_stop().await.expect("Channel should not be stopped");
// Setup new channel
if let Err(err) = self.setup_channel(addr, channel.clone()).await {
if let Err(err) = self.setup_channel(host.clone(), channel.clone()).await {
info!(
target: "net::outbound_session",
"[P2P] Outbound slot #{} disconnected: {}",
self.slot, err
slot, err
);
dnetev!(self, OutboundSlotDisconnected, {
@@ -286,9 +352,15 @@ impl Slot {
self.channel_id.store(channel.info.id, Ordering::Relaxed);
// Add this connection to the anchorlist, remove it from the [otherlist]
hosts.upgrade_host(&addr).await;
// Wait for channel to close
stop_sub.receive().await;
self.channel_id.store(0, Ordering::Relaxed);
// Downgrade this host to greylist if it's on the whitelist or anchorlist.
hosts.downgrade_host(&addr).await;
}
}
@@ -307,15 +379,12 @@ impl Slot {
Ok((addr_final, channel)) => Ok((addr_final, channel)),
Err(e) => {
error!(
debug!(
target: "net::outbound_session::try_connect()",
"[P2P] Unable to connect outbound slot #{} [{}]: {}",
self.slot, addr, e
);
// At this point we failed to connect. We'll quarantine this peer now.
self.p2p().hosts().quarantine(&addr).await;
// Remove connection from pending
self.p2p().remove_pending(&addr).await;
@@ -329,11 +398,14 @@ impl Slot {
async fn setup_channel(&self, addr: Url, channel: ChannelPtr) -> Result<()> {
// Register the new channel
debug!(target: "net::outbound_session::setup_channel", "register_channel {}", channel.clone().address());
self.session().register_channel(channel.clone(), self.p2p().executor()).await?;
// Channel is now connected but not yet setup
// Remove pending lock since register_channel will add the channel to p2p
debug!(target: "net::outbound_session::setup_channel", "removing channel...");
self.p2p().remove_pending(&addr).await;
debug!(target: "net::outbound_session::setup_channel", "channel removed!");
// Notify that channel processing has been finished
self.session().channel_subscriber.notify(Ok(channel)).await;
@@ -341,90 +413,6 @@ impl Slot {
Ok(())
}
/// Loops through host addresses to find an outbound address that we can
/// connect to. Check whether the address is valid by making sure it isn't
/// our own inbound address, then checks whether it is already connected
/// (exists) or connecting (pending).
/// Lastly adds matching address to the pending list.
/// TODO: this method should go in hosts
async fn fetch_address_with_lock(&self, transports: &[String]) -> Option<Url> {
let p2p = self.p2p();
// Collect hosts
let mut hosts = vec![];
// If transport mixing is enabled, then for example we're allowed to
// use tor:// to connect to tcp:// and tor+tls:// to connect to tcp+tls://.
// However, **do not** mix tor:// and tcp+tls://, nor tor+tls:// and tcp://.
let transport_mixing = p2p.settings().transport_mixing;
macro_rules! mix_transport {
($a:expr, $b:expr) => {
if transports.contains(&$a.to_string()) && transport_mixing {
let mut a_to_b = p2p.hosts().fetch_with_schemes(&[$b.to_string()], None).await;
for addr in a_to_b.iter_mut() {
addr.set_scheme($a).unwrap();
hosts.push(addr.clone());
}
}
};
}
mix_transport!("tor", "tcp");
mix_transport!("tor+tls", "tcp+tls");
mix_transport!("nym", "tcp");
mix_transport!("nym+tls", "tcp+tls");
// And now the actual requested transports
for addr in p2p.hosts().fetch_with_schemes(transports, None).await {
hosts.push(addr);
}
// Randomize hosts list. Do not try to connect in a deterministic order.
// This is healthier for multiple slots to not compete for the same addrs.
hosts.shuffle(&mut OsRng);
// Try to find an unused host in the set.
for host in hosts.iter() {
// Check if we already have this connection established
if p2p.exists(host).await {
trace!(
target: "net::outbound_session::fetch_address_with_lock()",
"Host '{}' exists so skipping",
host
);
continue
}
// Check if we already have this configured as a manual peer
if p2p.settings().peers.contains(host) {
trace!(
target: "net::outbound_session::fetch_address_with_lock()",
"Host '{}' configured as manual peer so skipping",
host
);
continue
}
// Obtain a lock on this address to prevent duplicate connection
if !p2p.add_pending(host).await {
trace!(
target: "net::outbound_session::fetch_address_with_lock()",
"Host '{}' pending so skipping",
host
);
continue
}
trace!(
target: "net::outbound_session::fetch_address_with_lock()",
"Found valid host '{}",
host
);
return Some(host.clone())
}
None
}
fn notify(&self) {
self.wakeup_self.notify()
}
@@ -498,6 +486,7 @@ impl PeerDiscovery {
}
if current_attempt >= 4 {
debug!("current attempt: {}", current_attempt);
info!(
target: "net::outbound_session::peer_discovery()",
"[P2P] Sleeping and trying again..."

View File

@@ -23,7 +23,7 @@
//! with an error, or times out.
//!
//! If a seed node connects successfully, it runs a version exchange protocol,
//! stores the channel in the p2p list of channels, and discoonnects, removing
//! stores the channel in the p2p list of channels, and disconnects, removing
//! the channel from the channel list.
//!
//! The channel is registered using the [`Session::register_channel()`] trait
@@ -115,8 +115,8 @@ impl SeedSyncSession {
}
// Seed process complete
if self.p2p().hosts().is_empty().await {
warn!(target: "net::session::seedsync_session", "[P2P] Hosts pool empty after seeding");
if self.p2p().hosts().is_empty_greylist().await {
warn!(target: "net::session::seedsync_session", "[P2P] Greylist empty after seeding");
}
debug!(target: "net::session::seedsync_session", "SeedSyncSession::start() [END]");

View File

@@ -68,12 +68,25 @@ pub struct Settings {
pub outbound_peer_discovery_cooloff_time: u64,
/// Time between peer discovery attempts
pub outbound_peer_discovery_attempt_time: u64,
/// Advertise our external address
pub advertise: bool,
/// Hostlist storage path
pub hostlist: String,
/// Pause interval within greylist refinery process
pub greylist_refinery_interval: u64,
/// Percent of connections to come from the whitelist
pub white_connection_percent: usize,
/// Number of anchorlist connections
pub anchor_connection_count: usize,
}
impl Default for Settings {
fn default() -> Self {
let version = option_env!("CARGO_PKG_VERSION").unwrap_or("0.0.0");
let app_version = semver::Version::parse(version).unwrap();
// TODO: We don't have a cross-platform method for the app directory (.local/darkfi)
// in util/path.rs currently.
let hostlist = format!("~/.local/darkfi/{}/hostlist.tsv", env!("CARGO_PKG_NAME"));
Self {
node_id: String::new(),
@@ -94,6 +107,11 @@ impl Default for Settings {
hosts_quarantine_limit: 50,
outbound_peer_discovery_cooloff_time: 30,
outbound_peer_discovery_attempt_time: 5,
advertise: true,
hostlist,
greylist_refinery_interval: 5,
white_connection_percent: 90,
anchor_connection_count: 2,
}
}
}
@@ -182,6 +200,28 @@ pub struct SettingsOpt {
/// Time between peer discovery attempts
#[structopt(skip)]
pub outbound_peer_discovery_attempt_time: Option<u64>,
/// Advertise our external address
#[serde(default)]
#[structopt(long)]
pub advertise: bool,
/// Hosts .tsv file to use
#[serde(default)]
#[structopt(long)]
pub hostlist: String,
/// Pause interval within greylist refinery process
#[structopt(skip)]
pub greylist_refinery_interval: Option<u64>,
/// Percent of connections to come from the whitelist
#[structopt(skip)]
pub white_connection_percent: Option<usize>,
/// Number of anchorlist connections
#[structopt(skip)]
pub anchor_connection_count: Option<usize>,
}
impl From<SettingsOpt> for Settings {
@@ -219,6 +259,17 @@ impl From<SettingsOpt> for Settings {
outbound_peer_discovery_attempt_time: opt
.outbound_peer_discovery_attempt_time
.unwrap_or(def.outbound_peer_discovery_attempt_time),
advertise: opt.advertise,
hostlist: opt.hostlist,
greylist_refinery_interval: opt
.greylist_refinery_interval
.unwrap_or(def.greylist_refinery_interval),
white_connection_percent: opt
.white_connection_percent
.unwrap_or(def.white_connection_percent),
anchor_connection_count: opt
.anchor_connection_count
.unwrap_or(def.anchor_connection_count),
}
}
}

186
src/net/tests.rs Normal file
View File

@@ -0,0 +1,186 @@
/* This file is part of DarkFi (https://dark.fi)
*
* Copyright (C) 2020-2023 Dyne.org foundation
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
// cargo +nightly test --release --features=net --lib p2p -- --include-ignored
use std::sync::Arc;
use log::{debug, info, warn};
use rand::Rng;
use smol::{channel, future, Executor};
use url::Url;
use crate::{
net::{P2p, Settings},
system::sleep,
};
// Number of nodes to spawn and number of peers each node connects to
const N_NODES: usize = 10;
const N_CONNS: usize = 5;
#[test]
fn p2p_test() {
let mut cfg = simplelog::ConfigBuilder::new();
cfg.add_filter_ignore("sled".to_string());
cfg.add_filter_ignore("net::protocol_ping".to_string());
cfg.add_filter_ignore("net::channel::subscribe_stop()".to_string());
cfg.add_filter_ignore("net::hosts".to_string());
cfg.add_filter_ignore("net::inbound_session".to_string());
cfg.add_filter_ignore("net::outbound_session".to_string());
cfg.add_filter_ignore("net::session".to_string());
cfg.add_filter_ignore("net::refinery".to_string());
cfg.add_filter_ignore("net::message_subscriber".to_string());
cfg.add_filter_ignore("net::protocol_address".to_string());
cfg.add_filter_ignore("net::protocol_jobs_manager".to_string());
cfg.add_filter_ignore("net::protocol_version".to_string());
cfg.add_filter_ignore("net::protocol_registry".to_string());
cfg.add_filter_ignore("net::protocol_seed".to_string());
cfg.add_filter_ignore("net::channel".to_string());
cfg.add_filter_ignore("net::p2p::seed".to_string());
cfg.add_filter_ignore("net::p2p::start".to_string());
cfg.add_filter_ignore("store".to_string());
cfg.add_filter_ignore("net::store".to_string());
cfg.add_filter_ignore("net::channel::send()".to_string());
cfg.add_filter_ignore("net::channel::start()".to_string());
cfg.add_filter_ignore("net::channel::subscribe_msg()".to_string());
cfg.add_filter_ignore("net::channel::main_receive_loop()".to_string());
cfg.add_filter_ignore("net::tcp".to_string());
// We check this error so we can execute same file tests in parallel,
// otherwise second one fails to init logger here.
if simplelog::TermLogger::init(
simplelog::LevelFilter::Info,
//simplelog::LevelFilter::Debug,
//simplelog::LevelFilter::Trace,
cfg.build(),
simplelog::TerminalMode::Mixed,
simplelog::ColorChoice::Auto,
)
.is_err()
{
warn!(target: "net::test", "Logger already initialized");
}
let ex = Arc::new(Executor::new());
let ex_ = ex.clone();
let (signal, shutdown) = channel::unbounded::<()>();
// Run a thread for each node.
easy_parallel::Parallel::new()
.each(0..N_NODES, |_| future::block_on(ex.run(shutdown.recv())))
.finish(|| {
future::block_on(async {
hostlist_propagation(ex_).await;
drop(signal);
})
});
}
async fn hostlist_propagation(ex: Arc<Executor<'static>>) {
let seed_addr = Url::parse(&format!("tcp://127.0.0.1:{}", 51505)).unwrap();
let mut p2p_instances = vec![];
let mut rng = rand::thread_rng();
let settings = Settings {
localnet: true,
inbound_addrs: vec![seed_addr.clone()],
external_addrs: vec![seed_addr.clone()],
outbound_connections: 0,
//outbound_connect_timeout: 10,
inbound_connections: usize::MAX,
seeds: vec![],
hostlist: String::from("~/.local/darkfi/p2p-test/seed.tsv"),
peers: vec![],
allowed_transports: vec!["tcp".to_string()],
node_id: "seed".to_string(),
..Default::default()
};
let p2p = P2p::new(settings, ex.clone()).await;
p2p_instances.push(p2p);
info!("Initializing outbound nodes");
for i in 0..N_NODES {
// Everyone will connect to N_CONNS random peers.
let mut peers = vec![];
for _ in 0..N_CONNS {
let mut port = 53200 + i;
while port == 53200 + i {
port = 53200 + rng.gen_range(0..N_NODES);
}
peers.push(Url::parse(&format!("tcp://127.0.0.1:{}", port)).unwrap());
}
let settings = Settings {
localnet: true,
inbound_addrs: vec![Url::parse(&format!("tcp://127.0.0.1:{}", 53200 + i)).unwrap()],
external_addrs: vec![Url::parse(&format!("tcp://127.0.0.1:{}", 53200 + i)).unwrap()],
outbound_connections: 8,
//outbound_connect_timeout: 10,
inbound_connections: usize::MAX,
seeds: vec![seed_addr.clone()],
hostlist: format!("~/.local/darkfi/p2p-test/hosts{}.tsv", i),
peers,
allowed_transports: vec!["tcp".to_string()],
node_id: i.to_string(),
anchor_connection_count: 2,
..Default::default()
};
let p2p = P2p::new(settings, ex.clone()).await;
p2p_instances.push(p2p);
}
// Start the P2P network
for p2p in p2p_instances.iter() {
p2p.clone().start().await.unwrap();
}
info!("Waiting until all peers connect");
sleep(10).await;
info!("Inspecting hostlists...");
for p2p in p2p_instances.iter() {
let hosts = p2p.hosts();
let greylist = hosts.greylist.read().await;
let whitelist = hosts.whitelist.read().await;
let anchorlist = hosts.anchorlist.read().await;
info!("Node {}", p2p.settings().node_id);
for (i, (url, last_seen)) in greylist.iter().enumerate() {
info!("Greylist entry {}: {}, {}", i, url, last_seen);
}
for (i, (url, last_seen)) in whitelist.iter().enumerate() {
info!("Whitelist entry {}: {}, {}", i, url, last_seen);
}
for (i, (url, last_seen)) in anchorlist.iter().enumerate() {
info!("Anchorlist entry {}: {}, {}", i, url, last_seen);
}
}
// Stop the P2P network
for p2p in p2p_instances.iter() {
debug!("Stopping P2P instances...");
p2p.clone().stop().await;
debug!("Node {} stopped!", p2p.settings().node_id);
}
}