diff --git a/src/error.rs b/src/error.rs index 76bf88690..18f2aaef7 100644 --- a/src/error.rs +++ b/src/error.rs @@ -16,20 +16,6 @@ * along with this program. If not, see . */ -// assert_err!(your_func(), Err(Error::UrlParsingFailed(_))); -// assert_err!(your_func(), Err(Error::CanonicalizationFailed(_))); -// assert_err!(your_func(), Err(Error::FileOpenFailed(er)) if er.kind() == ErrorKind::NotFound); -#[macro_export] -macro_rules! assert_err { - ($expression:expr, $($pattern:tt)+) => { - match $expression { - $($pattern)+ => (), - ref e => panic!("expected `{}` but got `{:?}`", stringify!($($pattern)+), e), - } - } -} -pub use assert_err; - // Hello developer. Please add your error to the according subsection // that is commented, or make a new subsection. Keep it clean. diff --git a/src/net/p2p.rs b/src/net/p2p.rs index 5e7123ea4..46d6668b2 100644 --- a/src/net/p2p.rs +++ b/src/net/p2p.rs @@ -40,11 +40,8 @@ use super::{ settings::{Settings, SettingsPtr}, }; use crate::{ - error::assert_err, - system::{ - sleep_forever, StoppableTask, StoppableTaskPtr, Subscriber, SubscriberPtr, Subscription, - }, - Error, Result, + system::{Subscriber, SubscriberPtr, Subscription}, + Result, }; /// Set of channels that are awaiting connection @@ -80,9 +77,6 @@ pub struct P2p { /// Reference to configured [`OutboundSession`] session_outbound: Mutex>>, - /// Main process that starts and stops all the sessions - process: StoppableTaskPtr, - /// Enable network debugging pub dnet_enabled: Mutex, /// The subscriber for which we can give dnet info over @@ -115,8 +109,6 @@ impl P2p { session_inbound: Mutex::new(None), session_outbound: Mutex::new(None), - process: StoppableTask::new(), - dnet_enabled: Mutex::new(false), dnet_subscriber: Subscriber::new(), }); @@ -132,28 +124,10 @@ impl P2p { self_ } - /// Invoke startup and seeding sequence. Call from constructing thread. + /// Starts inbound, outbound, and manual sessions. pub async fn start(self: Arc) -> Result<()> { - let self_ = self.clone(); - self.process.clone().start( - self.clone()._run(), - |result| self_.handle_stop(result), - Error::NetworkServiceStopped, - self.executor(), - ); - Ok(()) - } - - // TODO: fix this, see outbound session also - async fn _run(self: Arc) -> Result<()> { - self.run().await; - Ok(()) - } - /// Runs the network. Starts inbound, outbound, and manual sessions. - /// Waits for a stop signal and stops the network if received. - async fn run(self: Arc) { - debug!(target: "net::p2p::run()", "P2P::run() [BEGIN]"); - info!(target: "net::p2p::run()", "[P2P] Running P2P subsystem"); + debug!(target: "net::p2p::start()", "P2P::start() [BEGIN]"); + info!(target: "net::p2p::start()", "[P2P] Starting P2P subsystem"); // First attempt any set manual connections let manual = self.session_manual().await; @@ -164,35 +138,22 @@ impl P2p { // Start the inbound session let inbound = self.session_inbound().await; if let Err(err) = inbound.clone().start().await { - error!(target: "net::p2p::run()", "Failed to start inbound session!: {}", err); + error!(target: "net::p2p::start()", "Failed to start inbound session!: {}", err); manual.stop().await; - return + return Err(err) } // Start the outbound session let outbound = self.session_outbound().await; if let Err(err) = outbound.clone().start().await { - error!(target: "net::p2p::run()", "Failed to start outbound session!: {}", err); + error!(target: "net::p2p::start()", "Failed to start outbound session!: {}", err); manual.stop().await; inbound.stop().await; - return + return Err(err) } - info!(target: "net::p2p::run()", "[P2P] P2P subsystem started"); - - // Wait for stop signal - sleep_forever().await; - unreachable!(); - } - - async fn handle_stop(self: Arc, result: Result<()>) { - assert!(result.is_err()); - assert_err!(result, Err(Error::NetworkServiceStopped)); - info!(target: "net::p2p::handle_stop()", "[P2P] Received stop signal. Shutting down."); - // Stop the sessions - self.session_manual().await.stop().await; - self.session_inbound().await.stop().await; - self.session_outbound().await.stop().await; + info!(target: "net::p2p::start()", "[P2P] P2P subsystem started"); + Ok(()) } /// Reseed the P2P network. @@ -211,7 +172,10 @@ impl P2p { /// Stop the running P2P subsystem pub async fn stop(&self) { - self.process.stop().await; + // Stop the sessions + self.session_manual().await.stop().await; + self.session_inbound().await.stop().await; + self.session_outbound().await.stop().await; } /// Broadcasts a message concurrently across all active channels. diff --git a/src/net/session/outbound_session.rs b/src/net/session/outbound_session.rs index 40a73cf38..72fb57146 100644 --- a/src/net/session/outbound_session.rs +++ b/src/net/session/outbound_session.rs @@ -33,7 +33,7 @@ use std::{ use async_trait::async_trait; use log::{debug, error, info, warn}; -use smol::{lock::Mutex, Executor}; +use smol::lock::Mutex; use url::Url; use super::{ @@ -148,23 +148,23 @@ impl Slot { async fn start(self: Arc) { // TODO: way too many clones, look into making this implicit. See implicit-clone crate + let ex = self.p2p().executor(); + self.process.clone().start( - self.clone()._run(), + async move { + self.run().await; + Ok(()) + }, // Ignore stop handler |_| async {}, Error::NetworkServiceStopped, - self.p2p().executor(), + ex, ); } async fn stop(self: Arc) { self.process.stop().await } - // TODO: need to fix StoppableTask so it accepts arbitrary function signatures - async fn _run(self: Arc) -> Result<()> { - self.run().await; - Ok(()) - } async fn run(self: Arc) { assert_eq!(*self.state.lock().await, SlotState::Inactive); // This is the main outbound connection loop where we try to establish @@ -262,6 +262,11 @@ impl Slot { "[P2P] Outbound slot #{} disconnected: {}", self.slot, err ); + + dnetev!(self, OutboundDisconnected, { + slot: self.slot, + err: err.to_string() + }); continue } // Wait for channel to close