diff --git a/script/research/generic-node/src/main.rs b/script/research/generic-node/src/main.rs index 5b82fb561..57949ef3b 100644 --- a/script/research/generic-node/src/main.rs +++ b/script/research/generic-node/src/main.rs @@ -79,6 +79,18 @@ struct GenericNumberMessage { } impl_p2p_message!(GenericNumberMessage, "generic_number_message"); +#[derive(Clone, Debug, SerialEncodable, SerialDecodable)] +struct GenericRequestMessage { + msg: String, +} +impl_p2p_message!(GenericRequestMessage, "generic_request_message"); + +#[derive(Clone, Debug, SerialEncodable, SerialDecodable)] +struct GenericResponseMessage { + msg: String, +} +impl_p2p_message!(GenericResponseMessage, "generic_response_message"); + /// Generic daemon structure struct Genericd { /// Node ID, used in the dummy messages @@ -86,9 +98,14 @@ struct Genericd { /// P2P network pointer p2p: P2pPtr, /// GenericStringMessage handler - generic_string_msg_handler: ProtocolGenericHandlerPtr, + generic_string_msg_handler: + ProtocolGenericHandlerPtr, /// GenericNumberMessage handler - generic_number_msg_handler: ProtocolGenericHandlerPtr, + generic_number_msg_handler: + ProtocolGenericHandlerPtr, + /// GenericRequestMessage handler + generic_request_msg_handler: + ProtocolGenericHandlerPtr, /// Broadcasting messages task broadcast_task: StoppableTaskPtr, } @@ -111,6 +128,10 @@ impl Genericd { let generic_number_msg_handler = ProtocolGenericHandler::new(&p2p, "ProtocolGenericNumber", SESSION_DEFAULT).await; + // Add a generic protocol for GenericRequestMessage + let generic_request_msg_handler = + ProtocolGenericHandler::new(&p2p, "ProtocolGenericRequest", SESSION_DEFAULT).await; + let broadcast_task = StoppableTask::new(); Ok(Self { @@ -118,12 +139,13 @@ impl Genericd { p2p, generic_string_msg_handler, generic_number_msg_handler, + generic_request_msg_handler, broadcast_task, }) } /// Start all daemon background tasks. - async fn start(&self, executor: &Arc>) -> Result<()> { + async fn start(&self) -> Result<()> { info!(target: "genericd", "Starting tasks..."); self.generic_string_msg_handler.task.clone().start( @@ -135,7 +157,7 @@ impl Genericd { } }, Error::DetachedTaskStopped, - executor.clone(), + self.p2p.executor(), ); self.generic_number_msg_handler.task.clone().start( @@ -147,7 +169,19 @@ impl Genericd { } }, Error::DetachedTaskStopped, - executor.clone(), + self.p2p.executor(), + ); + + self.generic_request_msg_handler.task.clone().start( + handle_generic_request_msg(self.node_id, self.generic_request_msg_handler.clone()), + |res| async move { + match res { + Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ } + Err(e) => error!(target: "genericd", "Failed starting protocol generic request handler task: {e}"), + } + }, + Error::DetachedTaskStopped, + self.p2p.executor(), ); self.p2p.clone().start().await?; @@ -161,7 +195,7 @@ impl Genericd { } }, Error::DetachedTaskStopped, - executor.clone(), + self.p2p.executor(), ); info!(target: "genericd", "All tasks started!"); @@ -171,16 +205,18 @@ impl Genericd { /// Stop all daemon background tasks. async fn stop(&self) { info!(target: "genericd", "Terminating tasks..."); + self.broadcast_task.stop().await; self.p2p.stop().await; self.generic_string_msg_handler.task.stop().await; self.generic_number_msg_handler.task.stop().await; + self.generic_request_msg_handler.task.stop().await; info!(target: "genericd", "All tasks terminated!"); } } /// Background handler function for GenericStringMessage. async fn handle_generic_string_msg( - handler: ProtocolGenericHandlerPtr, + handler: ProtocolGenericHandlerPtr, ) -> Result<()> { let mut seen = HashSet::new(); loop { @@ -192,7 +228,7 @@ async fn handle_generic_string_msg( continue } - info!("Received string message from channel {channel}: {}", msg.msg); + info!(target: "handle_generic_string_msg", "Received string message from channel {channel}: {}", msg.msg); seen.insert(msg.msg); handler.send_action(channel, ProtocolGenericAction::Broadcast).await; @@ -201,7 +237,7 @@ async fn handle_generic_string_msg( /// Background handler function for GenericNumberMessage. async fn handle_generic_number_msg( - handler: ProtocolGenericHandlerPtr, + handler: ProtocolGenericHandlerPtr, ) -> Result<()> { let mut seen = HashSet::new(); loop { @@ -213,27 +249,68 @@ async fn handle_generic_number_msg( continue } - info!("Received string message from channel {channel}: {}", msg.num); + info!(target: "handle_generic_number_msg", "Received number message from channel {channel}: {}", msg.num); seen.insert(msg.num); handler.send_action(channel, ProtocolGenericAction::Broadcast).await; } } +/// Background handler function for GenericRequestMessage. +async fn handle_generic_request_msg( + node_id: u64, + handler: ProtocolGenericHandlerPtr, +) -> Result<()> { + let response = GenericResponseMessage { msg: format!("Pong from node {node_id}!") }; + loop { + // Wait for a new message + let (channel, msg) = handler.receiver.recv().await?; + + info!(target: "handle_generic_request_msg", "Received request message from channel {channel}: {}", msg.msg); + + handler.send_action(channel, ProtocolGenericAction::Response(response.clone())).await; + } +} + /// Background function to send messages at random intervals. async fn broadcast_messages(node_id: u64, p2p: P2pPtr) -> Result<()> { + let comms_timeout = p2p.settings().read().await.outbound_connect_timeout; + let request = GenericRequestMessage { msg: format!("Ping from node {node_id}!") }; let mut counter = 0; loop { let sleep_time = OsRng.gen_range(1..=10); - info!("Sleeping {sleep_time} till next broadcast..."); + info!(target: "broadcast_messages", "Sleeping {sleep_time} till next broadcast..."); sleep(sleep_time).await; - info!("Broacasting messages..."); + info!(target: "broadcast_messages", "Broacasting messages..."); + // Broadcast a generic string message let string_msg = GenericStringMessage { msg: format!("Hello from node {node_id}({counter})!") }; - let number_msg = GenericNumberMessage { num: node_id + counter }; p2p.broadcast(&string_msg).await; + + // Broadcast a generic number message + let number_msg = GenericNumberMessage { num: node_id + counter }; p2p.broadcast(&number_msg).await; + + // Perform a direct request to each peer and grab their response + let peers = p2p.hosts().channels(); + for peer in peers { + info!(target: "broadcast_messages", "Sending request message to peer {peer:?}: {}", request.msg); + let Ok(response_sub) = peer.subscribe_msg::().await else { + error!(target: "broadcast_messages", "Failure during `GenericResponseMessage` communication setup with peer: {peer:?}"); + continue + }; + if let Err(e) = peer.send(&request).await { + error!(target: "broadcast_messages", "Failure during `GenericResponseMessage` send to peer {peer:?}: {e}"); + continue + }; + let Ok(response) = response_sub.receive_with_timeout(comms_timeout).await else { + error!(target: "broadcast_messages", "Timeout while waiting for `GenericResponseMessage` from peer: {peer:?}"); + continue + }; + info!(target: "broadcast_messages", "Received response message from peer {peer:?}: {}", response.msg); + } + counter += 1; } } @@ -243,7 +320,7 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> { info!(target: "generic-node", "Initializing generic node..."); let genericd = Genericd::new(args.node_id, &args.net.into(), &ex).await?; - genericd.start(&ex).await?; + genericd.start().await?; // Signal handling for graceful termination. let (signals_handler, signals_task) = SignalHandler::new(ex)?; diff --git a/src/net/protocol/protocol_generic.rs b/src/net/protocol/protocol_generic.rs index 64686c954..7b8718759 100644 --- a/src/net/protocol/protocol_generic.rs +++ b/src/net/protocol/protocol_generic.rs @@ -16,7 +16,7 @@ * along with this program. If not, see . */ -use std::{clone::Clone, collections::HashMap, sync::Arc}; +use std::{clone::Clone, collections::HashMap, fmt::Debug, sync::Arc}; use async_trait::async_trait; use log::debug; @@ -42,22 +42,24 @@ use crate::{ /// Defines generic messages protocol action signal. #[derive(Debug)] -pub enum ProtocolGenericAction { +pub enum ProtocolGenericAction { /// Broadcast message to rest nodes Broadcast, + /// Send provided response message to the node + Response(M), /// Skip message broadcast Skip, /// Stop the channel entirely Stop, } -pub type ProtocolGenericHandlerPtr = Arc>; +pub type ProtocolGenericHandlerPtr = Arc>; /// Defines a handler for generic protocol messages, consisting /// of a message receiver, action signal senders mapped by each /// channel ID, and a stoppable task to run the handler in the /// background. -pub struct ProtocolGenericHandler { +pub struct ProtocolGenericHandler { // Since smol channels close if all senders or all receivers // get dropped, we will keep one here to remain alive with the // handler. @@ -68,20 +70,20 @@ pub struct ProtocolGenericHandler { pub receiver: Receiver<(u32, M)>, /// Senders mapped by channel ID to propagate the /// action signal after a message retrieval. - senders: RwLock>>, + senders: RwLock>>>, /// Handler background task to run the messages listener /// function with. pub task: StoppableTaskPtr, } -impl ProtocolGenericHandler { +impl ProtocolGenericHandler { /// Generate a new ProtocolGenericHandler for the provided P2P /// instance. The handler also attaches its generic protocol. pub async fn new( p2p: &P2pPtr, name: &'static str, session: SessionBitFlag, - ) -> ProtocolGenericHandlerPtr { + ) -> ProtocolGenericHandlerPtr { // Generate the message queue smol channel let (sender, receiver) = smol::channel::unbounded::<(u32, M)>(); @@ -108,7 +110,11 @@ impl ProtocolGenericHandler { /// Registers a new channel sender to the handler map. /// Additionally, looks for stale(closed) channels and prunes then from it. - async fn register_channel_sender(&self, channel: u32, sender: Sender) { + async fn register_channel_sender( + &self, + channel: u32, + sender: Sender>, + ) { // Register the new channel sender let mut lock = self.senders.write().await; lock.insert(channel, sender); @@ -130,7 +136,7 @@ impl ProtocolGenericHandler { } /// Sends provided protocol generic action to requested channel, if it exists. - pub async fn send_action(&self, channel: u32, action: ProtocolGenericAction) { + pub async fn send_action(&self, channel: u32, action: ProtocolGenericAction) { debug!( target: "net::protocol_generic::ProtocolGenericHandler::send_action", "Sending action {action:?} to channel {channel}..." @@ -162,13 +168,13 @@ impl ProtocolGenericHandler { } /// Defines generic messages protocol. -pub struct ProtocolGeneric { +pub struct ProtocolGeneric { /// The P2P channel message subcription msg_sub: MessageSubscription, /// The generic message smol channel sender sender: Sender<(u32, M)>, /// Action signal smol channel receiver - receiver: Receiver, + receiver: Receiver>, /// The P2P channel the protocol is serving channel: ChannelPtr, /// Pointer to the whole P2P instance @@ -177,12 +183,12 @@ pub struct ProtocolGeneric { jobsman: ProtocolJobsManagerPtr, } -impl ProtocolGeneric { +impl ProtocolGeneric { /// Initialize a new generic protocol. pub async fn init( channel: ChannelPtr, name: &'static str, - handler: ProtocolGenericHandlerPtr, + handler: ProtocolGenericHandlerPtr, p2p: P2pPtr, ) -> Result { debug!( @@ -193,6 +199,7 @@ impl ProtocolGeneric { // Add the message dispatcher let msg_subsystem = channel.message_subsystem(); msg_subsystem.add_dispatch::().await; + msg_subsystem.add_dispatch::().await; // Create the message subscription let msg_sub = channel.subscribe_msg::().await?; @@ -263,6 +270,14 @@ impl ProtocolGeneric { ProtocolGenericAction::Broadcast => { self.p2p.broadcast_with_exclude(&msg_copy, &exclude_list).await } + ProtocolGenericAction::Response(r) => { + if let Err(e) = self.channel.send(&r).await { + debug!( + target: "net::protocol_generic::handle_receive_message", + "[{}] Channel send fail: {e}", self.jobsman.clone().name() + ) + }; + } ProtocolGenericAction::Skip => { debug!( target: "net::protocol_generic::handle_receive_message", @@ -279,7 +294,7 @@ impl ProtocolGeneric { } #[async_trait] -impl ProtocolBase for ProtocolGeneric { +impl ProtocolBase for ProtocolGeneric { async fn start(self: Arc, ex: Arc>) -> Result<()> { debug!(target: "net::protocol_generic::start", "START"); self.jobsman.clone().start(ex.clone());