bin/ircd: refactoring irc server connection

This commit is contained in:
ghassmo
2022-06-25 14:40:01 +03:00
parent fcc6964692
commit 7a54e4172f
3 changed files with 93 additions and 72 deletions

View File

@@ -30,16 +30,13 @@ pub mod protocol_privmsg;
pub mod rpc;
pub mod server;
pub mod settings;
pub mod util;
use crate::{
crypto::try_decrypt_message,
privmsg::{Privmsg, PrivmsgsBuffer, SeenMsgIds},
protocol_privmsg::ProtocolPrivmsg,
rpc::JsonRpcInterface,
server::IrcServerConnection,
settings::{parse_configured_channels, Args, ChannelInfo, CONFIG_FILE, CONFIG_FILE_CONTENTS},
util::clean_input,
};
const SIZE_OF_MSGS_BUFFER: usize = 4096;
@@ -102,11 +99,13 @@ impl Ircd {
let mut reader = BufReader::new(reader);
let mut conn = IrcServerConnection::new(
writer,
peer_addr,
self.seen_msg_ids.clone(),
self.privmsgs_buffer.clone(),
self.autojoin_chans.clone(),
self.configured_chans.clone(),
self.p2p.clone(),
self.senders.clone(),
);
let (sender, receiver) = async_channel::unbounded();
@@ -124,62 +123,24 @@ impl Ircd {
let mut line = String::new();
futures::select! {
privmsg = receiver.recv().fuse() => {
let mut msg = privmsg?;
let msg = privmsg?;
info!("Received msg from P2p network: {:?}", msg);
// Try to potentially decrypt the incoming message.
if conn.configured_chans.contains_key(&msg.channel) {
let chan_info = conn.configured_chans.get_mut(&msg.channel).unwrap();
if !chan_info.joined {
continue
}
let salt_box = chan_info.salt_box.clone();
if salt_box.is_some() {
let decrypted_msg =
try_decrypt_message(&salt_box.unwrap(), &msg.message);
if decrypted_msg.is_none() {
continue
}
msg.message = decrypted_msg.unwrap();
info!("Decrypted received message: {:?}", msg);
}
// add the nickname to the channel's names
if !chan_info.names.contains(&msg.nickname) {
chan_info.names.push(msg.nickname.clone());
}
}
conn.reply(&msg.to_irc_msg()).await?;
conn.process_msg_from_p2p(&msg).await?;
}
err = reader.read_line(&mut line).fuse() => {
if let Err(e) = err {
warn!("Read line error. Closing stream for {}: {}", peer_addr, e);
return Ok(())
}
info!("Received msg from IRC client: {:?}", line);
let irc_msg = match clean_input(line, &peer_addr) {
Ok(m) => m,
Err(e) => return Err(e)
};
info!("Send msg to IRC client '{}' from {}", irc_msg, peer_addr);
if let Err(e) = conn.update(irc_msg).await {
warn!("Connection error: {} for {}", e, peer_addr);
if let Err(e) = conn.process_line_from_client(line).await {
error!("Process line from client failed {}: {}", peer_addr, e);
return Err(Error::ChannelStopped)
}
}
};
}
})
.detach();
.detach();
Ok(())
}

View File

@@ -1,4 +1,8 @@
use async_std::net::TcpStream;
use async_std::{
net::TcpStream,
sync::{Arc, Mutex},
};
use std::net::SocketAddr;
use futures::{io::WriteHalf, AsyncWriteExt};
use fxhash::FxHashMap;
@@ -9,7 +13,7 @@ use ringbuffer::RingBufferWrite;
use darkfi::{net::P2pPtr, Error, Result};
use crate::{
crypto::encrypt_message,
crypto::{encrypt_message, try_decrypt_message},
privmsg::{Privmsg, PrivmsgsBuffer, SeenMsgIds},
ChannelInfo,
};
@@ -21,6 +25,7 @@ const RPL_NAMEREPLY: u32 = 353;
pub struct IrcServerConnection {
// server stream
write_stream: WriteHalf<TcpStream>,
peer_address: SocketAddr,
// msg ids
seen_msg_ids: SeenMsgIds,
privmsgs_buffer: PrivmsgsBuffer,
@@ -33,19 +38,23 @@ pub struct IrcServerConnection {
pub configured_chans: FxHashMap<String, ChannelInfo>,
// p2p
p2p: P2pPtr,
senders: Arc<Mutex<FxHashMap<SocketAddr, async_channel::Sender<Privmsg>>>>,
}
impl IrcServerConnection {
pub fn new(
write_stream: WriteHalf<TcpStream>,
peer_address: SocketAddr,
seen_msg_ids: SeenMsgIds,
privmsgs_buffer: PrivmsgsBuffer,
auto_channels: Vec<String>,
configured_chans: FxHashMap<String, ChannelInfo>,
p2p: P2pPtr,
senders: Arc<Mutex<FxHashMap<SocketAddr, async_channel::Sender<Privmsg>>>>,
) -> Self {
Self {
write_stream,
peer_address,
seen_msg_ids,
privmsgs_buffer,
is_nick_init: false,
@@ -55,6 +64,7 @@ impl IrcServerConnection {
auto_channels,
configured_chans,
p2p,
senders,
}
}
@@ -206,6 +216,16 @@ impl IrcServerConnection {
(*self.privmsgs_buffer.lock().await).push(protocol_msg.clone())
}
let senders = self.senders.lock().await;
for (peer_addr, sender) in senders.iter() {
if peer_addr == &self.peer_address {
continue
}
// TODO this need more robust design
sender.send(protocol_msg.clone()).await?;
}
drop(senders);
debug!(target: "ircd", "PRIVMSG to be sent: {:?}", protocol_msg);
self.p2p.broadcast(protocol_msg).await?;
}
@@ -263,4 +283,68 @@ impl IrcServerConnection {
debug!("Sent {}", message);
Ok(())
}
pub async fn process_msg_from_p2p(&mut self, msg: &Privmsg) -> Result<()> {
let mut msg = msg.clone();
// Try to potentially decrypt the incoming message.
if self.configured_chans.contains_key(&msg.channel) {
let chan_info = self.configured_chans.get_mut(&msg.channel).unwrap();
if !chan_info.joined {
return Ok(())
}
let salt_box = chan_info.salt_box.clone();
if salt_box.is_some() {
let decrypted_msg = try_decrypt_message(&salt_box.unwrap(), &msg.message);
if decrypted_msg.is_none() {
return Ok(())
}
msg.message = decrypted_msg.unwrap();
info!("Decrypted received message: {:?}", msg);
}
// add the nickname to the channel's names
if !chan_info.names.contains(&msg.nickname) {
chan_info.names.push(msg.nickname.clone());
}
}
self.reply(&msg.to_irc_msg()).await?;
Ok(())
}
pub async fn process_line_from_client(&mut self, line: String) -> Result<()> {
info!("Received msg from IRC client: {:?}", line);
let irc_msg = self.clean_input_line(line)?;
info!("Send msg to IRC client '{}' from {}", irc_msg, self.peer_address);
if let Err(e) = self.update(irc_msg).await {
warn!("Connection error: {} for {}", e, self.peer_address);
return Err(Error::ChannelStopped)
}
Ok(())
}
fn clean_input_line(&self, mut line: String) -> Result<String> {
if line.is_empty() {
warn!("Received empty line from {}. ", self.peer_address);
warn!("Closing connection.");
return Err(Error::ChannelStopped)
}
if &line[(line.len() - 2)..] != "\r\n" {
warn!("Closing connection.");
return Err(Error::ChannelStopped)
}
// Remove CRLF
line.pop();
line.pop();
Ok(line.clone())
}
}

View File

@@ -1,24 +0,0 @@
use std::net::SocketAddr;
use log::warn;
use darkfi::{Error, Result};
pub fn clean_input(mut line: String, peer_addr: &SocketAddr) -> Result<String> {
if line.is_empty() {
warn!("Received empty line from {}. ", peer_addr);
warn!("Closing connection.");
return Err(Error::ChannelStopped)
}
if &line[(line.len() - 2)..] != "\r\n" {
warn!("Closing connection.");
return Err(Error::ChannelStopped)
}
// Remove CRLF
line.pop();
line.pop();
Ok(line)
}