diff --git a/bin/ircd/src/main.rs b/bin/ircd/src/main.rs index fbc3c8dab..e4240b917 100644 --- a/bin/ircd/src/main.rs +++ b/bin/ircd/src/main.rs @@ -145,7 +145,7 @@ async fn start(executor: Arc>, options: ProgramOptions) -> Result<( let seen_privmsg_ids = SeenPrivMsgIds::new(); - let p2p = net::P2p::new(options.network_settings); + let p2p = net::P2p::new(options.network_settings).await; // Performs seed session p2p.clone().start(executor.clone()).await?; // Actual main p2p session diff --git a/src/net/channel.rs b/src/net/channel.rs index 532aa9934..dc53501fb 100644 --- a/src/net/channel.rs +++ b/src/net/channel.rs @@ -18,6 +18,7 @@ use crate::{ net::{ message_subscriber::{MessageSubscription, MessageSubsystem}, messages, + protocols::{ProtocolBase, ProtocolBasePtr}, }, system::{StoppableTask, StoppableTaskPtr, Subscriber, SubscriberPtr, Subscription}, }; diff --git a/src/net/p2p.rs b/src/net/p2p.rs index b8eedb733..686240ef4 100644 --- a/src/net/p2p.rs +++ b/src/net/p2p.rs @@ -11,6 +11,7 @@ use crate::{ error::{Error, Result}, net::{ messages::Message, + protocols::{register_default_protocols, ProtocolRegistry}, sessions::{InboundSession, ManualSession, OutboundSession, SeedSession}, Channel, ChannelPtr, Hosts, HostsPtr, Settings, SettingsPtr, }, @@ -32,21 +33,28 @@ pub struct P2p { // Used both internally and externally stop_subscriber: SubscriberPtr, hosts: HostsPtr, + protocol_registry: ProtocolRegistry, settings: SettingsPtr, } impl P2p { /// Create a new p2p network. - pub fn new(settings: Settings) -> Arc { + pub async fn new(settings: Settings) -> Arc { let settings = Arc::new(settings); - Arc::new(Self { + + let self_ = Arc::new(Self { pending: Mutex::new(HashSet::new()), channels: Mutex::new(HashMap::new()), channel_subscriber: Subscriber::new(), stop_subscriber: Subscriber::new(), hosts: Hosts::new(), + protocol_registry: ProtocolRegistry::new(), settings, - }) + }); + + register_default_protocols(self_.clone()).await; + + self_ } /// Invoke startup and seeding sequence. Call from constructing thread. @@ -140,6 +148,10 @@ impl P2p { self.hosts.clone() } + pub fn protocol_registry(&self) -> &ProtocolRegistry { + &self.protocol_registry + } + /// Subscribe to a channel. pub async fn subscribe_channel(&self) -> Subscription> { self.channel_subscriber.clone().subscribe().await diff --git a/src/net/protocols/mod.rs b/src/net/protocols/mod.rs index d395ff560..0d7308a44 100644 --- a/src/net/protocols/mod.rs +++ b/src/net/protocols/mod.rs @@ -46,8 +46,22 @@ pub mod protocol_seed; /// other node and sending the version acknowledgement. pub mod protocol_version; +pub mod protocol_base; +pub mod protocol_registry; + pub use protocol_address::ProtocolAddress; pub use protocol_jobs_manager::{ProtocolJobsManager, ProtocolJobsManagerPtr}; pub use protocol_ping::ProtocolPing; pub use protocol_seed::ProtocolSeed; pub use protocol_version::ProtocolVersion; + +pub use protocol_base::{ProtocolBase, ProtocolBasePtr}; +pub use protocol_registry::ProtocolRegistry; + +use crate::net::P2pPtr; + +pub async fn register_default_protocols(p2p: P2pPtr) { + let registry = p2p.protocol_registry(); + registry.register(ProtocolPing::new2).await; + registry.register(ProtocolAddress::new2).await; +} diff --git a/src/net/protocols/protocol_address.rs b/src/net/protocols/protocol_address.rs index 2dc8bd386..b6de1fd15 100644 --- a/src/net/protocols/protocol_address.rs +++ b/src/net/protocols/protocol_address.rs @@ -1,14 +1,15 @@ -use log::*; +use log::{error, debug}; use smol::Executor; use std::sync::Arc; +use async_trait::async_trait; use crate::{ error::Result, net::{ message_subscriber::MessageSubscription, messages, - protocols::{ProtocolJobsManager, ProtocolJobsManagerPtr}, - ChannelPtr, HostsPtr, + protocols::{ProtocolBase, ProtocolBasePtr, ProtocolJobsManager, ProtocolJobsManagerPtr}, + ChannelPtr, HostsPtr, P2pPtr, }, }; @@ -48,19 +49,30 @@ impl ProtocolAddress { }) } - /// Starts the address protocol. Runs receive address and get address - /// protocols on the protocol task manager. Then sends get-address - /// message. - pub async fn start(self: Arc, executor: Arc>) { - debug!(target: "net", "ProtocolAddress::start() [START]"); - self.jobsman.clone().start(executor.clone()); - self.jobsman.clone().spawn(self.clone().handle_receive_addrs(), executor.clone()).await; - self.jobsman.clone().spawn(self.clone().handle_receive_get_addrs(), executor).await; + pub async fn new2(channel: ChannelPtr, p2p: P2pPtr) -> ProtocolBasePtr { + let hosts = p2p.hosts(); - // Send get_address message. - let get_addrs = messages::GetAddrsMessage {}; - let _ = self.channel.clone().send(get_addrs).await; - debug!(target: "net", "ProtocolAddress::start() [END]"); + // Creates a subscription to address message. + let addrs_sub = channel + .clone() + .subscribe_msg::() + .await + .expect("Missing addrs dispatcher!"); + + // Creates a subscription to get-address message. + let get_addrs_sub = channel + .clone() + .subscribe_msg::() + .await + .expect("Missing getaddrs dispatcher!"); + + Arc::new(Self { + channel: channel.clone(), + addrs_sub, + get_addrs_sub, + hosts, + jobsman: ProtocolJobsManager::new("ProtocolAddress", channel), + }) } /// Handles receiving the address message. Loops to continually recieve @@ -107,3 +119,22 @@ impl ProtocolAddress { } } } + +#[async_trait] +impl ProtocolBase for ProtocolAddress { + /// Starts the address protocol. Runs receive address and get address + /// protocols on the protocol task manager. Then sends get-address + /// message. + async fn start(self: Arc, executor: Arc>) { + debug!(target: "net", "ProtocolAddress::start() [START]"); + self.jobsman.clone().start(executor.clone()); + self.jobsman.clone().spawn(self.clone().handle_receive_addrs(), executor.clone()).await; + self.jobsman.clone().spawn(self.clone().handle_receive_get_addrs(), executor).await; + + // Send get_address message. + let get_addrs = messages::GetAddrsMessage {}; + let _ = self.channel.clone().send(get_addrs).await; + debug!(target: "net", "ProtocolAddress::start() [END]"); + } +} + diff --git a/src/net/protocols/protocol_base.rs b/src/net/protocols/protocol_base.rs new file mode 100644 index 000000000..bb83bc3fd --- /dev/null +++ b/src/net/protocols/protocol_base.rs @@ -0,0 +1,10 @@ +use std::sync::Arc; +use async_trait::async_trait; +use smol::Executor; + +pub type ProtocolBasePtr = Arc; + +#[async_trait] +pub trait ProtocolBase { + async fn start(self: Arc, executor: Arc>); +} diff --git a/src/net/protocols/protocol_ping.rs b/src/net/protocols/protocol_ping.rs index aad16efbe..9af624586 100644 --- a/src/net/protocols/protocol_ping.rs +++ b/src/net/protocols/protocol_ping.rs @@ -1,14 +1,15 @@ -use log::*; +use log::{error, debug}; use rand::Rng; use smol::Executor; use std::{sync::Arc, time::Instant}; +use async_trait::async_trait; use crate::{ error::{Error, Result}, net::{ messages, - protocols::{ProtocolJobsManager, ProtocolJobsManagerPtr}, - ChannelPtr, SettingsPtr, + protocols::{ProtocolBase, ProtocolBasePtr, ProtocolJobsManager, ProtocolJobsManagerPtr}, + ChannelPtr, P2pPtr, SettingsPtr, }, util::sleep, }; @@ -22,7 +23,9 @@ pub struct ProtocolPing { impl ProtocolPing { /// Create a new ping-pong protocol. - pub fn new(channel: ChannelPtr, settings: SettingsPtr) -> Arc { + pub fn new(channel: ChannelPtr, p2p: P2pPtr) -> Arc { + let settings = p2p.settings(); + Arc::new(Self { channel: channel.clone(), settings, @@ -30,15 +33,14 @@ impl ProtocolPing { }) } - /// Starts ping-pong keep-alive messages exchange. Runs ping-pong in the - /// protocol task manager, then queues the reply. Sends out a ping and - /// waits for pong reply. Waits for ping and replies with a pong. - pub async fn start(self: Arc, executor: Arc>) { - debug!(target: "net", "ProtocolPing::start() [START]"); - self.jobsman.clone().start(executor.clone()); - self.jobsman.clone().spawn(self.clone().run_ping_pong(), executor.clone()).await; - self.jobsman.clone().spawn(self.reply_to_ping(), executor).await; - debug!(target: "net", "ProtocolPing::start() [END]"); + pub async fn new2(channel: ChannelPtr, p2p: P2pPtr) -> ProtocolBasePtr { + let settings = p2p.settings(); + + Arc::new(Self { + channel: channel.clone(), + settings, + jobsman: ProtocolJobsManager::new("ProtocolPing", channel), + }) } /// Runs ping-pong protocol. Creates a subscription to pong, then starts a @@ -110,3 +112,18 @@ impl ProtocolPing { rng.gen() } } + +#[async_trait] +impl ProtocolBase for ProtocolPing { + /// Starts ping-pong keep-alive messages exchange. Runs ping-pong in the + /// protocol task manager, then queues the reply. Sends out a ping and + /// waits for pong reply. Waits for ping and replies with a pong. + async fn start(self: Arc, executor: Arc>) { + debug!(target: "net", "ProtocolPing::start() [START]"); + self.jobsman.clone().start(executor.clone()); + self.jobsman.clone().spawn(self.clone().run_ping_pong(), executor.clone()).await; + self.jobsman.clone().spawn(self.reply_to_ping(), executor).await; + debug!(target: "net", "ProtocolPing::start() [END]"); + } +} + diff --git a/src/net/protocols/protocol_registry.rs b/src/net/protocols/protocol_registry.rs new file mode 100644 index 000000000..c294a6df6 --- /dev/null +++ b/src/net/protocols/protocol_registry.rs @@ -0,0 +1,48 @@ +use async_std::sync::Mutex; +use futures::future::BoxFuture; +use std::future::Future; + +use super::protocol_base::ProtocolBase; +use std::sync::Arc; + +use super::protocol_base::ProtocolBasePtr; +use crate::net::{ChannelPtr, P2pPtr}; + +type Constructor = + Box BoxFuture<'static, + Arc + > + Send + Sync>; + +pub struct ProtocolRegistry { + protocol_constructors: Mutex>, +} + +impl ProtocolRegistry { + pub fn new() -> Self { + Self { + protocol_constructors: Mutex::new(Vec::new()), + } + } + + // add_protocol()? + pub async fn register(&self, constructor: C) + where + C: 'static + Fn(ChannelPtr, P2pPtr) -> F + Send + Sync, + F: 'static + Future> + Send, + { + let constructor = move |channel, p2p| { + Box::pin(constructor(channel, p2p)) as BoxFuture<'static, ProtocolBasePtr> + }; + self.protocol_constructors.lock().await.push(Box::new(constructor)); + } + + pub async fn attach(&self, channel: ChannelPtr, p2p: P2pPtr) -> Vec { + let mut protocols: Vec> = Vec::new(); + for construct in self.protocol_constructors.lock().await.iter() { + let protocol: Arc = + construct(channel.clone(), p2p.clone()).await; + protocols.push(protocol) + } + protocols + } +} diff --git a/src/net/sessions/inbound_session.rs b/src/net/sessions/inbound_session.rs index 9f977e22a..4102a068e 100644 --- a/src/net/sessions/inbound_session.rs +++ b/src/net/sessions/inbound_session.rs @@ -8,7 +8,7 @@ use std::{ use crate::{ error::{Error, Result}, net::{ - protocols::{ProtocolAddress, ProtocolPing}, + protocols::{ProtocolAddress, ProtocolPing, ProtocolBase}, sessions::Session, Acceptor, AcceptorPtr, ChannelPtr, P2p, }, @@ -105,10 +105,9 @@ impl InboundSession { channel: ChannelPtr, executor: Arc>, ) -> Result<()> { - let settings = self.p2p().settings().clone(); let hosts = self.p2p().hosts(); - let protocol_ping = ProtocolPing::new(channel.clone(), settings.clone()); + let protocol_ping = ProtocolPing::new(channel.clone(), self.p2p()); let protocol_addr = ProtocolAddress::new(channel, hosts).await; protocol_ping.start(executor.clone()).await; diff --git a/src/net/sessions/manual_session.rs b/src/net/sessions/manual_session.rs index e625340dd..31237000f 100644 --- a/src/net/sessions/manual_session.rs +++ b/src/net/sessions/manual_session.rs @@ -9,7 +9,7 @@ use std::{ use crate::{ error::{Error, Result}, net::{ - protocols::{ProtocolAddress, ProtocolPing}, + protocols::{ProtocolAddress, ProtocolPing, ProtocolBase}, sessions::Session, ChannelPtr, Connector, P2p, }, @@ -118,10 +118,9 @@ impl ManualSession { channel: ChannelPtr, executor: Arc>, ) -> Result<()> { - let settings = self.p2p().settings().clone(); let hosts = self.p2p().hosts(); - let protocol_ping = ProtocolPing::new(channel.clone(), settings.clone()); + let protocol_ping = ProtocolPing::new(channel.clone(), self.p2p()); let protocol_addr = ProtocolAddress::new(channel, hosts).await; protocol_ping.start(executor.clone()).await; diff --git a/src/net/sessions/outbound_session.rs b/src/net/sessions/outbound_session.rs index c7561c5af..b17423982 100644 --- a/src/net/sessions/outbound_session.rs +++ b/src/net/sessions/outbound_session.rs @@ -9,7 +9,7 @@ use std::{ use crate::{ error::{Error, Result}, net::{ - protocols::{ProtocolAddress, ProtocolPing}, + protocols::{ProtocolAddress, ProtocolPing, ProtocolBase}, sessions::Session, ChannelPtr, Connector, P2p, }, @@ -157,10 +157,9 @@ impl OutboundSession { channel: ChannelPtr, executor: Arc>, ) -> Result<()> { - let settings = self.p2p().settings().clone(); let hosts = self.p2p().hosts(); - let protocol_ping = ProtocolPing::new(channel.clone(), settings.clone()); + let protocol_ping = ProtocolPing::new(channel.clone(), self.p2p()); let protocol_addr = ProtocolAddress::new(channel, hosts).await; protocol_ping.start(executor.clone()).await; diff --git a/src/net/sessions/seed_session.rs b/src/net/sessions/seed_session.rs index f54b5153e..5e7939fd0 100644 --- a/src/net/sessions/seed_session.rs +++ b/src/net/sessions/seed_session.rs @@ -9,7 +9,7 @@ use std::{ use crate::{ error::{Error, Result}, net::{ - protocols::{ProtocolPing, ProtocolSeed}, + protocols::{ProtocolPing, ProtocolSeed, ProtocolBase}, sessions::Session, ChannelPtr, Connector, HostsPtr, P2p, SettingsPtr, }, @@ -120,7 +120,7 @@ impl SeedSession { settings: SettingsPtr, executor: Arc>, ) -> Result<()> { - let protocol_ping = ProtocolPing::new(channel.clone(), settings.clone()); + let protocol_ping = ProtocolPing::new(channel.clone(), self.p2p()); protocol_ping.start(executor.clone()).await; let protocol_seed = ProtocolSeed::new(channel.clone(), hosts, settings.clone()); diff --git a/src/net/sessions/session.rs b/src/net/sessions/session.rs index 01ae07c14..48624410d 100644 --- a/src/net/sessions/session.rs +++ b/src/net/sessions/session.rs @@ -37,15 +37,34 @@ pub trait Session: Sync { ) -> Result<()> { debug!(target: "net", "Session::register_channel() [START]"); + // Protocols should all be initialized but not started + // We do this so that the protocols can begin receiving and buffering messages + // while the handshake protocol is ongoing. + // They are currently in sleep mode. + let p2p = self.p2p(); + let protocols = p2p.protocol_registry().attach(channel.clone(), p2p.clone()).await; + + // Perform the handshake protocol let protocol_version = ProtocolVersion::new(channel.clone(), self.p2p().settings()).await; let handshake_task = self.perform_handshake_protocols(protocol_version, channel.clone(), executor.clone()); - // start channel - channel.start(executor); + // Switch on the channel + channel.start(executor.clone()); + // Wait for handshake to finish. handshake_task.await?; + // Now the channel is ready + + // Now start all the protocols + // They are responsible for managing their own lifetimes and + // correctly self destructing when the channel ends. + for protocol in protocols { + // Activate protocol + protocol.start(executor.clone()).await; + } + debug!(target: "net", "Session::register_channel() [END]"); Ok(()) }