From 06db0502294afa06ee400e4429ef4742067f563a Mon Sep 17 00:00:00 2001 From: ghassmo Date: Tue, 28 Jun 2022 01:43:53 +0300 Subject: [PATCH] bin/ircd: send messages in buffer to the client after JOIN command --- bin/ircd/src/main.rs | 6 ------ bin/ircd/src/server.rs | 9 ++++++++- src/system/subscriber.rs | 25 +++++++++++++------------ 3 files changed, 21 insertions(+), 19 deletions(-) diff --git a/bin/ircd/src/main.rs b/bin/ircd/src/main.rs index c529f900b..473ff9b1f 100644 --- a/bin/ircd/src/main.rs +++ b/bin/ircd/src/main.rs @@ -10,7 +10,6 @@ use futures::{io::BufReader, AsyncBufReadExt, AsyncReadExt, FutureExt}; use fxhash::FxHashMap; use log::{error, info, warn}; use rand::rngs::OsRng; -use ringbuffer::RingBufferExt; use smol::future; use structopt_toml::StructOptToml; @@ -104,11 +103,6 @@ impl Ircd { receiver.get_id(), ); - // Send messages in buffer - for msg in self.privmsgs_buffer.lock().await.to_vec() { - receiver.self_notify(msg).await; - } - executor .spawn(async move { loop { diff --git a/bin/ircd/src/server.rs b/bin/ircd/src/server.rs index 46eaac9ed..1ebca300d 100644 --- a/bin/ircd/src/server.rs +++ b/bin/ircd/src/server.rs @@ -5,7 +5,7 @@ use futures::{io::WriteHalf, AsyncWriteExt}; use fxhash::FxHashMap; use log::{debug, info, warn}; use rand::{rngs::OsRng, RngCore}; -use ringbuffer::RingBufferWrite; +use ringbuffer::{RingBufferExt, RingBufferWrite}; use darkfi::{net::P2pPtr, system::SubscriberPtr, Error, Result}; @@ -132,6 +132,13 @@ impl IrcServerConnection { let chan_info = self.configured_chans.get_mut(chan).unwrap(); chan_info.joined = true; } + + // Send messages in buffer + for msg in self.privmsgs_buffer.lock().await.to_vec() { + if msg.channel == chan { + self.senders.notify_with_id(msg, self.subscriber_id).await; + } + } } } "PART" => { diff --git a/src/system/subscriber.rs b/src/system/subscriber.rs index 16e2aa47f..ea8f71bd4 100644 --- a/src/system/subscriber.rs +++ b/src/system/subscriber.rs @@ -11,7 +11,6 @@ pub type SubscriptionId = u64; pub struct Subscription { id: SubscriptionId, recv_queue: async_channel::Receiver, - send_queue: async_channel::Sender, parent: Arc>, } @@ -31,15 +30,6 @@ impl Subscription { } } - pub async fn self_notify(&self, message: T) { - match self.send_queue.send(message).await { - Ok(_) => {} - Err(err) => { - panic!("MessageSubscription::self_notify() send_queue failed! {}", err); - } - } - } - // Must be called manually since async Drop is not possible in Rust pub async fn unsubscribe(&self) { self.parent.clone().unsubscribe(self.id).await @@ -66,9 +56,9 @@ impl Subscriber { let sub_id = Self::random_id(); - self.subs.lock().await.insert(sub_id, sender.clone()); + self.subs.lock().await.insert(sub_id, sender); - Subscription { id: sub_id, recv_queue: recvr, send_queue: sender, parent: self.clone() } + Subscription { id: sub_id, recv_queue: recvr, parent: self.clone() } } async fn unsubscribe(self: Arc, sub_id: SubscriptionId) { @@ -86,6 +76,17 @@ impl Subscriber { } } + pub async fn notify_with_id(&self, message_result: T, id: u64) { + if let Some(sub) = (*self.subs.lock().await).get(&id) { + match sub.send(message_result.clone()).await { + Ok(()) => {} + Err(err) => { + panic!("Error returned sending message in notify() call! {}", err); + } + } + } + } + pub async fn notify_with_exclude(&self, message_result: T, exclude_list: &[SubscriptionId]) { for (id, sub) in (*self.subs.lock().await).iter() { if exclude_list.contains(id) {