session: manually stop the connector on slot.stop()

This change required adding a new abstraction called Slot that stores a
Connector, and refactoring ManualSession significantly such that it more
closely resembles the format of other sessions.

We also use FuturesUnordered to stop process concurrently wherever
relevant.
This commit is contained in:
draoi
2024-05-20 15:58:56 +02:00
parent eca97916f6
commit 55e9cc21d0
6 changed files with 131 additions and 91 deletions

View File

@@ -87,23 +87,20 @@ impl Connector {
pin_mut!(stop_fut);
pin_mut!(dial_fut);
let result = {
match select(dial_fut, stop_fut).await {
Either::Left((Ok(ptstream), _)) => {
let channel = Channel::new(
ptstream,
Some(endpoint.clone()),
url.clone(),
self.session.clone(),
)
.await;
Ok((endpoint, channel))
}
Either::Left((Err(e), _)) => Err(e.into()),
Either::Right((_, _)) => return Err(Error::ConnectorStopped),
match select(dial_fut, stop_fut).await {
Either::Left((Ok(ptstream), _)) => {
let channel = Channel::new(
ptstream,
Some(endpoint.clone()),
url.clone(),
self.session.clone(),
)
.await;
Ok((endpoint, channel))
}
};
result
Either::Left((Err(e), _)) => Err(e),
Either::Right((_, _)) => Err(Error::ConnectorStopped),
}
}
pub(crate) fn stop(&self) {

View File

@@ -113,9 +113,7 @@ impl P2p {
info!(target: "net::p2p::start()", "[P2P] Starting P2P subsystem");
// First attempt any set manual connections
for peer in &self.settings.peers {
self.session_manual().connect(peer.clone()).await;
}
self.session_manual().start().await;
// Start the inbound session
if let Err(err) = self.session_inbound().start().await {

View File

@@ -31,9 +31,10 @@
//! and insures that no other part of the program uses the slots at the
//! same time.
use std::sync::Arc;
use std::sync::{Arc, Weak};
use async_trait::async_trait;
use futures::stream::{FuturesUnordered, StreamExt};
use log::{debug, error, info, warn};
use smol::lock::Mutex;
use url::Url;
@@ -46,7 +47,7 @@ use super::{
Session, SessionBitFlag, SESSION_MANUAL,
};
use crate::{
net::hosts::HostState,
net::{hosts::HostState, settings::SettingsPtr},
system::{sleep, LazyWeak, StoppableTask, StoppableTaskPtr},
Error, Result,
};
@@ -56,66 +57,116 @@ pub type ManualSessionPtr = Arc<ManualSession>;
/// Defines manual connections session.
pub struct ManualSession {
pub(in crate::net) p2p: LazyWeak<P2p>,
connect_slots: Mutex<Vec<StoppableTaskPtr>>,
slots: Mutex<Vec<Arc<Slot>>>,
}
impl ManualSession {
/// Create a new manual session.
pub fn new() -> ManualSessionPtr {
Arc::new(Self { p2p: LazyWeak::new(), connect_slots: Mutex::new(Vec::new()) })
Arc::new(Self { p2p: LazyWeak::new(), slots: Mutex::new(Vec::new()) })
}
pub(crate) async fn start(self: Arc<Self>) {
// Activate mutex lock on connection slots.
let mut slots = self.slots.lock().await;
let mut futures = FuturesUnordered::new();
let self_ = Arc::downgrade(&self);
// Initialize a slot for each configured peer.
// Connections will be started by not yet activated.
for peer in &self.p2p().settings().peers {
let slot = Slot::new(self_.clone(), peer.clone(), self.p2p().settings());
futures.push(slot.clone().start());
slots.push(slot);
}
while (futures.next().await).is_some() {}
}
/// Stops the manual session.
pub async fn stop(&self) {
let connect_slots = &*self.connect_slots.lock().await;
let slots = &*self.slots.lock().await;
let mut futures = FuturesUnordered::new();
for slot in connect_slots {
slot.stop().await;
for slot in slots {
futures.push(slot.stop());
}
while (futures.next().await).is_some() {}
}
}
#[async_trait]
impl Session for ManualSession {
fn p2p(&self) -> P2pPtr {
self.p2p.upgrade()
}
/// Connect the manual session to the given address
pub async fn connect(self: Arc<Self>, addr: Url) {
let ex = self.p2p().executor();
let task = StoppableTask::new();
self.connect_slots.lock().await.push(task.clone());
fn type_id(&self) -> SessionBitFlag {
SESSION_MANUAL
}
}
task.start(
self.clone().channel_connect_loop(addr),
// Ignore stop handler
|_| async {},
struct Slot {
addr: Url,
process: StoppableTaskPtr,
session: Weak<ManualSession>,
connector: Connector,
}
impl Slot {
fn new(session: Weak<ManualSession>, addr: Url, settings: SettingsPtr) -> Arc<Self> {
Arc::new(Self {
addr,
process: StoppableTask::new(),
session: session.clone(),
connector: Connector::new(settings, session),
})
}
async fn start(self: Arc<Self>) {
let ex = self.p2p().executor();
self.process.clone().start(
self.run(),
|res| async {
match res {
Ok(()) | Err(Error::NetworkServiceStopped) => {}
Err(e) => error!("net::manual_session {}", e),
}
},
Error::NetworkServiceStopped,
ex,
);
}
/// Creates a connector object and tries to connect using it
pub async fn channel_connect_loop(self: Arc<Self>, addr: Url) -> Result<()> {
/// Attempts a connection on the associated Connector object.
async fn run(self: Arc<Self>) -> Result<()> {
let ex = self.p2p().executor();
let parent = Arc::downgrade(&self);
let settings = self.p2p().settings();
let connector = Connector::new(settings.clone(), parent);
let mut attempts = 0;
loop {
attempts += 1;
info!(
target: "net::manual_session",
"[P2P] Connecting to manual outbound [{}] (attempt #{})",
addr, attempts
self.addr, attempts
);
// Do not establish a connection to a host that is also configured as a seed.
// This indicates a user misconfiguration.
if settings.seeds.contains(&addr) {
if self.p2p().settings().seeds.contains(&self.addr) {
error!(target: "net::manual_session",
"[P2P] Suspending manual connection to seed [{}]", addr.clone());
"[P2P] Suspending manual connection to seed [{}]", self.addr.clone());
return Ok(())
}
match self.p2p().hosts().try_register(addr.clone(), HostState::Connect).await {
match self.p2p().hosts().try_register(self.addr.clone(), HostState::Connect).await {
Ok(_) => {
match connector.connect(&addr).await {
match self.connector.connect(&self.addr).await {
Ok((url, channel)) => {
info!(
target: "net::manual_session",
@@ -130,7 +181,7 @@ impl ManualSession {
// Channel is now connected but not yet setup
// Register the new channel
self.register_channel(channel.clone(), ex.clone()).await?;
self.session().register_channel(channel.clone(), ex.clone()).await?;
// Wait for channel to close
stop_sub.receive().await;
@@ -147,39 +198,41 @@ impl ManualSession {
warn!(
target: "net::manual_session",
"[P2P] Unable to connect to manual outbound [{}]: {}",
addr, e,
self.addr, e,
);
// Stop tracking this peer, to avoid it getting stuck in the Connect
// state. This is safe since we have failed to connect at this point.
self.p2p().hosts().unregister(&addr).await;
self.p2p().hosts().unregister(&self.addr).await;
}
}
}
// This address is currently unavailable.
Err(e) => {
debug!(target: "net::manual_session", "[P2P] Unable to connect to manual
outbound [{}]: {}", addr.clone(), e);
outbound [{}]: {}", self.addr.clone(), e);
}
}
info!(
target: "net::manual_session",
"[P2P] Waiting {} seconds until next manual outbound connection attempt [{}]",
settings.outbound_connect_timeout, addr,
self.p2p().settings().outbound_connect_timeout, self.addr,
);
sleep(settings.outbound_connect_timeout).await;
sleep(self.p2p().settings().outbound_connect_timeout).await;
}
}
}
#[async_trait]
impl Session for ManualSession {
fn session(&self) -> ManualSessionPtr {
self.session.upgrade().unwrap()
}
fn p2p(&self) -> P2pPtr {
self.p2p.upgrade()
self.session().p2p()
}
fn type_id(&self) -> SessionBitFlag {
SESSION_MANUAL
async fn stop(&self) {
self.connector.stop();
self.process.stop().await;
}
}

View File

@@ -104,9 +104,7 @@ impl OutboundSession {
/// Stops the outbound session.
pub(crate) async fn stop(&self) {
debug!(target: "net::outbound_session", "Stopping outbound session");
let slots = &*self.slots.lock().await;
let mut futures = FuturesUnordered::new();
for slot in slots {
@@ -115,10 +113,6 @@ impl OutboundSession {
while (futures.next().await).is_some() {}
// TODO/ FIXME: Shutting down the slots triggers a seed sync in peer discovery
// (see outbound_session.rs:633).
// We should implement an Atomic Bool called stopped()
// (see channel.rs).
self.peer_discovery.clone().stop().await;
}
@@ -172,17 +166,20 @@ struct Slot {
process: StoppableTaskPtr,
wakeup_self: CondVar,
session: Weak<OutboundSession>,
connector: Connector,
// For debugging
channel_id: AtomicU32,
}
impl Slot {
fn new(session: Weak<OutboundSession>, slot: u32) -> Arc<Self> {
let settings = session.upgrade().unwrap().p2p().settings();
Arc::new(Self {
slot,
process: StoppableTask::new(),
wakeup_self: CondVar::new(),
session,
session: session.clone(),
connector: Connector::new(settings, session),
channel_id: AtomicU32::new(0),
})
}
@@ -191,18 +188,20 @@ impl Slot {
let ex = self.p2p().executor();
self.process.clone().start(
async move {
self.run().await;
unreachable!();
self.run(),
|res| async {
match res {
Ok(()) | Err(Error::NetworkServiceStopped) => {}
Err(e) => error!("net::outbound_session {}", e),
}
},
// Ignore stop handler
|_| async {},
Error::NetworkServiceStopped,
ex,
);
}
async fn stop(self: Arc<Self>) {
self.process.stop().await
self.connector.stop();
self.process.stop().await;
}
/// Address selection algorithm that works as follows: up to
@@ -300,7 +299,7 @@ impl Slot {
// We first try to make connections to the addresses on our anchor list. We then find some
// whitelist connections according to the whitelist percent default. Finally, any remaining
// connections we make from the greylist.
async fn run(self: Arc<Self>) {
async fn run(self: Arc<Self>) -> Result<()> {
let hosts = self.p2p().hosts();
loop {
@@ -378,6 +377,7 @@ impl Slot {
});
self.channel_id.store(0, Ordering::Relaxed);
continue
}
};
@@ -433,10 +433,7 @@ impl Slot {
/// channel. In case of any failures, a network error is returned and the
/// main connect loop (parent of this function) will iterate again.
async fn try_connect(&self, addr: Url, last_seen: u64) -> Result<(Url, ChannelPtr)> {
let parent = Arc::downgrade(&self.session());
let connector = Connector::new(self.p2p().settings(), parent);
match connector.connect(&addr).await {
match self.connector.connect(&addr).await {
Ok((addr_final, channel)) => Ok((addr_final, channel)),
Err(e) => {
@@ -531,7 +528,7 @@ impl PeerDiscoveryBase for PeerDiscovery {
);
}
async fn stop(self: Arc<Self>) {
self.process.stop().await
self.process.stop().await;
}
/// Activate peer discovery if not active already. For the first two

View File

@@ -42,15 +42,14 @@
//! function. This runs the version exchange protocol, stores the channel in the
//! p2p list of channels, and subscribes to a stop signal.
use std::sync::{
atomic::{AtomicBool, Ordering::SeqCst},
Arc, Weak,
};
use async_trait::async_trait;
use futures::stream::{FuturesUnordered, StreamExt};
use log::{debug, info, warn};
use smol::lock::Mutex;
use std::sync::{
atomic::{AtomicBool, Ordering::SeqCst},
Arc, Weak,
};
use url::Url;
use super::{
@@ -106,6 +105,7 @@ impl SeedSyncSession {
/// Called in `p2p.seed()`.
pub(crate) async fn notify(&self) {
let slots = &*self.slots.lock().await;
for slot in slots {
slot.notify();
}
@@ -264,6 +264,7 @@ impl Slot {
fn p2p(&self) -> P2pPtr {
self.session().p2p()
}
async fn wait(&self) {
self.wakeup_self.wait().await;
}
@@ -277,6 +278,7 @@ impl Slot {
}
async fn stop(self: Arc<Self>) {
self.process.stop().await
self.connector.stop();
self.process.stop().await;
}
}

View File

@@ -111,7 +111,6 @@ impl TcpDialer {
match select(connect, timeout).await {
Either::Left((Ok(_), _)) => {
debug!(target: "net::tcp::do_dial", "Connection successful!");
let stream = {
let socket = async_socket.into_inner()?;
std::net::TcpStream::from(socket)
@@ -121,15 +120,9 @@ impl TcpDialer {
Ok(stream)
}
Either::Left((Err(e), _)) => {
debug!(target: "net::tcp::do_dial", "Connection error: {}", e);
return Err(e.into());
}
Either::Left((Err(e), _)) => Err(e),
Either::Right((_, _)) => {
debug!(target: "net::tcp::do_dial", "Connection timeed out!");
return Err(Error::ConnectTimeout)
}
Either::Right((_, _)) => Err(Error::ConnectTimeout),
}
}
None => {