bin/ircd: POC messages model based on hashes and chains

This commit is contained in:
ghassmo
2022-09-19 14:40:30 +04:00
parent 6e02b37646
commit d41328844c
3 changed files with 605 additions and 0 deletions

213
bin/ircd/src/chains.rs Normal file
View File

@@ -0,0 +1,213 @@
use async_std::sync::Mutex;
use std::collections::VecDeque;
use chrono::Utc;
use fxhash::FxHashMap;
use ripemd::{Digest, Ripemd256};
use darkfi::util::serial::{SerialDecodable, SerialEncodable};
const MAX_CHAIN_SIZE: usize = 4096;
pub type PrivmsgId = String;
#[derive(Debug, Clone, SerialEncodable, SerialDecodable, Eq, PartialEq)]
pub struct Privmsg {
pub id: PrivmsgId,
pub nickname: String,
pub target: String,
pub message: String,
pub timestamp: i64,
pub read_confirms: u8,
pub prev_msg_id: String,
}
impl Privmsg {
pub fn new(nickname: &str, target: &str, message: &str, prev_msg_id: &str) -> Self {
let timestamp = Utc::now().timestamp();
let id = Self::hash(nickname, target, message, prev_msg_id, timestamp);
let read_confirms = 0;
Self {
id,
nickname: nickname.to_string(),
target: target.to_string(),
message: message.to_string(),
timestamp,
read_confirms,
prev_msg_id: prev_msg_id.to_string(),
}
}
pub fn hash(
nickname: &str,
target: &str,
message: &str,
prev_msg_id: &str,
timestamp: i64,
) -> String {
let mut hasher = Ripemd256::new();
hasher.update(format!("{nickname}{target}{message}{timestamp}{prev_msg_id}"));
hex::encode(hasher.finalize())
}
}
impl std::string::ToString for Privmsg {
fn to_string(&self) -> String {
format!(":{}!anon@dark.fi PRIVMSG {} :{}\r\n", self.nickname, self.target, self.message)
}
}
pub struct Chain {
buffer: VecDeque<Privmsg>,
hashes: Vec<String>,
}
impl Chain {
pub fn new() -> Self {
Self { buffer: VecDeque::new(), hashes: Vec::new() }
}
pub fn push_hashes(&mut self, hashes: Vec<String>) {
self.hashes.extend(hashes);
}
pub fn push_msg(&mut self, msg: &Privmsg) -> bool {
// Rehash the msg to check if it's valid
let hash = Privmsg::hash(
&msg.nickname,
&msg.target,
&msg.message,
&msg.prev_msg_id,
msg.timestamp,
);
if hash != msg.id {
return false
}
// Prune last messages from the buffer if it has exceeded the MAX_CHAIN_SIZE
if self.buffer.len() >= MAX_CHAIN_SIZE {
self.buffer.pop_front();
}
// Check if the hashes already has the msg id, if so add the msg to the buffer
if self.hashes.contains(&msg.id) {
// TODO: it should do sorting by the msg id in this step
self.buffer.push_back(msg.clone());
return true
}
// Check if the last msg in the chains is equal to the previous_msg_id in privmsg,
// if not and both the chain and previous_msg_id are empty,
// then it will be add it as genesis msg
if let Some(last_hash) = self.last_hash() {
if last_hash != msg.prev_msg_id {
return false
}
} else if !msg.prev_msg_id.is_empty() && !self.hashes.is_empty() {
return false
}
// Push the msg to the chain
self.buffer.push_back(msg.clone());
self.hashes.push(msg.id.clone());
true
}
pub fn last_msg(&self) -> Option<Privmsg> {
self.buffer.iter().last().cloned()
}
pub fn last_hash(&self) -> Option<String> {
self.hashes.iter().last().cloned()
}
pub fn height(&self) -> usize {
self.hashes.len()
}
pub fn get_msgs(&self, hashes: &[String]) -> Vec<Privmsg> {
self.buffer.iter().filter(|m| hashes.contains(&m.id)).cloned().collect()
}
pub fn get_hashes(&self, height: usize) -> Vec<String> {
if height >= self.height() {
return vec![]
}
self.hashes[height..].to_vec()
}
}
pub struct Chains {
chains: Mutex<FxHashMap<String, Chain>>,
}
impl Chains {
pub fn new(targets: Vec<String>) -> Self {
let mut map = FxHashMap::default();
for target in targets {
map.insert(target, Chain::new());
}
Self { chains: Mutex::new(map) }
}
pub async fn push_hashes(&self, target: String, height: usize, hashes: Vec<String>) -> bool {
let mut chains = self.chains.lock().await;
if !chains.contains_key(&target) {
return false
}
let chain = chains.get_mut(&target).unwrap();
if chain.height() + 1 != height {
return false
}
chain.push_hashes(hashes);
true
}
pub async fn push_msg(&self, msg: &Privmsg) -> bool {
let mut chains = self.chains.lock().await;
if !chains.contains_key(&msg.target) {
return false
}
chains.get_mut(&msg.target).unwrap().push_msg(msg);
true
}
pub async fn get_msgs(&self, target: &str, hashes: &[String]) -> Vec<Privmsg> {
let chains = self.chains.lock().await;
if !chains.contains_key(target) {
return vec![]
}
chains.get(target).unwrap().get_msgs(hashes)
}
pub async fn get_hashes(&self, target: &str, height: usize) -> Vec<String> {
let chains = self.chains.lock().await;
if !chains.contains_key(target) {
return vec![]
}
chains.get(target).unwrap().get_hashes(height)
}
pub async fn get_height(&self, target: &str) -> usize {
let chains = self.chains.lock().await;
if !chains.contains_key(target) {
return 0
}
chains.get(target).unwrap().height()
}
}

View File

@@ -25,10 +25,12 @@ use darkfi::{
};
pub mod buffers;
pub mod chains;
pub mod crypto;
pub mod irc;
pub mod privmsg;
pub mod protocol_privmsg;
pub mod protocol_privmsg2;
pub mod rpc;
pub mod settings;

View File

@@ -0,0 +1,390 @@
use async_std::sync::{Arc, Mutex};
use std::collections::VecDeque;
use async_executor::Executor;
use async_trait::async_trait;
use chrono::Utc;
use fxhash::FxHashMap;
use log::debug;
use rand::{rngs::OsRng, RngCore};
use darkfi::{
net,
util::{
serial::{SerialDecodable, SerialEncodable},
sleep,
},
Result,
};
use crate::{
chains::{Chains, Privmsg},
settings,
};
#[derive(Clone)]
pub struct RingBuffer<T> {
pub items: VecDeque<T>,
}
impl<T: Eq + PartialEq + Clone> RingBuffer<T> {
pub fn new(capacity: usize) -> Self {
let items = VecDeque::with_capacity(capacity);
Self { items }
}
pub fn push(&mut self, val: T) {
if self.items.len() == self.items.capacity() {
self.items.pop_front();
}
self.items.push_back(val);
}
pub fn contains(&self, val: &T) -> bool {
self.items.contains(val)
}
}
pub struct SeenIds {
ids: Mutex<RingBuffer<String>>,
}
impl SeenIds {
pub fn new() -> Self {
Self { ids: Mutex::new(RingBuffer::new(settings::SIZE_OF_IDSS_BUFFER)) }
}
pub async fn push(&self, id: &String) -> bool {
let ids = &mut self.ids.lock().await;
if !ids.contains(id) {
ids.push(id.clone());
return true
}
false
}
}
pub struct UnreadMsgs {
msgs: Mutex<FxHashMap<String, Privmsg>>,
}
impl UnreadMsgs {
pub fn new() -> Self {
Self { msgs: Mutex::new(FxHashMap::default()) }
}
pub async fn contains(&self, key: &str) -> bool {
self.msgs.lock().await.contains_key(key)
}
// Increase the read_confirms for a message, if it has exceeded the MAX_CONFIRM
// then remove it from the hash_map and return Some(msg), otherwise return None
pub async fn inc_read_confirms(&self, key: &str) -> Option<Privmsg> {
let msgs = &mut self.msgs.lock().await;
let mut result = None;
if let Some(msg) = msgs.get_mut(key) {
msg.read_confirms += 1;
if msg.read_confirms >= settings::MAX_CONFIRM {
result = Some(msg.clone())
}
}
if result.is_some() {
msgs.remove(key);
}
result
}
pub async fn insert(&self, msg: &Privmsg) {
let msgs = &mut self.msgs.lock().await;
// prune expired msgs
let mut prune_ids = vec![];
for (id, m) in msgs.iter() {
if m.timestamp + settings::UNREAD_MSG_EXPIRE_TIME < Utc::now().timestamp() {
prune_ids.push(id.clone());
}
}
for id in prune_ids {
msgs.remove(&id);
}
msgs.insert(msg.id.clone(), msg.clone());
}
}
#[derive(SerialDecodable, SerialEncodable, Clone, Debug)]
struct Inv {
id: String,
hash: String,
target: String,
}
#[derive(SerialDecodable, SerialEncodable, Clone, Debug)]
struct GetMsgs {
invs: Vec<String>,
target: String,
}
#[derive(SerialDecodable, SerialEncodable, Clone, Debug)]
struct Hashes {
hashes: Vec<String>,
height: usize,
target: String,
}
#[derive(SerialDecodable, SerialEncodable, Clone, Debug)]
struct SyncHash {
height: usize,
target: String,
}
pub struct ProtocolPrivmsg {
jobsman: net::ProtocolJobsManagerPtr,
notify: async_channel::Sender<Privmsg>,
msg_sub: net::MessageSubscription<Privmsg>,
inv_sub: net::MessageSubscription<Inv>,
getmsgs_sub: net::MessageSubscription<GetMsgs>,
hashes_sub: net::MessageSubscription<Hashes>,
synchash_sub: net::MessageSubscription<SyncHash>,
p2p: net::P2pPtr,
channel: net::ChannelPtr,
chains: Chains,
seen_ids: SeenIds,
unread_msgs: UnreadMsgs,
}
impl ProtocolPrivmsg {
pub async fn init(
channel: net::ChannelPtr,
notify: async_channel::Sender<Privmsg>,
p2p: net::P2pPtr,
chains: Chains,
seen_ids: SeenIds,
unread_msgs: UnreadMsgs,
) -> net::ProtocolBasePtr {
let message_subsytem = channel.get_message_subsystem();
message_subsytem.add_dispatch::<Privmsg>().await;
message_subsytem.add_dispatch::<Inv>().await;
message_subsytem.add_dispatch::<GetMsgs>().await;
message_subsytem.add_dispatch::<Hashes>().await;
message_subsytem.add_dispatch::<SyncHash>().await;
let msg_sub =
channel.clone().subscribe_msg::<Privmsg>().await.expect("Missing Privmsg dispatcher!");
let inv_sub = channel.subscribe_msg::<Inv>().await.expect("Missing Inv dispatcher!");
let getmsgs_sub =
channel.clone().subscribe_msg::<GetMsgs>().await.expect("Missing GetMsgs dispatcher!");
let hashes_sub =
channel.clone().subscribe_msg::<Hashes>().await.expect("Missing Hashes dispatcher!");
let synchash_sub = channel
.clone()
.subscribe_msg::<SyncHash>()
.await
.expect("Missing HashSync dispatcher!");
Arc::new(Self {
notify,
msg_sub,
inv_sub,
getmsgs_sub,
hashes_sub,
synchash_sub,
jobsman: net::ProtocolJobsManager::new("ProtocolPrivmsg", channel.clone()),
p2p,
channel,
chains,
seen_ids,
unread_msgs,
})
}
async fn handle_receive_inv(self: Arc<Self>) -> Result<()> {
debug!(target: "ircd", "ProtocolPrivmsg::handle_receive_inv() [START]");
let exclude_list = vec![self.channel.address()];
loop {
let inv = self.inv_sub.receive().await?;
let inv = (*inv).to_owned();
if !self.seen_ids.push(&inv.id).await {
continue
}
// On receive inv message, if the unread_msgs buffer has the msg's hash then increase
// the read_confirms, if not then send GetMsgs contain the msg's hash
if !self.unread_msgs.contains(&inv.hash).await {
self.send_getmsgs(&inv.target, vec![inv.hash.clone()]).await?;
} else if let Some(msg) = self.unread_msgs.inc_read_confirms(&inv.hash).await {
self.new_msg(&msg).await?;
}
// Either way, broadcast the inv msg
self.p2p.broadcast_with_exclude(inv, &exclude_list).await?;
}
}
async fn handle_receive_msg(self: Arc<Self>) -> Result<()> {
debug!(target: "ircd", "ProtocolPrivmsg::handle_receive_msg() [START]");
let exclude_list = vec![self.channel.address()];
loop {
let msg = self.msg_sub.receive().await?;
let mut msg = (*msg).to_owned();
if !self.seen_ids.push(&msg.id).await {
continue
}
// If the msg has read_confirms greater or equal to MAX_CONFIRM, it will be added to
// the chains, otherwise increase the msg's read_confirms, add it to unread_msgs, and
// broadcast an Inv msg contain the hash of the message
if msg.read_confirms >= settings::MAX_CONFIRM {
self.new_msg(&msg).await?;
} else {
msg.read_confirms += 1;
self.unread_msgs.insert(&msg).await;
self.send_inv_msg(&msg).await?;
}
// Broadcast the msg
self.p2p.broadcast_with_exclude(msg, &exclude_list).await?;
}
}
async fn handle_receive_getmsgs(self: Arc<Self>) -> Result<()> {
debug!(target: "ircd", "ProtocolPrivmsg::handle_receive_getmsgs() [START]");
loop {
let getmsgs = self.getmsgs_sub.receive().await?;
// Load the msgs from the chains, and send them back to the sender
let msgs = self.chains.get_msgs(&getmsgs.target, &getmsgs.invs).await;
for msg in msgs {
self.channel.send(msg.clone()).await?;
}
}
}
async fn handle_receive_hashes(self: Arc<Self>) -> Result<()> {
debug!(target: "ircd", "ProtocolPrivmsg::handle_receive_hashes() [START]");
loop {
let hashmsg = self.hashes_sub.receive().await?;
self.chains
.push_hashes(hashmsg.target.clone(), hashmsg.height, hashmsg.hashes.clone())
.await;
}
}
async fn handle_receive_synchash(self: Arc<Self>) -> Result<()> {
debug!(target: "ircd", "ProtocolPrivmsg::handle_receive_synchash() [START]");
loop {
let synchash = self.synchash_sub.receive().await?;
if synchash.height < self.chains.get_height(&synchash.target).await {
let hashes = self.chains.get_hashes(&synchash.target, synchash.height + 1).await;
// send the hashes from the chain
self.channel
.send(Hashes {
target: synchash.target.clone(),
height: synchash.height + 1,
hashes: hashes.clone(),
})
.await?;
// send the msgs from the chain's buffer
let msgs = self.chains.get_msgs(&synchash.target, &hashes).await;
for msg in msgs {
self.channel.send(msg).await?;
}
}
}
}
// every 2 seconds send a SyncHash msg, contain the last_height for each chain
async fn send_sync_hash_loop(self: Arc<Self>) -> Result<()> {
loop {
// TODO loop through preconfigured channels
let height = self.chains.get_height("").await;
self.channel.send(SyncHash { target: "".to_string(), height }).await?;
sleep(2).await;
}
}
async fn new_msg(&self, msg: &Privmsg) -> Result<()> {
if self.chains.push_msg(msg).await {
self.notify.send(msg.clone()).await?;
}
Ok(())
}
async fn send_inv_msg(&self, msg: &Privmsg) -> Result<()> {
let inv_id = OsRng.next_u64().to_string();
self.p2p
.broadcast(Inv { id: inv_id, hash: msg.id.clone(), target: msg.target.clone() })
.await?;
Ok(())
}
async fn send_getmsgs(&self, target: &str, hashes: Vec<String>) -> Result<()> {
self.channel.send(GetMsgs { target: target.to_string(), invs: hashes }).await?;
Ok(())
}
}
#[async_trait]
impl net::ProtocolBase for ProtocolPrivmsg {
/// Starts ping-pong keep-alive messages exchange. Runs ping-pong in the
/// protocol task manager, then queues the reply. Sends out a ping and
/// waits for pong reply. Waits for ping and replies with a pong.
async fn start(self: Arc<Self>, executor: Arc<Executor<'_>>) -> Result<()> {
debug!(target: "ircd", "ProtocolPrivmsg::start() [START]");
self.jobsman.clone().start(executor.clone());
self.jobsman.clone().spawn(self.clone().handle_receive_msg(), executor.clone()).await;
self.jobsman.clone().spawn(self.clone().handle_receive_inv(), executor.clone()).await;
self.jobsman.clone().spawn(self.clone().handle_receive_getmsgs(), executor.clone()).await;
self.jobsman.clone().spawn(self.clone().handle_receive_hashes(), executor.clone()).await;
self.jobsman.clone().spawn(self.clone().handle_receive_synchash(), executor.clone()).await;
self.jobsman.clone().spawn(self.clone().send_sync_hash_loop(), executor.clone()).await;
debug!(target: "ircd", "ProtocolPrivmsg::start() [END]");
Ok(())
}
fn name(&self) -> &'static str {
"ProtocolPrivmsg"
}
}
impl net::Message for Privmsg {
fn name() -> &'static str {
"privmsg"
}
}
impl net::Message for Inv {
fn name() -> &'static str {
"inv"
}
}
impl net::Message for GetMsgs {
fn name() -> &'static str {
"getmsgs"
}
}
impl net::Message for Hashes {
fn name() -> &'static str {
"hashes"
}
}
impl net::Message for SyncHash {
fn name() -> &'static str {
"synchash"
}
}