darkirc: queue user privmsgs and send them once DAG is synced

This commit is contained in:
dasman
2024-07-11 04:28:31 +03:00
parent 327c435e8f
commit b3f7a9a876
2 changed files with 89 additions and 60 deletions

View File

@@ -17,7 +17,7 @@
*/
use std::{
collections::{HashMap, HashSet},
collections::{HashMap, HashSet, VecDeque},
sync::{
atomic::{AtomicBool, AtomicUsize, Ordering::SeqCst},
Arc,
@@ -146,6 +146,8 @@ impl Client {
// Our buffer for the client line
let mut line = String::new();
let mut args_queue: VecDeque<_> = VecDeque::new();
loop {
futures::select! {
// Process message from the IRC client
@@ -165,28 +167,30 @@ impl Client {
// We'll be strict here and disconnect the client
// in case line processing failed in any way.
match self.process_client_line(&line, &mut writer).await {
match self.process_client_line(&line, &mut writer, &mut args_queue).await {
// If we got an event back, we should broadcast it.
// This means we add it to our DAG, and the DAG will
// handle the rest of the propagation.
Ok(Some(event)) => {
// Update the last sent event.
let event_id = event.id();
*self.last_sent.write().await = event_id;
Ok(Some(events)) => {
for event in events {
// Update the last sent event.
let event_id = event.id();
*self.last_sent.write().await = event_id;
// If it fails for some reason, for now, we just note it
// and pass.
if let Err(e) = self.server.darkirc.event_graph.dag_insert(&[event.clone()]).await {
error!("[IRC CLIENT] Failed inserting new event to DAG: {}", e);
} else {
// We sent this, so it should be considered seen.
if let Err(e) = self.mark_seen(&event_id).await {
error!("[IRC CLIENT] (multiplex_connection) self.mark_seen({}) failed: {}", event_id, e);
return Err(e)
// If it fails for some reason, for now, we just note it
// and pass.
if let Err(e) = self.server.darkirc.event_graph.dag_insert(&[event.clone()]).await {
error!("[IRC CLIENT] Failed inserting new event to DAG: {}", e);
} else {
// We sent this, so it should be considered seen.
if let Err(e) = self.mark_seen(&event_id).await {
error!("[IRC CLIENT] (multiplex_connection) self.mark_seen({}) failed: {}", event_id, e);
return Err(e)
}
// Otherwise, broadcast it
self.server.darkirc.p2p.broadcast(&EventPut(event)).await;
}
// Otherwise, broadcast it
self.server.darkirc.p2p.broadcast(&EventPut(event)).await;
}
}
@@ -302,7 +306,12 @@ impl Client {
}
/// Handle the incoming line given sent by the IRC client
async fn process_client_line<W>(&self, line: &str, writer: &mut W) -> Result<Option<Event>>
async fn process_client_line<W>(
&self,
line: &str,
writer: &mut W,
args_queue: &mut VecDeque<String>,
) -> Result<Option<Vec<Event>>>
where
W: AsyncWrite + Unpin,
{
@@ -372,28 +381,48 @@ impl Client {
// TODO: the best place to do it. Patches welcome. It's also a bit fragile
// since we assume that `handle_cmd_privmsg()` won't return any replies.
if cmd.as_str() == "PRIVMSG" && replies.is_empty() {
let channel = args.split_ascii_whitespace().next().unwrap().to_string();
let msg_offset = args.find(':').unwrap() + 1;
let (_, msg) = args.split_at(msg_offset);
let mut privmsg = Privmsg {
channel,
nick: self.nickname.read().await.to_string(),
msg: msg.to_string(),
};
// If the DAG is not synced yet, queue client lines
// Once synced, send queued lines and continue as normal
if !*self.server.darkirc.event_graph.synced.read().await {
debug!("DAG is still syncing, queuing and skipping...");
args_queue.push_back(args);
return Ok(None)
}
// Encrypt the Privmsg if an encryption method is available.
self.server.try_encrypt(&mut privmsg).await;
// Check if we have queued PRIVMSGs, if we do send all of them first.
let mut pending_events = vec![];
if !args_queue.is_empty() {
for _ in 0..args_queue.len() {
let args = args_queue.pop_front().unwrap();
pending_events.push(self.privmsg_to_event(args).await);
}
return Ok(Some(pending_events))
}
// Build a DAG event and return it.
let event =
Event::new(serialize_async(&privmsg).await, &self.server.darkirc.event_graph).await;
// If queue is empty, create an event and return it
let event = self.privmsg_to_event(args).await;
return Ok(Some(event))
return Ok(Some(vec![event]))
}
Ok(None)
}
// Internal helper function that creates an Event from PRIVMSG arguments
async fn privmsg_to_event(&self, args: String) -> Event {
let channel = args.split_ascii_whitespace().next().unwrap().to_string();
let msg_offset = args.find(':').unwrap() + 1;
let (_, msg) = args.split_at(msg_offset);
let mut privmsg =
Privmsg { channel, nick: self.nickname.read().await.to_string(), msg: msg.to_string() };
// Encrypt the Privmsg if an encryption method is available.
self.server.try_encrypt(&mut privmsg).await;
// Build a DAG event and return it.
Event::new(serialize_async(&privmsg).await, &self.server.darkirc.event_graph).await
}
/// Atomically mark a message as seen for this client.
pub async fn mark_seen(&self, event_id: &blake3::Hash) -> Result<()> {
let db = self

View File

@@ -341,6 +341,33 @@ async fn realmain(args: Args, ex: Arc<Executor<'static>>) -> Result<()> {
ex.clone(),
);
info!("Starting IRC server");
let password = args.password.unwrap_or_default();
let config_path = get_config_path(args.config, CONFIG_FILE)?;
let irc_server = IrcServer::new(
darkirc.clone(),
args.irc_listen,
args.irc_tls_cert,
args.irc_tls_secret,
config_path,
password,
)
.await?;
let irc_task = StoppableTask::new();
let ex_ = ex.clone();
irc_task.clone().start(
irc_server.clone().listen(ex_),
|res| async move {
match res {
Ok(()) | Err(Error::DetachedTaskStopped) => { /* TODO: */ }
Err(e) => error!("Failed stopping IRC server: {}", e),
}
},
Error::DetachedTaskStopped,
ex.clone(),
);
info!("Starting P2P network");
p2p.clone().start().await?;
@@ -371,33 +398,6 @@ async fn realmain(args: Args, ex: Arc<Executor<'static>>) -> Result<()> {
*event_graph.synced.write().await = true;
}
info!("Starting IRC server");
let password = args.password.unwrap_or_default();
let config_path = get_config_path(args.config, CONFIG_FILE)?;
let irc_server = IrcServer::new(
darkirc.clone(),
args.irc_listen,
args.irc_tls_cert,
args.irc_tls_secret,
config_path,
password,
)
.await?;
let irc_task = StoppableTask::new();
let ex_ = ex.clone();
irc_task.clone().start(
irc_server.clone().listen(ex_),
|res| async move {
match res {
Ok(()) | Err(Error::DetachedTaskStopped) => { /* TODO: */ }
Err(e) => error!("Failed stopping IRC server: {}", e),
}
},
Error::DetachedTaskStopped,
ex.clone(),
);
// Signal handling for graceful termination.
let (signals_handler, signals_task) = SignalHandler::new(ex)?;
signals_handler.wait_termination(signals_task).await?;