chore: cleanup

fix warnings + debug statements
This commit is contained in:
lunar-mining
2024-01-10 08:49:29 +01:00
parent 96cad54d81
commit 736459aa51
9 changed files with 71 additions and 193 deletions

View File

@@ -69,30 +69,23 @@ impl GreylistRefinery {
pub async fn stop(self: Arc<Self>) {
match self.p2p().hosts().save_hosts().await {
Ok(()) => {
debug!(target: "deadlock", "saved hosts node {}", self.p2p().settings().node_id);
debug!(target: "net::refinery::stop()", "Save hosts successful!");
}
Err(e) => {
debug!(target: "deadlock", "ERROR saving hosts node {}", self.p2p().settings().node_id);
warn!(target: "net::refinery::stop()", "Error saving hosts {}", e);
}
}
debug!(target: "deadlock", "Stopping refinery process node {}", self.p2p().settings().node_id);
self.process.stop().await;
debug!(target: "deadlock", "Refinery process stopped node {}", self.p2p().settings().node_id);
}
// Randomly select a peer on the greylist and probe it.
async fn run(self: Arc<Self>) {
//debug!(target: "net::refinery::run()", "START");
debug!(target: "deadlock", "refinery START node {}", self.p2p().settings().node_id);
loop {
sleep(self.p2p().settings().greylist_refinery_interval).await;
let hosts = self.p2p().hosts();
if hosts.is_empty_greylist().await {
debug!(target: "deadlock", "Greylist is empty! Cannot start refinery node {}", self.p2p().settings().node_id);
warn!(target: "net::refinery::run()",
"Greylist is empty! Cannot start refinery process");
@@ -105,23 +98,18 @@ impl GreylistRefinery {
if !ping_node(url, self.p2p().clone()).await {
let mut greylist = hosts.greylist.write().await;
greylist.remove(position);
//debug!(target: "net::refinery::run()", "Peer {} is not response. Removed from greylist", url);
debug!(target: "deadlock", "Peer {} is not response. Removed from greylist node {}", url, self.p2p().settings().node_id);
debug!(target: "net::refinery::run()", "Peer {} is not response. Removed from greylist", url);
continue
}
let last_seen = UNIX_EPOCH.elapsed().unwrap().as_secs();
debug!(target: "deadlock", "whitelist store or update node {}", self.p2p().settings().node_id);
// Append to the whitelist.
hosts.whitelist_store_or_update(&[(url.clone(), last_seen)]).await.unwrap();
debug!(target: "deadlock", "greylist remove node {}", self.p2p().settings().node_id);
// Remove whitelisted peer from the greylist.
hosts.greylist_remove(url, position).await;
debug!(target: "deadlock", "refinery STOP node {}", self.p2p().settings().node_id);
}
}
@@ -132,7 +120,6 @@ impl GreylistRefinery {
// Ping a node to check it's online.
pub async fn ping_node(addr: &Url, p2p: P2pPtr) -> bool {
debug!(target: "deadlock", "ping node START node {}", p2p.settings().node_id);
let session_outbound = p2p.session_outbound();
let parent = Arc::downgrade(&session_outbound);
let connector = Connector::new(p2p.settings(), parent);
@@ -154,13 +141,11 @@ pub async fn ping_node(addr: &Url, p2p: P2pPtr) -> bool {
match handshake_task.await {
Ok(()) => {
debug!(target: "net::refinery::ping_node()", "Handshake success! Stopping channel.");
debug!(target: "deadlock", "ping node STOP node {} -> true", p2p.settings().node_id);
channel.stop().await;
true
}
Err(e) => {
debug!(target: "net::refinery::ping_node()", "Handshake failure! {}", e);
debug!(target: "deadlock", "ping node STOP node {} -> false", p2p.settings().node_id);
channel.stop().await;
false
}
@@ -169,7 +154,6 @@ pub async fn ping_node(addr: &Url, p2p: P2pPtr) -> bool {
Err(e) => {
debug!(target: "net::refinery::ping_node()", "Failed to connect to {}, ({})", addr, e);
debug!(target: "deadlock", "ping node STOP node {} -> false", p2p.settings().node_id);
false
}
}

View File

@@ -94,11 +94,7 @@ impl Hosts {
/// our own inbound address, then checks whether it is already connected
/// (exists) or connecting (pending).
/// Lastly adds matching address to the pending list.
pub async fn greylist_fetch_address(
&self,
p2p: P2pPtr,
transports: &[String],
) -> Vec<(Url, u64)> {
pub async fn greylist_fetch_address(&self, transports: &[String]) -> Vec<(Url, u64)> {
trace!(target: "store", "greylist_fetch_address() [START]");
// Collect hosts
let mut hosts = vec![];
@@ -141,11 +137,7 @@ impl Hosts {
/// our own inbound address, then checks whether it is already connected
/// (exists) or connecting (pending).
/// Lastly adds matching address to the pending list.
pub async fn whitelist_fetch_address(
&self,
p2p: P2pPtr,
transports: &[String],
) -> Vec<(Url, u64)> {
pub async fn whitelist_fetch_address(&self, transports: &[String]) -> Vec<(Url, u64)> {
trace!(target: "store", "whitelist_fetch_address() [START]");
// Collect hosts
let mut hosts = vec![];
@@ -191,11 +183,7 @@ impl Hosts {
/// our own inbound address, then checks whether it is already connected
/// (exists) or connecting (pending).
/// Lastly adds matching address to the pending list.
pub async fn anchorlist_fetch_address(
&self,
p2p: P2pPtr,
transports: &[String],
) -> Vec<(Url, u64)> {
pub async fn anchorlist_fetch_address(&self, transports: &[String]) -> Vec<(Url, u64)> {
trace!(target: "store", "anchorlist_fetch_address() [START]");
// Collect hosts
let mut hosts = vec![];
@@ -361,7 +349,7 @@ impl Hosts {
debug!(target: "store::anchorlist_store_or_update()",
"We have this entry in the anchorlist. Updating last seen...");
let (index, entry) = self.get_anchorlist_entry_at_addr(addr).await?;
let index = self.get_anchorlist_index_at_addr(addr).await?;
self.anchorlist_update_last_seen(addr, last_seen.clone(), index).await;
}
}
@@ -774,7 +762,7 @@ impl Hosts {
/// Get the index for a given addr on the anchorlist.
pub async fn get_anchorlist_index_at_addr(&self, addr: &Url) -> Result<usize> {
let anchorlist = self.anchorlist.read().await;
for (i, (url, time)) in anchorlist.iter().enumerate() {
for (i, (url, _time)) in anchorlist.iter().enumerate() {
if url == addr {
return Ok(i)
}
@@ -1153,11 +1141,7 @@ impl Hosts {
#[cfg(test)]
mod tests {
use super::{
super::super::{settings::Settings, P2p},
*,
};
use smol::Executor;
use super::{super::super::settings::Settings, *};
use std::{sync::Arc, time::UNIX_EPOCH};
#[test]

View File

@@ -180,7 +180,6 @@ impl P2p {
self.session_outbound().stop().await;
// Stop greylist refinery process
debug!(target: "deadlock", "Killing greylist refinery node: {}", self.settings().node_id);
self.greylist_refinery().stop().await;
}

View File

@@ -58,7 +58,6 @@ impl ProtocolVersion {
/// info and wait for version ack. Wait for version info and send
/// version ack.
pub async fn run(self: Arc<Self>, executor: Arc<Executor<'_>>) -> Result<()> {
//debug!(target: "deadlock", "protocol_version::START => address={}", self.channel.address());
debug!(target: "net::protocol_version::run()", "START => address={}", self.channel.address());
// Start timer
// Send version, wait for verack
@@ -83,8 +82,7 @@ impl ProtocolVersion {
return Err(Error::ChannelTimeout)
}
//debug!(target: "deadlock", "protocol_version::END => address={}", self.channel.address());
//debug!(target: "net::protocol_version::run()", "END => address={}", self.channel.address());
debug!(target: "net::protocol_version::run()", "END => address={}", self.channel.address());
Ok(())
}

View File

@@ -32,7 +32,7 @@
use std::sync::Arc;
use async_trait::async_trait;
use log::{debug, info, warn};
use log::{info, warn};
use smol::lock::Mutex;
use url::Url;

View File

@@ -147,8 +147,6 @@ pub trait Session: Sync {
protocol_version.run(executor.clone()).await?;
if self.type_id() != SESSION_INBOUND {
//debug!(target: "deadlock", "perform_handshake_protocols adding to anchorlist channel {}, node {}",
// channel.clone().address(), self.p2p().settings().node_id);
// Channel is now initialized. Timestamp this.
let last_seen = UNIX_EPOCH.elapsed().unwrap().as_secs();
@@ -159,33 +157,15 @@ pub trait Session: Sync {
}
// Add channel to p2p
//debug!(target: "deadlock", "Storing channel in p2p, channel {} node {}",
//channel.clone().address(), self.p2p().settings().node_id);
//debug!(target: "net::session::register_channel()", "perform_handshake_protocol {}", channel.clone().address());
self.p2p().store(channel.clone()).await;
// Subscribe to stop, so we can remove from p2p
//debug!(target: "deadlock", "Waiting for a stop signal channel {} node {}",
//channel.clone().address(), self.p2p().settings().node_id);
executor.spawn(remove_sub_on_stop(self.p2p(), channel, self.type_id())).detach();
// Channel is ready for use
Ok(())
}
async fn perform_local_handshake(
&self,
protocol_version: Arc<ProtocolVersion>,
channel: ChannelPtr,
executor: Arc<Executor<'_>>,
) -> Result<()> {
// Perform handshake
protocol_version.run(executor.clone()).await?;
// Channel is ready for use
Ok(())
}
/// Returns a pointer to the p2p network interface
fn p2p(&self) -> P2pPtr;

View File

@@ -31,7 +31,7 @@ use std::{
atomic::{AtomicU32, Ordering},
Arc, Weak,
},
time::{Duration, Instant, UNIX_EPOCH},
time::{Duration, Instant},
};
use async_trait::async_trait;
@@ -108,14 +108,10 @@ impl OutboundSession {
let slots = &*self.slots.lock().await;
for slot in slots {
debug!(target: "deadlock", "Killing channel {:?}, slot: {:?}, node {}",
slot.channel_id, slot.slot, self.p2p().settings().node_id);
slot.clone().stop().await;
}
debug!(target: "deadlock", "Killing peer discovery node {}", self.p2p().settings().node_id);
self.peer_discovery.clone().stop().await;
debug!(target: "deadlock", "Killed all outbound processes node {}", self.p2p().settings().node_id);
}
pub async fn slot_info(&self) -> Vec<u32> {
@@ -203,9 +199,9 @@ impl Slot {
let addrs = {
if slot_count < self.p2p().settings().anchor_connection_count {
debug!(target: "outbound_session::fetch_address()",
debug!(target: "net::outbound_session::fetch_address()",
"First two connections- prefer anchor connections");
hosts.anchorlist_fetch_address(self.p2p(), transports).await
hosts.anchorlist_fetch_address(transports).await
}
// Up to white_connection_percent connections:
//
@@ -213,18 +209,18 @@ impl Slot {
// If the whitelist is empty, select from the greylist
// If the greylist is empty, do peer discovery
else if slot_count < white_count {
debug!(target: "outbound_session::fetch_address()",
debug!(target: "net::outbound_session::fetch_address()",
"Next N connections- prefer white connections");
hosts.whitelist_fetch_address(self.p2p(), transports).await
hosts.whitelist_fetch_address(transports).await
}
// All other connections:
//
// Select from the greylist
// If the greylist is empty, do peer discovery
else {
debug!(target: "outbound_session::fetch_address()",
debug!(target: "net::outbound_session::fetch_address()",
"All other connections- get grey connections");
hosts.greylist_fetch_address(self.p2p(), transports).await
hosts.greylist_fetch_address(transports).await
}
};
@@ -242,7 +238,6 @@ impl Slot {
async fn run(self: Arc<Self>) {
let hosts = self.p2p().hosts();
let slot_count = self.p2p().settings().outbound_connections;
let white_count = slot_count * self.p2p().settings().white_connection_percent / 100;
loop {
// Activate the slot
@@ -252,23 +247,12 @@ impl Slot {
self.slot,
);
debug!(
target: "deadlock",
"Finding a host to connect to for outbound slot {}, node {}",
self.slot, self.p2p().settings().node_id,
);
// Retrieve outbound transports
let transports = &self.p2p().settings().allowed_transports;
// Do peer discovery if we don't have a hostlist (first time connecting
// to the network).
if hosts.is_empty_hostlist().await {
debug!(
target: "deadlock",
"Empty hostlist: activating peer discovery on outbound slot {}, node {}",
self.slot, self.p2p().settings().node_id,
);
dnetev!(self, OutboundSlotSleeping, {
slot: self.slot,
});
@@ -282,17 +266,14 @@ impl Slot {
}
let addr = if let Some(addr) = self.fetch_address(slot_count, transports).await {
debug!(target: "outbound_session::run()", "Fetched address: {:?}", addr);
debug!(target: "net::outbound_session::run()", "Fetched address: {:?}", addr);
addr
} else {
debug!(target: "outbound_session::run()", "No address found! Activating peer discovery...");
debug!(target: "net::outbound_session::run()", "No address found! Activating peer discovery...");
dnetev!(self, OutboundSlotSleeping, {
slot: self.slot,
});
debug!(target: "deadlock", "sleeping slot {}, node {}", self.slot, self.p2p().settings().node_id);
//sleep(5).await;
self.wakeup_self.reset();
// Peer discovery
self.session().wakeup_peer_discovery();
@@ -310,12 +291,6 @@ impl Slot {
slot, host,
);
debug!(
target: "deadlock",
"connecting outbound slot {}, node {}",
slot, self.p2p().settings().node_id,
);
dnetev!(self, OutboundSlotConnecting, {
slot: slot,
addr: host.clone(),
@@ -324,17 +299,12 @@ impl Slot {
let (addr, channel) = match self.try_connect(host.clone()).await {
Ok(connect_info) => connect_info,
Err(err) => {
//debug!(
// target: "deadlock",
// "[P2P] Outbound slot #{} connection failed: {}, node {}",
// slot, err, self.p2p().settings().node_id
//);
debug!(
target: "deadlock",
"connection failed: slot {}, node {}",
slot, self.p2p().settings().node_id
target: "net::outbound_session::try_connect()",
"[P2P] Outbound slot #{} connection failed: {}, node {}",
slot, err, self.p2p().settings().node_id
);
dnetev!(self, OutboundSlotDisconnected, {
slot,
err: err.to_string()
@@ -351,12 +321,6 @@ impl Slot {
slot, addr
);
debug!(
target: "deadlock",
"Created channel {} slot {}, node {}",
channel.info.id, slot, self.p2p().settings().node_id
);
dnetev!(self, OutboundSlotConnected, {
slot: self.slot,
addr: addr.clone(),
@@ -372,11 +336,6 @@ impl Slot {
slot, err
);
debug!(
target: "deadlock",
"disconnected slot {}, node {}",
slot, self.p2p().settings().node_id,
);
dnetev!(self, OutboundSlotDisconnected, {
slot: self.slot,
err: err.to_string()
@@ -409,30 +368,18 @@ impl Slot {
Ok((addr_final, channel)) => Ok((addr_final, channel)),
Err(e) => {
//debug!(
// target: "TODO",
// "[P2P] Unable to connect outbound slot #{} [{}]: {}",
// self.slot, addr, e
//);
debug!(
target: "deadlock",
"[P2P] Unable to connect outbound slot #{} [{}]: {} node {}",
self.slot, addr, e, self.p2p().settings().node_id
target: "net::outbound_session::try_connect()",
"[P2P] Unable to connect outbound slot #{} [{}]: {}",
self.slot, addr, e
);
// At this point we've failed to connect.
// If the host is in the anchorlist or whitelist, downgrade it to greylist.
self.p2p().hosts().downgrade_host(&addr).await?;
debug!(target: "deadlock", "removing channel... slot {} node {}",
self.slot, self.p2p().settings().node_id);
debug!(target: "net::outbound_session::try_connect", "removing channel...");
// Remove connection from pending
self.p2p().remove_pending(&addr).await;
debug!(target: "net::outbound_session::try_connect", "channel removed!");
debug!(target: "deadlock", "channel removed! slot {} node {}",
self.slot, self.p2p().settings().node_id);
// Notify that channel processing failed
self.session().channel_subscriber.notify(Err(Error::ConnectFailed)).await;
@@ -513,12 +460,10 @@ impl PeerDiscovery {
async fn run(self: Arc<Self>) {
let mut current_attempt = 0;
loop {
debug!(target: "deadlock", "peer discovery START node {} current attempt {}",
self.p2p().settings().node_id, current_attempt);
//dnetev!(self, OutboundPeerDiscovery, {
// attempt: current_attempt,
// state: "wait",
//});
dnetev!(self, OutboundPeerDiscovery, {
attempt: current_attempt,
state: "wait",
});
// wait to be woken up by notify()
let sleep_was_instant = self.wait().await;

View File

@@ -18,7 +18,6 @@
// cargo +nightly test --release --features=net --lib p2p -- --include-ignored
use simplelog::ThreadLogMode;
use std::sync::Arc;
use log::{debug, info};
@@ -32,36 +31,32 @@ use crate::{
};
// Number of nodes to spawn and number of peers each node connects to
const N_NODES: usize = 20;
//const N_CONNS: usize = 3;
const N_NODES: usize = 10;
const N_CONNS: usize = 3;
#[test]
fn p2p_test() {
let mut cfg = simplelog::ConfigBuilder::new();
//cfg.set_thread_mode(ThreadLogMode::Both);
cfg.add_filter_ignore("sled".to_string());
//cfg.add_filter_ignore("deadlock".to_string());
cfg.add_filter_ignore("net::protocol_ping".to_string());
//cfg.add_filter_ignore("sled".to_string());
//cfg.add_filter_ignore("net::protocol_ping".to_string());
//cfg.add_filter_ignore("net::channel::subscribe_stop()".to_string());
cfg.add_filter_ignore("net::hosts".to_string());
cfg.add_filter_ignore("net::inbound_session".to_string());
cfg.add_filter_ignore("outbound_session".to_string());
cfg.add_filter_ignore("net::outbound_session".to_string());
cfg.add_filter_ignore("net::session::outbound_session".to_string());
cfg.add_filter_ignore("net::session".to_string());
cfg.add_filter_ignore("net::refinery".to_string());
cfg.add_filter_ignore("net::message_subscriber".to_string());
cfg.add_filter_ignore("net::protocol_address".to_string());
cfg.add_filter_ignore("net::protocol_jobs_manager".to_string());
cfg.add_filter_ignore("net::protocol_version".to_string());
cfg.add_filter_ignore("net::protocol_registry".to_string());
cfg.add_filter_ignore("net::protocol_seed".to_string());
cfg.add_filter_ignore("net::channel".to_string());
cfg.add_filter_ignore("net::p2p::seed".to_string());
cfg.add_filter_ignore("net::p2p::start".to_string());
cfg.add_filter_ignore("store".to_string());
cfg.add_filter_ignore("net::store".to_string());
//cfg.add_filter_ignore("net::hosts".to_string());
//cfg.add_filter_ignore("net::inbound_session".to_string());
//cfg.add_filter_ignore("net::outbound_session".to_string());
//cfg.add_filter_ignore("net::session".to_string());
//cfg.add_filter_ignore("net::refinery".to_string());
//cfg.add_filter_ignore("net::message_subscriber".to_string());
//cfg.add_filter_ignore("net::protocol_address".to_string());
//cfg.add_filter_ignore("net::protocol_jobs_manager".to_string());
//cfg.add_filter_ignore("net::protocol_version".to_string());
//cfg.add_filter_ignore("net::protocol_registry".to_string());
//cfg.add_filter_ignore("net::protocol_seed".to_string());
//cfg.add_filter_ignore("net::channel".to_string());
//cfg.add_filter_ignore("net::p2p::seed".to_string());
//cfg.add_filter_ignore("net::p2p::start".to_string());
//cfg.add_filter_ignore("store".to_string());
//cfg.add_filter_ignore("net::store".to_string());
//cfg.add_filter_ignore("net::channel::send()".to_string());
//cfg.add_filter_ignore("net::channel::start()".to_string());
//cfg.add_filter_ignore("net::channel::subscribe_msg()".to_string());
@@ -121,13 +116,13 @@ async fn hostlist_propagation(ex: Arc<Executor<'static>>) {
for i in 0..N_NODES {
// Everyone will connect to N_CONNS random peers.
let mut peers = vec![];
//for _ in 0..N_CONNS {
// let mut port = 13200 + i;
// while port == 13200 + i {
// port = 13200 + rng.gen_range(0..N_NODES);
// }
// peers.push(Url::parse(&format!("tcp://127.0.0.1:{}", port)).unwrap());
//}
for _ in 0..N_CONNS {
let mut port = 13200 + i;
while port == 13200 + i {
port = 13200 + rng.gen_range(0..N_NODES);
}
peers.push(Url::parse(&format!("tcp://127.0.0.1:{}", port)).unwrap());
}
let settings = Settings {
localnet: true,
inbound_addrs: vec![Url::parse(&format!("tcp://127.0.0.1:{}", 13200 + i)).unwrap()],
@@ -149,7 +144,6 @@ async fn hostlist_propagation(ex: Arc<Executor<'static>>) {
}
// Start the P2P network
for p2p in p2p_instances.iter() {
//assert!(p2p.settings().advertise == true);
p2p.clone().start().await.unwrap();
}
@@ -159,33 +153,29 @@ async fn hostlist_propagation(ex: Arc<Executor<'static>>) {
info!("Inspecting hostlists...");
for p2p in p2p_instances.iter() {
let hosts = p2p.hosts();
//assert!(!hosts.is_empty_greylist().await);
//assert!(!hosts.is_empty_whitelist().await);
//assert!(!hosts.is_empty_anchorlist().await);
//let greylist = hosts.greylist.read().await;
//let whitelist = hosts.whitelist.read().await;
//let anchorlist = hosts.anchorlist.read().await;
let greylist = hosts.greylist.read().await;
let whitelist = hosts.whitelist.read().await;
let anchorlist = hosts.anchorlist.read().await;
//info!("Node {}", p2p.settings().node_id);
//for (i, (url, last_seen)) in greylist.iter().enumerate() {
// info!("Greylist entry {}: {}, {}", i, url, last_seen);
//}
info!("Node {}", p2p.settings().node_id);
for (i, (url, last_seen)) in greylist.iter().enumerate() {
info!("Greylist entry {}: {}, {}", i, url, last_seen);
}
//for (i, (url, last_seen)) in whitelist.iter().enumerate() {
// info!("Whitelist entry {}: {}, {}", i, url, last_seen);
//}
for (i, (url, last_seen)) in whitelist.iter().enumerate() {
info!("Whitelist entry {}: {}, {}", i, url, last_seen);
}
//for (i, (url, last_seen)) in anchorlist.iter().enumerate() {
// info!("Anchorlist entry {}: {}, {}", i, url, last_seen);
//}
for (i, (url, last_seen)) in anchorlist.iter().enumerate() {
info!("Anchorlist entry {}: {}, {}", i, url, last_seen);
}
}
// Stop the P2P network
for p2p in p2p_instances.iter() {
//info!("Stopping P2P instances...");
debug!("Stopping P2P instances...");
p2p.clone().stop().await;
debug!("node {} stopped!", p2p.settings().node_id);
debug!("Node {} stopped!", p2p.settings().node_id);
}
}

View File

@@ -16,7 +16,7 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
use log::{debug, trace};
use log::trace;
use rand::{rngs::OsRng, Rng};
use smol::{
future::{self, Future},
@@ -115,11 +115,9 @@ impl StoppableTask {
/// Can be called multiple times. After the first call, this does nothing.
pub async fn stop(&self) {
trace!(target: "system::StoppableTask", "Stopping task {}", self.task_id);
debug!(target: "deadlock", "system::StoppableTask Stopping task {}", self.task_id);
self.signal.notify();
self.barrier.wait().await;
trace!(target: "system::StoppableTask", "Stopped task {}", self.task_id);
debug!(target: "deadlock", "system::StoppableTask Stopped task {}", self.task_id);
}
}