darkirc: revert start IRC after DAG sync and queue user messages until synced successfully

This commit is contained in:
dasman
2024-06-26 14:41:16 +03:00
parent 958361c953
commit 653670fc3f
2 changed files with 60 additions and 28 deletions

View File

@@ -17,7 +17,7 @@
*/
use std::{
collections::{HashMap, HashSet},
collections::{HashMap, HashSet, VecDeque},
sync::{
atomic::{AtomicBool, AtomicUsize, Ordering::SeqCst},
Arc,
@@ -146,6 +146,8 @@ impl Client {
// Our buffer for the client line
let mut line = String::new();
let mut event_queue: VecDeque<_> = VecDeque::new();
loop {
futures::select! {
// Process message from the IRC client
@@ -170,6 +172,36 @@ impl Client {
// This means we add it to our DAG, and the DAG will
// handle the rest of the propagation.
Ok(Some(event)) => {
// If the DAG is not synced yet, queue client lines
// Once synced, send queued lines and continue as normal
if !*self.server.darkirc.event_graph.synced.read().await{
event_queue.push_back(event);
continue
}
if !event_queue.is_empty(){
for _ in 0..event_queue.len(){
let ev = event_queue.pop_front().unwrap();
let event_id = ev.id();
*self.last_sent.write().await = event_id;
// If it fails for some reason, for now, we just note it
// and pass.
if let Err(e) = self.server.darkirc.event_graph.dag_insert(&[ev.clone()]).await {
error!("[IRC CLIENT] Failed inserting new event to DAG: {}", e);
} else {
// We sent this, so it should be considered seen.
if let Err(e) = self.mark_seen(&event_id).await {
error!("[IRC CLIENT] (multiplex_connection) self.mark_seen({}) failed: {}", event_id, e);
return Err(e)
}
// Otherwise, broadcast it
self.server.darkirc.p2p.broadcast(&EventPut(ev)).await;
}
}
}
// Update the last sent event.
let event_id = event.id();
*self.last_sent.write().await = event_id;

View File

@@ -341,6 +341,33 @@ async fn realmain(args: Args, ex: Arc<Executor<'static>>) -> Result<()> {
ex.clone(),
);
info!("Starting IRC server");
let password = args.password.unwrap_or_default();
let config_path = get_config_path(args.config, CONFIG_FILE)?;
let irc_server = IrcServer::new(
darkirc.clone(),
args.irc_listen,
args.irc_tls_cert,
args.irc_tls_secret,
config_path,
password,
)
.await?;
let irc_task = StoppableTask::new();
let ex_ = ex.clone();
irc_task.clone().start(
irc_server.clone().listen(ex_),
|res| async move {
match res {
Ok(()) | Err(Error::DetachedTaskStopped) => { /* TODO: */ }
Err(e) => error!("Failed stopping IRC server: {}", e),
}
},
Error::DetachedTaskStopped,
ex.clone(),
);
info!("Starting P2P network");
p2p.clone().start().await?;
@@ -371,33 +398,6 @@ async fn realmain(args: Args, ex: Arc<Executor<'static>>) -> Result<()> {
*event_graph.synced.write().await = true;
}
info!("Starting IRC server");
let password = args.password.unwrap_or_default();
let config_path = get_config_path(args.config, CONFIG_FILE)?;
let irc_server = IrcServer::new(
darkirc.clone(),
args.irc_listen,
args.irc_tls_cert,
args.irc_tls_secret,
config_path,
password,
)
.await?;
let irc_task = StoppableTask::new();
let ex_ = ex.clone();
irc_task.clone().start(
irc_server.clone().listen(ex_),
|res| async move {
match res {
Ok(()) | Err(Error::DetachedTaskStopped) => { /* TODO: */ }
Err(e) => error!("Failed stopping IRC server: {}", e),
}
},
Error::DetachedTaskStopped,
ex.clone(),
);
// Signal handling for graceful termination.
let (signals_handler, signals_task) = SignalHandler::new(ex)?;
signals_handler.wait_termination(signals_task).await?;