diff --git a/bin/dnetview/src/model.rs b/bin/dnetview/src/model.rs index 6cec2ee4b..326776ba1 100644 --- a/bin/dnetview/src/model.rs +++ b/bin/dnetview/src/model.rs @@ -41,6 +41,12 @@ impl InfoList { } } +impl Default for InfoList { + fn default() -> Self { + Self::new() + } +} + #[derive(Clone, Debug, PartialEq, Eq, Hash)] pub struct NodeInfo { pub outbound: Vec, @@ -54,6 +60,12 @@ impl NodeInfo { } } +impl Default for NodeInfo { + fn default() -> Self { + Self::new() + } +} + #[derive(Clone, Debug, PartialEq, Deserialize, Eq, Hash)] pub struct ManualInfo { pub key: u64, diff --git a/bin/ircd/src/main.rs b/bin/ircd/src/main.rs index a5a79c875..8df1a7ca2 100644 --- a/bin/ircd/src/main.rs +++ b/bin/ircd/src/main.rs @@ -139,7 +139,7 @@ async fn start(executor: Arc>, options: ProgramOptions) -> Result<( .register(!net::SESSION_SEED, move |channel, p2p| { let sender = sender2.clone(); let seen_privmsg_ids = seen_privmsg_ids2.clone(); - async move { ProtocolPrivMsg::new(channel, sender, seen_privmsg_ids, p2p).await } + async move { ProtocolPrivMsg::init(channel, sender, seen_privmsg_ids, p2p).await } }) .await; diff --git a/bin/ircd/src/protocol_privmsg.rs b/bin/ircd/src/protocol_privmsg.rs index 82cd0b3d1..eb0d2559d 100644 --- a/bin/ircd/src/protocol_privmsg.rs +++ b/bin/ircd/src/protocol_privmsg.rs @@ -16,7 +16,7 @@ pub struct ProtocolPrivMsg { } impl ProtocolPrivMsg { - pub async fn new( + pub async fn init( channel: net::ChannelPtr, notify_queue_sender: async_channel::Sender>, seen_privmsg_ids: SeenPrivMsgIdsPtr, diff --git a/src/net/protocol/mod.rs b/src/net/protocol/mod.rs index 1d4216d20..4175ba769 100644 --- a/src/net/protocol/mod.rs +++ b/src/net/protocol/mod.rs @@ -65,7 +65,7 @@ use crate::net::{ pub async fn register_default_protocols(p2p: P2pPtr) { let registry = p2p.protocol_registry(); - registry.register(SESSION_ALL, ProtocolPing::new).await; - registry.register(!SESSION_SEED, ProtocolAddress::new).await; - registry.register(SESSION_SEED, ProtocolSeed::new).await; + registry.register(SESSION_ALL, ProtocolPing::init).await; + registry.register(!SESSION_SEED, ProtocolAddress::init).await; + registry.register(SESSION_SEED, ProtocolSeed::init).await; } diff --git a/src/net/protocol/protocol_address.rs b/src/net/protocol/protocol_address.rs index fbbb26311..fa98cfd4c 100644 --- a/src/net/protocol/protocol_address.rs +++ b/src/net/protocol/protocol_address.rs @@ -26,7 +26,7 @@ pub struct ProtocolAddress { impl ProtocolAddress { /// Create a new address protocol. Makes an address and get-address /// subscription and adds them to the address protocol instance. - pub async fn new(channel: ChannelPtr, p2p: P2pPtr) -> ProtocolBasePtr { + pub async fn init(channel: ChannelPtr, p2p: P2pPtr) -> ProtocolBasePtr { let hosts = p2p.hosts(); // Creates a subscription to address message. diff --git a/src/net/protocol/protocol_ping.rs b/src/net/protocol/protocol_ping.rs index 020ec8568..c926ff09c 100644 --- a/src/net/protocol/protocol_ping.rs +++ b/src/net/protocol/protocol_ping.rs @@ -26,7 +26,7 @@ pub struct ProtocolPing { impl ProtocolPing { /// Create a new ping-pong protocol. - pub async fn new(channel: ChannelPtr, p2p: P2pPtr) -> ProtocolBasePtr { + pub async fn init(channel: ChannelPtr, p2p: P2pPtr) -> ProtocolBasePtr { let settings = p2p.settings(); // Creates a subscription to ping message. diff --git a/src/net/protocol/protocol_registry.rs b/src/net/protocol/protocol_registry.rs index 67a2826ba..1da681113 100644 --- a/src/net/protocol/protocol_registry.rs +++ b/src/net/protocol/protocol_registry.rs @@ -12,6 +12,12 @@ pub struct ProtocolRegistry { protocol_constructors: Mutex>, } +impl Default for ProtocolRegistry { + fn default() -> Self { + Self::new() + } +} + impl ProtocolRegistry { pub fn new() -> Self { Self { protocol_constructors: Mutex::new(Vec::new()) } diff --git a/src/net/protocol/protocol_seed.rs b/src/net/protocol/protocol_seed.rs index d2de384a8..902dac12c 100644 --- a/src/net/protocol/protocol_seed.rs +++ b/src/net/protocol/protocol_seed.rs @@ -21,7 +21,7 @@ pub struct ProtocolSeed { impl ProtocolSeed { /// Create a new seed protocol. - pub async fn new(channel: ChannelPtr, p2p: P2pPtr) -> ProtocolBasePtr { + pub async fn init(channel: ChannelPtr, p2p: P2pPtr) -> ProtocolBasePtr { let hosts = p2p.hosts(); let settings = p2p.settings(); diff --git a/src/net/session/mod.rs b/src/net/session/mod.rs index 1cad1ffef..e1f7f159c 100644 --- a/src/net/session/mod.rs +++ b/src/net/session/mod.rs @@ -1,3 +1,13 @@ +use async_trait::async_trait; +use log::debug; +use smol::Executor; +use std::sync::Arc; + +use crate::{ + error::Result, + net::{p2p::P2pPtr, protocol::ProtocolVersion, ChannelPtr}, +}; + /// Seed connections session. Manages the creation of seed sessions. Used on /// first time connecting to the network. The seed node stores a list of other /// nodes in the network. @@ -23,10 +33,6 @@ pub mod inbound_session; /// no other part of the program uses the slots at the same time. pub mod outbound_session; -/// Defines methods that are used across sessions. Implements registering the -/// channel and initializing the channel by performing a network handshake. -pub mod session; - // bitwise selectors for the protocol_registry pub type SessionBitflag = u32; pub const SESSION_INBOUND: SessionBitflag = 0b0001; @@ -39,4 +45,100 @@ pub use inbound_session::InboundSession; pub use manual_session::ManualSession; pub use outbound_session::OutboundSession; pub use seed_session::SeedSession; -pub use session::Session; + +/// Removes channel from the list of connected channels when a stop signal is +/// received. +async fn remove_sub_on_stop(p2p: P2pPtr, channel: ChannelPtr) { + debug!(target: "net", "remove_sub_on_stop() [START]"); + // Subscribe to stop events + let stop_sub = channel.clone().subscribe_stop().await; + // Wait for a stop event + let _ = stop_sub.receive().await; + debug!(target: "net", + "remove_sub_on_stop(): received stop event. Removing channel {}", + channel.address() + ); + // Remove channel from p2p + p2p.remove(channel).await; + debug!(target: "net", "remove_sub_on_stop() [END]"); +} + +#[async_trait] +/// Session trait. +/// Defines methods that are used across sessions. Implements registering the +/// channel and initializing the channel by performing a network handshake. +pub trait Session: Sync { + /// Registers a new channel with the session. Performs a network handshake + /// and starts the channel. + async fn register_channel( + self: Arc, + channel: ChannelPtr, + executor: Arc>, + ) -> Result<()> { + debug!(target: "net", "Session::register_channel() [START]"); + + // Protocols should all be initialized but not started + // We do this so that the protocols can begin receiving and buffering messages + // while the handshake protocol is ongoing. + // They are currently in sleep mode. + let p2p = self.p2p(); + let protocols = + p2p.protocol_registry().attach(self.selector_id(), channel.clone(), p2p.clone()).await; + + // Perform the handshake protocol + let protocol_version = ProtocolVersion::new(channel.clone(), self.p2p().settings()).await; + let handshake_task = + self.perform_handshake_protocols(protocol_version, channel.clone(), executor.clone()); + + // Switch on the channel + channel.start(executor.clone()); + + // Wait for handshake to finish. + handshake_task.await?; + + // Now the channel is ready + debug!(target: "net", "Session handshake complete. Activating remaining protocols"); + + // Now start all the protocols + // They are responsible for managing their own lifetimes and + // correctly self destructing when the channel ends. + for protocol in protocols { + // Activate protocol + protocol.start(executor.clone()).await?; + } + + debug!(target: "net", "Session::register_channel() [END]"); + Ok(()) + } + + /// Performs network handshake to initialize channel. Adds the channel to + /// the list of connected channels, and prepares to remove the channel + /// when a stop signal is received. + async fn perform_handshake_protocols( + &self, + protocol_version: Arc, + channel: ChannelPtr, + executor: Arc>, + ) -> Result<()> { + // Perform handshake + protocol_version.run(executor.clone()).await?; + + // Channel is now initialized + + // Add channel to p2p + self.p2p().store(channel.clone()).await; + + // Subscribe to stop, so can remove from p2p + executor.spawn(remove_sub_on_stop(self.p2p(), channel)).detach(); + + // Channel is ready for use + Ok(()) + } + + async fn get_info(&self) -> serde_json::Value; + + /// Returns a pointer to the p2p network interface. + fn p2p(&self) -> P2pPtr; + + fn selector_id(&self) -> u32; +} diff --git a/src/net/session/session.rs b/src/net/session/session.rs deleted file mode 100644 index 1c2e2bd68..000000000 --- a/src/net/session/session.rs +++ /dev/null @@ -1,104 +0,0 @@ -use async_trait::async_trait; -use log::debug; -use smol::Executor; -use std::sync::Arc; - -use crate::{ - error::Result, - net::{p2p::P2pPtr, protocol::ProtocolVersion, ChannelPtr}, -}; - -/// Removes channel from the list of connected channels when a stop signal is -/// received. -async fn remove_sub_on_stop(p2p: P2pPtr, channel: ChannelPtr) { - debug!(target: "net", "remove_sub_on_stop() [START]"); - // Subscribe to stop events - let stop_sub = channel.clone().subscribe_stop().await; - // Wait for a stop event - let _ = stop_sub.receive().await; - debug!(target: "net", - "remove_sub_on_stop(): received stop event. Removing channel {}", - channel.address() - ); - // Remove channel from p2p - p2p.remove(channel).await; - debug!(target: "net", "remove_sub_on_stop() [END]"); -} - -#[async_trait] -/// Session trait. -pub trait Session: Sync { - /// Registers a new channel with the session. Performs a network handshake - /// and starts the channel. - async fn register_channel( - self: Arc, - channel: ChannelPtr, - executor: Arc>, - ) -> Result<()> { - debug!(target: "net", "Session::register_channel() [START]"); - - // Protocols should all be initialized but not started - // We do this so that the protocols can begin receiving and buffering messages - // while the handshake protocol is ongoing. - // They are currently in sleep mode. - let p2p = self.p2p(); - let protocols = - p2p.protocol_registry().attach(self.selector_id(), channel.clone(), p2p.clone()).await; - - // Perform the handshake protocol - let protocol_version = ProtocolVersion::new(channel.clone(), self.p2p().settings()).await; - let handshake_task = - self.perform_handshake_protocols(protocol_version, channel.clone(), executor.clone()); - - // Switch on the channel - channel.start(executor.clone()); - - // Wait for handshake to finish. - handshake_task.await?; - - // Now the channel is ready - debug!(target: "net", "Session handshake complete. Activating remaining protocols"); - - // Now start all the protocols - // They are responsible for managing their own lifetimes and - // correctly self destructing when the channel ends. - for protocol in protocols { - // Activate protocol - protocol.start(executor.clone()).await?; - } - - debug!(target: "net", "Session::register_channel() [END]"); - Ok(()) - } - - /// Performs network handshake to initialize channel. Adds the channel to - /// the list of connected channels, and prepares to remove the channel - /// when a stop signal is received. - async fn perform_handshake_protocols( - &self, - protocol_version: Arc, - channel: ChannelPtr, - executor: Arc>, - ) -> Result<()> { - // Perform handshake - protocol_version.run(executor.clone()).await?; - - // Channel is now initialized - - // Add channel to p2p - self.p2p().store(channel.clone()).await; - - // Subscribe to stop, so can remove from p2p - executor.spawn(remove_sub_on_stop(self.p2p(), channel)).detach(); - - // Channel is ready for use - Ok(()) - } - - async fn get_info(&self) -> serde_json::Value; - - /// Returns a pointer to the p2p network interface. - fn p2p(&self) -> P2pPtr; - - fn selector_id(&self) -> u32; -}