diff --git a/bin/ircd2/src/main.rs b/bin/ircd2/src/main.rs index 8a0bc60b6..83472280f 100644 --- a/bin/ircd2/src/main.rs +++ b/bin/ircd2/src/main.rs @@ -1,5 +1,8 @@ -use async_std::net::{TcpListener, TcpStream}; -use std::{net::SocketAddr, sync::Arc}; +use async_std::{ + net::{TcpListener, TcpStream}, + sync::{Arc, Mutex}, +}; +use std::net::SocketAddr; use async_channel::Receiver; use async_executor::Executor; @@ -33,11 +36,14 @@ use crate::{ settings::{Args, CONFIG_FILE, CONFIG_FILE_CONTENTS}, }; +pub type SeenMsgId = Arc>>; + async fn process_user_input( mut line: String, peer_addr: SocketAddr, conn: &mut IrcServerConnection, sender: async_channel::Sender, + seen_msg_id: SeenMsgId, ) -> Result<()> { if line.is_empty() { warn!("Received empty line from {}. Closing connection.", peer_addr); @@ -51,7 +57,7 @@ async fn process_user_input( debug!("Received '{}' from {}", line, peer_addr); - if let Err(e) = conn.update(line, sender).await { + if let Err(e) = conn.update(line, sender, seen_msg_id).await { warn!("Connection error: {} for {}", e, peer_addr); return Err(Error::ChannelStopped) } @@ -64,6 +70,7 @@ async fn process( stream: TcpStream, peer_addr: SocketAddr, sender: async_channel::Sender, + seen_msg_id: SeenMsgId, ) -> Result<()> { let (reader, writer) = stream.split(); @@ -75,6 +82,15 @@ async fn process( futures::select! { privmsg = receiver.recv().fuse() => { let msg = privmsg?; + + let mut smi = seen_msg_id.lock().await; + if smi.contains(&msg.id) { + continue + } + + smi.push(msg.id); + drop(smi); + debug!("ABOUT TO SEND: {:?}", msg); let irc_msg = format!(":{}!anon@dark.fi PRIVMSG {} :{}\r\n", msg.nickname, @@ -91,7 +107,7 @@ async fn process( return Ok(()) } - process_user_input(line, peer_addr, &mut conn, sender.clone()).await?; + process_user_input(line, peer_addr, &mut conn, sender.clone(), seen_msg_id.clone()).await?; } }; } @@ -105,6 +121,8 @@ async fn realmain(settings: Args, executor: Arc>) -> Result<()> { let datastore_path = expand_path(&settings.datastore)?; + let seen_msg_id: SeenMsgId = Arc::new(Mutex::new(vec![])); + let net_settings = settings.net; // //Raft @@ -149,7 +167,13 @@ async fn realmain(settings: Args, executor: Arc>) -> Result<()> { info!("Accepted client: {}", peer_addr); executor_cloned - .spawn(process(commits.clone(), stream, peer_addr, raft_sender.clone())) + .spawn(process( + commits.clone(), + stream, + peer_addr, + raft_sender.clone(), + seen_msg_id.clone(), + )) .detach(); } }); diff --git a/bin/ircd2/src/server.rs b/bin/ircd2/src/server.rs index 2adbd76bf..736ce44b2 100644 --- a/bin/ircd2/src/server.rs +++ b/bin/ircd2/src/server.rs @@ -32,6 +32,7 @@ impl IrcServerConnection { &mut self, line: String, sender: async_channel::Sender, + seen_msg_id: crate::SeenMsgId, ) -> Result<()> { let mut tokens = line.split_ascii_whitespace(); // Commands can begin with :garbage but we will reject clients doing @@ -92,6 +93,10 @@ impl IrcServerConnection { message: message.to_string(), }; + let mut smi = seen_msg_id.lock().await; + smi.push(random_id); + drop(smi); + sender.send(protocol_msg).await?; } "QUIT" => {