diff --git a/bin/ircd/src/main.rs b/bin/ircd/src/main.rs index ef8fab2d5..5b9149dea 100644 --- a/bin/ircd/src/main.rs +++ b/bin/ircd/src/main.rs @@ -30,16 +30,13 @@ pub mod protocol_privmsg; pub mod rpc; pub mod server; pub mod settings; -pub mod util; use crate::{ - crypto::try_decrypt_message, privmsg::{Privmsg, PrivmsgsBuffer, SeenMsgIds}, protocol_privmsg::ProtocolPrivmsg, rpc::JsonRpcInterface, server::IrcServerConnection, settings::{parse_configured_channels, Args, ChannelInfo, CONFIG_FILE, CONFIG_FILE_CONTENTS}, - util::clean_input, }; const SIZE_OF_MSGS_BUFFER: usize = 4096; @@ -102,11 +99,13 @@ impl Ircd { let mut reader = BufReader::new(reader); let mut conn = IrcServerConnection::new( writer, + peer_addr, self.seen_msg_ids.clone(), self.privmsgs_buffer.clone(), self.autojoin_chans.clone(), self.configured_chans.clone(), self.p2p.clone(), + self.senders.clone(), ); let (sender, receiver) = async_channel::unbounded(); @@ -124,62 +123,24 @@ impl Ircd { let mut line = String::new(); futures::select! { privmsg = receiver.recv().fuse() => { - let mut msg = privmsg?; + let msg = privmsg?; info!("Received msg from P2p network: {:?}", msg); - - // Try to potentially decrypt the incoming message. - if conn.configured_chans.contains_key(&msg.channel) { - let chan_info = conn.configured_chans.get_mut(&msg.channel).unwrap(); - if !chan_info.joined { - continue - } - - let salt_box = chan_info.salt_box.clone(); - - if salt_box.is_some() { - let decrypted_msg = - try_decrypt_message(&salt_box.unwrap(), &msg.message); - - if decrypted_msg.is_none() { - continue - } - - msg.message = decrypted_msg.unwrap(); - info!("Decrypted received message: {:?}", msg); - - } - - // add the nickname to the channel's names - if !chan_info.names.contains(&msg.nickname) { - chan_info.names.push(msg.nickname.clone()); - } - } - - conn.reply(&msg.to_irc_msg()).await?; + conn.process_msg_from_p2p(&msg).await?; } err = reader.read_line(&mut line).fuse() => { if let Err(e) = err { warn!("Read line error. Closing stream for {}: {}", peer_addr, e); return Ok(()) } - - info!("Received msg from IRC client: {:?}", line); - let irc_msg = match clean_input(line, &peer_addr) { - Ok(m) => m, - Err(e) => return Err(e) - }; - - info!("Send msg to IRC client '{}' from {}", irc_msg, peer_addr); - - if let Err(e) = conn.update(irc_msg).await { - warn!("Connection error: {} for {}", e, peer_addr); + if let Err(e) = conn.process_line_from_client(line).await { + error!("Process line from client failed {}: {}", peer_addr, e); return Err(Error::ChannelStopped) } } }; } }) - .detach(); + .detach(); Ok(()) } diff --git a/bin/ircd/src/server.rs b/bin/ircd/src/server.rs index e8bfa1ac5..2421cfa4c 100644 --- a/bin/ircd/src/server.rs +++ b/bin/ircd/src/server.rs @@ -1,4 +1,8 @@ -use async_std::net::TcpStream; +use async_std::{ + net::TcpStream, + sync::{Arc, Mutex}, +}; +use std::net::SocketAddr; use futures::{io::WriteHalf, AsyncWriteExt}; use fxhash::FxHashMap; @@ -9,7 +13,7 @@ use ringbuffer::RingBufferWrite; use darkfi::{net::P2pPtr, Error, Result}; use crate::{ - crypto::encrypt_message, + crypto::{encrypt_message, try_decrypt_message}, privmsg::{Privmsg, PrivmsgsBuffer, SeenMsgIds}, ChannelInfo, }; @@ -21,6 +25,7 @@ const RPL_NAMEREPLY: u32 = 353; pub struct IrcServerConnection { // server stream write_stream: WriteHalf, + peer_address: SocketAddr, // msg ids seen_msg_ids: SeenMsgIds, privmsgs_buffer: PrivmsgsBuffer, @@ -33,19 +38,23 @@ pub struct IrcServerConnection { pub configured_chans: FxHashMap, // p2p p2p: P2pPtr, + senders: Arc>>>, } impl IrcServerConnection { pub fn new( write_stream: WriteHalf, + peer_address: SocketAddr, seen_msg_ids: SeenMsgIds, privmsgs_buffer: PrivmsgsBuffer, auto_channels: Vec, configured_chans: FxHashMap, p2p: P2pPtr, + senders: Arc>>>, ) -> Self { Self { write_stream, + peer_address, seen_msg_ids, privmsgs_buffer, is_nick_init: false, @@ -55,6 +64,7 @@ impl IrcServerConnection { auto_channels, configured_chans, p2p, + senders, } } @@ -206,6 +216,16 @@ impl IrcServerConnection { (*self.privmsgs_buffer.lock().await).push(protocol_msg.clone()) } + let senders = self.senders.lock().await; + for (peer_addr, sender) in senders.iter() { + if peer_addr == &self.peer_address { + continue + } + // TODO this need more robust design + sender.send(protocol_msg.clone()).await?; + } + drop(senders); + debug!(target: "ircd", "PRIVMSG to be sent: {:?}", protocol_msg); self.p2p.broadcast(protocol_msg).await?; } @@ -263,4 +283,68 @@ impl IrcServerConnection { debug!("Sent {}", message); Ok(()) } + + pub async fn process_msg_from_p2p(&mut self, msg: &Privmsg) -> Result<()> { + let mut msg = msg.clone(); + // Try to potentially decrypt the incoming message. + if self.configured_chans.contains_key(&msg.channel) { + let chan_info = self.configured_chans.get_mut(&msg.channel).unwrap(); + if !chan_info.joined { + return Ok(()) + } + + let salt_box = chan_info.salt_box.clone(); + + if salt_box.is_some() { + let decrypted_msg = try_decrypt_message(&salt_box.unwrap(), &msg.message); + + if decrypted_msg.is_none() { + return Ok(()) + } + + msg.message = decrypted_msg.unwrap(); + info!("Decrypted received message: {:?}", msg); + } + + // add the nickname to the channel's names + if !chan_info.names.contains(&msg.nickname) { + chan_info.names.push(msg.nickname.clone()); + } + } + + self.reply(&msg.to_irc_msg()).await?; + Ok(()) + } + + pub async fn process_line_from_client(&mut self, line: String) -> Result<()> { + info!("Received msg from IRC client: {:?}", line); + let irc_msg = self.clean_input_line(line)?; + + info!("Send msg to IRC client '{}' from {}", irc_msg, self.peer_address); + + if let Err(e) = self.update(irc_msg).await { + warn!("Connection error: {} for {}", e, self.peer_address); + return Err(Error::ChannelStopped) + } + Ok(()) + } + + fn clean_input_line(&self, mut line: String) -> Result { + if line.is_empty() { + warn!("Received empty line from {}. ", self.peer_address); + warn!("Closing connection."); + return Err(Error::ChannelStopped) + } + + if &line[(line.len() - 2)..] != "\r\n" { + warn!("Closing connection."); + return Err(Error::ChannelStopped) + } + + // Remove CRLF + line.pop(); + line.pop(); + + Ok(line.clone()) + } } diff --git a/bin/ircd/src/util.rs b/bin/ircd/src/util.rs deleted file mode 100644 index c16a94653..000000000 --- a/bin/ircd/src/util.rs +++ /dev/null @@ -1,24 +0,0 @@ -use std::net::SocketAddr; - -use log::warn; - -use darkfi::{Error, Result}; - -pub fn clean_input(mut line: String, peer_addr: &SocketAddr) -> Result { - if line.is_empty() { - warn!("Received empty line from {}. ", peer_addr); - warn!("Closing connection."); - return Err(Error::ChannelStopped) - } - - if &line[(line.len() - 2)..] != "\r\n" { - warn!("Closing connection."); - return Err(Error::ChannelStopped) - } - - // Remove CRLF - line.pop(); - line.pop(); - - Ok(line) -}