bin/ircd: add msg to buffer inside the protocol of privmsg

This commit is contained in:
ghassmo
2022-06-18 08:26:08 +03:00
parent 62e84c017a
commit ec23379a0f
2 changed files with 28 additions and 15 deletions

View File

@@ -11,7 +11,6 @@ use futures::{io::BufReader, AsyncBufReadExt, AsyncReadExt, FutureExt};
use fxhash::FxHashMap;
use log::{debug, error, info, warn};
use rand::rngs::OsRng;
use ringbuffer::RingBufferWrite;
use smol::future;
use structopt_toml::StructOptToml;
@@ -77,14 +76,12 @@ struct Ircd {
impl Ircd {
fn new(
seen_msg_ids: SeenMsgIds,
privmsgs_buffer: PrivmsgsBuffer,
autojoin_chans: Vec<String>,
configured_chans: FxHashMap<String, ChannelInfo>,
p2p: net::P2pPtr,
p2p_receiver: Receiver<Privmsg>,
) -> Self {
let privmsgs_buffer: PrivmsgsBuffer =
Arc::new(Mutex::new(ringbuffer::AllocRingBuffer::with_capacity(SIZE_OF_MSGS_BUFFER)));
Self { seen_msg_ids, privmsgs_buffer, autojoin_chans, configured_chans, p2p, p2p_receiver }
}
@@ -107,7 +104,6 @@ impl Ircd {
);
let p2p_receiver = self.p2p_receiver.clone();
let privmsgs_buffer = self.privmsgs_buffer.clone();
executor.spawn(async move {
loop {
@@ -131,11 +127,6 @@ impl Ircd {
}
}
// add the msg to buffer
{
(*privmsgs_buffer.lock().await).push(msg.clone());
}
conn.reply(&msg.to_irc_msg()).await?;
}
err = reader.read_line(&mut line).fuse() => {
@@ -143,6 +134,7 @@ impl Ircd {
warn!("Read line error. Closing stream for {}: {}", peer_addr, e);
return Ok(())
}
info!("Received msg from IRC client: {:?}", line);
let irc_msg = match clean_input(line, &peer_addr) {
Ok(m) => m,
@@ -167,6 +159,8 @@ impl Ircd {
async_daemonize!(realmain);
async fn realmain(settings: Args, executor: Arc<Executor<'_>>) -> Result<()> {
let seen_msg_ids = Arc::new(Mutex::new(vec![]));
let privmsgs_buffer: PrivmsgsBuffer =
Arc::new(Mutex::new(ringbuffer::AllocRingBuffer::with_capacity(SIZE_OF_MSGS_BUFFER)));
if settings.gen_secret {
let secret_key = crypto_box::SecretKey::generate(&mut OsRng);
@@ -191,11 +185,22 @@ async fn realmain(settings: Args, executor: Arc<Executor<'_>>) -> Result<()> {
let registry = p2p.protocol_registry();
let seen_msg_ids_cloned = seen_msg_ids.clone();
let privmsgs_buffer_cloned = privmsgs_buffer.clone();
registry
.register(net::SESSION_ALL, move |channel, p2p| {
let sender = p2p_send_channel.clone();
let seen_msg_ids_cloned = seen_msg_ids_cloned.clone();
async move { ProtocolPrivmsg::init(channel, sender, p2p, seen_msg_ids_cloned).await }
let privmsgs_buffer_cloned = privmsgs_buffer_cloned.clone();
async move {
ProtocolPrivmsg::init(
channel,
sender,
p2p,
seen_msg_ids_cloned,
privmsgs_buffer_cloned,
)
.await
}
})
.await;
@@ -225,6 +230,7 @@ async fn realmain(settings: Args, executor: Arc<Executor<'_>>) -> Result<()> {
.spawn(async move {
let ircd = Ircd::new(
seen_msg_ids.clone(),
privmsgs_buffer.clone(),
settings.autojoin.clone(),
configured_chans.clone(),
p2p.clone(),

View File

@@ -1,20 +1,22 @@
use async_std::sync::{Arc, Mutex};
use async_std::sync::Arc;
use async_executor::Executor;
use async_trait::async_trait;
use log::debug;
use ringbuffer::RingBufferWrite;
use url::Url;
use darkfi::{net, Result};
use crate::Privmsg;
use crate::privmsg::{Privmsg, PrivmsgsBuffer, SeenMsgIds};
pub struct ProtocolPrivmsg {
jobsman: net::ProtocolJobsManagerPtr,
notify_queue_sender: async_channel::Sender<Privmsg>,
msg_sub: net::MessageSubscription<Privmsg>,
p2p: net::P2pPtr,
msg_ids: Arc<Mutex<Vec<u64>>>,
msg_ids: SeenMsgIds,
msgs: PrivmsgsBuffer,
channel_address: Url,
}
@@ -23,7 +25,8 @@ impl ProtocolPrivmsg {
channel: net::ChannelPtr,
notify_queue_sender: async_channel::Sender<Privmsg>,
p2p: net::P2pPtr,
msg_ids: Arc<Mutex<Vec<u64>>>,
msg_ids: SeenMsgIds,
msgs: PrivmsgsBuffer,
) -> net::ProtocolBasePtr {
let message_subsytem = channel.get_message_subsystem();
message_subsytem.add_dispatch::<Privmsg>().await;
@@ -38,6 +41,7 @@ impl ProtocolPrivmsg {
jobsman: net::ProtocolJobsManager::new("ProtocolPrivmsg", channel),
p2p,
msg_ids,
msgs,
channel_address,
})
}
@@ -55,6 +59,9 @@ impl ProtocolPrivmsg {
self.msg_ids.lock().await.push(msg.id);
let msg = (*msg).clone();
// add the msg to the buffer
self.msgs.lock().await.push(msg.clone());
self.notify_queue_sender.send(msg.clone()).await?;
self.p2p.broadcast_with_exclude(msg.clone(), &exclude_list).await?;