p2p: greatly simplify & delet kod. p2p doesnt need a StoppableTask - the sessions themselves create their own StoppableTasks when calling start() and yield immediately.

This commit is contained in:
x
2023-08-24 11:36:31 +02:00
parent 21df1a2f52
commit 2db408de14
3 changed files with 28 additions and 73 deletions

View File

@@ -16,20 +16,6 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
// assert_err!(your_func(), Err(Error::UrlParsingFailed(_)));
// assert_err!(your_func(), Err(Error::CanonicalizationFailed(_)));
// assert_err!(your_func(), Err(Error::FileOpenFailed(er)) if er.kind() == ErrorKind::NotFound);
#[macro_export]
macro_rules! assert_err {
($expression:expr, $($pattern:tt)+) => {
match $expression {
$($pattern)+ => (),
ref e => panic!("expected `{}` but got `{:?}`", stringify!($($pattern)+), e),
}
}
}
pub use assert_err;
// Hello developer. Please add your error to the according subsection
// that is commented, or make a new subsection. Keep it clean.

View File

@@ -40,11 +40,8 @@ use super::{
settings::{Settings, SettingsPtr},
};
use crate::{
error::assert_err,
system::{
sleep_forever, StoppableTask, StoppableTaskPtr, Subscriber, SubscriberPtr, Subscription,
},
Error, Result,
system::{Subscriber, SubscriberPtr, Subscription},
Result,
};
/// Set of channels that are awaiting connection
@@ -80,9 +77,6 @@ pub struct P2p {
/// Reference to configured [`OutboundSession`]
session_outbound: Mutex<Option<Arc<OutboundSession>>>,
/// Main process that starts and stops all the sessions
process: StoppableTaskPtr,
/// Enable network debugging
pub dnet_enabled: Mutex<bool>,
/// The subscriber for which we can give dnet info over
@@ -115,8 +109,6 @@ impl P2p {
session_inbound: Mutex::new(None),
session_outbound: Mutex::new(None),
process: StoppableTask::new(),
dnet_enabled: Mutex::new(false),
dnet_subscriber: Subscriber::new(),
});
@@ -132,28 +124,10 @@ impl P2p {
self_
}
/// Invoke startup and seeding sequence. Call from constructing thread.
/// Starts inbound, outbound, and manual sessions.
pub async fn start(self: Arc<Self>) -> Result<()> {
let self_ = self.clone();
self.process.clone().start(
self.clone()._run(),
|result| self_.handle_stop(result),
Error::NetworkServiceStopped,
self.executor(),
);
Ok(())
}
// TODO: fix this, see outbound session also
async fn _run(self: Arc<Self>) -> Result<()> {
self.run().await;
Ok(())
}
/// Runs the network. Starts inbound, outbound, and manual sessions.
/// Waits for a stop signal and stops the network if received.
async fn run(self: Arc<Self>) {
debug!(target: "net::p2p::run()", "P2P::run() [BEGIN]");
info!(target: "net::p2p::run()", "[P2P] Running P2P subsystem");
debug!(target: "net::p2p::start()", "P2P::start() [BEGIN]");
info!(target: "net::p2p::start()", "[P2P] Starting P2P subsystem");
// First attempt any set manual connections
let manual = self.session_manual().await;
@@ -164,35 +138,22 @@ impl P2p {
// Start the inbound session
let inbound = self.session_inbound().await;
if let Err(err) = inbound.clone().start().await {
error!(target: "net::p2p::run()", "Failed to start inbound session!: {}", err);
error!(target: "net::p2p::start()", "Failed to start inbound session!: {}", err);
manual.stop().await;
return
return Err(err)
}
// Start the outbound session
let outbound = self.session_outbound().await;
if let Err(err) = outbound.clone().start().await {
error!(target: "net::p2p::run()", "Failed to start outbound session!: {}", err);
error!(target: "net::p2p::start()", "Failed to start outbound session!: {}", err);
manual.stop().await;
inbound.stop().await;
return
return Err(err)
}
info!(target: "net::p2p::run()", "[P2P] P2P subsystem started");
// Wait for stop signal
sleep_forever().await;
unreachable!();
}
async fn handle_stop(self: Arc<Self>, result: Result<()>) {
assert!(result.is_err());
assert_err!(result, Err(Error::NetworkServiceStopped));
info!(target: "net::p2p::handle_stop()", "[P2P] Received stop signal. Shutting down.");
// Stop the sessions
self.session_manual().await.stop().await;
self.session_inbound().await.stop().await;
self.session_outbound().await.stop().await;
info!(target: "net::p2p::start()", "[P2P] P2P subsystem started");
Ok(())
}
/// Reseed the P2P network.
@@ -211,7 +172,10 @@ impl P2p {
/// Stop the running P2P subsystem
pub async fn stop(&self) {
self.process.stop().await;
// Stop the sessions
self.session_manual().await.stop().await;
self.session_inbound().await.stop().await;
self.session_outbound().await.stop().await;
}
/// Broadcasts a message concurrently across all active channels.

View File

@@ -33,7 +33,7 @@ use std::{
use async_trait::async_trait;
use log::{debug, error, info, warn};
use smol::{lock::Mutex, Executor};
use smol::lock::Mutex;
use url::Url;
use super::{
@@ -148,23 +148,23 @@ impl Slot {
async fn start(self: Arc<Self>) {
// TODO: way too many clones, look into making this implicit. See implicit-clone crate
let ex = self.p2p().executor();
self.process.clone().start(
self.clone()._run(),
async move {
self.run().await;
Ok(())
},
// Ignore stop handler
|_| async {},
Error::NetworkServiceStopped,
self.p2p().executor(),
ex,
);
}
async fn stop(self: Arc<Self>) {
self.process.stop().await
}
// TODO: need to fix StoppableTask so it accepts arbitrary function signatures
async fn _run(self: Arc<Self>) -> Result<()> {
self.run().await;
Ok(())
}
async fn run(self: Arc<Self>) {
assert_eq!(*self.state.lock().await, SlotState::Inactive);
// This is the main outbound connection loop where we try to establish
@@ -262,6 +262,11 @@ impl Slot {
"[P2P] Outbound slot #{} disconnected: {}",
self.slot, err
);
dnetev!(self, OutboundDisconnected, {
slot: self.slot,
err: err.to_string()
});
continue
}
// Wait for channel to close