From f4d93104b514ed080961d9fa7c24369fa42e94f4 Mon Sep 17 00:00:00 2001 From: draoi Date: Thu, 16 May 2024 12:23:27 +0200 Subject: [PATCH] seedsync: refactor to enable reseed on CondVar.notify() We introduce a new abstraction called Slot which runs a connector for each seed specified in Settings. On `p2p.start()`, seed slots are started but sit in a pending state until a subsequent call to `notify()`. This change allows us to have more fine grained control over the `Connector` (such as allowing us to stop the `Connector` when it is in progress). Notably there has been a change to the error handling since we no longer return a success or failure from the seedsync process. Instead, we store success or failure of a given `Slot` in a AtomicBool, which allows us to notify other parts of the codebase if all attempts to seed have failed. --- src/net/p2p.rs | 26 ++- src/net/session/outbound_session.rs | 20 +- src/net/session/seedsync_session.rs | 305 ++++++++++++++++++---------- 3 files changed, 221 insertions(+), 130 deletions(-) diff --git a/src/net/p2p.rs b/src/net/p2p.rs index 4cc03c65d..f1bc7dbba 100644 --- a/src/net/p2p.rs +++ b/src/net/p2p.rs @@ -31,7 +31,7 @@ use super::{ protocol::{protocol_registry::ProtocolRegistry, register_default_protocols}, session::{ InboundSession, InboundSessionPtr, ManualSession, ManualSessionPtr, OutboundSession, - OutboundSessionPtr, RefineSession, RefineSessionPtr, SeedSyncSession, + OutboundSessionPtr, RefineSession, RefineSessionPtr, SeedSyncSession, SeedSyncSessionPtr, }, settings::{Settings, SettingsPtr}, }; @@ -61,7 +61,8 @@ pub struct P2p { session_outbound: OutboundSessionPtr, /// Reference to configured [`RefineSession`] session_refine: RefineSessionPtr, - + /// Reference to configured [`SeedSyncSession`] + session_seedsync: SeedSyncSessionPtr, /// Enable network debugging pub dnet_enabled: Mutex, /// The subscriber for which we can give dnet info over @@ -89,6 +90,7 @@ impl P2p { session_inbound: InboundSession::new(), session_outbound: OutboundSession::new(), session_refine: RefineSession::new(), + session_seedsync: SeedSyncSession::new(), dnet_enabled: Mutex::new(false), dnet_subscriber: Subscriber::new(), @@ -98,6 +100,7 @@ impl P2p { self_.session_inbound.p2p.init(self_.clone()); self_.session_outbound.p2p.init(self_.clone()); self_.session_refine.p2p.init(self_.clone()); + self_.session_seedsync.p2p.init(self_.clone()); register_default_protocols(self_.clone()).await; @@ -124,6 +127,10 @@ impl P2p { // Start the refine session self.session_refine().start().await; + // Start the seedsync session. Seed connections will not + // activate yet- they wait for a call to notify(). + self.session_seedsync().start().await; + // Start the outbound session self.session_outbound().start().await; @@ -132,17 +139,14 @@ impl P2p { } /// Reseed the P2P network. - pub async fn seed(self: Arc) -> Result<()> { + pub async fn seed(self: Arc) { debug!(target: "net::p2p::seed()", "P2P::seed() [BEGIN]"); info!(target: "net::p2p::seed()", "[P2P] Seeding P2P subsystem"); - // Start seed session - let seed = SeedSyncSession::new(Arc::downgrade(&self)); - // This will block until all seed queries have finished - seed.start().await?; + // Activate the seed session. + self.session_seedsync().notify().await; debug!(target: "net::p2p::seed()", "P2P::seed() [END]"); - Ok(()) } /// Stop the running P2P subsystem @@ -157,6 +161,7 @@ impl P2p { self.session_inbound().stop().await; self.session_refine().stop().await; self.session_outbound().stop().await; + self.session_seedsync().stop().await; } /// Broadcasts a message concurrently across all active channels. @@ -246,6 +251,11 @@ impl P2p { self.session_refine.clone() } + /// Get pointer to seedsync session + pub fn session_seedsync(&self) -> SeedSyncSessionPtr { + self.session_seedsync.clone() + } + /// Enable network debugging pub async fn dnet_enable(&self) { *self.dnet_enabled.lock().await = true; diff --git a/src/net/session/outbound_session.rs b/src/net/session/outbound_session.rs index aaecb403e..ae6e3472e 100644 --- a/src/net/session/outbound_session.rs +++ b/src/net/session/outbound_session.rs @@ -651,19 +651,13 @@ impl PeerDiscoveryBase for PeerDiscovery { state: "seed", }); - match p2p.clone().seed().await { - Ok(()) => { - info!( - target: "net::outbound_session::peer_discovery()", - "[P2P] Seeding hosts successful." - ); - } - Err(err) => { - error!( - target: "net::outbound_session::peer_discovery()", - "[P2P] Network reseed failed: {}", err, - ); - } + p2p.clone().seed().await; + + if p2p.clone().session_seedsync().failed().await { + error!( + target: "net::outbound_session::peer_discovery()", + "[P2P] Network reseed failed!" + ); } } diff --git a/src/net/session/seedsync_session.rs b/src/net/session/seedsync_session.rs index 68ae198f9..ed7fb10f8 100644 --- a/src/net/session/seedsync_session.rs +++ b/src/net/session/seedsync_session.rs @@ -18,9 +18,14 @@ //! Seed sync session creates a connection to the seed nodes specified in settings. //! A new seed sync session is created every time we call [`P2p::start()`]. The -//! seed sync session loops through all the configured seeds and tries to connect -//! to them using a [`Connector`]. Seed sync either connects successfully, fails -//! with an error, or times out. +//! seed sync session loops through all the configured seeds and creates a corresponding +//! `Slot`. `Slot`'s are started, but sit in a suspended state until they are activated +//! by a call to notify (see: `p2p.seed()`). +//! +//! When a `Slot` has been activated by a call to `notify()`, it will try to connect +//! to the given seed address using a [`Connector`]. This will either connect successfully +//! or fail with a warning. With gather the results of each `Slot` in an `AtomicBool` +//! so that we can handle the error elsewhere in the code base. //! //! If a seed node connects successfully, it runs a version exchange protocol, //! stores the channel in the p2p list of channels, and disconnects, removing @@ -38,14 +43,14 @@ //! p2p list of channels, and subscribes to a stop signal. use std::sync::{ - atomic::{AtomicUsize, Ordering}, + atomic::{AtomicBool, Ordering::SeqCst}, Arc, Weak, }; use async_trait::async_trait; -use futures::future::join_all; +use futures::stream::{FuturesUnordered, StreamExt}; use log::{debug, info, warn}; -use smol::Executor; +use smol::lock::Mutex; use url::Url; use super::{ @@ -53,143 +58,225 @@ use super::{ connector::Connector, hosts::HostColor, p2p::{P2p, P2pPtr}, + settings::SettingsPtr, }, Session, SessionBitFlag, SESSION_SEED, }; -use crate::{Error, Result}; +use crate::{ + system::{CondVar, LazyWeak, StoppableTask, StoppableTaskPtr}, + Error, +}; pub type SeedSyncSessionPtr = Arc; /// Defines seed connections session pub struct SeedSyncSession { - p2p: Weak, + pub(in crate::net) p2p: LazyWeak, + slots: Mutex>>, } impl SeedSyncSession { /// Create a new seed sync session instance - pub fn new(p2p: Weak) -> SeedSyncSessionPtr { - Arc::new(Self { p2p }) + pub(crate) fn new() -> SeedSyncSessionPtr { + Arc::new(Self { p2p: LazyWeak::new(), slots: Mutex::new(Vec::new()) }) } - /// Start the seed sync session. Creates a new task for every seed - /// connection and starts the seed on each task. - pub async fn start(self: Arc) -> Result<()> { - debug!(target: "net::session::seedsync_session", "SeedSyncSession::start() [START]"); - let settings = self.p2p().settings(); + /// Initialize the seedsync session. Each slot is suspended while it waits + /// for a call to notify(). + pub(crate) async fn start(self: Arc) { + // Activate mutex lock on connection slots. + let mut slots = self.slots.lock().await; - if settings.seeds.is_empty() { - warn!( - target: "net::session::seedsync_session", - "[P2P] Skipping seed sync process since no seeds are configured.", - ); + let mut futures = FuturesUnordered::new(); - return Ok(()) + let self_ = Arc::downgrade(&self); + + // Initialize a slot for each configured seed. + // Connections will be started by not yet activated. + for seed in &self.p2p().settings().seeds { + let slot = Slot::new(self_.clone(), seed.clone(), self.p2p().settings()); + futures.push(slot.clone().start()); + slots.push(slot); } - // Gather tasks so we can execute concurrently - let executor = self.p2p().executor(); - let mut tasks = Vec::with_capacity(settings.seeds.len()); - - let failed = Arc::new(AtomicUsize::new(0)); - - for (i, seed) in settings.seeds.iter().enumerate() { - let ex_ = executor.clone(); - let self_ = self.clone(); - let failed_ = failed.clone(); - - tasks.push(async move { - if let Err(e) = self_.clone().start_seed(i, seed.clone(), ex_.clone()).await { - warn!( - target: "net::session::seedsync_session", - "[P2P] Seed #{} connection failed: {}", i, e, - ); - failed_.fetch_add(1, Ordering::SeqCst); - } - }); - } - - // Poll concurrently - join_all(tasks).await; - - if failed.load(Ordering::SeqCst) == settings.seeds.len() { - return Err(Error::SeedFailed) - } - - // Seed process complete - if self.p2p().hosts().container.is_empty(HostColor::Grey).await { - warn!(target: "net::session::seedsync_session", "[P2P] Greylist empty after seeding"); - } - - debug!(target: "net::session::seedsync_session", "SeedSyncSession::start() [END]"); - Ok(()) + while (futures.next().await).is_some() {} } - /// Connects to a seed socket address - async fn start_seed( - self: Arc, - seed_index: usize, - seed: Url, - ex: Arc>, - ) -> Result<()> { - debug!( - target: "net::session::seedsync_session", "SeedSyncSession::start_seed(i={}) [START]", - seed_index - ); + /// Activate the slots so they can continue with the seedsync process. + /// Called in `p2p.seed()`. + pub(crate) async fn notify(&self) { + let slots = &*self.slots.lock().await; + for slot in slots { + slot.notify(); + } + } - let settings = self.p2p.upgrade().unwrap().settings(); - let parent = Arc::downgrade(&self); - let connector = Connector::new(settings.clone(), parent); + /// Stop the seedsync session. + pub(crate) async fn stop(&self) { + let slots = &*self.slots.lock().await; + let mut futures = FuturesUnordered::new(); - match connector.connect(&seed).await { - Ok((url, ch)) => { - info!( - target: "net::session::seedsync_session", - "[P2P] Connected seed #{} [{}]", seed_index, url, - ); - - if let Err(e) = self.clone().register_channel(ch.clone(), ex.clone()).await { - warn!( - target: "net::session::seedsync_session", - "[P2P] Failure during sync seed session #{} [{}]: {}", - seed_index, url, e, - ); - } - - info!( - target: "net::session::seedsync_session", - "[P2P] Disconnecting from seed #{} [{}]", - seed_index, url, - ); - ch.stop().await; - } - - Err(e) => { - warn!( - target: "net::session:seedsync_session", - "[P2P] Failure contacting seed #{} [{}]: {}", - seed_index, seed, e - ); - return Err(e) - } + for slot in slots { + futures.push(slot.clone().stop()); } - debug!( - target: "net::session::seedsync_session", - "SeedSyncSession::start_seed(i={}) [END]", - seed_index - ); + while (futures.next().await).is_some() {} + } - Ok(()) + pub(crate) async fn failed(&self) -> bool { + let slots = &*self.slots.lock().await; + slots.iter().any(|s| s.failed()) } } #[async_trait] impl Session for SeedSyncSession { fn p2p(&self) -> P2pPtr { - self.p2p.upgrade().unwrap() + self.p2p.upgrade() } fn type_id(&self) -> SessionBitFlag { SESSION_SEED } } + +struct Slot { + addr: Url, + process: StoppableTaskPtr, + wakeup_self: CondVar, + session: Weak, + connector: Connector, + failed: AtomicBool, +} + +impl Slot { + fn new(session: Weak, addr: Url, settings: SettingsPtr) -> Arc { + Arc::new(Self { + addr, + process: StoppableTask::new(), + wakeup_self: CondVar::new(), + session: session.clone(), + connector: Connector::new(settings, session), + failed: AtomicBool::new(false), + }) + } + + async fn start(self: Arc) { + let ex = self.p2p().executor(); + + self.process.clone().start( + async move { + self.run().await; + unreachable!(); + }, + // Ignore stop handler + |_| async {}, + Error::NetworkServiceStopped, + ex, + ); + } + + /// Main seedsync connection process that is started on `p2p.start()` but does + /// not proceed until it receives a call to `notify()` (called in `p2p.seed()`). + /// Resets the CondVar after each run to re-suspend the connection process until + /// `notify()` is called again. + async fn run(self: Arc) { + let ex = self.p2p().executor(); + + loop { + // Wait for a signal from notify() before proceeding with the seedsync. + self.wait().await; + + debug!( + target: "net::session::seedsync_session", "SeedSyncSession::start_seed() [START]", + ); + + match self.connector.connect(&self.addr).await { + Ok((url, ch)) => { + info!( + target: "net::session::seedsync_session", + "[P2P] Connected seed [{}]", url, + ); + + match self.session().register_channel(ch.clone(), ex.clone()).await { + Ok(()) => { + self.failed.store(false, SeqCst); + } + + Err(e) => { + warn!( + target: "net::session::seedsync_session", + "[P2P] Failure during sync seed session [{}]: {}", + url, e, + ); + self.failed.store(true, SeqCst); + } + } + + info!( + target: "net::session::seedsync_session", + "[P2P] Disconnecting from seed [{}]", + url, + ); + ch.stop().await; + } + + Err(e) => { + warn!( + target: "net::session:seedsync_session", + "[P2P] Failure contacting seed [{}]: {}", + self.addr, e + ); + + self.failed.store(true, SeqCst); + + // Reset the CondVar for future use. + self.reset(); + + continue + } + } + + // Seed process complete + if self.p2p().hosts().container.is_empty(HostColor::Grey).await { + warn!(target: "net::session::seedsync_session()", + "[P2P] Greylist empty after seeding"); + } + + // Reset the CondVar for future use. + self.reset(); + + debug!( + target: "net::session::seedsync_session", + "SeedSyncSession::start_seed() [END]", + ); + } + } + + pub fn failed(&self) -> bool { + self.failed.load(SeqCst) + } + + fn session(&self) -> SeedSyncSessionPtr { + self.session.upgrade().unwrap() + } + + fn p2p(&self) -> P2pPtr { + self.session().p2p() + } + async fn wait(&self) { + self.wakeup_self.wait().await; + } + + fn reset(&self) { + self.wakeup_self.reset() + } + + fn notify(&self) { + self.wakeup_self.notify() + } + + async fn stop(self: Arc) { + self.process.stop().await + } +}