bin/ircd: push the privmsg with the same term to the buffer

This commit is contained in:
ghassmo
2022-09-04 09:03:37 +04:00
parent 0395727e32
commit f5611d7212
5 changed files with 38 additions and 78 deletions

View File

@@ -43,7 +43,7 @@ impl<T: Eq + PartialEq + Clone> RingBuffer<T> {
self.items.make_contiguous()
}
pub fn iter(&self) -> impl Iterator<Item = &T> {
pub fn iter(&self) -> impl Iterator<Item = &T> + DoubleEndedIterator {
self.items.iter()
}
@@ -74,22 +74,14 @@ impl PrivmsgsBuffer {
}
pub fn push(&mut self, privmsg: &Privmsg) {
if self.buffer.contains(privmsg) {
return
}
match privmsg.term.cmp(&(self.last_term() + 1)) {
Ordering::Equal => self.buffer.push(privmsg.clone()),
Ordering::Equal | Ordering::Less => self.buffer.push(privmsg.clone()),
Ordering::Greater => self.orphans.push(Orphan::new(privmsg)),
Ordering::Less => {
if !self.term_exist(privmsg.term) {
self.orphans.push(Orphan::new(privmsg))
}
}
}
self.update();
}
pub fn iter(&self) -> impl Iterator<Item = &Privmsg> {
pub fn iter(&self) -> impl Iterator<Item = &Privmsg> + DoubleEndedIterator {
self.buffer.iter()
}
@@ -100,43 +92,45 @@ impl PrivmsgsBuffer {
}
}
pub fn update(&mut self) {
fn update(&mut self) {
self.sort_orphans();
self.push_orphans();
self.sort();
self.update_orphans();
self.sort_buffer();
}
fn term_exist(&self, term: u64) -> bool {
self.buffer.items.iter().any(|p| p.term == term)
}
fn sort(&mut self) {
self.buffer.as_slice().sort_by(|a, b| a.term.cmp(&b.term));
fn sort_buffer(&mut self) {
self.buffer.as_slice().sort_by(|a, b| match a.term.cmp(&b.term) {
Ordering::Equal => a.timestamp.cmp(&b.timestamp),
o => o,
});
}
fn sort_orphans(&mut self) {
self.orphans.as_slice().sort_by(|a, b| a.msg.term.cmp(&b.msg.term));
self.orphans.as_slice().sort_by(|a, b| match a.msg.term.cmp(&b.msg.term) {
Ordering::Equal => a.msg.timestamp.cmp(&b.msg.timestamp),
o => o,
});
}
fn push_orphans(&mut self) {
fn oprhan_is_valid(&mut self, orphan: &Orphan) -> bool {
(orphan.timestamp + LIFETIME_FOR_ORPHAN) > Utc::now().timestamp()
}
fn update_orphans(&mut self) {
for orphan in self.orphans.clone().iter() {
let privmsg = orphan.msg.clone();
if !self.oprhan_is_valid(orphan) {
self.orphans.remove(orphan);
continue
}
match privmsg.term.cmp(&(self.last_term() + 1)) {
Ordering::Equal => {
Ordering::Equal | Ordering::Less => {
self.buffer.push(privmsg.clone());
self.orphans.remove(orphan);
}
Ordering::Less => {
if !self.term_exist(privmsg.term) {
self.buffer.push(privmsg.clone());
}
self.orphans.remove(orphan);
}
Ordering::Greater => {
if (orphan.timestamp + LIFETIME_FOR_ORPHAN) < Utc::now().timestamp() {
self.orphans.remove(orphan);
}
}
Ordering::Greater => {}
}
}
}
@@ -198,8 +192,6 @@ mod tests {
pms.push(&privmsg);
}
pms.update();
assert_eq!(pms.buffer.len(), 3000);
assert_eq!(pms.last_term(), 3000);
assert_eq!(pms.orphans.len(), 0);
@@ -217,9 +209,7 @@ mod tests {
pms.push(&privmsg);
}
pms.update();
assert_eq!(pms.buffer.len(), 4000);
assert_eq!(pms.buffer.len(), SIZE_OF_MSGS_BUFFER);
assert_eq!(pms.last_term(), 4000);
assert_eq!(pms.orphans.len(), 0);
@@ -235,32 +225,8 @@ mod tests {
pms.push(&privmsg);
}
pms.update();
assert_eq!(pms.buffer.len(), SIZE_OF_MSGS_BUFFER);
assert_eq!(pms.last_term(), 7000);
assert_eq!(pms.orphans.len(), 0);
//
// Fill the buffer with random generated terms in range 7001..10001
// This will occasionally update the buffer
// At the end, the messages in the buffer have to be in correct order
//
let mut terms: Vec<u64> = (7001..10001).collect();
terms.shuffle(&mut thread_rng());
for term in terms {
let privmsg = Privmsg::new("nick", "#dev", &format!("message_{}", term), term);
pms.push(&privmsg);
if rand::random() {
pms.update();
}
}
pms.update();
assert_eq!(pms.buffer.len(), SIZE_OF_MSGS_BUFFER);
assert_eq!(pms.last_term(), 10000);
assert_eq!(pms.orphans.len(), 0);
}
}

View File

@@ -1,9 +1,7 @@
use chrono::Utc;
use rand::{rngs::OsRng, RngCore};
use darkfi::util::{
serial::{SerialDecodable, SerialEncodable},
Timestamp,
};
use darkfi::util::serial::{SerialDecodable, SerialEncodable};
pub type PrivmsgId = u64;
@@ -16,14 +14,14 @@ pub struct Privmsg {
pub nickname: String,
pub target: String,
pub message: String,
pub timestamp: Timestamp,
pub timestamp: i64,
pub term: u64,
}
impl Privmsg {
pub fn new(nickname: &str, target: &str, message: &str, term: u64) -> Self {
let id = OsRng.next_u64();
let timestamp = Timestamp::current_time();
let timestamp = Utc::now().timestamp();
Self {
id,
nickname: nickname.to_string(),

View File

@@ -51,8 +51,7 @@ impl ProtocolPrivmsg {
let exclude_list = vec![self.channel.address()];
// once a channel get started
let mut msgs_buffer = self.msgs.lock().await;
msgs_buffer.update();
let msgs_buffer = self.msgs.lock().await;
for m in msgs_buffer.iter() {
self.channel.send(m.clone()).await?;
}
@@ -72,7 +71,6 @@ impl ProtocolPrivmsg {
// add the msg to the buffer
let mut msgs = self.msgs.lock().await;
msgs.push(&msg);
msgs.update();
drop(msgs);
self.notify.send(msg.clone()).await?;

View File

@@ -210,9 +210,8 @@ impl<C: AsyncRead + AsyncWrite + Send + Unpin + 'static> IrcServerConnection<C>
info!("(Plain) PRIVMSG {} :{}", target, message);
let mut privmsgs_buffer = self.privmsgs_buffer.lock().await;
privmsgs_buffer.update();
let last_term = privmsgs_buffer.last_term();
let privmsgs_buffer = self.privmsgs_buffer.lock().await;
let last_term = privmsgs_buffer.last_term() + 1;
drop(privmsgs_buffer);
let mut privmsg = Privmsg::new(&self.nickname, target, &message, last_term);

View File

@@ -187,8 +187,7 @@ impl<C: AsyncRead + AsyncWrite + Send + Unpin + 'static> IrcServerConnection<C>
}
// Send dm messages in buffer
let mut privmsgs_buffer = self.privmsgs_buffer.lock().await;
privmsgs_buffer.update();
let privmsgs_buffer = self.privmsgs_buffer.lock().await;
for msg in privmsgs_buffer.iter() {
let is_dm = msg.target == self.nickname ||
(msg.nickname == self.nickname && !msg.target.starts_with('#'));