Add a protocol registry which properly handles creating the protocols in

a suspended sleep state, then activating them once the channel is
operating.
This commit is contained in:
narodnik
2022-02-03 08:09:58 +01:00
parent 533ee4a63c
commit d1d184c0ea
13 changed files with 194 additions and 45 deletions

View File

@@ -145,7 +145,7 @@ async fn start(executor: Arc<Executor<'_>>, options: ProgramOptions) -> Result<(
let seen_privmsg_ids = SeenPrivMsgIds::new();
let p2p = net::P2p::new(options.network_settings);
let p2p = net::P2p::new(options.network_settings).await;
// Performs seed session
p2p.clone().start(executor.clone()).await?;
// Actual main p2p session

View File

@@ -18,6 +18,7 @@ use crate::{
net::{
message_subscriber::{MessageSubscription, MessageSubsystem},
messages,
protocols::{ProtocolBase, ProtocolBasePtr},
},
system::{StoppableTask, StoppableTaskPtr, Subscriber, SubscriberPtr, Subscription},
};

View File

@@ -11,6 +11,7 @@ use crate::{
error::{Error, Result},
net::{
messages::Message,
protocols::{register_default_protocols, ProtocolRegistry},
sessions::{InboundSession, ManualSession, OutboundSession, SeedSession},
Channel, ChannelPtr, Hosts, HostsPtr, Settings, SettingsPtr,
},
@@ -32,21 +33,28 @@ pub struct P2p {
// Used both internally and externally
stop_subscriber: SubscriberPtr<Error>,
hosts: HostsPtr,
protocol_registry: ProtocolRegistry,
settings: SettingsPtr,
}
impl P2p {
/// Create a new p2p network.
pub fn new(settings: Settings) -> Arc<Self> {
pub async fn new(settings: Settings) -> Arc<Self> {
let settings = Arc::new(settings);
Arc::new(Self {
let self_ = Arc::new(Self {
pending: Mutex::new(HashSet::new()),
channels: Mutex::new(HashMap::new()),
channel_subscriber: Subscriber::new(),
stop_subscriber: Subscriber::new(),
hosts: Hosts::new(),
protocol_registry: ProtocolRegistry::new(),
settings,
})
});
register_default_protocols(self_.clone()).await;
self_
}
/// Invoke startup and seeding sequence. Call from constructing thread.
@@ -140,6 +148,10 @@ impl P2p {
self.hosts.clone()
}
pub fn protocol_registry(&self) -> &ProtocolRegistry {
&self.protocol_registry
}
/// Subscribe to a channel.
pub async fn subscribe_channel(&self) -> Subscription<Result<ChannelPtr>> {
self.channel_subscriber.clone().subscribe().await

View File

@@ -46,8 +46,22 @@ pub mod protocol_seed;
/// other node and sending the version acknowledgement.
pub mod protocol_version;
pub mod protocol_base;
pub mod protocol_registry;
pub use protocol_address::ProtocolAddress;
pub use protocol_jobs_manager::{ProtocolJobsManager, ProtocolJobsManagerPtr};
pub use protocol_ping::ProtocolPing;
pub use protocol_seed::ProtocolSeed;
pub use protocol_version::ProtocolVersion;
pub use protocol_base::{ProtocolBase, ProtocolBasePtr};
pub use protocol_registry::ProtocolRegistry;
use crate::net::P2pPtr;
pub async fn register_default_protocols(p2p: P2pPtr) {
let registry = p2p.protocol_registry();
registry.register(ProtocolPing::new2).await;
registry.register(ProtocolAddress::new2).await;
}

View File

@@ -1,14 +1,15 @@
use log::*;
use log::{error, debug};
use smol::Executor;
use std::sync::Arc;
use async_trait::async_trait;
use crate::{
error::Result,
net::{
message_subscriber::MessageSubscription,
messages,
protocols::{ProtocolJobsManager, ProtocolJobsManagerPtr},
ChannelPtr, HostsPtr,
protocols::{ProtocolBase, ProtocolBasePtr, ProtocolJobsManager, ProtocolJobsManagerPtr},
ChannelPtr, HostsPtr, P2pPtr,
},
};
@@ -48,19 +49,30 @@ impl ProtocolAddress {
})
}
/// Starts the address protocol. Runs receive address and get address
/// protocols on the protocol task manager. Then sends get-address
/// message.
pub async fn start(self: Arc<Self>, executor: Arc<Executor<'_>>) {
debug!(target: "net", "ProtocolAddress::start() [START]");
self.jobsman.clone().start(executor.clone());
self.jobsman.clone().spawn(self.clone().handle_receive_addrs(), executor.clone()).await;
self.jobsman.clone().spawn(self.clone().handle_receive_get_addrs(), executor).await;
pub async fn new2(channel: ChannelPtr, p2p: P2pPtr) -> ProtocolBasePtr {
let hosts = p2p.hosts();
// Send get_address message.
let get_addrs = messages::GetAddrsMessage {};
let _ = self.channel.clone().send(get_addrs).await;
debug!(target: "net", "ProtocolAddress::start() [END]");
// Creates a subscription to address message.
let addrs_sub = channel
.clone()
.subscribe_msg::<messages::AddrsMessage>()
.await
.expect("Missing addrs dispatcher!");
// Creates a subscription to get-address message.
let get_addrs_sub = channel
.clone()
.subscribe_msg::<messages::GetAddrsMessage>()
.await
.expect("Missing getaddrs dispatcher!");
Arc::new(Self {
channel: channel.clone(),
addrs_sub,
get_addrs_sub,
hosts,
jobsman: ProtocolJobsManager::new("ProtocolAddress", channel),
})
}
/// Handles receiving the address message. Loops to continually recieve
@@ -107,3 +119,22 @@ impl ProtocolAddress {
}
}
}
#[async_trait]
impl ProtocolBase for ProtocolAddress {
/// Starts the address protocol. Runs receive address and get address
/// protocols on the protocol task manager. Then sends get-address
/// message.
async fn start(self: Arc<Self>, executor: Arc<Executor<'_>>) {
debug!(target: "net", "ProtocolAddress::start() [START]");
self.jobsman.clone().start(executor.clone());
self.jobsman.clone().spawn(self.clone().handle_receive_addrs(), executor.clone()).await;
self.jobsman.clone().spawn(self.clone().handle_receive_get_addrs(), executor).await;
// Send get_address message.
let get_addrs = messages::GetAddrsMessage {};
let _ = self.channel.clone().send(get_addrs).await;
debug!(target: "net", "ProtocolAddress::start() [END]");
}
}

View File

@@ -0,0 +1,10 @@
use std::sync::Arc;
use async_trait::async_trait;
use smol::Executor;
pub type ProtocolBasePtr = Arc<dyn 'static + ProtocolBase + Send + Sync>;
#[async_trait]
pub trait ProtocolBase {
async fn start(self: Arc<Self>, executor: Arc<Executor<'_>>);
}

View File

@@ -1,14 +1,15 @@
use log::*;
use log::{error, debug};
use rand::Rng;
use smol::Executor;
use std::{sync::Arc, time::Instant};
use async_trait::async_trait;
use crate::{
error::{Error, Result},
net::{
messages,
protocols::{ProtocolJobsManager, ProtocolJobsManagerPtr},
ChannelPtr, SettingsPtr,
protocols::{ProtocolBase, ProtocolBasePtr, ProtocolJobsManager, ProtocolJobsManagerPtr},
ChannelPtr, P2pPtr, SettingsPtr,
},
util::sleep,
};
@@ -22,7 +23,9 @@ pub struct ProtocolPing {
impl ProtocolPing {
/// Create a new ping-pong protocol.
pub fn new(channel: ChannelPtr, settings: SettingsPtr) -> Arc<Self> {
pub fn new(channel: ChannelPtr, p2p: P2pPtr) -> Arc<Self> {
let settings = p2p.settings();
Arc::new(Self {
channel: channel.clone(),
settings,
@@ -30,15 +33,14 @@ impl ProtocolPing {
})
}
/// Starts ping-pong keep-alive messages exchange. Runs ping-pong in the
/// protocol task manager, then queues the reply. Sends out a ping and
/// waits for pong reply. Waits for ping and replies with a pong.
pub async fn start(self: Arc<Self>, executor: Arc<Executor<'_>>) {
debug!(target: "net", "ProtocolPing::start() [START]");
self.jobsman.clone().start(executor.clone());
self.jobsman.clone().spawn(self.clone().run_ping_pong(), executor.clone()).await;
self.jobsman.clone().spawn(self.reply_to_ping(), executor).await;
debug!(target: "net", "ProtocolPing::start() [END]");
pub async fn new2(channel: ChannelPtr, p2p: P2pPtr) -> ProtocolBasePtr {
let settings = p2p.settings();
Arc::new(Self {
channel: channel.clone(),
settings,
jobsman: ProtocolJobsManager::new("ProtocolPing", channel),
})
}
/// Runs ping-pong protocol. Creates a subscription to pong, then starts a
@@ -110,3 +112,18 @@ impl ProtocolPing {
rng.gen()
}
}
#[async_trait]
impl ProtocolBase for ProtocolPing {
/// Starts ping-pong keep-alive messages exchange. Runs ping-pong in the
/// protocol task manager, then queues the reply. Sends out a ping and
/// waits for pong reply. Waits for ping and replies with a pong.
async fn start(self: Arc<Self>, executor: Arc<Executor<'_>>) {
debug!(target: "net", "ProtocolPing::start() [START]");
self.jobsman.clone().start(executor.clone());
self.jobsman.clone().spawn(self.clone().run_ping_pong(), executor.clone()).await;
self.jobsman.clone().spawn(self.reply_to_ping(), executor).await;
debug!(target: "net", "ProtocolPing::start() [END]");
}
}

View File

@@ -0,0 +1,48 @@
use async_std::sync::Mutex;
use futures::future::BoxFuture;
use std::future::Future;
use super::protocol_base::ProtocolBase;
use std::sync::Arc;
use super::protocol_base::ProtocolBasePtr;
use crate::net::{ChannelPtr, P2pPtr};
type Constructor =
Box<dyn Fn(ChannelPtr, P2pPtr) -> BoxFuture<'static,
Arc<dyn 'static + ProtocolBase + Send + Sync>
> + Send + Sync>;
pub struct ProtocolRegistry {
protocol_constructors: Mutex<Vec<Constructor>>,
}
impl ProtocolRegistry {
pub fn new() -> Self {
Self {
protocol_constructors: Mutex::new(Vec::new()),
}
}
// add_protocol()?
pub async fn register<C, F>(&self, constructor: C)
where
C: 'static + Fn(ChannelPtr, P2pPtr) -> F + Send + Sync,
F: 'static + Future<Output = Arc<dyn 'static + ProtocolBase + Send + Sync>> + Send,
{
let constructor = move |channel, p2p| {
Box::pin(constructor(channel, p2p)) as BoxFuture<'static, ProtocolBasePtr>
};
self.protocol_constructors.lock().await.push(Box::new(constructor));
}
pub async fn attach(&self, channel: ChannelPtr, p2p: P2pPtr) -> Vec<ProtocolBasePtr> {
let mut protocols: Vec<Arc<dyn ProtocolBase + 'static + Send + Sync>> = Vec::new();
for construct in self.protocol_constructors.lock().await.iter() {
let protocol: Arc<dyn ProtocolBase + 'static + Send + Sync> =
construct(channel.clone(), p2p.clone()).await;
protocols.push(protocol)
}
protocols
}
}

View File

@@ -8,7 +8,7 @@ use std::{
use crate::{
error::{Error, Result},
net::{
protocols::{ProtocolAddress, ProtocolPing},
protocols::{ProtocolAddress, ProtocolPing, ProtocolBase},
sessions::Session,
Acceptor, AcceptorPtr, ChannelPtr, P2p,
},
@@ -105,10 +105,9 @@ impl InboundSession {
channel: ChannelPtr,
executor: Arc<Executor<'_>>,
) -> Result<()> {
let settings = self.p2p().settings().clone();
let hosts = self.p2p().hosts();
let protocol_ping = ProtocolPing::new(channel.clone(), settings.clone());
let protocol_ping = ProtocolPing::new(channel.clone(), self.p2p());
let protocol_addr = ProtocolAddress::new(channel, hosts).await;
protocol_ping.start(executor.clone()).await;

View File

@@ -9,7 +9,7 @@ use std::{
use crate::{
error::{Error, Result},
net::{
protocols::{ProtocolAddress, ProtocolPing},
protocols::{ProtocolAddress, ProtocolPing, ProtocolBase},
sessions::Session,
ChannelPtr, Connector, P2p,
},
@@ -118,10 +118,9 @@ impl ManualSession {
channel: ChannelPtr,
executor: Arc<Executor<'_>>,
) -> Result<()> {
let settings = self.p2p().settings().clone();
let hosts = self.p2p().hosts();
let protocol_ping = ProtocolPing::new(channel.clone(), settings.clone());
let protocol_ping = ProtocolPing::new(channel.clone(), self.p2p());
let protocol_addr = ProtocolAddress::new(channel, hosts).await;
protocol_ping.start(executor.clone()).await;

View File

@@ -9,7 +9,7 @@ use std::{
use crate::{
error::{Error, Result},
net::{
protocols::{ProtocolAddress, ProtocolPing},
protocols::{ProtocolAddress, ProtocolPing, ProtocolBase},
sessions::Session,
ChannelPtr, Connector, P2p,
},
@@ -157,10 +157,9 @@ impl OutboundSession {
channel: ChannelPtr,
executor: Arc<Executor<'_>>,
) -> Result<()> {
let settings = self.p2p().settings().clone();
let hosts = self.p2p().hosts();
let protocol_ping = ProtocolPing::new(channel.clone(), settings.clone());
let protocol_ping = ProtocolPing::new(channel.clone(), self.p2p());
let protocol_addr = ProtocolAddress::new(channel, hosts).await;
protocol_ping.start(executor.clone()).await;

View File

@@ -9,7 +9,7 @@ use std::{
use crate::{
error::{Error, Result},
net::{
protocols::{ProtocolPing, ProtocolSeed},
protocols::{ProtocolPing, ProtocolSeed, ProtocolBase},
sessions::Session,
ChannelPtr, Connector, HostsPtr, P2p, SettingsPtr,
},
@@ -120,7 +120,7 @@ impl SeedSession {
settings: SettingsPtr,
executor: Arc<Executor<'_>>,
) -> Result<()> {
let protocol_ping = ProtocolPing::new(channel.clone(), settings.clone());
let protocol_ping = ProtocolPing::new(channel.clone(), self.p2p());
protocol_ping.start(executor.clone()).await;
let protocol_seed = ProtocolSeed::new(channel.clone(), hosts, settings.clone());

View File

@@ -37,15 +37,34 @@ pub trait Session: Sync {
) -> Result<()> {
debug!(target: "net", "Session::register_channel() [START]");
// Protocols should all be initialized but not started
// We do this so that the protocols can begin receiving and buffering messages
// while the handshake protocol is ongoing.
// They are currently in sleep mode.
let p2p = self.p2p();
let protocols = p2p.protocol_registry().attach(channel.clone(), p2p.clone()).await;
// Perform the handshake protocol
let protocol_version = ProtocolVersion::new(channel.clone(), self.p2p().settings()).await;
let handshake_task =
self.perform_handshake_protocols(protocol_version, channel.clone(), executor.clone());
// start channel
channel.start(executor);
// Switch on the channel
channel.start(executor.clone());
// Wait for handshake to finish.
handshake_task.await?;
// Now the channel is ready
// Now start all the protocols
// They are responsible for managing their own lifetimes and
// correctly self destructing when the channel ends.
for protocol in protocols {
// Activate protocol
protocol.start(executor.clone()).await;
}
debug!(target: "net", "Session::register_channel() [END]");
Ok(())
}