bin/ircd: WIP refactoring and cleaning up main.rs

This commit is contained in:
ghassmo
2022-06-17 04:18:11 +03:00
parent 40e20ba078
commit e1ffadaa15

View File

@@ -68,10 +68,7 @@ fn clean_input(mut line: String, peer_addr: &SocketAddr) -> Result<String> {
Ok(line)
}
async fn process(
// server
stream: TcpStream,
peer_addr: SocketAddr,
struct Ircd {
// msgs
seen_msg_ids: SeenMsgIds,
privmsgs_buffer: PrivmsgsBuffer,
@@ -81,75 +78,102 @@ async fn process(
// p2p
p2p: net::P2pPtr,
p2p_receiver: Receiver<Privmsg>,
) -> Result<()> {
let (reader, writer) = stream.split();
}
let mut reader = BufReader::new(reader);
let mut conn = IrcServerConnection::new(
writer,
seen_msg_ids.clone(),
privmsgs_buffer.clone(),
autojoin_chans,
configured_chans,
p2p.clone(),
);
impl Ircd {
fn new(
seen_msg_ids: SeenMsgIds,
autojoin_chans: Vec<String>,
configured_chans: FxHashMap<String, ChannelInfo>,
p2p: net::P2pPtr,
p2p_receiver: Receiver<Privmsg>,
) -> Self {
let privmsgs_buffer: PrivmsgsBuffer =
Arc::new(Mutex::new(ringbuffer::AllocRingBuffer::with_capacity(SIZE_OF_MSGS_BUFFER)));
loop {
let mut line = String::new();
futures::select! {
privmsg = p2p_receiver.recv().fuse() => {
let mut msg = privmsg?;
info!("Received msg from P2p network: {:?}", msg);
Self { seen_msg_ids, privmsgs_buffer, autojoin_chans, configured_chans, p2p, p2p_receiver }
}
// Try to potentially decrypt the incoming message.
if conn.configured_chans.contains_key(&msg.channel) {
let chan_info = conn.configured_chans.get(&msg.channel).unwrap();
if !chan_info.joined.load(Ordering::Relaxed) {
continue
async fn process(
&self,
executor: Arc<Executor<'_>>,
stream: TcpStream,
peer_addr: SocketAddr,
) -> Result<()> {
let (reader, writer) = stream.split();
let mut reader = BufReader::new(reader);
let mut conn = IrcServerConnection::new(
writer,
self.seen_msg_ids.clone(),
self.privmsgs_buffer.clone(),
self.autojoin_chans.clone(),
self.configured_chans.clone(),
self.p2p.clone(),
);
let p2p_receiver = self.p2p_receiver.clone();
let privmsgs_buffer = self.privmsgs_buffer.clone();
executor.spawn(async move {
loop {
let mut line = String::new();
futures::select! {
privmsg = p2p_receiver.recv().fuse() => {
let mut msg = privmsg?;
info!("Received msg from P2p network: {:?}", msg);
// Try to potentially decrypt the incoming message.
if conn.configured_chans.contains_key(&msg.channel) {
let chan_info = conn.configured_chans.get(&msg.channel).unwrap();
if !chan_info.joined.load(Ordering::Relaxed) {
continue
}
if let Some(salt_box) = &chan_info.salt_box {
if let Some(decrypted_msg) = try_decrypt_message(salt_box, &msg.message) {
msg.message = decrypted_msg;
info!("Decrypted received message: {:?}", msg);
}
}
}
// add the msg to buffer
{
(*privmsgs_buffer.lock().await).push(msg.clone());
}
let irc_msg = build_irc_msg(&msg);
conn.reply(&irc_msg).await?;
}
if let Some(salt_box) = &chan_info.salt_box {
if let Some(decrypted_msg) = try_decrypt_message(salt_box, &msg.message) {
msg.message = decrypted_msg;
info!("Decrypted received message: {:?}", msg);
err = reader.read_line(&mut line).fuse() => {
if let Err(e) = err {
warn!("Read line error. Closing stream for {}: {}", peer_addr, e);
return Ok(())
}
info!("Received msg from IRC client: {:?}", line);
let irc_msg = match clean_input(line, &peer_addr) {
Ok(m) => m,
Err(e) => return Err(e)
};
info!("Send msg to IRC client '{}' from {}", irc_msg, peer_addr);
if let Err(e) = conn.update(irc_msg).await {
warn!("Connection error: {} for {}", e, peer_addr);
return Err(Error::ChannelStopped)
}
}
}
// add the msg to buffer
{
(*privmsgs_buffer.lock().await).push(msg.clone());
}
let irc_msg = build_irc_msg(&msg);
conn.reply(&irc_msg).await?;
}
err = reader.read_line(&mut line).fuse() => {
if let Err(e) = err {
warn!("Read line error. Closing stream for {}: {}", peer_addr, e);
return Ok(())
}
info!("Received msg from IRC client: {:?}", line);
let irc_msg = match clean_input(line, &peer_addr) {
Ok(m) => m,
Err(e) => return Err(e)
};
info!("Send msg to IRC client '{}' from {}", irc_msg, peer_addr);
if let Err(e) = conn.update(irc_msg).await {
warn!("Connection error: {} for {}", e, peer_addr);
return Err(Error::ChannelStopped)
}
}
};
}).detach();
Ok(())
}
}
async_daemonize!(realmain);
async fn realmain(settings: Args, executor: Arc<Executor<'_>>) -> Result<()> {
let seen_msg_ids = Arc::new(Mutex::new(vec![]));
let privmsgs_buffer: PrivmsgsBuffer =
Arc::new(Mutex::new(ringbuffer::AllocRingBuffer::with_capacity(SIZE_OF_MSGS_BUFFER)));
if settings.gen_secret {
let secret_key = crypto_box::SecretKey::generate(&mut OsRng);
@@ -206,29 +230,31 @@ async fn realmain(settings: Args, executor: Arc<Executor<'_>>) -> Result<()> {
let executor_cloned = executor.clone();
executor
.spawn(async move {
let ircd = Ircd::new(
seen_msg_ids.clone(),
settings.autojoin.clone(),
configured_chans.clone(),
p2p.clone(),
p2p_recv_channel.clone(),
);
loop {
let (stream, peer_addr) = match listener.accept().await {
Ok((s, a)) => (s, a),
Err(e) => {
error!("Failed accepting new connections: {}", e);
error!("failed accepting new connections: {}", e);
continue
}
};
info!("IRC Accepted client: {}", peer_addr);
let result = ircd.process(executor_cloned.clone(), stream, peer_addr).await;
executor_cloned
.spawn(process(
stream,
peer_addr,
seen_msg_ids.clone(),
privmsgs_buffer.clone(),
settings.autojoin.clone(),
configured_chans.clone(),
p2p.clone(),
p2p_recv_channel.clone(),
))
.detach();
if let Err(e) = result {
error!("failed process the {} connections: {}", peer_addr, e);
continue
};
info!("IRC Accepted new client: {}", peer_addr);
}
})
.detach();