net: when registering protocols, use a bitwise selector to specify

sessions it will be activated for.

    registry.register(SESSION_INBOUND | SESSION_OUTBOUND, ProtocolFoo::new)

The constructors accept a channel and p2p instance.
For an example of passing additional arguments, you have to use an async
closure:
    let foo2 = foo.clone();

    //

    registry.register(
        !net::SESSION_SEED,
        move |channel, p2p| {
            let foo = foo2.clone();
            async move {
                ProtocolFoo::new(channel, p2p, foo).await
            }
        }).await;
This commit is contained in:
narodnik
2022-02-05 09:56:34 +01:00
parent f4026bf8a2
commit a9a714aa73
13 changed files with 118 additions and 55 deletions

View File

@@ -92,5 +92,6 @@ pub use hosts::{Hosts, HostsPtr};
pub use message::Message;
pub use message_subscriber::MessageSubscription;
pub use p2p::{P2p, P2pPtr};
pub use protocol::{ProtocolJobsManager, ProtocolJobsManagerPtr};
pub use protocol::{ProtocolJobsManager, ProtocolJobsManagerPtr, ProtocolBasePtr, ProtocolBase};
pub use session::{SESSION_ALL, SESSION_INBOUND, SESSION_MANUAL, SESSION_OUTBOUND, SESSION_SEED};
pub use settings::{Settings, SettingsPtr};

View File

@@ -58,10 +58,14 @@ pub use protocol_version::ProtocolVersion;
pub use protocol_base::{ProtocolBase, ProtocolBasePtr};
pub use protocol_registry::ProtocolRegistry;
use crate::net::P2pPtr;
use crate::net::{
session::{SESSION_ALL, SESSION_SEED},
P2pPtr,
};
pub async fn register_default_protocols(p2p: P2pPtr) {
let registry = p2p.protocol_registry();
registry.register(ProtocolPing::new2).await;
registry.register(ProtocolAddress::new2).await;
registry.register(SESSION_ALL, ProtocolPing::new2).await;
registry.register(!SESSION_SEED, ProtocolAddress::new2).await;
registry.register(SESSION_SEED, ProtocolSeed::new2).await;
}

View File

@@ -125,7 +125,7 @@ impl ProtocolBase for ProtocolAddress {
/// Starts the address protocol. Runs receive address and get address
/// protocols on the protocol task manager. Then sends get-address
/// message.
async fn start(self: Arc<Self>, executor: Arc<Executor<'_>>) {
async fn start(self: Arc<Self>, executor: Arc<Executor<'_>>) -> Result<()> {
debug!(target: "net", "ProtocolAddress::start() [START]");
self.jobsman.clone().start(executor.clone());
self.jobsman.clone().spawn(self.clone().handle_receive_addrs(), executor.clone()).await;
@@ -135,5 +135,6 @@ impl ProtocolBase for ProtocolAddress {
let get_addrs = message::GetAddrsMessage {};
let _ = self.channel.clone().send(get_addrs).await;
debug!(target: "net", "ProtocolAddress::start() [END]");
Ok(())
}
}

View File

@@ -2,9 +2,11 @@ use async_trait::async_trait;
use smol::Executor;
use std::sync::Arc;
pub type ProtocolBasePtr = Arc<dyn 'static + ProtocolBase + Send + Sync>;
use crate::error::Result;
pub type ProtocolBasePtr = Arc<dyn ProtocolBase + Send + Sync>;
#[async_trait]
pub trait ProtocolBase {
async fn start(self: Arc<Self>, executor: Arc<Executor<'_>>);
async fn start(self: Arc<Self>, executor: Arc<Executor<'_>>) -> Result<()>;
}

View File

@@ -118,11 +118,12 @@ impl ProtocolBase for ProtocolPing {
/// Starts ping-pong keep-alive messages exchange. Runs ping-pong in the
/// protocol task manager, then queues the reply. Sends out a ping and
/// waits for pong reply. Waits for ping and replies with a pong.
async fn start(self: Arc<Self>, executor: Arc<Executor<'_>>) {
async fn start(self: Arc<Self>, executor: Arc<Executor<'_>>) -> Result<()> {
debug!(target: "net", "ProtocolPing::start() [START]");
self.jobsman.clone().start(executor.clone());
self.jobsman.clone().spawn(self.clone().run_ping_pong(), executor.clone()).await;
self.jobsman.clone().spawn(self.reply_to_ping(), executor).await;
debug!(target: "net", "ProtocolPing::start() [END]");
Ok(())
}
}

View File

@@ -6,7 +6,7 @@ use super::protocol_base::ProtocolBase;
use std::sync::Arc;
//use super::protocol_base::ProtocolBasePtr;
use crate::net::{ChannelPtr, P2pPtr};
use crate::net::{session::SessionBitflag, ChannelPtr, P2pPtr};
type ProtocolBasePtr = Arc<dyn ProtocolBase + Send + Sync>;
@@ -17,7 +17,7 @@ type Constructor = Box<
>;
pub struct ProtocolRegistry {
protocol_constructors: Mutex<Vec<Constructor>>,
protocol_constructors: Mutex<Vec<(SessionBitflag, Constructor)>>,
}
impl ProtocolRegistry {
@@ -26,20 +26,31 @@ impl ProtocolRegistry {
}
// add_protocol()?
pub async fn register<C, F>(&self, constructor: C)
pub async fn register<C, F>(&self, session_flags: SessionBitflag, constructor: C)
where
C: 'static + Fn(ChannelPtr, P2pPtr) -> F + Send + Sync,
F: 'static + Future<Output = Arc<dyn ProtocolBase + Send + Sync>> + Send,
{
let constructor = move |channel, p2p| {
Box::pin(constructor(channel, p2p)) as BoxFuture<'static, Arc<dyn ProtocolBase + Send + Sync>>
Box::pin(constructor(channel, p2p))
as BoxFuture<'static, Arc<dyn ProtocolBase + Send + Sync>>
};
self.protocol_constructors.lock().await.push(Box::new(constructor));
self.protocol_constructors.lock().await.push((session_flags, Box::new(constructor)));
}
pub async fn attach(&self, channel: ChannelPtr, p2p: P2pPtr) -> Vec<Arc<dyn ProtocolBase + Send + Sync>> {
pub async fn attach(
&self,
selector_id: SessionBitflag,
channel: ChannelPtr,
p2p: P2pPtr,
) -> Vec<Arc<dyn ProtocolBase + Send + Sync>> {
let mut protocols: Vec<Arc<dyn ProtocolBase + Send + Sync>> = Vec::new();
for construct in self.protocol_constructors.lock().await.iter() {
for (session_flags, construct) in self.protocol_constructors.lock().await.iter() {
// Skip protocols that are not registered for this session
if selector_id & session_flags == 0 {
continue
}
let protocol: Arc<dyn ProtocolBase + Send + Sync> =
construct(channel.clone(), p2p.clone()).await;
protocols.push(protocol)

View File

@@ -1,10 +1,15 @@
use log::*;
use async_trait::async_trait;
use log::debug;
use smol::Executor;
use std::sync::Arc;
use crate::{
error::Result,
net::{message, ChannelPtr, HostsPtr, SettingsPtr},
net::{
message,
protocol::{ProtocolBase, ProtocolBasePtr},
ChannelPtr, HostsPtr, P2pPtr, SettingsPtr,
},
};
/// Implements the seed protocol.
@@ -20,10 +25,35 @@ impl ProtocolSeed {
Arc::new(Self { channel, hosts, settings })
}
pub async fn new2(channel: ChannelPtr, p2p: P2pPtr) -> ProtocolBasePtr {
let hosts = p2p.hosts();
let settings = p2p.settings();
Arc::new(Self { channel, hosts, settings })
}
/// Sends own external address over a channel. Imports own external address
/// from settings, then adds that address to an address message and
/// sends it out over the channel.
pub async fn send_self_address(&self) -> Result<()> {
match self.settings.external_addr {
Some(addr) => {
debug!(target: "net", "ProtocolSeed::send_own_address() addr={}", addr);
let addr = message::AddrsMessage { addrs: vec![addr] };
Ok(self.channel.clone().send(addr).await?)
}
// Do nothing if external address is not configured
None => Ok(()),
}
}
}
#[async_trait]
impl ProtocolBase for ProtocolSeed {
/// Starts the seed protocol. Creates a subscription to the address message,
/// then sends our address to the seed server. Sends a get-address
/// message and receives an address message.
pub async fn start(self: Arc<Self>, _executor: Arc<Executor<'_>>) -> Result<()> {
async fn start(self: Arc<Self>, _executor: Arc<Executor<'_>>) -> Result<()> {
debug!(target: "net", "ProtocolSeed::start() [START]");
// Create a subscription to address message.
let addr_sub = self
@@ -48,19 +78,4 @@ impl ProtocolSeed {
debug!(target: "net", "ProtocolSeed::start() [END]");
Ok(())
}
/// Sends own external address over a channel. Imports own external address
/// from settings, then adds that address to an address message and
/// sends it out over the channel.
pub async fn send_self_address(&self) -> Result<()> {
match self.settings.external_addr {
Some(addr) => {
debug!(target: "net", "ProtocolSeed::send_own_address() addr={}", addr);
let addr = message::AddrsMessage { addrs: vec![addr] };
Ok(self.channel.clone().send(addr).await?)
}
// Do nothing if external address is not configured
None => Ok(()),
}
}
}

View File

@@ -9,7 +9,7 @@ use crate::{
error::{Error, Result},
net::{
protocol::{ProtocolAddress, ProtocolBase, ProtocolPing},
session::Session,
session::{Session, SessionBitflag, SESSION_INBOUND},
Acceptor, AcceptorPtr, ChannelPtr, P2p,
},
system::{StoppableTask, StoppableTaskPtr},
@@ -96,11 +96,12 @@ impl InboundSession {
self.clone().register_channel(channel.clone(), executor.clone()).await?;
self.attach_protocols(channel, executor).await
//self.attach_protocols(channel, executor).await
Ok(())
}
/// Starts sending keep-alive and address messages across the channels.
async fn attach_protocols(
// Starts sending keep-alive and address messages across the channels.
/*async fn attach_protocols(
self: Arc<Self>,
channel: ChannelPtr,
executor: Arc<Executor<'_>>,
@@ -114,11 +115,15 @@ impl InboundSession {
protocol_addr.start(executor).await;
Ok(())
}
}*/
}
impl Session for InboundSession {
fn p2p(&self) -> Arc<P2p> {
self.p2p.upgrade().unwrap()
}
fn selector_id(&self) -> SessionBitflag {
SESSION_INBOUND
}
}

View File

@@ -10,7 +10,7 @@ use crate::{
error::{Error, Result},
net::{
protocol::{ProtocolAddress, ProtocolBase, ProtocolPing},
session::Session,
session::{Session, SessionBitflag, SESSION_MANUAL},
ChannelPtr, Connector, P2p,
},
system::{StoppableTask, StoppableTaskPtr},
@@ -89,7 +89,7 @@ impl ManualSession {
// Remove pending lock since register_channel will add the channel to p2p
self.p2p().remove_pending(&addr).await;
self.clone().attach_protocols(channel, executor.clone()).await?;
//self.clone().attach_protocols(channel, executor.clone()).await?;
// Wait for channel to close
stop_sub.receive().await;
@@ -112,8 +112,8 @@ impl ManualSession {
Ok(())
}
/// Starts sending keep-alive and address messages across the channels.
async fn attach_protocols(
// Starts sending keep-alive and address messages across the channels.
/*async fn attach_protocols(
self: Arc<Self>,
channel: ChannelPtr,
executor: Arc<Executor<'_>>,
@@ -127,11 +127,15 @@ impl ManualSession {
protocol_addr.start(executor).await;
Ok(())
}
}*/
}
impl Session for ManualSession {
fn p2p(&self) -> Arc<P2p> {
self.p2p.upgrade().unwrap()
}
fn selector_id(&self) -> SessionBitflag {
SESSION_MANUAL
}
}

View File

@@ -27,6 +27,14 @@ pub mod outbound_session;
/// channel and initializing the channel by performing a network handshake.
pub mod session;
// bitwise selectors for the protocol_registry
pub type SessionBitflag = u32;
pub const SESSION_INBOUND: SessionBitflag = 0b0001;
pub const SESSION_OUTBOUND: SessionBitflag = 0b0010;
pub const SESSION_MANUAL: SessionBitflag = 0b0100;
pub const SESSION_SEED: SessionBitflag = 0b1000;
pub const SESSION_ALL: SessionBitflag = 0b1111;
pub use inbound_session::InboundSession;
pub use manual_session::ManualSession;
pub use outbound_session::OutboundSession;

View File

@@ -10,7 +10,7 @@ use crate::{
error::{Error, Result},
net::{
protocol::{ProtocolAddress, ProtocolBase, ProtocolPing},
session::Session,
session::{Session, SessionBitflag, SESSION_OUTBOUND},
ChannelPtr, Connector, P2p,
},
system::{StoppableTask, StoppableTaskPtr},
@@ -91,7 +91,7 @@ impl OutboundSession {
// Remove pending lock since register_channel will add the channel to p2p
self.p2p().remove_pending(&addr).await;
self.clone().attach_protocols(channel, executor.clone()).await?;
//self.clone().attach_protocols(channel, executor.clone()).await?;
// Wait for channel to close
stop_sub.receive().await;
@@ -151,8 +151,8 @@ impl OutboundSession {
}
}
/// Starts sending keep-alive and address messages across the channels.
async fn attach_protocols(
// Starts sending keep-alive and address messages across the channels.
/*async fn attach_protocols(
self: Arc<Self>,
channel: ChannelPtr,
executor: Arc<Executor<'_>>,
@@ -166,11 +166,15 @@ impl OutboundSession {
protocol_addr.start(executor).await;
Ok(())
}
}*/
}
impl Session for OutboundSession {
fn p2p(&self) -> Arc<P2p> {
self.p2p.upgrade().unwrap()
}
fn selector_id(&self) -> SessionBitflag {
SESSION_OUTBOUND
}
}

View File

@@ -10,7 +10,7 @@ use crate::{
error::{Error, Result},
net::{
protocol::{ProtocolBase, ProtocolPing, ProtocolSeed},
session::Session,
session::{Session, SessionBitflag, SESSION_SEED},
ChannelPtr, Connector, HostsPtr, P2p, SettingsPtr,
},
util::sleep,
@@ -100,7 +100,7 @@ impl SeedSession {
self.clone().register_channel(channel.clone(), executor.clone()).await?;
self.attach_protocols(channel, hosts, settings, executor).await?;
//self.attach_protocols(channel, hosts, settings, executor).await?;
debug!(target: "net", "SeedSession::start_seed(i={}) [END]", seed_index);
Ok(())
@@ -112,8 +112,8 @@ impl SeedSession {
}
}
/// Starts keep-alive messages and seed protocol.
async fn attach_protocols(
// Starts keep-alive messages and seed protocol.
/*async fn attach_protocols(
self: Arc<Self>,
channel: ChannelPtr,
hosts: HostsPtr,
@@ -130,11 +130,15 @@ impl SeedSession {
channel.stop().await;
Ok(())
}
}*/
}
impl Session for SeedSession {
fn p2p(&self) -> Arc<P2p> {
self.p2p.upgrade().unwrap()
}
fn selector_id(&self) -> SessionBitflag {
SESSION_SEED
}
}

View File

@@ -42,7 +42,8 @@ pub trait Session: Sync {
// while the handshake protocol is ongoing.
// They are currently in sleep mode.
let p2p = self.p2p();
let protocols = p2p.protocol_registry().attach(channel.clone(), p2p.clone()).await;
let protocols =
p2p.protocol_registry().attach(self.selector_id(), channel.clone(), p2p.clone()).await;
// Perform the handshake protocol
let protocol_version = ProtocolVersion::new(channel.clone(), self.p2p().settings()).await;
@@ -62,7 +63,7 @@ pub trait Session: Sync {
// correctly self destructing when the channel ends.
for protocol in protocols {
// Activate protocol
protocol.start(executor.clone()).await;
protocol.start(executor.clone()).await?;
}
debug!(target: "net", "Session::register_channel() [END]");
@@ -95,4 +96,6 @@ pub trait Session: Sync {
/// Returns a pointer to the p2p network interface.
fn p2p(&self) -> P2pPtr;
fn selector_id(&self) -> u32;
}