net/channel: introduced Message pre-serialization for minor optimization of p2p.broadcast()

This commit is contained in:
skoupidi
2024-07-22 19:29:02 +03:00
parent d63f675fba
commit 151aa186c4
3 changed files with 41 additions and 26 deletions

View File

@@ -41,7 +41,7 @@ use super::{
dnet::{self, dnetev, DnetEvent},
hosts::HostColor,
message,
message::{VersionMessage, MAGIC_BYTES},
message::{SerializedMessage, VersionMessage, MAGIC_BYTES},
message_publisher::{MessageSubscription, MessageSubsystem},
p2p::P2pPtr,
session::{
@@ -186,13 +186,20 @@ impl Channel {
self.stopped.load(SeqCst)
}
/// Sends a message across a channel. Calls `send_message` that creates
/// a new payload and sends it over the network transport as a packet.
/// Sends a message across a channel. First it converts the message
/// into a `SerializedMessage` and then calls `send_serialized` to send it.
/// Returns an error if something goes wrong.
pub async fn send<M: message::Message>(&self, message: &M) -> Result<()> {
self.send_serialized(&SerializedMessage::new(message).await).await
}
/// Sends the encoded payload of provided `SerializedMessage` across the channel.
/// Calls `send_message` that creates a new payload and sends it over the
/// network transport as a packet. Returns an error if something goes wrong.
pub async fn send_serialized(&self, message: &SerializedMessage) -> Result<()> {
debug!(
target: "net::channel::send()", "[START] command={} {:?}",
M::NAME, self,
message.command, self,
);
if self.is_stopped() {
@@ -213,25 +220,23 @@ impl Channel {
debug!(
target: "net::channel::send()", "[END] command={} {:?}",
M::NAME, self
message.command, self
);
Ok(())
}
/// Sends an outbound Message by writing data to the given async stream.
async fn send_message<M: message::Message>(&self, payload: &M) -> Result<()> {
let command = M::NAME.to_string();
assert!(!command.is_empty());
assert!(std::mem::size_of::<usize>() <= std::mem::size_of::<u64>());
/// Sends the encoded payload of provided `SerializedMessage` by writing
/// the data to the channel async stream.
async fn send_message(&self, message: &SerializedMessage) -> Result<()> {
assert!(!message.command.is_empty());
let stream = &mut *self.writer.lock().await;
let mut buffer = Vec::<u8>::new();
let mut written: usize = 0;
dnetev!(self, SendMessage, {
chan: self.info.clone(),
cmd: command,
cmd: message.command.clone(),
time: NanoTimestamp::current_time(),
});
@@ -240,21 +245,18 @@ impl Channel {
trace!(target: "net::channel::send_message()", "Sent magic");
trace!(target: "net::channel::send_message()", "Sending command...");
written += M::NAME.to_string().encode_async(stream).await?;
trace!(target: "net::channel::send_message()", "Sent command: {}", M::NAME.to_string());
written += message.command.encode_async(stream).await?;
trace!(target: "net::channel::send_message()", "Sent command: {}", message.command);
trace!(target: "net::channel::send_message()", "Sending payload...");
// First encode the payload to an intermediate buffer.
payload.encode_async(&mut buffer).await?;
// Then extract the length of the intermediate buffer as a VarInt
// and write to the stream. This is the length of the payload.
// Then encode the payload itself to the stream.
written += VarInt(buffer.len() as u64).encode_async(stream).await?;
written += payload.encode_async(stream).await?;
// First extract the length of the payload as a VarInt and write it to the stream.
written += VarInt(message.payload.len() as u64).encode_async(stream).await?;
// Then write the encoded payload itself to the stream.
stream.write_all(&message.payload).await?;
written += message.payload.len();
trace!(target: "net::channel::send_message()", "Sent payload {} bytes, total bytes {}",
buffer.len(), written);
message.payload.len(), written);
stream.flush().await?;

View File

@@ -17,7 +17,7 @@
*/
use darkfi_serial::{
async_trait, AsyncDecodable, AsyncEncodable, SerialDecodable, SerialEncodable,
async_trait, serialize_async, AsyncDecodable, AsyncEncodable, SerialDecodable, SerialEncodable,
};
use url::Url;
@@ -28,6 +28,18 @@ pub trait Message: 'static + Send + Sync + AsyncDecodable + AsyncEncodable {
const NAME: &'static str;
}
/// Generic serialized message template.
pub struct SerializedMessage {
pub command: String,
pub payload: Vec<u8>,
}
impl SerializedMessage {
pub async fn new<M: Message>(message: &M) -> Self {
Self { command: M::NAME.to_string(), payload: serialize_async(message).await }
}
}
#[macro_export]
macro_rules! impl_p2p_message {
($st:ty, $nm:expr) => {

View File

@@ -35,7 +35,7 @@ use super::{
channel::ChannelPtr,
dnet::DnetEvent,
hosts::{Hosts, HostsPtr},
message::Message,
message::{Message, SerializedMessage},
protocol::{protocol_registry::ProtocolRegistry, register_default_protocols},
session::{
InboundSession, InboundSessionPtr, ManualSession, ManualSessionPtr, OutboundSession,
@@ -194,10 +194,11 @@ impl P2p {
return
}
let message = SerializedMessage::new(message).await;
let futures = FuturesUnordered::new();
for channel in channel_list {
futures.push(channel.send(message).map_err(|e| {
futures.push(channel.send_serialized(&message).map_err(|e| {
error!(
target: "net::p2p::broadcast()",
"[P2P] Broadcasting message to {} failed: {}",