net: 99.9999% of the time it works 100% of the time

cleaned up refinery code and seem to have eliminated deadlocks...
This commit is contained in:
lunar-mining
2024-01-08 12:20:10 +01:00
parent fcf5a87a28
commit 96cad54d81
9 changed files with 163 additions and 229 deletions

View File

@@ -69,48 +69,59 @@ impl GreylistRefinery {
pub async fn stop(self: Arc<Self>) {
match self.p2p().hosts().save_hosts().await {
Ok(()) => {
debug!(target: "deadlock", "saved hosts node {}", self.p2p().settings().node_id);
debug!(target: "net::refinery::stop()", "Save hosts successful!");
}
Err(e) => {
debug!(target: "deadlock", "ERROR saving hosts node {}", self.p2p().settings().node_id);
warn!(target: "net::refinery::stop()", "Error saving hosts {}", e);
}
}
self.process.stop().await
debug!(target: "deadlock", "Stopping refinery process node {}", self.p2p().settings().node_id);
self.process.stop().await;
debug!(target: "deadlock", "Refinery process stopped node {}", self.p2p().settings().node_id);
}
// Randomly select a peer on the greylist and probe it.
async fn run(self: Arc<Self>) {
debug!(target: "net::refinery::run()", "START");
//debug!(target: "net::refinery::run()", "START");
debug!(target: "deadlock", "refinery START node {}", self.p2p().settings().node_id);
loop {
sleep(self.p2p().settings().greylist_refinery_interval).await;
let hosts = self.p2p().hosts();
if hosts.is_empty_greylist().await {
debug!(target: "deadlock", "Greylist is empty! Cannot start refinery node {}", self.p2p().settings().node_id);
warn!(target: "net::refinery::run()",
"Greylist is empty! Cannot start refinery process");
} else {
debug!(target: "net::refinery::run()", "Starting refinery process");
// Randomly select an entry from the greylist.
let (entry, position) = hosts.greylist_fetch_random().await;
let url = &entry.0;
if ping_node(url, self.p2p().clone()).await {
// Peer is responsive. Update last_seen and add it to the whitelist.
let last_seen = UNIX_EPOCH.elapsed().unwrap().as_secs();
// Append to the whitelist.
hosts.whitelist_store_or_update(&[(url.clone(), last_seen)]).await.unwrap();
// Remove whitelisted peer from the greylist.
hosts.greylist_remove(url, position).await;
} else {
let mut greylist = hosts.greylist.write().await;
greylist.remove(position);
debug!(target: "net::refinery::run()", "Peer {} is not response. Removed from greylist", url);
}
continue
}
debug!(target: "net::greylist_refinery::run()", "Sleeping...");
sleep(self.p2p().settings().greylist_refinery_interval).await;
let (entry, position) = hosts.greylist_fetch_random().await;
let url = &entry.0;
if !ping_node(url, self.p2p().clone()).await {
let mut greylist = hosts.greylist.write().await;
greylist.remove(position);
//debug!(target: "net::refinery::run()", "Peer {} is not response. Removed from greylist", url);
debug!(target: "deadlock", "Peer {} is not response. Removed from greylist node {}", url, self.p2p().settings().node_id);
continue
}
let last_seen = UNIX_EPOCH.elapsed().unwrap().as_secs();
debug!(target: "deadlock", "whitelist store or update node {}", self.p2p().settings().node_id);
// Append to the whitelist.
hosts.whitelist_store_or_update(&[(url.clone(), last_seen)]).await.unwrap();
debug!(target: "deadlock", "greylist remove node {}", self.p2p().settings().node_id);
// Remove whitelisted peer from the greylist.
hosts.greylist_remove(url, position).await;
debug!(target: "deadlock", "refinery STOP node {}", self.p2p().settings().node_id);
}
}
@@ -121,6 +132,7 @@ impl GreylistRefinery {
// Ping a node to check it's online.
pub async fn ping_node(addr: &Url, p2p: P2pPtr) -> bool {
debug!(target: "deadlock", "ping node START node {}", p2p.settings().node_id);
let session_outbound = p2p.session_outbound();
let parent = Arc::downgrade(&session_outbound);
let connector = Connector::new(p2p.settings(), parent);
@@ -142,11 +154,13 @@ pub async fn ping_node(addr: &Url, p2p: P2pPtr) -> bool {
match handshake_task.await {
Ok(()) => {
debug!(target: "net::refinery::ping_node()", "Handshake success! Stopping channel.");
debug!(target: "deadlock", "ping node STOP node {} -> true", p2p.settings().node_id);
channel.stop().await;
true
}
Err(e) => {
debug!(target: "net::refinery::ping_node()", "Handshake failure! {}", e);
debug!(target: "deadlock", "ping node STOP node {} -> false", p2p.settings().node_id);
channel.stop().await;
false
}
@@ -155,6 +169,7 @@ pub async fn ping_node(addr: &Url, p2p: P2pPtr) -> bool {
Err(e) => {
debug!(target: "net::refinery::ping_node()", "Failed to connect to {}, ({})", addr, e);
debug!(target: "deadlock", "ping node STOP node {} -> false", p2p.settings().node_id);
false
}
}

View File

@@ -98,7 +98,7 @@ impl Hosts {
&self,
p2p: P2pPtr,
transports: &[String],
) -> Option<Vec<(Url, u64)>> {
) -> Vec<(Url, u64)> {
trace!(target: "store", "greylist_fetch_address() [START]");
// Collect hosts
let mut hosts = vec![];
@@ -133,48 +133,7 @@ impl Hosts {
// This is healthier for multiple slots to not compete for the same addrs.
hosts.shuffle(&mut OsRng);
Some(hosts)
//// Try to find an unused host in the set.
//for (host, last_seen) in hosts.iter() {
// // Check if we already have this connection established
// if p2p.exists(host).await {
// debug!(
// target: "store::greylist_fetch_address()",
// "Host '{}' exists so skipping",
// host
// );
// continue
// }
// // Check if we already have this configured as a manual peer
// if self.settings.peers.contains(host) {
// debug!(
// target: "store::greylist_fetch_address()",
// "Host '{}' configured as manual peer so skipping",
// host
// );
// continue
// }
// // Obtain a lock on this address to prevent duplicate connection
// if !p2p.add_pending(host).await {
// debug!(
// target: "store::greylist_fetch_address()",
// "Host '{}' pending so skipping",
// host
// );
// continue
// }
// debug!(
// target: "store::greylist_fetch_address()",
// "Found valid host '{}",
// host
// );
// return Some((host.clone(), last_seen.clone()))
//}
//None
hosts
}
/// Loops through whitelist addresses to find an outbound address that we can
@@ -186,7 +145,7 @@ impl Hosts {
&self,
p2p: P2pPtr,
transports: &[String],
) -> Option<Vec<(Url, u64)>> {
) -> Vec<(Url, u64)> {
trace!(target: "store", "whitelist_fetch_address() [START]");
// Collect hosts
let mut hosts = vec![];
@@ -224,61 +183,7 @@ impl Hosts {
// This is healthier for multiple slots to not compete for the same addrs.
hosts.shuffle(&mut OsRng);
Some(hosts)
//// Try to find an unused host in the set.
//for (host, last_seen) in hosts.iter() {
// debug!(target: "store::whitelist_fetch_address()",
// "Starting checks");
// // Check if we already have this connection established
// if p2p.exists(host).await {
// debug!(
// target: "store::whitelist_fetch_address()",
// "Host '{}' exists so skipping",
// host
// );
// continue
// }
// debug!(target: "store::whitelist_fetch_address()",
// "Connection is not already established");
// // Check if we already have this configured as a manual peer
// if self.settings.peers.contains(host) {
// debug!(
// target: "store::whitelist_fetch_address()",
// "Host '{}' configured as manual peer so skipping",
// host
// );
// continue
// }
// debug!(target: "store::whitelist_fetch_address()",
// "Connection not configured as manual peer");
// // Obtain a lock on this address to prevent duplicate connection
// if !p2p.add_pending(host).await {
// debug!(
// target: "store::whitelist_fetch_address()",
// "Host '{}' pending so skipping",
// host
// );
// continue
// }
// debug!(target: "store::whitelist_fetch_address()",
// "Connection not pending");
// debug!(
// target: "store::whitelist_fetch_address()",
// "Found valid host '{}",
// host
// );
// return Some((host.clone(), last_seen.clone()))
//}
//debug!(target: "store::whitelist_fetch_address()",
//"Exiting with NONE");
//None
hosts
}
/// Loops through anchorlist addresses to find an outbound address that we can
@@ -290,7 +195,7 @@ impl Hosts {
&self,
p2p: P2pPtr,
transports: &[String],
) -> Option<Vec<(Url, u64)>> {
) -> Vec<(Url, u64)> {
trace!(target: "store", "anchorlist_fetch_address() [START]");
// Collect hosts
let mut hosts = vec![];
@@ -328,64 +233,14 @@ impl Hosts {
// This is healthier for multiple slots to not compete for the same addrs.
hosts.shuffle(&mut OsRng);
Some(hosts)
//// Try to find an unused host in the set.
//for (host, last_seen) in hosts.iter() {
// debug!(target: "store::anchorlist_fetch_address()",
// "Starting checks");
// // Check if we already have this connection established
// if p2p.exists(host).await {
// debug!(
// target: "store::anchorlist_fetch_address()",
// "Host '{}' exists so skipping",
// host
// );
// continue
// }
// debug!(target: "store::anchorlist_fetch_address()",
// "Connection is not already established");
// // Check if we already have this configured as a manual peer
// if self.settings.peers.contains(host) {
// debug!(
// target: "store::anchorlist_fetch_address()",
// "Host '{}' configured as manual peer so skipping",
// host
// );
// continue
// }
// debug!(target: "store::anchorlist_fetch_address()",
// "Connection not configured as manual peer");
// // Obtain a lock on this address to prevent duplicate connection
// if !p2p.add_pending(host).await {
// debug!(
// target: "store::anchorlist_fetch_address()",
// "Host '{}' pending so skipping",
// host
// );
// continue
// }
// debug!(target: "store::anchorlist_fetch_address()",
// "Connection not pending");
// debug!(
// target: "store::anchorlist_fetch_address()",
// "Found valid host '{}",
// host
// );
// return Some((host.clone(), last_seen.clone()))
//}
//debug!(target: "store::anchorlist_fetch_address()",
//"Exiting with NONE");
//None
hosts
}
pub async fn lock_check(&self, p2p: P2pPtr, hosts: Vec<(Url, u64)>) -> Option<(Url, u64)> {
pub async fn check_address_with_lock(
&self,
p2p: P2pPtr,
hosts: Vec<(Url, u64)>,
) -> Option<(Url, u64)> {
// Try to find an unused host in the set.
for (host, last_seen) in hosts {
debug!(target: "store::anchorlist_fetch_address()",

View File

@@ -180,6 +180,7 @@ impl P2p {
self.session_outbound().stop().await;
// Stop greylist refinery process
debug!(target: "deadlock", "Killing greylist refinery node: {}", self.settings().node_id);
self.greylist_refinery().stop().await;
}

View File

@@ -58,6 +58,7 @@ impl ProtocolVersion {
/// info and wait for version ack. Wait for version info and send
/// version ack.
pub async fn run(self: Arc<Self>, executor: Arc<Executor<'_>>) -> Result<()> {
//debug!(target: "deadlock", "protocol_version::START => address={}", self.channel.address());
debug!(target: "net::protocol_version::run()", "START => address={}", self.channel.address());
// Start timer
// Send version, wait for verack
@@ -82,7 +83,8 @@ impl ProtocolVersion {
return Err(Error::ChannelTimeout)
}
debug!(target: "net::protocol_version::run()", "END => address={}", self.channel.address());
//debug!(target: "deadlock", "protocol_version::END => address={}", self.channel.address());
//debug!(target: "net::protocol_version::run()", "END => address={}", self.channel.address());
Ok(())
}

View File

@@ -147,6 +147,8 @@ pub trait Session: Sync {
protocol_version.run(executor.clone()).await?;
if self.type_id() != SESSION_INBOUND {
//debug!(target: "deadlock", "perform_handshake_protocols adding to anchorlist channel {}, node {}",
// channel.clone().address(), self.p2p().settings().node_id);
// Channel is now initialized. Timestamp this.
let last_seen = UNIX_EPOCH.elapsed().unwrap().as_secs();
@@ -157,16 +159,33 @@ pub trait Session: Sync {
}
// Add channel to p2p
debug!(target: "net::session::register_channel()", "perform_handshake_protocol {}", channel.clone().address());
//debug!(target: "deadlock", "Storing channel in p2p, channel {} node {}",
//channel.clone().address(), self.p2p().settings().node_id);
//debug!(target: "net::session::register_channel()", "perform_handshake_protocol {}", channel.clone().address());
self.p2p().store(channel.clone()).await;
// Subscribe to stop, so we can remove from p2p
//debug!(target: "deadlock", "Waiting for a stop signal channel {} node {}",
//channel.clone().address(), self.p2p().settings().node_id);
executor.spawn(remove_sub_on_stop(self.p2p(), channel, self.type_id())).detach();
// Channel is ready for use
Ok(())
}
async fn perform_local_handshake(
&self,
protocol_version: Arc<ProtocolVersion>,
channel: ChannelPtr,
executor: Arc<Executor<'_>>,
) -> Result<()> {
// Perform handshake
protocol_version.run(executor.clone()).await?;
// Channel is ready for use
Ok(())
}
/// Returns a pointer to the p2p network interface
fn p2p(&self) -> P2pPtr;

View File

@@ -108,12 +108,14 @@ impl OutboundSession {
let slots = &*self.slots.lock().await;
for slot in slots {
debug!(target: "deadlock", "Killing channel {:?}, slot: {:?}, node: {:?}",
debug!(target: "deadlock", "Killing channel {:?}, slot: {:?}, node {}",
slot.channel_id, slot.slot, self.p2p().settings().node_id);
slot.clone().stop().await;
}
debug!(target: "deadlock", "Killing peer discovery node {}", self.p2p().settings().node_id);
self.peer_discovery.clone().stop().await;
debug!(target: "deadlock", "Killed all outbound processes node {}", self.p2p().settings().node_id);
}
pub async fn slot_info(&self) -> Vec<u32> {
@@ -187,14 +189,10 @@ impl Slot {
self.process.stop().await
}
async fn fetch_address(
&self,
slot_count: usize,
transports: &[String],
) -> Option<(Url, u64)> {
async fn fetch_address(&self, slot_count: usize, transports: &[String]) -> Option<(Url, u64)> {
let hosts = self.p2p().hosts();
//let slot_count = self.p2p().settings().outbound_connections;
let white_count = slot_count * self.p2p().settings().white_connection_percent / 100;
let connects = self.p2p().settings().outbound_connections;
let white_count = connects * self.p2p().settings().white_connection_percent / 100;
// Up to anchor_connection_count connections:
//
@@ -227,18 +225,15 @@ impl Slot {
debug!(target: "outbound_session::fetch_address()",
"All other connections- get grey connections");
hosts.greylist_fetch_address(self.p2p(), transports).await
}
}
};
// Check whether:
// * we already have this connection established
// * we already have this configured as a manual peer
// * address is already pending a connection
if addrs.is_some() {
let addr = hosts.lock_check(self.p2p(), addrs.unwrap()).await;
return addr
}
None
let addr = hosts.check_address_with_lock(self.p2p(), addrs).await;
return addr
}
// We first try to make connections to the addresses on our anchor list. We then find some
@@ -414,20 +409,30 @@ impl Slot {
Ok((addr_final, channel)) => Ok((addr_final, channel)),
Err(e) => {
//debug!(
// target: "TODO",
// "[P2P] Unable to connect outbound slot #{} [{}]: {}",
// self.slot, addr, e
//);
debug!(
target: "deadlock",
"[P2P] Unable to connect outbound slot #{} [{}]: {}",
self.slot, addr, e
"[P2P] Unable to connect outbound slot #{} [{}]: {} node {}",
self.slot, addr, e, self.p2p().settings().node_id
);
// At this point we've failed to connect.
// If the host is in the anchorlist or whitelist, downgrade it to greylist.
self.p2p().hosts().downgrade_host(&addr).await?;
debug!(target: "deadlock", "removing channel... slot {} node {}",
self.slot, self.p2p().settings().node_id);
debug!(target: "net::outbound_session::try_connect", "removing channel...");
// Remove connection from pending
self.p2p().remove_pending(&addr).await;
debug!(target: "net::outbound_session::try_connect", "channel removed!");
debug!(target: "deadlock", "channel removed! slot {} node {}",
self.slot, self.p2p().settings().node_id);
// Notify that channel processing failed
self.session().channel_subscriber.notify(Err(Error::ConnectFailed)).await;
@@ -508,6 +513,8 @@ impl PeerDiscovery {
async fn run(self: Arc<Self>) {
let mut current_attempt = 0;
loop {
debug!(target: "deadlock", "peer discovery START node {} current attempt {}",
self.p2p().settings().node_id, current_attempt);
//dnetev!(self, OutboundPeerDiscovery, {
// attempt: current_attempt,
// state: "wait",

View File

@@ -108,7 +108,7 @@ impl Default for Settings {
advertise: true,
hostlist,
greylist_refinery_interval: 30,
white_connection_percent: 50,
white_connection_percent: 90,
anchor_connection_count: 2,
}
}

View File

@@ -18,9 +18,10 @@
// cargo +nightly test --release --features=net --lib p2p -- --include-ignored
use simplelog::ThreadLogMode;
use std::sync::Arc;
use log::info;
use log::{debug, info};
use rand::Rng;
use smol::{channel, future, Executor};
use url::Url;
@@ -31,16 +32,45 @@ use crate::{
};
// Number of nodes to spawn and number of peers each node connects to
const N_NODES: usize = 6;
const N_CONNS: usize = 1;
const N_NODES: usize = 20;
//const N_CONNS: usize = 3;
#[test]
fn p2p_test() {
let mut cfg = simplelog::ConfigBuilder::new();
//cfg.set_thread_mode(ThreadLogMode::Both);
cfg.add_filter_ignore("sled".to_string());
//cfg.add_filter_ignore("deadlock".to_string());
cfg.add_filter_ignore("net::protocol_ping".to_string());
//cfg.add_filter_ignore("net::channel::subscribe_stop()".to_string());
cfg.add_filter_ignore("net::hosts".to_string());
cfg.add_filter_ignore("net::inbound_session".to_string());
cfg.add_filter_ignore("outbound_session".to_string());
cfg.add_filter_ignore("net::outbound_session".to_string());
cfg.add_filter_ignore("net::session::outbound_session".to_string());
cfg.add_filter_ignore("net::session".to_string());
cfg.add_filter_ignore("net::refinery".to_string());
cfg.add_filter_ignore("net::message_subscriber".to_string());
cfg.add_filter_ignore("net::protocol_address".to_string());
cfg.add_filter_ignore("net::protocol_jobs_manager".to_string());
cfg.add_filter_ignore("net::protocol_version".to_string());
cfg.add_filter_ignore("net::protocol_registry".to_string());
cfg.add_filter_ignore("net::protocol_seed".to_string());
cfg.add_filter_ignore("net::channel".to_string());
cfg.add_filter_ignore("net::p2p::seed".to_string());
cfg.add_filter_ignore("net::p2p::start".to_string());
cfg.add_filter_ignore("store".to_string());
cfg.add_filter_ignore("net::store".to_string());
//cfg.add_filter_ignore("net::channel::send()".to_string());
//cfg.add_filter_ignore("net::channel::start()".to_string());
//cfg.add_filter_ignore("net::channel::subscribe_msg()".to_string());
//cfg.add_filter_ignore("net::channel::main_receive_loop()".to_string());
cfg.add_filter_ignore("net::tcp".to_string());
simplelog::TermLogger::init(
simplelog::LevelFilter::Info,
//simplelog::LevelFilter::Debug,
//simplelog::LevelFilter::Info,
simplelog::LevelFilter::Debug,
//simplelog::LevelFilter::Trace,
cfg.build(),
simplelog::TerminalMode::Mixed,
@@ -73,7 +103,7 @@ async fn hostlist_propagation(ex: Arc<Executor<'static>>) {
localnet: true,
inbound_addrs: vec![seed_addr.clone()],
external_addrs: vec![seed_addr.clone()],
outbound_connections: 2,
outbound_connections: 0,
//outbound_connect_timeout: 10,
inbound_connections: usize::MAX,
seeds: vec![],
@@ -102,7 +132,7 @@ async fn hostlist_propagation(ex: Arc<Executor<'static>>) {
localnet: true,
inbound_addrs: vec![Url::parse(&format!("tcp://127.0.0.1:{}", 13200 + i)).unwrap()],
external_addrs: vec![Url::parse(&format!("tcp://127.0.0.1:{}", 13200 + i)).unwrap()],
outbound_connections: 2,
outbound_connections: 8,
//outbound_connect_timeout: 10,
inbound_connections: usize::MAX,
seeds: vec![seed_addr.clone()],
@@ -110,6 +140,7 @@ async fn hostlist_propagation(ex: Arc<Executor<'static>>) {
peers,
allowed_transports: vec!["tcp".to_string()],
node_id: i.to_string(),
anchor_connection_count: 2,
..Default::default()
};
@@ -122,37 +153,39 @@ async fn hostlist_propagation(ex: Arc<Executor<'static>>) {
p2p.clone().start().await.unwrap();
}
info!("Waiting until all peers connect");
sleep(15).await;
//info!("Waiting until all peers connect");
sleep(30).await;
//info!("Inspecting hostlists...");
//for p2p in p2p_instances.iter() {
// let hosts = p2p.hosts();
// //assert!(!hosts.is_empty_greylist().await);
// //assert!(!hosts.is_empty_whitelist().await);
// //assert!(!hosts.is_empty_anchorlist().await);
info!("Inspecting hostlists...");
for p2p in p2p_instances.iter() {
let hosts = p2p.hosts();
//assert!(!hosts.is_empty_greylist().await);
//assert!(!hosts.is_empty_whitelist().await);
//assert!(!hosts.is_empty_anchorlist().await);
// let greylist = hosts.greylist.read().await;
// let whitelist = hosts.whitelist.read().await;
// let anchorlist = hosts.anchorlist.read().await;
//let greylist = hosts.greylist.read().await;
//let whitelist = hosts.whitelist.read().await;
//let anchorlist = hosts.anchorlist.read().await;
// info!("Node {}", p2p.settings().node_id);
// for (i, (url, last_seen)) in greylist.iter().enumerate() {
// info!("Greylist entry {}: {}, {}", i, url, last_seen);
// }
//info!("Node {}", p2p.settings().node_id);
//for (i, (url, last_seen)) in greylist.iter().enumerate() {
// info!("Greylist entry {}: {}, {}", i, url, last_seen);
//}
// for (i, (url, last_seen)) in whitelist.iter().enumerate() {
// info!("Whitelist entry {}: {}, {}", i, url, last_seen);
// }
//for (i, (url, last_seen)) in whitelist.iter().enumerate() {
// info!("Whitelist entry {}: {}, {}", i, url, last_seen);
//}
// for (i, (url, last_seen)) in anchorlist.iter().enumerate() {
// info!("Anchorlist entry {}: {}, {}", i, url, last_seen);
// }
//}
//for (i, (url, last_seen)) in anchorlist.iter().enumerate() {
// info!("Anchorlist entry {}: {}, {}", i, url, last_seen);
//}
}
// Stop the P2P network
for p2p in p2p_instances.iter() {
info!("Stopping P2P instances...");
//info!("Stopping P2P instances...");
debug!("Stopping P2P instances...");
p2p.clone().stop().await;
debug!("node {} stopped!", p2p.settings().node_id);
}
}

View File

@@ -16,7 +16,7 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
use log::trace;
use log::{debug, trace};
use rand::{rngs::OsRng, Rng};
use smol::{
future::{self, Future},
@@ -115,9 +115,11 @@ impl StoppableTask {
/// Can be called multiple times. After the first call, this does nothing.
pub async fn stop(&self) {
trace!(target: "system::StoppableTask", "Stopping task {}", self.task_id);
debug!(target: "deadlock", "system::StoppableTask Stopping task {}", self.task_id);
self.signal.notify();
self.barrier.wait().await;
trace!(target: "system::StoppableTask", "Stopped task {}", self.task_id);
debug!(target: "deadlock", "system::StoppableTask Stopped task {}", self.task_id);
}
}