From ec23379a0f977a6cb3153ce28dbcf05ae40a7016 Mon Sep 17 00:00:00 2001 From: ghassmo Date: Sat, 18 Jun 2022 08:26:08 +0300 Subject: [PATCH] bin/ircd: add msg to buffer inside the protocol of privmsg --- bin/ircd/src/main.rs | 28 +++++++++++++++++----------- bin/ircd/src/protocol_privmsg.rs | 15 +++++++++++---- 2 files changed, 28 insertions(+), 15 deletions(-) diff --git a/bin/ircd/src/main.rs b/bin/ircd/src/main.rs index 218f8bff9..ff1cdccea 100644 --- a/bin/ircd/src/main.rs +++ b/bin/ircd/src/main.rs @@ -11,7 +11,6 @@ use futures::{io::BufReader, AsyncBufReadExt, AsyncReadExt, FutureExt}; use fxhash::FxHashMap; use log::{debug, error, info, warn}; use rand::rngs::OsRng; -use ringbuffer::RingBufferWrite; use smol::future; use structopt_toml::StructOptToml; @@ -77,14 +76,12 @@ struct Ircd { impl Ircd { fn new( seen_msg_ids: SeenMsgIds, + privmsgs_buffer: PrivmsgsBuffer, autojoin_chans: Vec, configured_chans: FxHashMap, p2p: net::P2pPtr, p2p_receiver: Receiver, ) -> Self { - let privmsgs_buffer: PrivmsgsBuffer = - Arc::new(Mutex::new(ringbuffer::AllocRingBuffer::with_capacity(SIZE_OF_MSGS_BUFFER))); - Self { seen_msg_ids, privmsgs_buffer, autojoin_chans, configured_chans, p2p, p2p_receiver } } @@ -107,7 +104,6 @@ impl Ircd { ); let p2p_receiver = self.p2p_receiver.clone(); - let privmsgs_buffer = self.privmsgs_buffer.clone(); executor.spawn(async move { loop { @@ -131,11 +127,6 @@ impl Ircd { } } - // add the msg to buffer - { - (*privmsgs_buffer.lock().await).push(msg.clone()); - } - conn.reply(&msg.to_irc_msg()).await?; } err = reader.read_line(&mut line).fuse() => { @@ -143,6 +134,7 @@ impl Ircd { warn!("Read line error. Closing stream for {}: {}", peer_addr, e); return Ok(()) } + info!("Received msg from IRC client: {:?}", line); let irc_msg = match clean_input(line, &peer_addr) { Ok(m) => m, @@ -167,6 +159,8 @@ impl Ircd { async_daemonize!(realmain); async fn realmain(settings: Args, executor: Arc>) -> Result<()> { let seen_msg_ids = Arc::new(Mutex::new(vec![])); + let privmsgs_buffer: PrivmsgsBuffer = + Arc::new(Mutex::new(ringbuffer::AllocRingBuffer::with_capacity(SIZE_OF_MSGS_BUFFER))); if settings.gen_secret { let secret_key = crypto_box::SecretKey::generate(&mut OsRng); @@ -191,11 +185,22 @@ async fn realmain(settings: Args, executor: Arc>) -> Result<()> { let registry = p2p.protocol_registry(); let seen_msg_ids_cloned = seen_msg_ids.clone(); + let privmsgs_buffer_cloned = privmsgs_buffer.clone(); registry .register(net::SESSION_ALL, move |channel, p2p| { let sender = p2p_send_channel.clone(); let seen_msg_ids_cloned = seen_msg_ids_cloned.clone(); - async move { ProtocolPrivmsg::init(channel, sender, p2p, seen_msg_ids_cloned).await } + let privmsgs_buffer_cloned = privmsgs_buffer_cloned.clone(); + async move { + ProtocolPrivmsg::init( + channel, + sender, + p2p, + seen_msg_ids_cloned, + privmsgs_buffer_cloned, + ) + .await + } }) .await; @@ -225,6 +230,7 @@ async fn realmain(settings: Args, executor: Arc>) -> Result<()> { .spawn(async move { let ircd = Ircd::new( seen_msg_ids.clone(), + privmsgs_buffer.clone(), settings.autojoin.clone(), configured_chans.clone(), p2p.clone(), diff --git a/bin/ircd/src/protocol_privmsg.rs b/bin/ircd/src/protocol_privmsg.rs index 7903b1238..da9d1a62d 100644 --- a/bin/ircd/src/protocol_privmsg.rs +++ b/bin/ircd/src/protocol_privmsg.rs @@ -1,20 +1,22 @@ -use async_std::sync::{Arc, Mutex}; +use async_std::sync::Arc; use async_executor::Executor; use async_trait::async_trait; use log::debug; +use ringbuffer::RingBufferWrite; use url::Url; use darkfi::{net, Result}; -use crate::Privmsg; +use crate::privmsg::{Privmsg, PrivmsgsBuffer, SeenMsgIds}; pub struct ProtocolPrivmsg { jobsman: net::ProtocolJobsManagerPtr, notify_queue_sender: async_channel::Sender, msg_sub: net::MessageSubscription, p2p: net::P2pPtr, - msg_ids: Arc>>, + msg_ids: SeenMsgIds, + msgs: PrivmsgsBuffer, channel_address: Url, } @@ -23,7 +25,8 @@ impl ProtocolPrivmsg { channel: net::ChannelPtr, notify_queue_sender: async_channel::Sender, p2p: net::P2pPtr, - msg_ids: Arc>>, + msg_ids: SeenMsgIds, + msgs: PrivmsgsBuffer, ) -> net::ProtocolBasePtr { let message_subsytem = channel.get_message_subsystem(); message_subsytem.add_dispatch::().await; @@ -38,6 +41,7 @@ impl ProtocolPrivmsg { jobsman: net::ProtocolJobsManager::new("ProtocolPrivmsg", channel), p2p, msg_ids, + msgs, channel_address, }) } @@ -55,6 +59,9 @@ impl ProtocolPrivmsg { self.msg_ids.lock().await.push(msg.id); let msg = (*msg).clone(); + // add the msg to the buffer + self.msgs.lock().await.push(msg.clone()); + self.notify_queue_sender.send(msg.clone()).await?; self.p2p.broadcast_with_exclude(msg.clone(), &exclude_list).await?;