From b3f7a9a8769fee69abb09c1a5774e3754c4b129e Mon Sep 17 00:00:00 2001 From: dasman Date: Thu, 11 Jul 2024 04:28:31 +0300 Subject: [PATCH] darkirc: queue user privmsgs and send them once DAG is synced --- bin/darkirc/src/irc/client.rs | 95 +++++++++++++++++++++++------------ bin/darkirc/src/main.rs | 54 ++++++++++---------- 2 files changed, 89 insertions(+), 60 deletions(-) diff --git a/bin/darkirc/src/irc/client.rs b/bin/darkirc/src/irc/client.rs index 3c3310484..c0548d9f5 100644 --- a/bin/darkirc/src/irc/client.rs +++ b/bin/darkirc/src/irc/client.rs @@ -17,7 +17,7 @@ */ use std::{ - collections::{HashMap, HashSet}, + collections::{HashMap, HashSet, VecDeque}, sync::{ atomic::{AtomicBool, AtomicUsize, Ordering::SeqCst}, Arc, @@ -146,6 +146,8 @@ impl Client { // Our buffer for the client line let mut line = String::new(); + let mut args_queue: VecDeque<_> = VecDeque::new(); + loop { futures::select! { // Process message from the IRC client @@ -165,28 +167,30 @@ impl Client { // We'll be strict here and disconnect the client // in case line processing failed in any way. - match self.process_client_line(&line, &mut writer).await { + match self.process_client_line(&line, &mut writer, &mut args_queue).await { // If we got an event back, we should broadcast it. // This means we add it to our DAG, and the DAG will // handle the rest of the propagation. - Ok(Some(event)) => { - // Update the last sent event. - let event_id = event.id(); - *self.last_sent.write().await = event_id; + Ok(Some(events)) => { + for event in events { + // Update the last sent event. + let event_id = event.id(); + *self.last_sent.write().await = event_id; - // If it fails for some reason, for now, we just note it - // and pass. - if let Err(e) = self.server.darkirc.event_graph.dag_insert(&[event.clone()]).await { - error!("[IRC CLIENT] Failed inserting new event to DAG: {}", e); - } else { - // We sent this, so it should be considered seen. - if let Err(e) = self.mark_seen(&event_id).await { - error!("[IRC CLIENT] (multiplex_connection) self.mark_seen({}) failed: {}", event_id, e); - return Err(e) + // If it fails for some reason, for now, we just note it + // and pass. + if let Err(e) = self.server.darkirc.event_graph.dag_insert(&[event.clone()]).await { + error!("[IRC CLIENT] Failed inserting new event to DAG: {}", e); + } else { + // We sent this, so it should be considered seen. + if let Err(e) = self.mark_seen(&event_id).await { + error!("[IRC CLIENT] (multiplex_connection) self.mark_seen({}) failed: {}", event_id, e); + return Err(e) + } + + // Otherwise, broadcast it + self.server.darkirc.p2p.broadcast(&EventPut(event)).await; } - - // Otherwise, broadcast it - self.server.darkirc.p2p.broadcast(&EventPut(event)).await; } } @@ -302,7 +306,12 @@ impl Client { } /// Handle the incoming line given sent by the IRC client - async fn process_client_line(&self, line: &str, writer: &mut W) -> Result> + async fn process_client_line( + &self, + line: &str, + writer: &mut W, + args_queue: &mut VecDeque, + ) -> Result>> where W: AsyncWrite + Unpin, { @@ -372,28 +381,48 @@ impl Client { // TODO: the best place to do it. Patches welcome. It's also a bit fragile // since we assume that `handle_cmd_privmsg()` won't return any replies. if cmd.as_str() == "PRIVMSG" && replies.is_empty() { - let channel = args.split_ascii_whitespace().next().unwrap().to_string(); - let msg_offset = args.find(':').unwrap() + 1; - let (_, msg) = args.split_at(msg_offset); - let mut privmsg = Privmsg { - channel, - nick: self.nickname.read().await.to_string(), - msg: msg.to_string(), - }; + // If the DAG is not synced yet, queue client lines + // Once synced, send queued lines and continue as normal + if !*self.server.darkirc.event_graph.synced.read().await { + debug!("DAG is still syncing, queuing and skipping..."); + args_queue.push_back(args); + return Ok(None) + } - // Encrypt the Privmsg if an encryption method is available. - self.server.try_encrypt(&mut privmsg).await; + // Check if we have queued PRIVMSGs, if we do send all of them first. + let mut pending_events = vec![]; + if !args_queue.is_empty() { + for _ in 0..args_queue.len() { + let args = args_queue.pop_front().unwrap(); + pending_events.push(self.privmsg_to_event(args).await); + } + return Ok(Some(pending_events)) + } - // Build a DAG event and return it. - let event = - Event::new(serialize_async(&privmsg).await, &self.server.darkirc.event_graph).await; + // If queue is empty, create an event and return it + let event = self.privmsg_to_event(args).await; - return Ok(Some(event)) + return Ok(Some(vec![event])) } Ok(None) } + // Internal helper function that creates an Event from PRIVMSG arguments + async fn privmsg_to_event(&self, args: String) -> Event { + let channel = args.split_ascii_whitespace().next().unwrap().to_string(); + let msg_offset = args.find(':').unwrap() + 1; + let (_, msg) = args.split_at(msg_offset); + let mut privmsg = + Privmsg { channel, nick: self.nickname.read().await.to_string(), msg: msg.to_string() }; + + // Encrypt the Privmsg if an encryption method is available. + self.server.try_encrypt(&mut privmsg).await; + + // Build a DAG event and return it. + Event::new(serialize_async(&privmsg).await, &self.server.darkirc.event_graph).await + } + /// Atomically mark a message as seen for this client. pub async fn mark_seen(&self, event_id: &blake3::Hash) -> Result<()> { let db = self diff --git a/bin/darkirc/src/main.rs b/bin/darkirc/src/main.rs index b528c6d28..3e2c39819 100644 --- a/bin/darkirc/src/main.rs +++ b/bin/darkirc/src/main.rs @@ -341,6 +341,33 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> { ex.clone(), ); + info!("Starting IRC server"); + let password = args.password.unwrap_or_default(); + let config_path = get_config_path(args.config, CONFIG_FILE)?; + let irc_server = IrcServer::new( + darkirc.clone(), + args.irc_listen, + args.irc_tls_cert, + args.irc_tls_secret, + config_path, + password, + ) + .await?; + + let irc_task = StoppableTask::new(); + let ex_ = ex.clone(); + irc_task.clone().start( + irc_server.clone().listen(ex_), + |res| async move { + match res { + Ok(()) | Err(Error::DetachedTaskStopped) => { /* TODO: */ } + Err(e) => error!("Failed stopping IRC server: {}", e), + } + }, + Error::DetachedTaskStopped, + ex.clone(), + ); + info!("Starting P2P network"); p2p.clone().start().await?; @@ -371,33 +398,6 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> { *event_graph.synced.write().await = true; } - info!("Starting IRC server"); - let password = args.password.unwrap_or_default(); - let config_path = get_config_path(args.config, CONFIG_FILE)?; - let irc_server = IrcServer::new( - darkirc.clone(), - args.irc_listen, - args.irc_tls_cert, - args.irc_tls_secret, - config_path, - password, - ) - .await?; - - let irc_task = StoppableTask::new(); - let ex_ = ex.clone(); - irc_task.clone().start( - irc_server.clone().listen(ex_), - |res| async move { - match res { - Ok(()) | Err(Error::DetachedTaskStopped) => { /* TODO: */ } - Err(e) => error!("Failed stopping IRC server: {}", e), - } - }, - Error::DetachedTaskStopped, - ex.clone(), - ); - // Signal handling for graceful termination. let (signals_handler, signals_task) = SignalHandler::new(ex)?; signals_handler.wait_termination(signals_task).await?;