seedsync: refactor to enable reseed on CondVar.notify()

We introduce a new abstraction called Slot which runs a connector for
each seed specified in Settings. On `p2p.start()`, seed slots are
started but sit in a pending state until a subsequent call to
`notify()`.

This change allows us to have more fine grained control over the `Connector`
(such as allowing us to stop the `Connector` when it is in progress).

Notably there has been a change to the error handling since we no longer
return a success or failure from the seedsync process. Instead, we store
success or failure of a given `Slot` in a AtomicBool, which allows us to
notify other parts of the codebase if all attempts to seed have failed.
This commit is contained in:
draoi
2024-05-16 12:23:27 +02:00
parent e93b6cca95
commit f4d93104b5
3 changed files with 221 additions and 130 deletions

View File

@@ -31,7 +31,7 @@ use super::{
protocol::{protocol_registry::ProtocolRegistry, register_default_protocols},
session::{
InboundSession, InboundSessionPtr, ManualSession, ManualSessionPtr, OutboundSession,
OutboundSessionPtr, RefineSession, RefineSessionPtr, SeedSyncSession,
OutboundSessionPtr, RefineSession, RefineSessionPtr, SeedSyncSession, SeedSyncSessionPtr,
},
settings::{Settings, SettingsPtr},
};
@@ -61,7 +61,8 @@ pub struct P2p {
session_outbound: OutboundSessionPtr,
/// Reference to configured [`RefineSession`]
session_refine: RefineSessionPtr,
/// Reference to configured [`SeedSyncSession`]
session_seedsync: SeedSyncSessionPtr,
/// Enable network debugging
pub dnet_enabled: Mutex<bool>,
/// The subscriber for which we can give dnet info over
@@ -89,6 +90,7 @@ impl P2p {
session_inbound: InboundSession::new(),
session_outbound: OutboundSession::new(),
session_refine: RefineSession::new(),
session_seedsync: SeedSyncSession::new(),
dnet_enabled: Mutex::new(false),
dnet_subscriber: Subscriber::new(),
@@ -98,6 +100,7 @@ impl P2p {
self_.session_inbound.p2p.init(self_.clone());
self_.session_outbound.p2p.init(self_.clone());
self_.session_refine.p2p.init(self_.clone());
self_.session_seedsync.p2p.init(self_.clone());
register_default_protocols(self_.clone()).await;
@@ -124,6 +127,10 @@ impl P2p {
// Start the refine session
self.session_refine().start().await;
// Start the seedsync session. Seed connections will not
// activate yet- they wait for a call to notify().
self.session_seedsync().start().await;
// Start the outbound session
self.session_outbound().start().await;
@@ -132,17 +139,14 @@ impl P2p {
}
/// Reseed the P2P network.
pub async fn seed(self: Arc<Self>) -> Result<()> {
pub async fn seed(self: Arc<Self>) {
debug!(target: "net::p2p::seed()", "P2P::seed() [BEGIN]");
info!(target: "net::p2p::seed()", "[P2P] Seeding P2P subsystem");
// Start seed session
let seed = SeedSyncSession::new(Arc::downgrade(&self));
// This will block until all seed queries have finished
seed.start().await?;
// Activate the seed session.
self.session_seedsync().notify().await;
debug!(target: "net::p2p::seed()", "P2P::seed() [END]");
Ok(())
}
/// Stop the running P2P subsystem
@@ -157,6 +161,7 @@ impl P2p {
self.session_inbound().stop().await;
self.session_refine().stop().await;
self.session_outbound().stop().await;
self.session_seedsync().stop().await;
}
/// Broadcasts a message concurrently across all active channels.
@@ -246,6 +251,11 @@ impl P2p {
self.session_refine.clone()
}
/// Get pointer to seedsync session
pub fn session_seedsync(&self) -> SeedSyncSessionPtr {
self.session_seedsync.clone()
}
/// Enable network debugging
pub async fn dnet_enable(&self) {
*self.dnet_enabled.lock().await = true;

View File

@@ -651,19 +651,13 @@ impl PeerDiscoveryBase for PeerDiscovery {
state: "seed",
});
match p2p.clone().seed().await {
Ok(()) => {
info!(
target: "net::outbound_session::peer_discovery()",
"[P2P] Seeding hosts successful."
);
}
Err(err) => {
error!(
target: "net::outbound_session::peer_discovery()",
"[P2P] Network reseed failed: {}", err,
);
}
p2p.clone().seed().await;
if p2p.clone().session_seedsync().failed().await {
error!(
target: "net::outbound_session::peer_discovery()",
"[P2P] Network reseed failed!"
);
}
}

View File

@@ -18,9 +18,14 @@
//! Seed sync session creates a connection to the seed nodes specified in settings.
//! A new seed sync session is created every time we call [`P2p::start()`]. The
//! seed sync session loops through all the configured seeds and tries to connect
//! to them using a [`Connector`]. Seed sync either connects successfully, fails
//! with an error, or times out.
//! seed sync session loops through all the configured seeds and creates a corresponding
//! `Slot`. `Slot`'s are started, but sit in a suspended state until they are activated
//! by a call to notify (see: `p2p.seed()`).
//!
//! When a `Slot` has been activated by a call to `notify()`, it will try to connect
//! to the given seed address using a [`Connector`]. This will either connect successfully
//! or fail with a warning. With gather the results of each `Slot` in an `AtomicBool`
//! so that we can handle the error elsewhere in the code base.
//!
//! If a seed node connects successfully, it runs a version exchange protocol,
//! stores the channel in the p2p list of channels, and disconnects, removing
@@ -38,14 +43,14 @@
//! p2p list of channels, and subscribes to a stop signal.
use std::sync::{
atomic::{AtomicUsize, Ordering},
atomic::{AtomicBool, Ordering::SeqCst},
Arc, Weak,
};
use async_trait::async_trait;
use futures::future::join_all;
use futures::stream::{FuturesUnordered, StreamExt};
use log::{debug, info, warn};
use smol::Executor;
use smol::lock::Mutex;
use url::Url;
use super::{
@@ -53,143 +58,225 @@ use super::{
connector::Connector,
hosts::HostColor,
p2p::{P2p, P2pPtr},
settings::SettingsPtr,
},
Session, SessionBitFlag, SESSION_SEED,
};
use crate::{Error, Result};
use crate::{
system::{CondVar, LazyWeak, StoppableTask, StoppableTaskPtr},
Error,
};
pub type SeedSyncSessionPtr = Arc<SeedSyncSession>;
/// Defines seed connections session
pub struct SeedSyncSession {
p2p: Weak<P2p>,
pub(in crate::net) p2p: LazyWeak<P2p>,
slots: Mutex<Vec<Arc<Slot>>>,
}
impl SeedSyncSession {
/// Create a new seed sync session instance
pub fn new(p2p: Weak<P2p>) -> SeedSyncSessionPtr {
Arc::new(Self { p2p })
pub(crate) fn new() -> SeedSyncSessionPtr {
Arc::new(Self { p2p: LazyWeak::new(), slots: Mutex::new(Vec::new()) })
}
/// Start the seed sync session. Creates a new task for every seed
/// connection and starts the seed on each task.
pub async fn start(self: Arc<Self>) -> Result<()> {
debug!(target: "net::session::seedsync_session", "SeedSyncSession::start() [START]");
let settings = self.p2p().settings();
/// Initialize the seedsync session. Each slot is suspended while it waits
/// for a call to notify().
pub(crate) async fn start(self: Arc<Self>) {
// Activate mutex lock on connection slots.
let mut slots = self.slots.lock().await;
if settings.seeds.is_empty() {
warn!(
target: "net::session::seedsync_session",
"[P2P] Skipping seed sync process since no seeds are configured.",
);
let mut futures = FuturesUnordered::new();
return Ok(())
let self_ = Arc::downgrade(&self);
// Initialize a slot for each configured seed.
// Connections will be started by not yet activated.
for seed in &self.p2p().settings().seeds {
let slot = Slot::new(self_.clone(), seed.clone(), self.p2p().settings());
futures.push(slot.clone().start());
slots.push(slot);
}
// Gather tasks so we can execute concurrently
let executor = self.p2p().executor();
let mut tasks = Vec::with_capacity(settings.seeds.len());
let failed = Arc::new(AtomicUsize::new(0));
for (i, seed) in settings.seeds.iter().enumerate() {
let ex_ = executor.clone();
let self_ = self.clone();
let failed_ = failed.clone();
tasks.push(async move {
if let Err(e) = self_.clone().start_seed(i, seed.clone(), ex_.clone()).await {
warn!(
target: "net::session::seedsync_session",
"[P2P] Seed #{} connection failed: {}", i, e,
);
failed_.fetch_add(1, Ordering::SeqCst);
}
});
}
// Poll concurrently
join_all(tasks).await;
if failed.load(Ordering::SeqCst) == settings.seeds.len() {
return Err(Error::SeedFailed)
}
// Seed process complete
if self.p2p().hosts().container.is_empty(HostColor::Grey).await {
warn!(target: "net::session::seedsync_session", "[P2P] Greylist empty after seeding");
}
debug!(target: "net::session::seedsync_session", "SeedSyncSession::start() [END]");
Ok(())
while (futures.next().await).is_some() {}
}
/// Connects to a seed socket address
async fn start_seed(
self: Arc<Self>,
seed_index: usize,
seed: Url,
ex: Arc<Executor<'_>>,
) -> Result<()> {
debug!(
target: "net::session::seedsync_session", "SeedSyncSession::start_seed(i={}) [START]",
seed_index
);
/// Activate the slots so they can continue with the seedsync process.
/// Called in `p2p.seed()`.
pub(crate) async fn notify(&self) {
let slots = &*self.slots.lock().await;
for slot in slots {
slot.notify();
}
}
let settings = self.p2p.upgrade().unwrap().settings();
let parent = Arc::downgrade(&self);
let connector = Connector::new(settings.clone(), parent);
/// Stop the seedsync session.
pub(crate) async fn stop(&self) {
let slots = &*self.slots.lock().await;
let mut futures = FuturesUnordered::new();
match connector.connect(&seed).await {
Ok((url, ch)) => {
info!(
target: "net::session::seedsync_session",
"[P2P] Connected seed #{} [{}]", seed_index, url,
);
if let Err(e) = self.clone().register_channel(ch.clone(), ex.clone()).await {
warn!(
target: "net::session::seedsync_session",
"[P2P] Failure during sync seed session #{} [{}]: {}",
seed_index, url, e,
);
}
info!(
target: "net::session::seedsync_session",
"[P2P] Disconnecting from seed #{} [{}]",
seed_index, url,
);
ch.stop().await;
}
Err(e) => {
warn!(
target: "net::session:seedsync_session",
"[P2P] Failure contacting seed #{} [{}]: {}",
seed_index, seed, e
);
return Err(e)
}
for slot in slots {
futures.push(slot.clone().stop());
}
debug!(
target: "net::session::seedsync_session",
"SeedSyncSession::start_seed(i={}) [END]",
seed_index
);
while (futures.next().await).is_some() {}
}
Ok(())
pub(crate) async fn failed(&self) -> bool {
let slots = &*self.slots.lock().await;
slots.iter().any(|s| s.failed())
}
}
#[async_trait]
impl Session for SeedSyncSession {
fn p2p(&self) -> P2pPtr {
self.p2p.upgrade().unwrap()
self.p2p.upgrade()
}
fn type_id(&self) -> SessionBitFlag {
SESSION_SEED
}
}
struct Slot {
addr: Url,
process: StoppableTaskPtr,
wakeup_self: CondVar,
session: Weak<SeedSyncSession>,
connector: Connector,
failed: AtomicBool,
}
impl Slot {
fn new(session: Weak<SeedSyncSession>, addr: Url, settings: SettingsPtr) -> Arc<Self> {
Arc::new(Self {
addr,
process: StoppableTask::new(),
wakeup_self: CondVar::new(),
session: session.clone(),
connector: Connector::new(settings, session),
failed: AtomicBool::new(false),
})
}
async fn start(self: Arc<Self>) {
let ex = self.p2p().executor();
self.process.clone().start(
async move {
self.run().await;
unreachable!();
},
// Ignore stop handler
|_| async {},
Error::NetworkServiceStopped,
ex,
);
}
/// Main seedsync connection process that is started on `p2p.start()` but does
/// not proceed until it receives a call to `notify()` (called in `p2p.seed()`).
/// Resets the CondVar after each run to re-suspend the connection process until
/// `notify()` is called again.
async fn run(self: Arc<Self>) {
let ex = self.p2p().executor();
loop {
// Wait for a signal from notify() before proceeding with the seedsync.
self.wait().await;
debug!(
target: "net::session::seedsync_session", "SeedSyncSession::start_seed() [START]",
);
match self.connector.connect(&self.addr).await {
Ok((url, ch)) => {
info!(
target: "net::session::seedsync_session",
"[P2P] Connected seed [{}]", url,
);
match self.session().register_channel(ch.clone(), ex.clone()).await {
Ok(()) => {
self.failed.store(false, SeqCst);
}
Err(e) => {
warn!(
target: "net::session::seedsync_session",
"[P2P] Failure during sync seed session [{}]: {}",
url, e,
);
self.failed.store(true, SeqCst);
}
}
info!(
target: "net::session::seedsync_session",
"[P2P] Disconnecting from seed [{}]",
url,
);
ch.stop().await;
}
Err(e) => {
warn!(
target: "net::session:seedsync_session",
"[P2P] Failure contacting seed [{}]: {}",
self.addr, e
);
self.failed.store(true, SeqCst);
// Reset the CondVar for future use.
self.reset();
continue
}
}
// Seed process complete
if self.p2p().hosts().container.is_empty(HostColor::Grey).await {
warn!(target: "net::session::seedsync_session()",
"[P2P] Greylist empty after seeding");
}
// Reset the CondVar for future use.
self.reset();
debug!(
target: "net::session::seedsync_session",
"SeedSyncSession::start_seed() [END]",
);
}
}
pub fn failed(&self) -> bool {
self.failed.load(SeqCst)
}
fn session(&self) -> SeedSyncSessionPtr {
self.session.upgrade().unwrap()
}
fn p2p(&self) -> P2pPtr {
self.session().p2p()
}
async fn wait(&self) {
self.wakeup_self.wait().await;
}
fn reset(&self) {
self.wakeup_self.reset()
}
fn notify(&self) {
self.wakeup_self.notify()
}
async fn stop(self: Arc<Self>) {
self.process.stop().await
}
}