src/net/protorocl/protocol_generic: added response action

This commit is contained in:
skoupidi
2024-09-17 16:41:46 +03:00
parent c699f9da44
commit 19eb32f29b
2 changed files with 120 additions and 28 deletions

View File

@@ -79,6 +79,18 @@ struct GenericNumberMessage {
} }
impl_p2p_message!(GenericNumberMessage, "generic_number_message"); impl_p2p_message!(GenericNumberMessage, "generic_number_message");
#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
struct GenericRequestMessage {
msg: String,
}
impl_p2p_message!(GenericRequestMessage, "generic_request_message");
#[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
struct GenericResponseMessage {
msg: String,
}
impl_p2p_message!(GenericResponseMessage, "generic_response_message");
/// Generic daemon structure /// Generic daemon structure
struct Genericd { struct Genericd {
/// Node ID, used in the dummy messages /// Node ID, used in the dummy messages
@@ -86,9 +98,14 @@ struct Genericd {
/// P2P network pointer /// P2P network pointer
p2p: P2pPtr, p2p: P2pPtr,
/// GenericStringMessage handler /// GenericStringMessage handler
generic_string_msg_handler: ProtocolGenericHandlerPtr<GenericStringMessage>, generic_string_msg_handler:
ProtocolGenericHandlerPtr<GenericStringMessage, GenericStringMessage>,
/// GenericNumberMessage handler /// GenericNumberMessage handler
generic_number_msg_handler: ProtocolGenericHandlerPtr<GenericNumberMessage>, generic_number_msg_handler:
ProtocolGenericHandlerPtr<GenericNumberMessage, GenericNumberMessage>,
/// GenericRequestMessage handler
generic_request_msg_handler:
ProtocolGenericHandlerPtr<GenericRequestMessage, GenericResponseMessage>,
/// Broadcasting messages task /// Broadcasting messages task
broadcast_task: StoppableTaskPtr, broadcast_task: StoppableTaskPtr,
} }
@@ -111,6 +128,10 @@ impl Genericd {
let generic_number_msg_handler = let generic_number_msg_handler =
ProtocolGenericHandler::new(&p2p, "ProtocolGenericNumber", SESSION_DEFAULT).await; ProtocolGenericHandler::new(&p2p, "ProtocolGenericNumber", SESSION_DEFAULT).await;
// Add a generic protocol for GenericRequestMessage
let generic_request_msg_handler =
ProtocolGenericHandler::new(&p2p, "ProtocolGenericRequest", SESSION_DEFAULT).await;
let broadcast_task = StoppableTask::new(); let broadcast_task = StoppableTask::new();
Ok(Self { Ok(Self {
@@ -118,12 +139,13 @@ impl Genericd {
p2p, p2p,
generic_string_msg_handler, generic_string_msg_handler,
generic_number_msg_handler, generic_number_msg_handler,
generic_request_msg_handler,
broadcast_task, broadcast_task,
}) })
} }
/// Start all daemon background tasks. /// Start all daemon background tasks.
async fn start(&self, executor: &Arc<Executor<'static>>) -> Result<()> { async fn start(&self) -> Result<()> {
info!(target: "genericd", "Starting tasks..."); info!(target: "genericd", "Starting tasks...");
self.generic_string_msg_handler.task.clone().start( self.generic_string_msg_handler.task.clone().start(
@@ -135,7 +157,7 @@ impl Genericd {
} }
}, },
Error::DetachedTaskStopped, Error::DetachedTaskStopped,
executor.clone(), self.p2p.executor(),
); );
self.generic_number_msg_handler.task.clone().start( self.generic_number_msg_handler.task.clone().start(
@@ -147,7 +169,19 @@ impl Genericd {
} }
}, },
Error::DetachedTaskStopped, Error::DetachedTaskStopped,
executor.clone(), self.p2p.executor(),
);
self.generic_request_msg_handler.task.clone().start(
handle_generic_request_msg(self.node_id, self.generic_request_msg_handler.clone()),
|res| async move {
match res {
Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ }
Err(e) => error!(target: "genericd", "Failed starting protocol generic request handler task: {e}"),
}
},
Error::DetachedTaskStopped,
self.p2p.executor(),
); );
self.p2p.clone().start().await?; self.p2p.clone().start().await?;
@@ -161,7 +195,7 @@ impl Genericd {
} }
}, },
Error::DetachedTaskStopped, Error::DetachedTaskStopped,
executor.clone(), self.p2p.executor(),
); );
info!(target: "genericd", "All tasks started!"); info!(target: "genericd", "All tasks started!");
@@ -171,16 +205,18 @@ impl Genericd {
/// Stop all daemon background tasks. /// Stop all daemon background tasks.
async fn stop(&self) { async fn stop(&self) {
info!(target: "genericd", "Terminating tasks..."); info!(target: "genericd", "Terminating tasks...");
self.broadcast_task.stop().await;
self.p2p.stop().await; self.p2p.stop().await;
self.generic_string_msg_handler.task.stop().await; self.generic_string_msg_handler.task.stop().await;
self.generic_number_msg_handler.task.stop().await; self.generic_number_msg_handler.task.stop().await;
self.generic_request_msg_handler.task.stop().await;
info!(target: "genericd", "All tasks terminated!"); info!(target: "genericd", "All tasks terminated!");
} }
} }
/// Background handler function for GenericStringMessage. /// Background handler function for GenericStringMessage.
async fn handle_generic_string_msg( async fn handle_generic_string_msg(
handler: ProtocolGenericHandlerPtr<GenericStringMessage>, handler: ProtocolGenericHandlerPtr<GenericStringMessage, GenericStringMessage>,
) -> Result<()> { ) -> Result<()> {
let mut seen = HashSet::new(); let mut seen = HashSet::new();
loop { loop {
@@ -192,7 +228,7 @@ async fn handle_generic_string_msg(
continue continue
} }
info!("Received string message from channel {channel}: {}", msg.msg); info!(target: "handle_generic_string_msg", "Received string message from channel {channel}: {}", msg.msg);
seen.insert(msg.msg); seen.insert(msg.msg);
handler.send_action(channel, ProtocolGenericAction::Broadcast).await; handler.send_action(channel, ProtocolGenericAction::Broadcast).await;
@@ -201,7 +237,7 @@ async fn handle_generic_string_msg(
/// Background handler function for GenericNumberMessage. /// Background handler function for GenericNumberMessage.
async fn handle_generic_number_msg( async fn handle_generic_number_msg(
handler: ProtocolGenericHandlerPtr<GenericNumberMessage>, handler: ProtocolGenericHandlerPtr<GenericNumberMessage, GenericNumberMessage>,
) -> Result<()> { ) -> Result<()> {
let mut seen = HashSet::new(); let mut seen = HashSet::new();
loop { loop {
@@ -213,27 +249,68 @@ async fn handle_generic_number_msg(
continue continue
} }
info!("Received string message from channel {channel}: {}", msg.num); info!(target: "handle_generic_number_msg", "Received number message from channel {channel}: {}", msg.num);
seen.insert(msg.num); seen.insert(msg.num);
handler.send_action(channel, ProtocolGenericAction::Broadcast).await; handler.send_action(channel, ProtocolGenericAction::Broadcast).await;
} }
} }
/// Background handler function for GenericRequestMessage.
async fn handle_generic_request_msg(
node_id: u64,
handler: ProtocolGenericHandlerPtr<GenericRequestMessage, GenericResponseMessage>,
) -> Result<()> {
let response = GenericResponseMessage { msg: format!("Pong from node {node_id}!") };
loop {
// Wait for a new message
let (channel, msg) = handler.receiver.recv().await?;
info!(target: "handle_generic_request_msg", "Received request message from channel {channel}: {}", msg.msg);
handler.send_action(channel, ProtocolGenericAction::Response(response.clone())).await;
}
}
/// Background function to send messages at random intervals. /// Background function to send messages at random intervals.
async fn broadcast_messages(node_id: u64, p2p: P2pPtr) -> Result<()> { async fn broadcast_messages(node_id: u64, p2p: P2pPtr) -> Result<()> {
let comms_timeout = p2p.settings().read().await.outbound_connect_timeout;
let request = GenericRequestMessage { msg: format!("Ping from node {node_id}!") };
let mut counter = 0; let mut counter = 0;
loop { loop {
let sleep_time = OsRng.gen_range(1..=10); let sleep_time = OsRng.gen_range(1..=10);
info!("Sleeping {sleep_time} till next broadcast..."); info!(target: "broadcast_messages", "Sleeping {sleep_time} till next broadcast...");
sleep(sleep_time).await; sleep(sleep_time).await;
info!("Broacasting messages..."); info!(target: "broadcast_messages", "Broacasting messages...");
// Broadcast a generic string message
let string_msg = let string_msg =
GenericStringMessage { msg: format!("Hello from node {node_id}({counter})!") }; GenericStringMessage { msg: format!("Hello from node {node_id}({counter})!") };
let number_msg = GenericNumberMessage { num: node_id + counter };
p2p.broadcast(&string_msg).await; p2p.broadcast(&string_msg).await;
// Broadcast a generic number message
let number_msg = GenericNumberMessage { num: node_id + counter };
p2p.broadcast(&number_msg).await; p2p.broadcast(&number_msg).await;
// Perform a direct request to each peer and grab their response
let peers = p2p.hosts().channels();
for peer in peers {
info!(target: "broadcast_messages", "Sending request message to peer {peer:?}: {}", request.msg);
let Ok(response_sub) = peer.subscribe_msg::<GenericResponseMessage>().await else {
error!(target: "broadcast_messages", "Failure during `GenericResponseMessage` communication setup with peer: {peer:?}");
continue
};
if let Err(e) = peer.send(&request).await {
error!(target: "broadcast_messages", "Failure during `GenericResponseMessage` send to peer {peer:?}: {e}");
continue
};
let Ok(response) = response_sub.receive_with_timeout(comms_timeout).await else {
error!(target: "broadcast_messages", "Timeout while waiting for `GenericResponseMessage` from peer: {peer:?}");
continue
};
info!(target: "broadcast_messages", "Received response message from peer {peer:?}: {}", response.msg);
}
counter += 1; counter += 1;
} }
} }
@@ -243,7 +320,7 @@ async fn realmain(args: Args, ex: Arc<smol::Executor<'static>>) -> Result<()> {
info!(target: "generic-node", "Initializing generic node..."); info!(target: "generic-node", "Initializing generic node...");
let genericd = Genericd::new(args.node_id, &args.net.into(), &ex).await?; let genericd = Genericd::new(args.node_id, &args.net.into(), &ex).await?;
genericd.start(&ex).await?; genericd.start().await?;
// Signal handling for graceful termination. // Signal handling for graceful termination.
let (signals_handler, signals_task) = SignalHandler::new(ex)?; let (signals_handler, signals_task) = SignalHandler::new(ex)?;

View File

@@ -16,7 +16,7 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>. * along with this program. If not, see <https://www.gnu.org/licenses/>.
*/ */
use std::{clone::Clone, collections::HashMap, sync::Arc}; use std::{clone::Clone, collections::HashMap, fmt::Debug, sync::Arc};
use async_trait::async_trait; use async_trait::async_trait;
use log::debug; use log::debug;
@@ -42,22 +42,24 @@ use crate::{
/// Defines generic messages protocol action signal. /// Defines generic messages protocol action signal.
#[derive(Debug)] #[derive(Debug)]
pub enum ProtocolGenericAction { pub enum ProtocolGenericAction<M> {
/// Broadcast message to rest nodes /// Broadcast message to rest nodes
Broadcast, Broadcast,
/// Send provided response message to the node
Response(M),
/// Skip message broadcast /// Skip message broadcast
Skip, Skip,
/// Stop the channel entirely /// Stop the channel entirely
Stop, Stop,
} }
pub type ProtocolGenericHandlerPtr<M> = Arc<ProtocolGenericHandler<M>>; pub type ProtocolGenericHandlerPtr<M, R> = Arc<ProtocolGenericHandler<M, R>>;
/// Defines a handler for generic protocol messages, consisting /// Defines a handler for generic protocol messages, consisting
/// of a message receiver, action signal senders mapped by each /// of a message receiver, action signal senders mapped by each
/// channel ID, and a stoppable task to run the handler in the /// channel ID, and a stoppable task to run the handler in the
/// background. /// background.
pub struct ProtocolGenericHandler<M: Message + Clone> { pub struct ProtocolGenericHandler<M: Message + Clone, R: Message + Clone + Debug> {
// Since smol channels close if all senders or all receivers // Since smol channels close if all senders or all receivers
// get dropped, we will keep one here to remain alive with the // get dropped, we will keep one here to remain alive with the
// handler. // handler.
@@ -68,20 +70,20 @@ pub struct ProtocolGenericHandler<M: Message + Clone> {
pub receiver: Receiver<(u32, M)>, pub receiver: Receiver<(u32, M)>,
/// Senders mapped by channel ID to propagate the /// Senders mapped by channel ID to propagate the
/// action signal after a message retrieval. /// action signal after a message retrieval.
senders: RwLock<HashMap<u32, Sender<ProtocolGenericAction>>>, senders: RwLock<HashMap<u32, Sender<ProtocolGenericAction<R>>>>,
/// Handler background task to run the messages listener /// Handler background task to run the messages listener
/// function with. /// function with.
pub task: StoppableTaskPtr, pub task: StoppableTaskPtr,
} }
impl<M: Message + Clone> ProtocolGenericHandler<M> { impl<M: Message + Clone, R: Message + Clone + Debug> ProtocolGenericHandler<M, R> {
/// Generate a new ProtocolGenericHandler for the provided P2P /// Generate a new ProtocolGenericHandler for the provided P2P
/// instance. The handler also attaches its generic protocol. /// instance. The handler also attaches its generic protocol.
pub async fn new( pub async fn new(
p2p: &P2pPtr, p2p: &P2pPtr,
name: &'static str, name: &'static str,
session: SessionBitFlag, session: SessionBitFlag,
) -> ProtocolGenericHandlerPtr<M> { ) -> ProtocolGenericHandlerPtr<M, R> {
// Generate the message queue smol channel // Generate the message queue smol channel
let (sender, receiver) = smol::channel::unbounded::<(u32, M)>(); let (sender, receiver) = smol::channel::unbounded::<(u32, M)>();
@@ -108,7 +110,11 @@ impl<M: Message + Clone> ProtocolGenericHandler<M> {
/// Registers a new channel sender to the handler map. /// Registers a new channel sender to the handler map.
/// Additionally, looks for stale(closed) channels and prunes then from it. /// Additionally, looks for stale(closed) channels and prunes then from it.
async fn register_channel_sender(&self, channel: u32, sender: Sender<ProtocolGenericAction>) { async fn register_channel_sender(
&self,
channel: u32,
sender: Sender<ProtocolGenericAction<R>>,
) {
// Register the new channel sender // Register the new channel sender
let mut lock = self.senders.write().await; let mut lock = self.senders.write().await;
lock.insert(channel, sender); lock.insert(channel, sender);
@@ -130,7 +136,7 @@ impl<M: Message + Clone> ProtocolGenericHandler<M> {
} }
/// Sends provided protocol generic action to requested channel, if it exists. /// Sends provided protocol generic action to requested channel, if it exists.
pub async fn send_action(&self, channel: u32, action: ProtocolGenericAction) { pub async fn send_action(&self, channel: u32, action: ProtocolGenericAction<R>) {
debug!( debug!(
target: "net::protocol_generic::ProtocolGenericHandler::send_action", target: "net::protocol_generic::ProtocolGenericHandler::send_action",
"Sending action {action:?} to channel {channel}..." "Sending action {action:?} to channel {channel}..."
@@ -162,13 +168,13 @@ impl<M: Message + Clone> ProtocolGenericHandler<M> {
} }
/// Defines generic messages protocol. /// Defines generic messages protocol.
pub struct ProtocolGeneric<M: Message + Clone> { pub struct ProtocolGeneric<M: Message + Clone, R: Message + Clone + Debug> {
/// The P2P channel message subcription /// The P2P channel message subcription
msg_sub: MessageSubscription<M>, msg_sub: MessageSubscription<M>,
/// The generic message smol channel sender /// The generic message smol channel sender
sender: Sender<(u32, M)>, sender: Sender<(u32, M)>,
/// Action signal smol channel receiver /// Action signal smol channel receiver
receiver: Receiver<ProtocolGenericAction>, receiver: Receiver<ProtocolGenericAction<R>>,
/// The P2P channel the protocol is serving /// The P2P channel the protocol is serving
channel: ChannelPtr, channel: ChannelPtr,
/// Pointer to the whole P2P instance /// Pointer to the whole P2P instance
@@ -177,12 +183,12 @@ pub struct ProtocolGeneric<M: Message + Clone> {
jobsman: ProtocolJobsManagerPtr, jobsman: ProtocolJobsManagerPtr,
} }
impl<M: Message + Clone> ProtocolGeneric<M> { impl<M: Message + Clone, R: Message + Clone + Debug> ProtocolGeneric<M, R> {
/// Initialize a new generic protocol. /// Initialize a new generic protocol.
pub async fn init( pub async fn init(
channel: ChannelPtr, channel: ChannelPtr,
name: &'static str, name: &'static str,
handler: ProtocolGenericHandlerPtr<M>, handler: ProtocolGenericHandlerPtr<M, R>,
p2p: P2pPtr, p2p: P2pPtr,
) -> Result<ProtocolBasePtr> { ) -> Result<ProtocolBasePtr> {
debug!( debug!(
@@ -193,6 +199,7 @@ impl<M: Message + Clone> ProtocolGeneric<M> {
// Add the message dispatcher // Add the message dispatcher
let msg_subsystem = channel.message_subsystem(); let msg_subsystem = channel.message_subsystem();
msg_subsystem.add_dispatch::<M>().await; msg_subsystem.add_dispatch::<M>().await;
msg_subsystem.add_dispatch::<R>().await;
// Create the message subscription // Create the message subscription
let msg_sub = channel.subscribe_msg::<M>().await?; let msg_sub = channel.subscribe_msg::<M>().await?;
@@ -263,6 +270,14 @@ impl<M: Message + Clone> ProtocolGeneric<M> {
ProtocolGenericAction::Broadcast => { ProtocolGenericAction::Broadcast => {
self.p2p.broadcast_with_exclude(&msg_copy, &exclude_list).await self.p2p.broadcast_with_exclude(&msg_copy, &exclude_list).await
} }
ProtocolGenericAction::Response(r) => {
if let Err(e) = self.channel.send(&r).await {
debug!(
target: "net::protocol_generic::handle_receive_message",
"[{}] Channel send fail: {e}", self.jobsman.clone().name()
)
};
}
ProtocolGenericAction::Skip => { ProtocolGenericAction::Skip => {
debug!( debug!(
target: "net::protocol_generic::handle_receive_message", target: "net::protocol_generic::handle_receive_message",
@@ -279,7 +294,7 @@ impl<M: Message + Clone> ProtocolGeneric<M> {
} }
#[async_trait] #[async_trait]
impl<M: Message + Clone> ProtocolBase for ProtocolGeneric<M> { impl<M: Message + Clone, R: Message + Clone + Debug> ProtocolBase for ProtocolGeneric<M, R> {
async fn start(self: Arc<Self>, ex: Arc<Executor<'_>>) -> Result<()> { async fn start(self: Arc<Self>, ex: Arc<Executor<'_>>) -> Result<()> {
debug!(target: "net::protocol_generic::start", "START"); debug!(target: "net::protocol_generic::start", "START");
self.jobsman.clone().start(ex.clone()); self.jobsman.clone().start(ex.clone());