bin/ircd: send messages in buffer to the client after JOIN command

This commit is contained in:
ghassmo
2022-06-28 01:43:53 +03:00
parent d5339ece5f
commit 06db050229
3 changed files with 21 additions and 19 deletions

View File

@@ -10,7 +10,6 @@ use futures::{io::BufReader, AsyncBufReadExt, AsyncReadExt, FutureExt};
use fxhash::FxHashMap;
use log::{error, info, warn};
use rand::rngs::OsRng;
use ringbuffer::RingBufferExt;
use smol::future;
use structopt_toml::StructOptToml;
@@ -104,11 +103,6 @@ impl Ircd {
receiver.get_id(),
);
// Send messages in buffer
for msg in self.privmsgs_buffer.lock().await.to_vec() {
receiver.self_notify(msg).await;
}
executor
.spawn(async move {
loop {

View File

@@ -5,7 +5,7 @@ use futures::{io::WriteHalf, AsyncWriteExt};
use fxhash::FxHashMap;
use log::{debug, info, warn};
use rand::{rngs::OsRng, RngCore};
use ringbuffer::RingBufferWrite;
use ringbuffer::{RingBufferExt, RingBufferWrite};
use darkfi::{net::P2pPtr, system::SubscriberPtr, Error, Result};
@@ -132,6 +132,13 @@ impl IrcServerConnection {
let chan_info = self.configured_chans.get_mut(chan).unwrap();
chan_info.joined = true;
}
// Send messages in buffer
for msg in self.privmsgs_buffer.lock().await.to_vec() {
if msg.channel == chan {
self.senders.notify_with_id(msg, self.subscriber_id).await;
}
}
}
}
"PART" => {

View File

@@ -11,7 +11,6 @@ pub type SubscriptionId = u64;
pub struct Subscription<T> {
id: SubscriptionId,
recv_queue: async_channel::Receiver<T>,
send_queue: async_channel::Sender<T>,
parent: Arc<Subscriber<T>>,
}
@@ -31,15 +30,6 @@ impl<T: Clone> Subscription<T> {
}
}
pub async fn self_notify(&self, message: T) {
match self.send_queue.send(message).await {
Ok(_) => {}
Err(err) => {
panic!("MessageSubscription::self_notify() send_queue failed! {}", err);
}
}
}
// Must be called manually since async Drop is not possible in Rust
pub async fn unsubscribe(&self) {
self.parent.clone().unsubscribe(self.id).await
@@ -66,9 +56,9 @@ impl<T: Clone> Subscriber<T> {
let sub_id = Self::random_id();
self.subs.lock().await.insert(sub_id, sender.clone());
self.subs.lock().await.insert(sub_id, sender);
Subscription { id: sub_id, recv_queue: recvr, send_queue: sender, parent: self.clone() }
Subscription { id: sub_id, recv_queue: recvr, parent: self.clone() }
}
async fn unsubscribe(self: Arc<Self>, sub_id: SubscriptionId) {
@@ -86,6 +76,17 @@ impl<T: Clone> Subscriber<T> {
}
}
pub async fn notify_with_id(&self, message_result: T, id: u64) {
if let Some(sub) = (*self.subs.lock().await).get(&id) {
match sub.send(message_result.clone()).await {
Ok(()) => {}
Err(err) => {
panic!("Error returned sending message in notify() call! {}", err);
}
}
}
}
pub async fn notify_with_exclude(&self, message_result: T, exclude_list: &[SubscriptionId]) {
for (id, sub) in (*self.subs.lock().await).iter() {
if exclude_list.contains(id) {