From a9a714aa73cc8f0c5f450af183477fb2c698ae44 Mon Sep 17 00:00:00 2001 From: narodnik Date: Sat, 5 Feb 2022 09:56:34 +0100 Subject: [PATCH] net: when registering protocols, use a bitwise selector to specify sessions it will be activated for. registry.register(SESSION_INBOUND | SESSION_OUTBOUND, ProtocolFoo::new) The constructors accept a channel and p2p instance. For an example of passing additional arguments, you have to use an async closure: let foo2 = foo.clone(); // registry.register( !net::SESSION_SEED, move |channel, p2p| { let foo = foo2.clone(); async move { ProtocolFoo::new(channel, p2p, foo).await } }).await; --- src/net/mod.rs | 3 +- src/net/protocol/mod.rs | 10 ++++-- src/net/protocol/protocol_address.rs | 3 +- src/net/protocol/protocol_base.rs | 6 ++-- src/net/protocol/protocol_ping.rs | 3 +- src/net/protocol/protocol_registry.rs | 25 +++++++++---- src/net/protocol/protocol_seed.rs | 51 +++++++++++++++++---------- src/net/session/inbound_session.rs | 15 +++++--- src/net/session/manual_session.rs | 14 +++++--- src/net/session/mod.rs | 8 +++++ src/net/session/outbound_session.rs | 14 +++++--- src/net/session/seed_session.rs | 14 +++++--- src/net/session/session.rs | 7 ++-- 13 files changed, 118 insertions(+), 55 deletions(-) diff --git a/src/net/mod.rs b/src/net/mod.rs index 37b1d2d7a..61491796f 100644 --- a/src/net/mod.rs +++ b/src/net/mod.rs @@ -92,5 +92,6 @@ pub use hosts::{Hosts, HostsPtr}; pub use message::Message; pub use message_subscriber::MessageSubscription; pub use p2p::{P2p, P2pPtr}; -pub use protocol::{ProtocolJobsManager, ProtocolJobsManagerPtr}; +pub use protocol::{ProtocolJobsManager, ProtocolJobsManagerPtr, ProtocolBasePtr, ProtocolBase}; +pub use session::{SESSION_ALL, SESSION_INBOUND, SESSION_MANUAL, SESSION_OUTBOUND, SESSION_SEED}; pub use settings::{Settings, SettingsPtr}; diff --git a/src/net/protocol/mod.rs b/src/net/protocol/mod.rs index 0d7308a44..d109de207 100644 --- a/src/net/protocol/mod.rs +++ b/src/net/protocol/mod.rs @@ -58,10 +58,14 @@ pub use protocol_version::ProtocolVersion; pub use protocol_base::{ProtocolBase, ProtocolBasePtr}; pub use protocol_registry::ProtocolRegistry; -use crate::net::P2pPtr; +use crate::net::{ + session::{SESSION_ALL, SESSION_SEED}, + P2pPtr, +}; pub async fn register_default_protocols(p2p: P2pPtr) { let registry = p2p.protocol_registry(); - registry.register(ProtocolPing::new2).await; - registry.register(ProtocolAddress::new2).await; + registry.register(SESSION_ALL, ProtocolPing::new2).await; + registry.register(!SESSION_SEED, ProtocolAddress::new2).await; + registry.register(SESSION_SEED, ProtocolSeed::new2).await; } diff --git a/src/net/protocol/protocol_address.rs b/src/net/protocol/protocol_address.rs index d1e91f60a..f43b0ceab 100644 --- a/src/net/protocol/protocol_address.rs +++ b/src/net/protocol/protocol_address.rs @@ -125,7 +125,7 @@ impl ProtocolBase for ProtocolAddress { /// Starts the address protocol. Runs receive address and get address /// protocols on the protocol task manager. Then sends get-address /// message. - async fn start(self: Arc, executor: Arc>) { + async fn start(self: Arc, executor: Arc>) -> Result<()> { debug!(target: "net", "ProtocolAddress::start() [START]"); self.jobsman.clone().start(executor.clone()); self.jobsman.clone().spawn(self.clone().handle_receive_addrs(), executor.clone()).await; @@ -135,5 +135,6 @@ impl ProtocolBase for ProtocolAddress { let get_addrs = message::GetAddrsMessage {}; let _ = self.channel.clone().send(get_addrs).await; debug!(target: "net", "ProtocolAddress::start() [END]"); + Ok(()) } } diff --git a/src/net/protocol/protocol_base.rs b/src/net/protocol/protocol_base.rs index 990b5a677..bcc4e5830 100644 --- a/src/net/protocol/protocol_base.rs +++ b/src/net/protocol/protocol_base.rs @@ -2,9 +2,11 @@ use async_trait::async_trait; use smol::Executor; use std::sync::Arc; -pub type ProtocolBasePtr = Arc; +use crate::error::Result; + +pub type ProtocolBasePtr = Arc; #[async_trait] pub trait ProtocolBase { - async fn start(self: Arc, executor: Arc>); + async fn start(self: Arc, executor: Arc>) -> Result<()>; } diff --git a/src/net/protocol/protocol_ping.rs b/src/net/protocol/protocol_ping.rs index 3a15b7315..15462f6ec 100644 --- a/src/net/protocol/protocol_ping.rs +++ b/src/net/protocol/protocol_ping.rs @@ -118,11 +118,12 @@ impl ProtocolBase for ProtocolPing { /// Starts ping-pong keep-alive messages exchange. Runs ping-pong in the /// protocol task manager, then queues the reply. Sends out a ping and /// waits for pong reply. Waits for ping and replies with a pong. - async fn start(self: Arc, executor: Arc>) { + async fn start(self: Arc, executor: Arc>) -> Result<()> { debug!(target: "net", "ProtocolPing::start() [START]"); self.jobsman.clone().start(executor.clone()); self.jobsman.clone().spawn(self.clone().run_ping_pong(), executor.clone()).await; self.jobsman.clone().spawn(self.reply_to_ping(), executor).await; debug!(target: "net", "ProtocolPing::start() [END]"); + Ok(()) } } diff --git a/src/net/protocol/protocol_registry.rs b/src/net/protocol/protocol_registry.rs index d1f8f00f3..2a956c654 100644 --- a/src/net/protocol/protocol_registry.rs +++ b/src/net/protocol/protocol_registry.rs @@ -6,7 +6,7 @@ use super::protocol_base::ProtocolBase; use std::sync::Arc; //use super::protocol_base::ProtocolBasePtr; -use crate::net::{ChannelPtr, P2pPtr}; +use crate::net::{session::SessionBitflag, ChannelPtr, P2pPtr}; type ProtocolBasePtr = Arc; @@ -17,7 +17,7 @@ type Constructor = Box< >; pub struct ProtocolRegistry { - protocol_constructors: Mutex>, + protocol_constructors: Mutex>, } impl ProtocolRegistry { @@ -26,20 +26,31 @@ impl ProtocolRegistry { } // add_protocol()? - pub async fn register(&self, constructor: C) + pub async fn register(&self, session_flags: SessionBitflag, constructor: C) where C: 'static + Fn(ChannelPtr, P2pPtr) -> F + Send + Sync, F: 'static + Future> + Send, { let constructor = move |channel, p2p| { - Box::pin(constructor(channel, p2p)) as BoxFuture<'static, Arc> + Box::pin(constructor(channel, p2p)) + as BoxFuture<'static, Arc> }; - self.protocol_constructors.lock().await.push(Box::new(constructor)); + self.protocol_constructors.lock().await.push((session_flags, Box::new(constructor))); } - pub async fn attach(&self, channel: ChannelPtr, p2p: P2pPtr) -> Vec> { + pub async fn attach( + &self, + selector_id: SessionBitflag, + channel: ChannelPtr, + p2p: P2pPtr, + ) -> Vec> { let mut protocols: Vec> = Vec::new(); - for construct in self.protocol_constructors.lock().await.iter() { + for (session_flags, construct) in self.protocol_constructors.lock().await.iter() { + // Skip protocols that are not registered for this session + if selector_id & session_flags == 0 { + continue + } + let protocol: Arc = construct(channel.clone(), p2p.clone()).await; protocols.push(protocol) diff --git a/src/net/protocol/protocol_seed.rs b/src/net/protocol/protocol_seed.rs index e27e267c0..b033c1963 100644 --- a/src/net/protocol/protocol_seed.rs +++ b/src/net/protocol/protocol_seed.rs @@ -1,10 +1,15 @@ -use log::*; +use async_trait::async_trait; +use log::debug; use smol::Executor; use std::sync::Arc; use crate::{ error::Result, - net::{message, ChannelPtr, HostsPtr, SettingsPtr}, + net::{ + message, + protocol::{ProtocolBase, ProtocolBasePtr}, + ChannelPtr, HostsPtr, P2pPtr, SettingsPtr, + }, }; /// Implements the seed protocol. @@ -20,10 +25,35 @@ impl ProtocolSeed { Arc::new(Self { channel, hosts, settings }) } + pub async fn new2(channel: ChannelPtr, p2p: P2pPtr) -> ProtocolBasePtr { + let hosts = p2p.hosts(); + let settings = p2p.settings(); + + Arc::new(Self { channel, hosts, settings }) + } + + /// Sends own external address over a channel. Imports own external address + /// from settings, then adds that address to an address message and + /// sends it out over the channel. + pub async fn send_self_address(&self) -> Result<()> { + match self.settings.external_addr { + Some(addr) => { + debug!(target: "net", "ProtocolSeed::send_own_address() addr={}", addr); + let addr = message::AddrsMessage { addrs: vec![addr] }; + Ok(self.channel.clone().send(addr).await?) + } + // Do nothing if external address is not configured + None => Ok(()), + } + } +} + +#[async_trait] +impl ProtocolBase for ProtocolSeed { /// Starts the seed protocol. Creates a subscription to the address message, /// then sends our address to the seed server. Sends a get-address /// message and receives an address message. - pub async fn start(self: Arc, _executor: Arc>) -> Result<()> { + async fn start(self: Arc, _executor: Arc>) -> Result<()> { debug!(target: "net", "ProtocolSeed::start() [START]"); // Create a subscription to address message. let addr_sub = self @@ -48,19 +78,4 @@ impl ProtocolSeed { debug!(target: "net", "ProtocolSeed::start() [END]"); Ok(()) } - - /// Sends own external address over a channel. Imports own external address - /// from settings, then adds that address to an address message and - /// sends it out over the channel. - pub async fn send_self_address(&self) -> Result<()> { - match self.settings.external_addr { - Some(addr) => { - debug!(target: "net", "ProtocolSeed::send_own_address() addr={}", addr); - let addr = message::AddrsMessage { addrs: vec![addr] }; - Ok(self.channel.clone().send(addr).await?) - } - // Do nothing if external address is not configured - None => Ok(()), - } - } } diff --git a/src/net/session/inbound_session.rs b/src/net/session/inbound_session.rs index 946d07db1..667486e69 100644 --- a/src/net/session/inbound_session.rs +++ b/src/net/session/inbound_session.rs @@ -9,7 +9,7 @@ use crate::{ error::{Error, Result}, net::{ protocol::{ProtocolAddress, ProtocolBase, ProtocolPing}, - session::Session, + session::{Session, SessionBitflag, SESSION_INBOUND}, Acceptor, AcceptorPtr, ChannelPtr, P2p, }, system::{StoppableTask, StoppableTaskPtr}, @@ -96,11 +96,12 @@ impl InboundSession { self.clone().register_channel(channel.clone(), executor.clone()).await?; - self.attach_protocols(channel, executor).await + //self.attach_protocols(channel, executor).await + Ok(()) } - /// Starts sending keep-alive and address messages across the channels. - async fn attach_protocols( + // Starts sending keep-alive and address messages across the channels. + /*async fn attach_protocols( self: Arc, channel: ChannelPtr, executor: Arc>, @@ -114,11 +115,15 @@ impl InboundSession { protocol_addr.start(executor).await; Ok(()) - } + }*/ } impl Session for InboundSession { fn p2p(&self) -> Arc { self.p2p.upgrade().unwrap() } + + fn selector_id(&self) -> SessionBitflag { + SESSION_INBOUND + } } diff --git a/src/net/session/manual_session.rs b/src/net/session/manual_session.rs index 713ebe751..b10f9155c 100644 --- a/src/net/session/manual_session.rs +++ b/src/net/session/manual_session.rs @@ -10,7 +10,7 @@ use crate::{ error::{Error, Result}, net::{ protocol::{ProtocolAddress, ProtocolBase, ProtocolPing}, - session::Session, + session::{Session, SessionBitflag, SESSION_MANUAL}, ChannelPtr, Connector, P2p, }, system::{StoppableTask, StoppableTaskPtr}, @@ -89,7 +89,7 @@ impl ManualSession { // Remove pending lock since register_channel will add the channel to p2p self.p2p().remove_pending(&addr).await; - self.clone().attach_protocols(channel, executor.clone()).await?; + //self.clone().attach_protocols(channel, executor.clone()).await?; // Wait for channel to close stop_sub.receive().await; @@ -112,8 +112,8 @@ impl ManualSession { Ok(()) } - /// Starts sending keep-alive and address messages across the channels. - async fn attach_protocols( + // Starts sending keep-alive and address messages across the channels. + /*async fn attach_protocols( self: Arc, channel: ChannelPtr, executor: Arc>, @@ -127,11 +127,15 @@ impl ManualSession { protocol_addr.start(executor).await; Ok(()) - } + }*/ } impl Session for ManualSession { fn p2p(&self) -> Arc { self.p2p.upgrade().unwrap() } + + fn selector_id(&self) -> SessionBitflag { + SESSION_MANUAL + } } diff --git a/src/net/session/mod.rs b/src/net/session/mod.rs index 9dcf93238..1cad1ffef 100644 --- a/src/net/session/mod.rs +++ b/src/net/session/mod.rs @@ -27,6 +27,14 @@ pub mod outbound_session; /// channel and initializing the channel by performing a network handshake. pub mod session; +// bitwise selectors for the protocol_registry +pub type SessionBitflag = u32; +pub const SESSION_INBOUND: SessionBitflag = 0b0001; +pub const SESSION_OUTBOUND: SessionBitflag = 0b0010; +pub const SESSION_MANUAL: SessionBitflag = 0b0100; +pub const SESSION_SEED: SessionBitflag = 0b1000; +pub const SESSION_ALL: SessionBitflag = 0b1111; + pub use inbound_session::InboundSession; pub use manual_session::ManualSession; pub use outbound_session::OutboundSession; diff --git a/src/net/session/outbound_session.rs b/src/net/session/outbound_session.rs index 654e228d5..8c0f91716 100644 --- a/src/net/session/outbound_session.rs +++ b/src/net/session/outbound_session.rs @@ -10,7 +10,7 @@ use crate::{ error::{Error, Result}, net::{ protocol::{ProtocolAddress, ProtocolBase, ProtocolPing}, - session::Session, + session::{Session, SessionBitflag, SESSION_OUTBOUND}, ChannelPtr, Connector, P2p, }, system::{StoppableTask, StoppableTaskPtr}, @@ -91,7 +91,7 @@ impl OutboundSession { // Remove pending lock since register_channel will add the channel to p2p self.p2p().remove_pending(&addr).await; - self.clone().attach_protocols(channel, executor.clone()).await?; + //self.clone().attach_protocols(channel, executor.clone()).await?; // Wait for channel to close stop_sub.receive().await; @@ -151,8 +151,8 @@ impl OutboundSession { } } - /// Starts sending keep-alive and address messages across the channels. - async fn attach_protocols( + // Starts sending keep-alive and address messages across the channels. + /*async fn attach_protocols( self: Arc, channel: ChannelPtr, executor: Arc>, @@ -166,11 +166,15 @@ impl OutboundSession { protocol_addr.start(executor).await; Ok(()) - } + }*/ } impl Session for OutboundSession { fn p2p(&self) -> Arc { self.p2p.upgrade().unwrap() } + + fn selector_id(&self) -> SessionBitflag { + SESSION_OUTBOUND + } } diff --git a/src/net/session/seed_session.rs b/src/net/session/seed_session.rs index 96daf4566..403d7b0f7 100644 --- a/src/net/session/seed_session.rs +++ b/src/net/session/seed_session.rs @@ -10,7 +10,7 @@ use crate::{ error::{Error, Result}, net::{ protocol::{ProtocolBase, ProtocolPing, ProtocolSeed}, - session::Session, + session::{Session, SessionBitflag, SESSION_SEED}, ChannelPtr, Connector, HostsPtr, P2p, SettingsPtr, }, util::sleep, @@ -100,7 +100,7 @@ impl SeedSession { self.clone().register_channel(channel.clone(), executor.clone()).await?; - self.attach_protocols(channel, hosts, settings, executor).await?; + //self.attach_protocols(channel, hosts, settings, executor).await?; debug!(target: "net", "SeedSession::start_seed(i={}) [END]", seed_index); Ok(()) @@ -112,8 +112,8 @@ impl SeedSession { } } - /// Starts keep-alive messages and seed protocol. - async fn attach_protocols( + // Starts keep-alive messages and seed protocol. + /*async fn attach_protocols( self: Arc, channel: ChannelPtr, hosts: HostsPtr, @@ -130,11 +130,15 @@ impl SeedSession { channel.stop().await; Ok(()) - } + }*/ } impl Session for SeedSession { fn p2p(&self) -> Arc { self.p2p.upgrade().unwrap() } + + fn selector_id(&self) -> SessionBitflag { + SESSION_SEED + } } diff --git a/src/net/session/session.rs b/src/net/session/session.rs index 5f6b615f1..56c55b073 100644 --- a/src/net/session/session.rs +++ b/src/net/session/session.rs @@ -42,7 +42,8 @@ pub trait Session: Sync { // while the handshake protocol is ongoing. // They are currently in sleep mode. let p2p = self.p2p(); - let protocols = p2p.protocol_registry().attach(channel.clone(), p2p.clone()).await; + let protocols = + p2p.protocol_registry().attach(self.selector_id(), channel.clone(), p2p.clone()).await; // Perform the handshake protocol let protocol_version = ProtocolVersion::new(channel.clone(), self.p2p().settings()).await; @@ -62,7 +63,7 @@ pub trait Session: Sync { // correctly self destructing when the channel ends. for protocol in protocols { // Activate protocol - protocol.start(executor.clone()).await; + protocol.start(executor.clone()).await?; } debug!(target: "net", "Session::register_channel() [END]"); @@ -95,4 +96,6 @@ pub trait Session: Sync { /// Returns a pointer to the p2p network interface. fn p2p(&self) -> P2pPtr; + + fn selector_id(&self) -> u32; }