diff --git a/src/net/hosts.rs b/src/net/hosts.rs index 6e12dd080..b4888c411 100644 --- a/src/net/hosts.rs +++ b/src/net/hosts.rs @@ -799,6 +799,9 @@ pub struct Hosts { /// Publisher for notifications of new channels pub(in crate::net) channel_publisher: PublisherPtr>, + /// Publisher listening for network disconnects + pub(in crate::net) disconnect_publisher: PublisherPtr, + /// Keeps track of the last time a connection was made. pub(in crate::net) last_connection: Mutex, @@ -817,6 +820,7 @@ impl Hosts { container: HostContainer::new(), store_publisher: Publisher::new(), channel_publisher: Publisher::new(), + disconnect_publisher: Publisher::new(), last_connection: Mutex::new(Instant::now()), ipv6_available: Mutex::new(true), settings, @@ -996,14 +1000,21 @@ impl Hosts { *last_online = Instant::now(); } + /// Get notified when new hosts have been inserted into a hostlist. pub async fn subscribe_store(&self) -> Subscription { self.store_publisher.clone().subscribe().await } + /// Get notified when a new channel has been created pub async fn subscribe_channel(&self) -> Subscription> { self.channel_publisher.clone().subscribe().await } + /// Get notified when a node has no active connections (is disconnected) + pub async fn subscribe_disconnect(&self) -> Subscription { + self.disconnect_publisher.clone().subscribe().await + } + // Verify whether a URL is local. // NOTE: This function is stateless and not specific to // `Hosts`. For this reason, it might make more sense diff --git a/src/net/session/mod.rs b/src/net/session/mod.rs index 05a97b681..40cb2a3d8 100644 --- a/src/net/session/mod.rs +++ b/src/net/session/mod.rs @@ -56,6 +56,7 @@ pub type SessionWeakPtr = Weak; /// is received. pub async fn remove_sub_on_stop(p2p: P2pPtr, channel: ChannelPtr, type_id: SessionBitFlag) { debug!(target: "net::session::remove_sub_on_stop()", "[START]"); + let hosts = p2p.hosts(); let addr = channel.address(); // Subscribe to stop events @@ -77,13 +78,16 @@ pub async fn remove_sub_on_stop(p2p: P2pPtr, channel: ChannelPtr, type_id: Sessi "Downgrading {}", addr, ); - let last_seen = p2p.hosts().fetch_last_seen(addr).await.unwrap(); - p2p.hosts().move_host(addr, last_seen, HostColor::Grey).await.unwrap(); + let last_seen = hosts.fetch_last_seen(addr).await.unwrap(); + hosts.move_host(addr, last_seen, HostColor::Grey).await.unwrap(); } // Remove channel from the HostRegistry. Free up this addr for any future operation. - p2p.hosts().unregister(channel.address()).await; + hosts.unregister(channel.address()).await; + if hosts.channels().await.is_empty() { + hosts.disconnect_publisher.notify(true).await; + } debug!(target: "net::session::remove_sub_on_stop()", "[END]"); }