mirror of
https://github.com/darkrenaissance/darkfi.git
synced 2026-04-28 03:00:18 -04:00
ircd2: avoid duplicate messages
This commit is contained in:
@@ -1,5 +1,8 @@
|
||||
use async_std::net::{TcpListener, TcpStream};
|
||||
use std::{net::SocketAddr, sync::Arc};
|
||||
use async_std::{
|
||||
net::{TcpListener, TcpStream},
|
||||
sync::{Arc, Mutex},
|
||||
};
|
||||
use std::net::SocketAddr;
|
||||
|
||||
use async_channel::Receiver;
|
||||
use async_executor::Executor;
|
||||
@@ -33,11 +36,14 @@ use crate::{
|
||||
settings::{Args, CONFIG_FILE, CONFIG_FILE_CONTENTS},
|
||||
};
|
||||
|
||||
pub type SeenMsgId = Arc<Mutex<Vec<u32>>>;
|
||||
|
||||
async fn process_user_input(
|
||||
mut line: String,
|
||||
peer_addr: SocketAddr,
|
||||
conn: &mut IrcServerConnection,
|
||||
sender: async_channel::Sender<Privmsg>,
|
||||
seen_msg_id: SeenMsgId,
|
||||
) -> Result<()> {
|
||||
if line.is_empty() {
|
||||
warn!("Received empty line from {}. Closing connection.", peer_addr);
|
||||
@@ -51,7 +57,7 @@ async fn process_user_input(
|
||||
|
||||
debug!("Received '{}' from {}", line, peer_addr);
|
||||
|
||||
if let Err(e) = conn.update(line, sender).await {
|
||||
if let Err(e) = conn.update(line, sender, seen_msg_id).await {
|
||||
warn!("Connection error: {} for {}", e, peer_addr);
|
||||
return Err(Error::ChannelStopped)
|
||||
}
|
||||
@@ -64,6 +70,7 @@ async fn process(
|
||||
stream: TcpStream,
|
||||
peer_addr: SocketAddr,
|
||||
sender: async_channel::Sender<Privmsg>,
|
||||
seen_msg_id: SeenMsgId,
|
||||
) -> Result<()> {
|
||||
let (reader, writer) = stream.split();
|
||||
|
||||
@@ -75,6 +82,15 @@ async fn process(
|
||||
futures::select! {
|
||||
privmsg = receiver.recv().fuse() => {
|
||||
let msg = privmsg?;
|
||||
|
||||
let mut smi = seen_msg_id.lock().await;
|
||||
if smi.contains(&msg.id) {
|
||||
continue
|
||||
}
|
||||
|
||||
smi.push(msg.id);
|
||||
drop(smi);
|
||||
|
||||
debug!("ABOUT TO SEND: {:?}", msg);
|
||||
let irc_msg = format!(":{}!anon@dark.fi PRIVMSG {} :{}\r\n",
|
||||
msg.nickname,
|
||||
@@ -91,7 +107,7 @@ async fn process(
|
||||
return Ok(())
|
||||
}
|
||||
|
||||
process_user_input(line, peer_addr, &mut conn, sender.clone()).await?;
|
||||
process_user_input(line, peer_addr, &mut conn, sender.clone(), seen_msg_id.clone()).await?;
|
||||
}
|
||||
};
|
||||
}
|
||||
@@ -105,6 +121,8 @@ async fn realmain(settings: Args, executor: Arc<Executor<'_>>) -> Result<()> {
|
||||
|
||||
let datastore_path = expand_path(&settings.datastore)?;
|
||||
|
||||
let seen_msg_id: SeenMsgId = Arc::new(Mutex::new(vec![]));
|
||||
|
||||
let net_settings = settings.net;
|
||||
//
|
||||
//Raft
|
||||
@@ -149,7 +167,13 @@ async fn realmain(settings: Args, executor: Arc<Executor<'_>>) -> Result<()> {
|
||||
info!("Accepted client: {}", peer_addr);
|
||||
|
||||
executor_cloned
|
||||
.spawn(process(commits.clone(), stream, peer_addr, raft_sender.clone()))
|
||||
.spawn(process(
|
||||
commits.clone(),
|
||||
stream,
|
||||
peer_addr,
|
||||
raft_sender.clone(),
|
||||
seen_msg_id.clone(),
|
||||
))
|
||||
.detach();
|
||||
}
|
||||
});
|
||||
|
||||
@@ -32,6 +32,7 @@ impl IrcServerConnection {
|
||||
&mut self,
|
||||
line: String,
|
||||
sender: async_channel::Sender<Privmsg>,
|
||||
seen_msg_id: crate::SeenMsgId,
|
||||
) -> Result<()> {
|
||||
let mut tokens = line.split_ascii_whitespace();
|
||||
// Commands can begin with :garbage but we will reject clients doing
|
||||
@@ -92,6 +93,10 @@ impl IrcServerConnection {
|
||||
message: message.to_string(),
|
||||
};
|
||||
|
||||
let mut smi = seen_msg_id.lock().await;
|
||||
smi.push(random_id);
|
||||
drop(smi);
|
||||
|
||||
sender.send(protocol_msg).await?;
|
||||
}
|
||||
"QUIT" => {
|
||||
|
||||
Reference in New Issue
Block a user