From 151aa186c43cbf05f97652e95a7de430ba13b8d8 Mon Sep 17 00:00:00 2001 From: skoupidi Date: Mon, 22 Jul 2024 19:29:02 +0300 Subject: [PATCH] net/channel: introduced Message pre-serialization for minor optimization of p2p.broadcast() --- src/net/channel.rs | 48 ++++++++++++++++++++++++---------------------- src/net/message.rs | 14 +++++++++++++- src/net/p2p.rs | 5 +++-- 3 files changed, 41 insertions(+), 26 deletions(-) diff --git a/src/net/channel.rs b/src/net/channel.rs index 2a51ddf66..4190032a1 100644 --- a/src/net/channel.rs +++ b/src/net/channel.rs @@ -41,7 +41,7 @@ use super::{ dnet::{self, dnetev, DnetEvent}, hosts::HostColor, message, - message::{VersionMessage, MAGIC_BYTES}, + message::{SerializedMessage, VersionMessage, MAGIC_BYTES}, message_publisher::{MessageSubscription, MessageSubsystem}, p2p::P2pPtr, session::{ @@ -186,13 +186,20 @@ impl Channel { self.stopped.load(SeqCst) } - /// Sends a message across a channel. Calls `send_message` that creates - /// a new payload and sends it over the network transport as a packet. + /// Sends a message across a channel. First it converts the message + /// into a `SerializedMessage` and then calls `send_serialized` to send it. /// Returns an error if something goes wrong. pub async fn send(&self, message: &M) -> Result<()> { + self.send_serialized(&SerializedMessage::new(message).await).await + } + + /// Sends the encoded payload of provided `SerializedMessage` across the channel. + /// Calls `send_message` that creates a new payload and sends it over the + /// network transport as a packet. Returns an error if something goes wrong. + pub async fn send_serialized(&self, message: &SerializedMessage) -> Result<()> { debug!( target: "net::channel::send()", "[START] command={} {:?}", - M::NAME, self, + message.command, self, ); if self.is_stopped() { @@ -213,25 +220,23 @@ impl Channel { debug!( target: "net::channel::send()", "[END] command={} {:?}", - M::NAME, self + message.command, self ); Ok(()) } - /// Sends an outbound Message by writing data to the given async stream. - async fn send_message(&self, payload: &M) -> Result<()> { - let command = M::NAME.to_string(); - assert!(!command.is_empty()); - assert!(std::mem::size_of::() <= std::mem::size_of::()); + /// Sends the encoded payload of provided `SerializedMessage` by writing + /// the data to the channel async stream. + async fn send_message(&self, message: &SerializedMessage) -> Result<()> { + assert!(!message.command.is_empty()); let stream = &mut *self.writer.lock().await; - let mut buffer = Vec::::new(); let mut written: usize = 0; dnetev!(self, SendMessage, { chan: self.info.clone(), - cmd: command, + cmd: message.command.clone(), time: NanoTimestamp::current_time(), }); @@ -240,21 +245,18 @@ impl Channel { trace!(target: "net::channel::send_message()", "Sent magic"); trace!(target: "net::channel::send_message()", "Sending command..."); - written += M::NAME.to_string().encode_async(stream).await?; - trace!(target: "net::channel::send_message()", "Sent command: {}", M::NAME.to_string()); + written += message.command.encode_async(stream).await?; + trace!(target: "net::channel::send_message()", "Sent command: {}", message.command); trace!(target: "net::channel::send_message()", "Sending payload..."); - // First encode the payload to an intermediate buffer. - payload.encode_async(&mut buffer).await?; - - // Then extract the length of the intermediate buffer as a VarInt - // and write to the stream. This is the length of the payload. - // Then encode the payload itself to the stream. - written += VarInt(buffer.len() as u64).encode_async(stream).await?; - written += payload.encode_async(stream).await?; + // First extract the length of the payload as a VarInt and write it to the stream. + written += VarInt(message.payload.len() as u64).encode_async(stream).await?; + // Then write the encoded payload itself to the stream. + stream.write_all(&message.payload).await?; + written += message.payload.len(); trace!(target: "net::channel::send_message()", "Sent payload {} bytes, total bytes {}", - buffer.len(), written); + message.payload.len(), written); stream.flush().await?; diff --git a/src/net/message.rs b/src/net/message.rs index 99b1257dc..7406d6dcb 100644 --- a/src/net/message.rs +++ b/src/net/message.rs @@ -17,7 +17,7 @@ */ use darkfi_serial::{ - async_trait, AsyncDecodable, AsyncEncodable, SerialDecodable, SerialEncodable, + async_trait, serialize_async, AsyncDecodable, AsyncEncodable, SerialDecodable, SerialEncodable, }; use url::Url; @@ -28,6 +28,18 @@ pub trait Message: 'static + Send + Sync + AsyncDecodable + AsyncEncodable { const NAME: &'static str; } +/// Generic serialized message template. +pub struct SerializedMessage { + pub command: String, + pub payload: Vec, +} + +impl SerializedMessage { + pub async fn new(message: &M) -> Self { + Self { command: M::NAME.to_string(), payload: serialize_async(message).await } + } +} + #[macro_export] macro_rules! impl_p2p_message { ($st:ty, $nm:expr) => { diff --git a/src/net/p2p.rs b/src/net/p2p.rs index d968512f4..8ce54a23c 100644 --- a/src/net/p2p.rs +++ b/src/net/p2p.rs @@ -35,7 +35,7 @@ use super::{ channel::ChannelPtr, dnet::DnetEvent, hosts::{Hosts, HostsPtr}, - message::Message, + message::{Message, SerializedMessage}, protocol::{protocol_registry::ProtocolRegistry, register_default_protocols}, session::{ InboundSession, InboundSessionPtr, ManualSession, ManualSessionPtr, OutboundSession, @@ -194,10 +194,11 @@ impl P2p { return } + let message = SerializedMessage::new(message).await; let futures = FuturesUnordered::new(); for channel in channel_list { - futures.push(channel.send(message).map_err(|e| { + futures.push(channel.send_serialized(&message).map_err(|e| { error!( target: "net::p2p::broadcast()", "[P2P] Broadcasting message to {} failed: {}",