diff --git a/bin/ircd/src/main.rs b/bin/ircd/src/main.rs index 30e5f1d92..dae7d63fc 100644 --- a/bin/ircd/src/main.rs +++ b/bin/ircd/src/main.rs @@ -68,10 +68,7 @@ fn clean_input(mut line: String, peer_addr: &SocketAddr) -> Result { Ok(line) } -async fn process( - // server - stream: TcpStream, - peer_addr: SocketAddr, +struct Ircd { // msgs seen_msg_ids: SeenMsgIds, privmsgs_buffer: PrivmsgsBuffer, @@ -81,75 +78,102 @@ async fn process( // p2p p2p: net::P2pPtr, p2p_receiver: Receiver, -) -> Result<()> { - let (reader, writer) = stream.split(); +} - let mut reader = BufReader::new(reader); - let mut conn = IrcServerConnection::new( - writer, - seen_msg_ids.clone(), - privmsgs_buffer.clone(), - autojoin_chans, - configured_chans, - p2p.clone(), - ); +impl Ircd { + fn new( + seen_msg_ids: SeenMsgIds, + autojoin_chans: Vec, + configured_chans: FxHashMap, + p2p: net::P2pPtr, + p2p_receiver: Receiver, + ) -> Self { + let privmsgs_buffer: PrivmsgsBuffer = + Arc::new(Mutex::new(ringbuffer::AllocRingBuffer::with_capacity(SIZE_OF_MSGS_BUFFER))); - loop { - let mut line = String::new(); - futures::select! { - privmsg = p2p_receiver.recv().fuse() => { - let mut msg = privmsg?; - info!("Received msg from P2p network: {:?}", msg); + Self { seen_msg_ids, privmsgs_buffer, autojoin_chans, configured_chans, p2p, p2p_receiver } + } - // Try to potentially decrypt the incoming message. - if conn.configured_chans.contains_key(&msg.channel) { - let chan_info = conn.configured_chans.get(&msg.channel).unwrap(); - if !chan_info.joined.load(Ordering::Relaxed) { - continue + async fn process( + &self, + executor: Arc>, + stream: TcpStream, + peer_addr: SocketAddr, + ) -> Result<()> { + let (reader, writer) = stream.split(); + + let mut reader = BufReader::new(reader); + let mut conn = IrcServerConnection::new( + writer, + self.seen_msg_ids.clone(), + self.privmsgs_buffer.clone(), + self.autojoin_chans.clone(), + self.configured_chans.clone(), + self.p2p.clone(), + ); + + let p2p_receiver = self.p2p_receiver.clone(); + let privmsgs_buffer = self.privmsgs_buffer.clone(); + + executor.spawn(async move { + loop { + let mut line = String::new(); + futures::select! { + privmsg = p2p_receiver.recv().fuse() => { + let mut msg = privmsg?; + info!("Received msg from P2p network: {:?}", msg); + + // Try to potentially decrypt the incoming message. + if conn.configured_chans.contains_key(&msg.channel) { + let chan_info = conn.configured_chans.get(&msg.channel).unwrap(); + if !chan_info.joined.load(Ordering::Relaxed) { + continue + } + if let Some(salt_box) = &chan_info.salt_box { + if let Some(decrypted_msg) = try_decrypt_message(salt_box, &msg.message) { + msg.message = decrypted_msg; + info!("Decrypted received message: {:?}", msg); + } + } + } + + // add the msg to buffer + { + (*privmsgs_buffer.lock().await).push(msg.clone()); + } + + let irc_msg = build_irc_msg(&msg); + conn.reply(&irc_msg).await?; } - if let Some(salt_box) = &chan_info.salt_box { - if let Some(decrypted_msg) = try_decrypt_message(salt_box, &msg.message) { - msg.message = decrypted_msg; - info!("Decrypted received message: {:?}", msg); + err = reader.read_line(&mut line).fuse() => { + if let Err(e) = err { + warn!("Read line error. Closing stream for {}: {}", peer_addr, e); + return Ok(()) + } + info!("Received msg from IRC client: {:?}", line); + let irc_msg = match clean_input(line, &peer_addr) { + Ok(m) => m, + Err(e) => return Err(e) + }; + + info!("Send msg to IRC client '{}' from {}", irc_msg, peer_addr); + + if let Err(e) = conn.update(irc_msg).await { + warn!("Connection error: {} for {}", e, peer_addr); + return Err(Error::ChannelStopped) } } - } - - // add the msg to buffer - { - (*privmsgs_buffer.lock().await).push(msg.clone()); - } - - let irc_msg = build_irc_msg(&msg); - conn.reply(&irc_msg).await?; - } - err = reader.read_line(&mut line).fuse() => { - if let Err(e) = err { - warn!("Read line error. Closing stream for {}: {}", peer_addr, e); - return Ok(()) - } - info!("Received msg from IRC client: {:?}", line); - let irc_msg = match clean_input(line, &peer_addr) { - Ok(m) => m, - Err(e) => return Err(e) }; - - info!("Send msg to IRC client '{}' from {}", irc_msg, peer_addr); - - if let Err(e) = conn.update(irc_msg).await { - warn!("Connection error: {} for {}", e, peer_addr); - return Err(Error::ChannelStopped) - } } - }; + }).detach(); + + Ok(()) } } async_daemonize!(realmain); async fn realmain(settings: Args, executor: Arc>) -> Result<()> { let seen_msg_ids = Arc::new(Mutex::new(vec![])); - let privmsgs_buffer: PrivmsgsBuffer = - Arc::new(Mutex::new(ringbuffer::AllocRingBuffer::with_capacity(SIZE_OF_MSGS_BUFFER))); if settings.gen_secret { let secret_key = crypto_box::SecretKey::generate(&mut OsRng); @@ -206,29 +230,31 @@ async fn realmain(settings: Args, executor: Arc>) -> Result<()> { let executor_cloned = executor.clone(); executor .spawn(async move { + let ircd = Ircd::new( + seen_msg_ids.clone(), + settings.autojoin.clone(), + configured_chans.clone(), + p2p.clone(), + p2p_recv_channel.clone(), + ); + loop { let (stream, peer_addr) = match listener.accept().await { Ok((s, a)) => (s, a), Err(e) => { - error!("Failed accepting new connections: {}", e); + error!("failed accepting new connections: {}", e); continue } }; - info!("IRC Accepted client: {}", peer_addr); + let result = ircd.process(executor_cloned.clone(), stream, peer_addr).await; - executor_cloned - .spawn(process( - stream, - peer_addr, - seen_msg_ids.clone(), - privmsgs_buffer.clone(), - settings.autojoin.clone(), - configured_chans.clone(), - p2p.clone(), - p2p_recv_channel.clone(), - )) - .detach(); + if let Err(e) = result { + error!("failed process the {} connections: {}", peer_addr, e); + continue + }; + + info!("IRC Accepted new client: {}", peer_addr); } }) .detach();