diff --git a/src/net/hosts/refinery.rs b/src/net/hosts/refinery.rs index a191e6cdb..db428a7c9 100644 --- a/src/net/hosts/refinery.rs +++ b/src/net/hosts/refinery.rs @@ -69,48 +69,59 @@ impl GreylistRefinery { pub async fn stop(self: Arc) { match self.p2p().hosts().save_hosts().await { Ok(()) => { + debug!(target: "deadlock", "saved hosts node {}", self.p2p().settings().node_id); debug!(target: "net::refinery::stop()", "Save hosts successful!"); } Err(e) => { + debug!(target: "deadlock", "ERROR saving hosts node {}", self.p2p().settings().node_id); warn!(target: "net::refinery::stop()", "Error saving hosts {}", e); } } - self.process.stop().await + debug!(target: "deadlock", "Stopping refinery process node {}", self.p2p().settings().node_id); + self.process.stop().await; + debug!(target: "deadlock", "Refinery process stopped node {}", self.p2p().settings().node_id); } // Randomly select a peer on the greylist and probe it. async fn run(self: Arc) { - debug!(target: "net::refinery::run()", "START"); + //debug!(target: "net::refinery::run()", "START"); + debug!(target: "deadlock", "refinery START node {}", self.p2p().settings().node_id); loop { + sleep(self.p2p().settings().greylist_refinery_interval).await; + let hosts = self.p2p().hosts(); if hosts.is_empty_greylist().await { + debug!(target: "deadlock", "Greylist is empty! Cannot start refinery node {}", self.p2p().settings().node_id); warn!(target: "net::refinery::run()", "Greylist is empty! Cannot start refinery process"); - } else { - debug!(target: "net::refinery::run()", "Starting refinery process"); - // Randomly select an entry from the greylist. - let (entry, position) = hosts.greylist_fetch_random().await; - let url = &entry.0; - if ping_node(url, self.p2p().clone()).await { - // Peer is responsive. Update last_seen and add it to the whitelist. - let last_seen = UNIX_EPOCH.elapsed().unwrap().as_secs(); - - // Append to the whitelist. - hosts.whitelist_store_or_update(&[(url.clone(), last_seen)]).await.unwrap(); - - // Remove whitelisted peer from the greylist. - hosts.greylist_remove(url, position).await; - } else { - let mut greylist = hosts.greylist.write().await; - greylist.remove(position); - debug!(target: "net::refinery::run()", "Peer {} is not response. Removed from greylist", url); - } + continue } - debug!(target: "net::greylist_refinery::run()", "Sleeping..."); - sleep(self.p2p().settings().greylist_refinery_interval).await; + let (entry, position) = hosts.greylist_fetch_random().await; + let url = &entry.0; + + if !ping_node(url, self.p2p().clone()).await { + let mut greylist = hosts.greylist.write().await; + greylist.remove(position); + //debug!(target: "net::refinery::run()", "Peer {} is not response. Removed from greylist", url); + debug!(target: "deadlock", "Peer {} is not response. Removed from greylist node {}", url, self.p2p().settings().node_id); + + continue + } + + let last_seen = UNIX_EPOCH.elapsed().unwrap().as_secs(); + + debug!(target: "deadlock", "whitelist store or update node {}", self.p2p().settings().node_id); + // Append to the whitelist. + hosts.whitelist_store_or_update(&[(url.clone(), last_seen)]).await.unwrap(); + + debug!(target: "deadlock", "greylist remove node {}", self.p2p().settings().node_id); + // Remove whitelisted peer from the greylist. + hosts.greylist_remove(url, position).await; + + debug!(target: "deadlock", "refinery STOP node {}", self.p2p().settings().node_id); } } @@ -121,6 +132,7 @@ impl GreylistRefinery { // Ping a node to check it's online. pub async fn ping_node(addr: &Url, p2p: P2pPtr) -> bool { + debug!(target: "deadlock", "ping node START node {}", p2p.settings().node_id); let session_outbound = p2p.session_outbound(); let parent = Arc::downgrade(&session_outbound); let connector = Connector::new(p2p.settings(), parent); @@ -142,11 +154,13 @@ pub async fn ping_node(addr: &Url, p2p: P2pPtr) -> bool { match handshake_task.await { Ok(()) => { debug!(target: "net::refinery::ping_node()", "Handshake success! Stopping channel."); + debug!(target: "deadlock", "ping node STOP node {} -> true", p2p.settings().node_id); channel.stop().await; true } Err(e) => { debug!(target: "net::refinery::ping_node()", "Handshake failure! {}", e); + debug!(target: "deadlock", "ping node STOP node {} -> false", p2p.settings().node_id); channel.stop().await; false } @@ -155,6 +169,7 @@ pub async fn ping_node(addr: &Url, p2p: P2pPtr) -> bool { Err(e) => { debug!(target: "net::refinery::ping_node()", "Failed to connect to {}, ({})", addr, e); + debug!(target: "deadlock", "ping node STOP node {} -> false", p2p.settings().node_id); false } } diff --git a/src/net/hosts/store.rs b/src/net/hosts/store.rs index 5912592df..1d24775fa 100644 --- a/src/net/hosts/store.rs +++ b/src/net/hosts/store.rs @@ -98,7 +98,7 @@ impl Hosts { &self, p2p: P2pPtr, transports: &[String], - ) -> Option> { + ) -> Vec<(Url, u64)> { trace!(target: "store", "greylist_fetch_address() [START]"); // Collect hosts let mut hosts = vec![]; @@ -133,48 +133,7 @@ impl Hosts { // This is healthier for multiple slots to not compete for the same addrs. hosts.shuffle(&mut OsRng); - Some(hosts) - //// Try to find an unused host in the set. - //for (host, last_seen) in hosts.iter() { - // // Check if we already have this connection established - // if p2p.exists(host).await { - // debug!( - // target: "store::greylist_fetch_address()", - // "Host '{}' exists so skipping", - // host - // ); - // continue - // } - - // // Check if we already have this configured as a manual peer - // if self.settings.peers.contains(host) { - // debug!( - // target: "store::greylist_fetch_address()", - // "Host '{}' configured as manual peer so skipping", - // host - // ); - // continue - // } - - // // Obtain a lock on this address to prevent duplicate connection - // if !p2p.add_pending(host).await { - // debug!( - // target: "store::greylist_fetch_address()", - // "Host '{}' pending so skipping", - // host - // ); - // continue - // } - - // debug!( - // target: "store::greylist_fetch_address()", - // "Found valid host '{}", - // host - // ); - // return Some((host.clone(), last_seen.clone())) - //} - - //None + hosts } /// Loops through whitelist addresses to find an outbound address that we can @@ -186,7 +145,7 @@ impl Hosts { &self, p2p: P2pPtr, transports: &[String], - ) -> Option> { + ) -> Vec<(Url, u64)> { trace!(target: "store", "whitelist_fetch_address() [START]"); // Collect hosts let mut hosts = vec![]; @@ -224,61 +183,7 @@ impl Hosts { // This is healthier for multiple slots to not compete for the same addrs. hosts.shuffle(&mut OsRng); - Some(hosts) - //// Try to find an unused host in the set. - //for (host, last_seen) in hosts.iter() { - // debug!(target: "store::whitelist_fetch_address()", - // "Starting checks"); - // // Check if we already have this connection established - // if p2p.exists(host).await { - // debug!( - // target: "store::whitelist_fetch_address()", - // "Host '{}' exists so skipping", - // host - // ); - // continue - // } - - // debug!(target: "store::whitelist_fetch_address()", - // "Connection is not already established"); - - // // Check if we already have this configured as a manual peer - // if self.settings.peers.contains(host) { - // debug!( - // target: "store::whitelist_fetch_address()", - // "Host '{}' configured as manual peer so skipping", - // host - // ); - // continue - // } - - // debug!(target: "store::whitelist_fetch_address()", - // "Connection not configured as manual peer"); - - // // Obtain a lock on this address to prevent duplicate connection - // if !p2p.add_pending(host).await { - // debug!( - // target: "store::whitelist_fetch_address()", - // "Host '{}' pending so skipping", - // host - // ); - // continue - // } - - // debug!(target: "store::whitelist_fetch_address()", - // "Connection not pending"); - - // debug!( - // target: "store::whitelist_fetch_address()", - // "Found valid host '{}", - // host - // ); - // return Some((host.clone(), last_seen.clone())) - //} - - //debug!(target: "store::whitelist_fetch_address()", - //"Exiting with NONE"); - //None + hosts } /// Loops through anchorlist addresses to find an outbound address that we can @@ -290,7 +195,7 @@ impl Hosts { &self, p2p: P2pPtr, transports: &[String], - ) -> Option> { + ) -> Vec<(Url, u64)> { trace!(target: "store", "anchorlist_fetch_address() [START]"); // Collect hosts let mut hosts = vec![]; @@ -328,64 +233,14 @@ impl Hosts { // This is healthier for multiple slots to not compete for the same addrs. hosts.shuffle(&mut OsRng); - Some(hosts) - //// Try to find an unused host in the set. - //for (host, last_seen) in hosts.iter() { - // debug!(target: "store::anchorlist_fetch_address()", - // "Starting checks"); - // // Check if we already have this connection established - // if p2p.exists(host).await { - // debug!( - // target: "store::anchorlist_fetch_address()", - // "Host '{}' exists so skipping", - // host - // ); - // continue - // } - - // debug!(target: "store::anchorlist_fetch_address()", - // "Connection is not already established"); - - // // Check if we already have this configured as a manual peer - // if self.settings.peers.contains(host) { - // debug!( - // target: "store::anchorlist_fetch_address()", - // "Host '{}' configured as manual peer so skipping", - // host - // ); - // continue - // } - - // debug!(target: "store::anchorlist_fetch_address()", - // "Connection not configured as manual peer"); - - // // Obtain a lock on this address to prevent duplicate connection - // if !p2p.add_pending(host).await { - // debug!( - // target: "store::anchorlist_fetch_address()", - // "Host '{}' pending so skipping", - // host - // ); - // continue - // } - - // debug!(target: "store::anchorlist_fetch_address()", - // "Connection not pending"); - - // debug!( - // target: "store::anchorlist_fetch_address()", - // "Found valid host '{}", - // host - // ); - // return Some((host.clone(), last_seen.clone())) - //} - - //debug!(target: "store::anchorlist_fetch_address()", - //"Exiting with NONE"); - //None + hosts } - pub async fn lock_check(&self, p2p: P2pPtr, hosts: Vec<(Url, u64)>) -> Option<(Url, u64)> { + pub async fn check_address_with_lock( + &self, + p2p: P2pPtr, + hosts: Vec<(Url, u64)>, + ) -> Option<(Url, u64)> { // Try to find an unused host in the set. for (host, last_seen) in hosts { debug!(target: "store::anchorlist_fetch_address()", diff --git a/src/net/p2p.rs b/src/net/p2p.rs index 451b72065..aea4bc1fc 100644 --- a/src/net/p2p.rs +++ b/src/net/p2p.rs @@ -180,6 +180,7 @@ impl P2p { self.session_outbound().stop().await; // Stop greylist refinery process + debug!(target: "deadlock", "Killing greylist refinery node: {}", self.settings().node_id); self.greylist_refinery().stop().await; } diff --git a/src/net/protocol/protocol_version.rs b/src/net/protocol/protocol_version.rs index 6bb4658e1..eaaec7f5c 100644 --- a/src/net/protocol/protocol_version.rs +++ b/src/net/protocol/protocol_version.rs @@ -58,6 +58,7 @@ impl ProtocolVersion { /// info and wait for version ack. Wait for version info and send /// version ack. pub async fn run(self: Arc, executor: Arc>) -> Result<()> { + //debug!(target: "deadlock", "protocol_version::START => address={}", self.channel.address()); debug!(target: "net::protocol_version::run()", "START => address={}", self.channel.address()); // Start timer // Send version, wait for verack @@ -82,7 +83,8 @@ impl ProtocolVersion { return Err(Error::ChannelTimeout) } - debug!(target: "net::protocol_version::run()", "END => address={}", self.channel.address()); + //debug!(target: "deadlock", "protocol_version::END => address={}", self.channel.address()); + //debug!(target: "net::protocol_version::run()", "END => address={}", self.channel.address()); Ok(()) } diff --git a/src/net/session/mod.rs b/src/net/session/mod.rs index c2bdc6ee2..0c9d2ee95 100644 --- a/src/net/session/mod.rs +++ b/src/net/session/mod.rs @@ -147,6 +147,8 @@ pub trait Session: Sync { protocol_version.run(executor.clone()).await?; if self.type_id() != SESSION_INBOUND { + //debug!(target: "deadlock", "perform_handshake_protocols adding to anchorlist channel {}, node {}", + // channel.clone().address(), self.p2p().settings().node_id); // Channel is now initialized. Timestamp this. let last_seen = UNIX_EPOCH.elapsed().unwrap().as_secs(); @@ -157,16 +159,33 @@ pub trait Session: Sync { } // Add channel to p2p - debug!(target: "net::session::register_channel()", "perform_handshake_protocol {}", channel.clone().address()); + //debug!(target: "deadlock", "Storing channel in p2p, channel {} node {}", + //channel.clone().address(), self.p2p().settings().node_id); + + //debug!(target: "net::session::register_channel()", "perform_handshake_protocol {}", channel.clone().address()); self.p2p().store(channel.clone()).await; // Subscribe to stop, so we can remove from p2p + //debug!(target: "deadlock", "Waiting for a stop signal channel {} node {}", + //channel.clone().address(), self.p2p().settings().node_id); executor.spawn(remove_sub_on_stop(self.p2p(), channel, self.type_id())).detach(); // Channel is ready for use Ok(()) } + async fn perform_local_handshake( + &self, + protocol_version: Arc, + channel: ChannelPtr, + executor: Arc>, + ) -> Result<()> { + // Perform handshake + protocol_version.run(executor.clone()).await?; + // Channel is ready for use + Ok(()) + } + /// Returns a pointer to the p2p network interface fn p2p(&self) -> P2pPtr; diff --git a/src/net/session/outbound_session.rs b/src/net/session/outbound_session.rs index 7a7291608..d12c67183 100644 --- a/src/net/session/outbound_session.rs +++ b/src/net/session/outbound_session.rs @@ -108,12 +108,14 @@ impl OutboundSession { let slots = &*self.slots.lock().await; for slot in slots { - debug!(target: "deadlock", "Killing channel {:?}, slot: {:?}, node: {:?}", + debug!(target: "deadlock", "Killing channel {:?}, slot: {:?}, node {}", slot.channel_id, slot.slot, self.p2p().settings().node_id); slot.clone().stop().await; } + debug!(target: "deadlock", "Killing peer discovery node {}", self.p2p().settings().node_id); self.peer_discovery.clone().stop().await; + debug!(target: "deadlock", "Killed all outbound processes node {}", self.p2p().settings().node_id); } pub async fn slot_info(&self) -> Vec { @@ -187,14 +189,10 @@ impl Slot { self.process.stop().await } - async fn fetch_address( - &self, - slot_count: usize, - transports: &[String], - ) -> Option<(Url, u64)> { + async fn fetch_address(&self, slot_count: usize, transports: &[String]) -> Option<(Url, u64)> { let hosts = self.p2p().hosts(); - //let slot_count = self.p2p().settings().outbound_connections; - let white_count = slot_count * self.p2p().settings().white_connection_percent / 100; + let connects = self.p2p().settings().outbound_connections; + let white_count = connects * self.p2p().settings().white_connection_percent / 100; // Up to anchor_connection_count connections: // @@ -227,18 +225,15 @@ impl Slot { debug!(target: "outbound_session::fetch_address()", "All other connections- get grey connections"); hosts.greylist_fetch_address(self.p2p(), transports).await - } + } }; // Check whether: // * we already have this connection established // * we already have this configured as a manual peer // * address is already pending a connection - if addrs.is_some() { - let addr = hosts.lock_check(self.p2p(), addrs.unwrap()).await; - return addr - } - None + let addr = hosts.check_address_with_lock(self.p2p(), addrs).await; + return addr } // We first try to make connections to the addresses on our anchor list. We then find some @@ -414,20 +409,30 @@ impl Slot { Ok((addr_final, channel)) => Ok((addr_final, channel)), Err(e) => { + //debug!( + // target: "TODO", + // "[P2P] Unable to connect outbound slot #{} [{}]: {}", + // self.slot, addr, e + //); + debug!( target: "deadlock", - "[P2P] Unable to connect outbound slot #{} [{}]: {}", - self.slot, addr, e + "[P2P] Unable to connect outbound slot #{} [{}]: {} node {}", + self.slot, addr, e, self.p2p().settings().node_id ); // At this point we've failed to connect. // If the host is in the anchorlist or whitelist, downgrade it to greylist. self.p2p().hosts().downgrade_host(&addr).await?; + debug!(target: "deadlock", "removing channel... slot {} node {}", + self.slot, self.p2p().settings().node_id); debug!(target: "net::outbound_session::try_connect", "removing channel..."); // Remove connection from pending self.p2p().remove_pending(&addr).await; debug!(target: "net::outbound_session::try_connect", "channel removed!"); + debug!(target: "deadlock", "channel removed! slot {} node {}", + self.slot, self.p2p().settings().node_id); // Notify that channel processing failed self.session().channel_subscriber.notify(Err(Error::ConnectFailed)).await; @@ -508,6 +513,8 @@ impl PeerDiscovery { async fn run(self: Arc) { let mut current_attempt = 0; loop { + debug!(target: "deadlock", "peer discovery START node {} current attempt {}", + self.p2p().settings().node_id, current_attempt); //dnetev!(self, OutboundPeerDiscovery, { // attempt: current_attempt, // state: "wait", diff --git a/src/net/settings.rs b/src/net/settings.rs index ee33d6a68..d1152e434 100644 --- a/src/net/settings.rs +++ b/src/net/settings.rs @@ -108,7 +108,7 @@ impl Default for Settings { advertise: true, hostlist, greylist_refinery_interval: 30, - white_connection_percent: 50, + white_connection_percent: 90, anchor_connection_count: 2, } } diff --git a/src/net/tests.rs b/src/net/tests.rs index aab4e8b90..9f60c5db3 100644 --- a/src/net/tests.rs +++ b/src/net/tests.rs @@ -18,9 +18,10 @@ // cargo +nightly test --release --features=net --lib p2p -- --include-ignored +use simplelog::ThreadLogMode; use std::sync::Arc; -use log::info; +use log::{debug, info}; use rand::Rng; use smol::{channel, future, Executor}; use url::Url; @@ -31,16 +32,45 @@ use crate::{ }; // Number of nodes to spawn and number of peers each node connects to -const N_NODES: usize = 6; -const N_CONNS: usize = 1; +const N_NODES: usize = 20; +//const N_CONNS: usize = 3; #[test] fn p2p_test() { let mut cfg = simplelog::ConfigBuilder::new(); + //cfg.set_thread_mode(ThreadLogMode::Both); + cfg.add_filter_ignore("sled".to_string()); + //cfg.add_filter_ignore("deadlock".to_string()); + cfg.add_filter_ignore("net::protocol_ping".to_string()); + //cfg.add_filter_ignore("net::channel::subscribe_stop()".to_string()); + cfg.add_filter_ignore("net::hosts".to_string()); + cfg.add_filter_ignore("net::inbound_session".to_string()); + cfg.add_filter_ignore("outbound_session".to_string()); + cfg.add_filter_ignore("net::outbound_session".to_string()); + cfg.add_filter_ignore("net::session::outbound_session".to_string()); + cfg.add_filter_ignore("net::session".to_string()); + cfg.add_filter_ignore("net::refinery".to_string()); + cfg.add_filter_ignore("net::message_subscriber".to_string()); + cfg.add_filter_ignore("net::protocol_address".to_string()); + cfg.add_filter_ignore("net::protocol_jobs_manager".to_string()); + cfg.add_filter_ignore("net::protocol_version".to_string()); + cfg.add_filter_ignore("net::protocol_registry".to_string()); + cfg.add_filter_ignore("net::protocol_seed".to_string()); + cfg.add_filter_ignore("net::channel".to_string()); + cfg.add_filter_ignore("net::p2p::seed".to_string()); + cfg.add_filter_ignore("net::p2p::start".to_string()); + cfg.add_filter_ignore("store".to_string()); + cfg.add_filter_ignore("net::store".to_string()); + //cfg.add_filter_ignore("net::channel::send()".to_string()); + //cfg.add_filter_ignore("net::channel::start()".to_string()); + //cfg.add_filter_ignore("net::channel::subscribe_msg()".to_string()); + //cfg.add_filter_ignore("net::channel::main_receive_loop()".to_string()); + cfg.add_filter_ignore("net::tcp".to_string()); + simplelog::TermLogger::init( - simplelog::LevelFilter::Info, - //simplelog::LevelFilter::Debug, + //simplelog::LevelFilter::Info, + simplelog::LevelFilter::Debug, //simplelog::LevelFilter::Trace, cfg.build(), simplelog::TerminalMode::Mixed, @@ -73,7 +103,7 @@ async fn hostlist_propagation(ex: Arc>) { localnet: true, inbound_addrs: vec![seed_addr.clone()], external_addrs: vec![seed_addr.clone()], - outbound_connections: 2, + outbound_connections: 0, //outbound_connect_timeout: 10, inbound_connections: usize::MAX, seeds: vec![], @@ -102,7 +132,7 @@ async fn hostlist_propagation(ex: Arc>) { localnet: true, inbound_addrs: vec![Url::parse(&format!("tcp://127.0.0.1:{}", 13200 + i)).unwrap()], external_addrs: vec![Url::parse(&format!("tcp://127.0.0.1:{}", 13200 + i)).unwrap()], - outbound_connections: 2, + outbound_connections: 8, //outbound_connect_timeout: 10, inbound_connections: usize::MAX, seeds: vec![seed_addr.clone()], @@ -110,6 +140,7 @@ async fn hostlist_propagation(ex: Arc>) { peers, allowed_transports: vec!["tcp".to_string()], node_id: i.to_string(), + anchor_connection_count: 2, ..Default::default() }; @@ -122,37 +153,39 @@ async fn hostlist_propagation(ex: Arc>) { p2p.clone().start().await.unwrap(); } - info!("Waiting until all peers connect"); - sleep(15).await; + //info!("Waiting until all peers connect"); + sleep(30).await; - //info!("Inspecting hostlists..."); - //for p2p in p2p_instances.iter() { - // let hosts = p2p.hosts(); - // //assert!(!hosts.is_empty_greylist().await); - // //assert!(!hosts.is_empty_whitelist().await); - // //assert!(!hosts.is_empty_anchorlist().await); + info!("Inspecting hostlists..."); + for p2p in p2p_instances.iter() { + let hosts = p2p.hosts(); + //assert!(!hosts.is_empty_greylist().await); + //assert!(!hosts.is_empty_whitelist().await); + //assert!(!hosts.is_empty_anchorlist().await); - // let greylist = hosts.greylist.read().await; - // let whitelist = hosts.whitelist.read().await; - // let anchorlist = hosts.anchorlist.read().await; + //let greylist = hosts.greylist.read().await; + //let whitelist = hosts.whitelist.read().await; + //let anchorlist = hosts.anchorlist.read().await; - // info!("Node {}", p2p.settings().node_id); - // for (i, (url, last_seen)) in greylist.iter().enumerate() { - // info!("Greylist entry {}: {}, {}", i, url, last_seen); - // } + //info!("Node {}", p2p.settings().node_id); + //for (i, (url, last_seen)) in greylist.iter().enumerate() { + // info!("Greylist entry {}: {}, {}", i, url, last_seen); + //} - // for (i, (url, last_seen)) in whitelist.iter().enumerate() { - // info!("Whitelist entry {}: {}, {}", i, url, last_seen); - // } + //for (i, (url, last_seen)) in whitelist.iter().enumerate() { + // info!("Whitelist entry {}: {}, {}", i, url, last_seen); + //} - // for (i, (url, last_seen)) in anchorlist.iter().enumerate() { - // info!("Anchorlist entry {}: {}, {}", i, url, last_seen); - // } - //} + //for (i, (url, last_seen)) in anchorlist.iter().enumerate() { + // info!("Anchorlist entry {}: {}, {}", i, url, last_seen); + //} + } // Stop the P2P network for p2p in p2p_instances.iter() { - info!("Stopping P2P instances..."); + //info!("Stopping P2P instances..."); + debug!("Stopping P2P instances..."); p2p.clone().stop().await; + debug!("node {} stopped!", p2p.settings().node_id); } } diff --git a/src/system/stoppable_task.rs b/src/system/stoppable_task.rs index 6470efb45..c2b5c56f8 100644 --- a/src/system/stoppable_task.rs +++ b/src/system/stoppable_task.rs @@ -16,7 +16,7 @@ * along with this program. If not, see . */ -use log::trace; +use log::{debug, trace}; use rand::{rngs::OsRng, Rng}; use smol::{ future::{self, Future}, @@ -115,9 +115,11 @@ impl StoppableTask { /// Can be called multiple times. After the first call, this does nothing. pub async fn stop(&self) { trace!(target: "system::StoppableTask", "Stopping task {}", self.task_id); + debug!(target: "deadlock", "system::StoppableTask Stopping task {}", self.task_id); self.signal.notify(); self.barrier.wait().await; trace!(target: "system::StoppableTask", "Stopped task {}", self.task_id); + debug!(target: "deadlock", "system::StoppableTask Stopped task {}", self.task_id); } }