From 653670fc3fcbb653d7fe29fabd6f1fa2e18ca6cb Mon Sep 17 00:00:00 2001 From: dasman Date: Wed, 26 Jun 2024 14:41:16 +0300 Subject: [PATCH] darkirc: revert start IRC after DAG sync and queue user messages until synced successfully --- bin/darkirc/src/irc/client.rs | 34 +++++++++++++++++++++- bin/darkirc/src/main.rs | 54 +++++++++++++++++------------------ 2 files changed, 60 insertions(+), 28 deletions(-) diff --git a/bin/darkirc/src/irc/client.rs b/bin/darkirc/src/irc/client.rs index 3c3310484..74ea98d2f 100644 --- a/bin/darkirc/src/irc/client.rs +++ b/bin/darkirc/src/irc/client.rs @@ -17,7 +17,7 @@ */ use std::{ - collections::{HashMap, HashSet}, + collections::{HashMap, HashSet, VecDeque}, sync::{ atomic::{AtomicBool, AtomicUsize, Ordering::SeqCst}, Arc, @@ -146,6 +146,8 @@ impl Client { // Our buffer for the client line let mut line = String::new(); + let mut event_queue: VecDeque<_> = VecDeque::new(); + loop { futures::select! { // Process message from the IRC client @@ -170,6 +172,36 @@ impl Client { // This means we add it to our DAG, and the DAG will // handle the rest of the propagation. Ok(Some(event)) => { + // If the DAG is not synced yet, queue client lines + // Once synced, send queued lines and continue as normal + if !*self.server.darkirc.event_graph.synced.read().await{ + event_queue.push_back(event); + continue + } + + if !event_queue.is_empty(){ + for _ in 0..event_queue.len(){ + let ev = event_queue.pop_front().unwrap(); + let event_id = ev.id(); + *self.last_sent.write().await = event_id; + + // If it fails for some reason, for now, we just note it + // and pass. + if let Err(e) = self.server.darkirc.event_graph.dag_insert(&[ev.clone()]).await { + error!("[IRC CLIENT] Failed inserting new event to DAG: {}", e); + } else { + // We sent this, so it should be considered seen. + if let Err(e) = self.mark_seen(&event_id).await { + error!("[IRC CLIENT] (multiplex_connection) self.mark_seen({}) failed: {}", event_id, e); + return Err(e) + } + + // Otherwise, broadcast it + self.server.darkirc.p2p.broadcast(&EventPut(ev)).await; + } + } + } + // Update the last sent event. let event_id = event.id(); *self.last_sent.write().await = event_id; diff --git a/bin/darkirc/src/main.rs b/bin/darkirc/src/main.rs index 412254d1d..d96accb24 100644 --- a/bin/darkirc/src/main.rs +++ b/bin/darkirc/src/main.rs @@ -341,6 +341,33 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> { ex.clone(), ); + info!("Starting IRC server"); + let password = args.password.unwrap_or_default(); + let config_path = get_config_path(args.config, CONFIG_FILE)?; + let irc_server = IrcServer::new( + darkirc.clone(), + args.irc_listen, + args.irc_tls_cert, + args.irc_tls_secret, + config_path, + password, + ) + .await?; + + let irc_task = StoppableTask::new(); + let ex_ = ex.clone(); + irc_task.clone().start( + irc_server.clone().listen(ex_), + |res| async move { + match res { + Ok(()) | Err(Error::DetachedTaskStopped) => { /* TODO: */ } + Err(e) => error!("Failed stopping IRC server: {}", e), + } + }, + Error::DetachedTaskStopped, + ex.clone(), + ); + info!("Starting P2P network"); p2p.clone().start().await?; @@ -371,33 +398,6 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> { *event_graph.synced.write().await = true; } - info!("Starting IRC server"); - let password = args.password.unwrap_or_default(); - let config_path = get_config_path(args.config, CONFIG_FILE)?; - let irc_server = IrcServer::new( - darkirc.clone(), - args.irc_listen, - args.irc_tls_cert, - args.irc_tls_secret, - config_path, - password, - ) - .await?; - - let irc_task = StoppableTask::new(); - let ex_ = ex.clone(); - irc_task.clone().start( - irc_server.clone().listen(ex_), - |res| async move { - match res { - Ok(()) | Err(Error::DetachedTaskStopped) => { /* TODO: */ } - Err(e) => error!("Failed stopping IRC server: {}", e), - } - }, - Error::DetachedTaskStopped, - ex.clone(), - ); - // Signal handling for graceful termination. let (signals_handler, signals_task) = SignalHandler::new(ex)?; signals_handler.wait_termination(signals_task).await?;