net: create a disconnect_publisher that notifies when we have no connectons

We check whether there are any remaining channels when we remove a
channel in remove_sub_on_stop(). If the channel list is empty,
we call notify() on disconnect_pubisher and set its inner value to true.

Note that this only signals when we do not have any connections, and
does not update to false when new connections are formed.
This commit is contained in:
draoi
2024-06-25 08:44:32 +02:00
parent 01d57d5877
commit 2badfbb141
2 changed files with 18 additions and 3 deletions

View File

@@ -799,6 +799,9 @@ pub struct Hosts {
/// Publisher for notifications of new channels
pub(in crate::net) channel_publisher: PublisherPtr<Result<ChannelPtr>>,
/// Publisher listening for network disconnects
pub(in crate::net) disconnect_publisher: PublisherPtr<bool>,
/// Keeps track of the last time a connection was made.
pub(in crate::net) last_connection: Mutex<Instant>,
@@ -817,6 +820,7 @@ impl Hosts {
container: HostContainer::new(),
store_publisher: Publisher::new(),
channel_publisher: Publisher::new(),
disconnect_publisher: Publisher::new(),
last_connection: Mutex::new(Instant::now()),
ipv6_available: Mutex::new(true),
settings,
@@ -996,14 +1000,21 @@ impl Hosts {
*last_online = Instant::now();
}
/// Get notified when new hosts have been inserted into a hostlist.
pub async fn subscribe_store(&self) -> Subscription<usize> {
self.store_publisher.clone().subscribe().await
}
/// Get notified when a new channel has been created
pub async fn subscribe_channel(&self) -> Subscription<Result<ChannelPtr>> {
self.channel_publisher.clone().subscribe().await
}
/// Get notified when a node has no active connections (is disconnected)
pub async fn subscribe_disconnect(&self) -> Subscription<bool> {
self.disconnect_publisher.clone().subscribe().await
}
// Verify whether a URL is local.
// NOTE: This function is stateless and not specific to
// `Hosts`. For this reason, it might make more sense

View File

@@ -56,6 +56,7 @@ pub type SessionWeakPtr = Weak<dyn Session + Send + Sync + 'static>;
/// is received.
pub async fn remove_sub_on_stop(p2p: P2pPtr, channel: ChannelPtr, type_id: SessionBitFlag) {
debug!(target: "net::session::remove_sub_on_stop()", "[START]");
let hosts = p2p.hosts();
let addr = channel.address();
// Subscribe to stop events
@@ -77,13 +78,16 @@ pub async fn remove_sub_on_stop(p2p: P2pPtr, channel: ChannelPtr, type_id: Sessi
"Downgrading {}", addr,
);
let last_seen = p2p.hosts().fetch_last_seen(addr).await.unwrap();
p2p.hosts().move_host(addr, last_seen, HostColor::Grey).await.unwrap();
let last_seen = hosts.fetch_last_seen(addr).await.unwrap();
hosts.move_host(addr, last_seen, HostColor::Grey).await.unwrap();
}
// Remove channel from the HostRegistry. Free up this addr for any future operation.
p2p.hosts().unregister(channel.address()).await;
hosts.unregister(channel.address()).await;
if hosts.channels().await.is_empty() {
hosts.disconnect_publisher.notify(true).await;
}
debug!(target: "net::session::remove_sub_on_stop()", "[END]");
}