From 39350f87409beeec888808573f9506fcccb05ade Mon Sep 17 00:00:00 2001 From: draoi Date: Thu, 9 May 2024 14:42:17 +0200 Subject: [PATCH] outbound_session: start() and stop() slots concurrently use FuturesUnordered to poll slot.start() and slot.stop() in a non-blocking manner. This should be more efficient than waiting for each stop(), start() future to complete. --- src/net/session/outbound_session.rs | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/src/net/session/outbound_session.rs b/src/net/session/outbound_session.rs index c9114d549..aaecb403e 100644 --- a/src/net/session/outbound_session.rs +++ b/src/net/session/outbound_session.rs @@ -35,6 +35,7 @@ use std::{ }; use async_trait::async_trait; +use futures::stream::{FuturesUnordered, StreamExt}; use log::{debug, error, info, warn}; use smol::lock::Mutex; use url::Url; @@ -86,14 +87,18 @@ impl OutboundSession { // Activate mutex lock on connection slots. let mut slots = self.slots.lock().await; + let mut futures = FuturesUnordered::new(); + let self_ = Arc::downgrade(&self); for i in 0..n_slots as u32 { let slot = Slot::new(self_.clone(), i); - slot.clone().start().await; + futures.push(slot.clone().start()); slots.push(slot); } + while (futures.next().await).is_some() {} + self.peer_discovery.clone().start().await; } @@ -102,10 +107,18 @@ impl OutboundSession { debug!(target: "net::outbound_session", "Stopping outbound session"); let slots = &*self.slots.lock().await; + let mut futures = FuturesUnordered::new(); + for slot in slots { - slot.clone().stop().await; + futures.push(slot.clone().stop()); } + while (futures.next().await).is_some() {} + + // TODO/ FIXME: Shutting down the slots triggers a seed sync in peer discovery + // (see outbound_session.rs:633). + // We should implement an Atomic Bool called stopped() + // (see channel.rs). self.peer_discovery.clone().stop().await; }