net: Minor cleanups for idiomatic naming.

This commit is contained in:
parazyd
2022-03-17 13:35:18 +01:00
parent 50edabb426
commit 36fa2e020c
10 changed files with 133 additions and 117 deletions

View File

@@ -41,6 +41,12 @@ impl InfoList {
}
}
impl Default for InfoList {
fn default() -> Self {
Self::new()
}
}
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub struct NodeInfo {
pub outbound: Vec<OutboundInfo>,
@@ -54,6 +60,12 @@ impl NodeInfo {
}
}
impl Default for NodeInfo {
fn default() -> Self {
Self::new()
}
}
#[derive(Clone, Debug, PartialEq, Deserialize, Eq, Hash)]
pub struct ManualInfo {
pub key: u64,

View File

@@ -139,7 +139,7 @@ async fn start(executor: Arc<Executor<'_>>, options: ProgramOptions) -> Result<(
.register(!net::SESSION_SEED, move |channel, p2p| {
let sender = sender2.clone();
let seen_privmsg_ids = seen_privmsg_ids2.clone();
async move { ProtocolPrivMsg::new(channel, sender, seen_privmsg_ids, p2p).await }
async move { ProtocolPrivMsg::init(channel, sender, seen_privmsg_ids, p2p).await }
})
.await;

View File

@@ -16,7 +16,7 @@ pub struct ProtocolPrivMsg {
}
impl ProtocolPrivMsg {
pub async fn new(
pub async fn init(
channel: net::ChannelPtr,
notify_queue_sender: async_channel::Sender<Arc<PrivMsg>>,
seen_privmsg_ids: SeenPrivMsgIdsPtr,

View File

@@ -65,7 +65,7 @@ use crate::net::{
pub async fn register_default_protocols(p2p: P2pPtr) {
let registry = p2p.protocol_registry();
registry.register(SESSION_ALL, ProtocolPing::new).await;
registry.register(!SESSION_SEED, ProtocolAddress::new).await;
registry.register(SESSION_SEED, ProtocolSeed::new).await;
registry.register(SESSION_ALL, ProtocolPing::init).await;
registry.register(!SESSION_SEED, ProtocolAddress::init).await;
registry.register(SESSION_SEED, ProtocolSeed::init).await;
}

View File

@@ -26,7 +26,7 @@ pub struct ProtocolAddress {
impl ProtocolAddress {
/// Create a new address protocol. Makes an address and get-address
/// subscription and adds them to the address protocol instance.
pub async fn new(channel: ChannelPtr, p2p: P2pPtr) -> ProtocolBasePtr {
pub async fn init(channel: ChannelPtr, p2p: P2pPtr) -> ProtocolBasePtr {
let hosts = p2p.hosts();
// Creates a subscription to address message.

View File

@@ -26,7 +26,7 @@ pub struct ProtocolPing {
impl ProtocolPing {
/// Create a new ping-pong protocol.
pub async fn new(channel: ChannelPtr, p2p: P2pPtr) -> ProtocolBasePtr {
pub async fn init(channel: ChannelPtr, p2p: P2pPtr) -> ProtocolBasePtr {
let settings = p2p.settings();
// Creates a subscription to ping message.

View File

@@ -12,6 +12,12 @@ pub struct ProtocolRegistry {
protocol_constructors: Mutex<Vec<(SessionBitflag, Constructor)>>,
}
impl Default for ProtocolRegistry {
fn default() -> Self {
Self::new()
}
}
impl ProtocolRegistry {
pub fn new() -> Self {
Self { protocol_constructors: Mutex::new(Vec::new()) }

View File

@@ -21,7 +21,7 @@ pub struct ProtocolSeed {
impl ProtocolSeed {
/// Create a new seed protocol.
pub async fn new(channel: ChannelPtr, p2p: P2pPtr) -> ProtocolBasePtr {
pub async fn init(channel: ChannelPtr, p2p: P2pPtr) -> ProtocolBasePtr {
let hosts = p2p.hosts();
let settings = p2p.settings();

View File

@@ -1,3 +1,13 @@
use async_trait::async_trait;
use log::debug;
use smol::Executor;
use std::sync::Arc;
use crate::{
error::Result,
net::{p2p::P2pPtr, protocol::ProtocolVersion, ChannelPtr},
};
/// Seed connections session. Manages the creation of seed sessions. Used on
/// first time connecting to the network. The seed node stores a list of other
/// nodes in the network.
@@ -23,10 +33,6 @@ pub mod inbound_session;
/// no other part of the program uses the slots at the same time.
pub mod outbound_session;
/// Defines methods that are used across sessions. Implements registering the
/// channel and initializing the channel by performing a network handshake.
pub mod session;
// bitwise selectors for the protocol_registry
pub type SessionBitflag = u32;
pub const SESSION_INBOUND: SessionBitflag = 0b0001;
@@ -39,4 +45,100 @@ pub use inbound_session::InboundSession;
pub use manual_session::ManualSession;
pub use outbound_session::OutboundSession;
pub use seed_session::SeedSession;
pub use session::Session;
/// Removes channel from the list of connected channels when a stop signal is
/// received.
async fn remove_sub_on_stop(p2p: P2pPtr, channel: ChannelPtr) {
debug!(target: "net", "remove_sub_on_stop() [START]");
// Subscribe to stop events
let stop_sub = channel.clone().subscribe_stop().await;
// Wait for a stop event
let _ = stop_sub.receive().await;
debug!(target: "net",
"remove_sub_on_stop(): received stop event. Removing channel {}",
channel.address()
);
// Remove channel from p2p
p2p.remove(channel).await;
debug!(target: "net", "remove_sub_on_stop() [END]");
}
#[async_trait]
/// Session trait.
/// Defines methods that are used across sessions. Implements registering the
/// channel and initializing the channel by performing a network handshake.
pub trait Session: Sync {
/// Registers a new channel with the session. Performs a network handshake
/// and starts the channel.
async fn register_channel(
self: Arc<Self>,
channel: ChannelPtr,
executor: Arc<Executor<'_>>,
) -> Result<()> {
debug!(target: "net", "Session::register_channel() [START]");
// Protocols should all be initialized but not started
// We do this so that the protocols can begin receiving and buffering messages
// while the handshake protocol is ongoing.
// They are currently in sleep mode.
let p2p = self.p2p();
let protocols =
p2p.protocol_registry().attach(self.selector_id(), channel.clone(), p2p.clone()).await;
// Perform the handshake protocol
let protocol_version = ProtocolVersion::new(channel.clone(), self.p2p().settings()).await;
let handshake_task =
self.perform_handshake_protocols(protocol_version, channel.clone(), executor.clone());
// Switch on the channel
channel.start(executor.clone());
// Wait for handshake to finish.
handshake_task.await?;
// Now the channel is ready
debug!(target: "net", "Session handshake complete. Activating remaining protocols");
// Now start all the protocols
// They are responsible for managing their own lifetimes and
// correctly self destructing when the channel ends.
for protocol in protocols {
// Activate protocol
protocol.start(executor.clone()).await?;
}
debug!(target: "net", "Session::register_channel() [END]");
Ok(())
}
/// Performs network handshake to initialize channel. Adds the channel to
/// the list of connected channels, and prepares to remove the channel
/// when a stop signal is received.
async fn perform_handshake_protocols(
&self,
protocol_version: Arc<ProtocolVersion>,
channel: ChannelPtr,
executor: Arc<Executor<'_>>,
) -> Result<()> {
// Perform handshake
protocol_version.run(executor.clone()).await?;
// Channel is now initialized
// Add channel to p2p
self.p2p().store(channel.clone()).await;
// Subscribe to stop, so can remove from p2p
executor.spawn(remove_sub_on_stop(self.p2p(), channel)).detach();
// Channel is ready for use
Ok(())
}
async fn get_info(&self) -> serde_json::Value;
/// Returns a pointer to the p2p network interface.
fn p2p(&self) -> P2pPtr;
fn selector_id(&self) -> u32;
}

View File

@@ -1,104 +0,0 @@
use async_trait::async_trait;
use log::debug;
use smol::Executor;
use std::sync::Arc;
use crate::{
error::Result,
net::{p2p::P2pPtr, protocol::ProtocolVersion, ChannelPtr},
};
/// Removes channel from the list of connected channels when a stop signal is
/// received.
async fn remove_sub_on_stop(p2p: P2pPtr, channel: ChannelPtr) {
debug!(target: "net", "remove_sub_on_stop() [START]");
// Subscribe to stop events
let stop_sub = channel.clone().subscribe_stop().await;
// Wait for a stop event
let _ = stop_sub.receive().await;
debug!(target: "net",
"remove_sub_on_stop(): received stop event. Removing channel {}",
channel.address()
);
// Remove channel from p2p
p2p.remove(channel).await;
debug!(target: "net", "remove_sub_on_stop() [END]");
}
#[async_trait]
/// Session trait.
pub trait Session: Sync {
/// Registers a new channel with the session. Performs a network handshake
/// and starts the channel.
async fn register_channel(
self: Arc<Self>,
channel: ChannelPtr,
executor: Arc<Executor<'_>>,
) -> Result<()> {
debug!(target: "net", "Session::register_channel() [START]");
// Protocols should all be initialized but not started
// We do this so that the protocols can begin receiving and buffering messages
// while the handshake protocol is ongoing.
// They are currently in sleep mode.
let p2p = self.p2p();
let protocols =
p2p.protocol_registry().attach(self.selector_id(), channel.clone(), p2p.clone()).await;
// Perform the handshake protocol
let protocol_version = ProtocolVersion::new(channel.clone(), self.p2p().settings()).await;
let handshake_task =
self.perform_handshake_protocols(protocol_version, channel.clone(), executor.clone());
// Switch on the channel
channel.start(executor.clone());
// Wait for handshake to finish.
handshake_task.await?;
// Now the channel is ready
debug!(target: "net", "Session handshake complete. Activating remaining protocols");
// Now start all the protocols
// They are responsible for managing their own lifetimes and
// correctly self destructing when the channel ends.
for protocol in protocols {
// Activate protocol
protocol.start(executor.clone()).await?;
}
debug!(target: "net", "Session::register_channel() [END]");
Ok(())
}
/// Performs network handshake to initialize channel. Adds the channel to
/// the list of connected channels, and prepares to remove the channel
/// when a stop signal is received.
async fn perform_handshake_protocols(
&self,
protocol_version: Arc<ProtocolVersion>,
channel: ChannelPtr,
executor: Arc<Executor<'_>>,
) -> Result<()> {
// Perform handshake
protocol_version.run(executor.clone()).await?;
// Channel is now initialized
// Add channel to p2p
self.p2p().store(channel.clone()).await;
// Subscribe to stop, so can remove from p2p
executor.spawn(remove_sub_on_stop(self.p2p(), channel)).detach();
// Channel is ready for use
Ok(())
}
async fn get_info(&self) -> serde_json::Value;
/// Returns a pointer to the p2p network interface.
fn p2p(&self) -> P2pPtr;
fn selector_id(&self) -> u32;
}