[WIP] event_graph added to src/ and ircd2 utilize it from there

This commit is contained in:
Dastan-glitch
2023-02-19 05:24:52 +03:00
parent 4292006959
commit cd5dfbed0e
15 changed files with 151 additions and 96 deletions

View File

@@ -0,0 +1,40 @@
/* This file is part of DarkFi (https://dark.fi)
*
* Copyright (C) 2020-2023 Dyne.org foundation
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
use async_std::sync::Arc;
use crate::{event_graph::model::Event, Error, Result};
pub type EventsQueuePtr = Arc<EventsQueue>;
pub struct EventsQueue(smol::channel::Sender<Event>, smol::channel::Receiver<Event>);
impl EventsQueue {
pub fn new() -> EventsQueuePtr {
let (sn, rv) = smol::channel::unbounded();
Arc::new(Self(sn, rv))
}
pub async fn fetch(&self) -> Result<Event> {
self.1.recv().await.map_err(Error::from)
}
pub async fn dispatch(&self, event: &Event) -> Result<()> {
self.0.send(event.clone()).await.map_err(Error::from)
}
}

77
src/event_graph/mod.rs Normal file
View File

@@ -0,0 +1,77 @@
/* This file is part of DarkFi (https://dark.fi)
*
* Copyright (C) 2020-2023 Dyne.org foundation
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
use std::io;
use darkfi_serial::{Decodable, Encodable, ReadExt, SerialDecodable, SerialEncodable};
pub mod events_queue;
pub mod model;
pub mod protocol_event;
pub mod view;
#[derive(SerialEncodable, SerialDecodable, Clone)]
pub struct PrivMsgEvent {
pub nick: String,
pub msg: String,
pub target: String,
}
#[derive(Clone)]
pub enum EventAction {
PrivMsg(PrivMsgEvent),
}
impl std::string::ToString for PrivMsgEvent {
fn to_string(&self) -> String {
format!(":{}!anon@dark.fi PRIVMSG {} :{}\r\n", self.nick, self.target, self.msg)
}
}
impl Encodable for EventAction {
fn encode<S: io::Write>(&self, mut s: S) -> core::result::Result<usize, io::Error> {
match self {
Self::PrivMsg(event) => {
let mut len = 0;
len += 0u8.encode(&mut s)?;
len += event.encode(s)?;
Ok(len)
}
}
}
}
impl Decodable for EventAction {
fn decode<D: io::Read>(mut d: D) -> core::result::Result<Self, io::Error> {
let type_id = d.read_u8()?;
match type_id {
0 => Ok(Self::PrivMsg(PrivMsgEvent::decode(d)?)),
_ => Err(io::Error::new(io::ErrorKind::Other, "Bad type ID byte for Event")),
}
}
}
pub fn get_current_time() -> u64 {
let start = std::time::SystemTime::now();
start
.duration_since(std::time::UNIX_EPOCH)
.expect("Time went backwards")
.as_millis()
.try_into()
.unwrap()
}

629
src/event_graph/model.rs Normal file
View File

@@ -0,0 +1,629 @@
/* This file is part of DarkFi (https://dark.fi)
*
* Copyright (C) 2020-2023 Dyne.org foundation
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
use std::{cmp::Ordering, collections::HashMap, fmt};
use async_std::sync::{Arc, Mutex};
use darkfi_serial::{Encodable, SerialDecodable, SerialEncodable};
use log::error;
use ripemd::{Digest, Ripemd256};
use crate::event_graph::events_queue::EventsQueuePtr;
use super::{EventAction, PrivMsgEvent};
pub type EventId = [u8; 32];
const MAX_DEPTH: u32 = 300;
const MAX_HEIGHT: u32 = 300;
#[derive(SerialEncodable, SerialDecodable, Clone)]
pub struct Event {
pub previous_event_hash: EventId,
pub action: EventAction,
pub timestamp: u64,
pub read_confirms: u8,
}
impl Event {
pub fn hash(&self) -> EventId {
let mut bytes = Vec::new();
let mut event_to_be_hashed = self.clone();
event_to_be_hashed.read_confirms = 0;
event_to_be_hashed.encode(&mut bytes).expect("serialize failed!");
let mut hasher = Ripemd256::new();
hasher.update(bytes);
let bytes = hasher.finalize().to_vec();
let mut result = [0u8; 32];
result.copy_from_slice(bytes.as_slice());
result
}
}
impl fmt::Debug for Event {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match &self.action {
EventAction::PrivMsg(event) => {
write!(f, "PRIVMSG {}: {} ({})", event.nick, event.msg, self.timestamp)
}
}
}
}
#[derive(Debug, Clone)]
struct EventNode {
// Only current root has this set to None
parent: Option<EventId>,
event: Event,
children: Vec<EventId>,
}
pub type ModelPtr = Arc<Mutex<Model>>;
pub struct Model {
// This is periodically updated so we discard old nodes
current_root: EventId,
orphans: HashMap<EventId, Event>,
event_map: HashMap<EventId, EventNode>,
events_queue: EventsQueuePtr,
}
impl Model {
pub fn new(events_queue: EventsQueuePtr) -> Self {
let root_node = EventNode {
parent: None,
event: Event {
previous_event_hash: [0u8; 32],
action: EventAction::PrivMsg(PrivMsgEvent {
nick: "root".to_string(),
msg: "Let there be dark".to_string(),
target: "root".to_string(),
}),
timestamp: 1674512021323,
read_confirms: 0,
},
children: Vec::new(),
};
let root_node_id = root_node.event.hash();
let mut event_map = HashMap::new();
event_map.insert(root_node_id, root_node);
Self { current_root: root_node_id, orphans: HashMap::new(), event_map, events_queue }
}
pub fn get_head_hash(&self) -> EventId {
self.find_head()
}
pub async fn add(&mut self, event: Event) {
self.orphans.insert(event.hash(), event);
self.reorganize().await;
}
pub fn is_orphan(&self, event: &Event) -> bool {
!self.event_map.contains_key(&event.previous_event_hash)
}
pub fn find_leaves(&self) -> Vec<EventId> {
// collect the leaves in the tree
let mut leaves = vec![];
for (event_hash, node) in self.event_map.iter() {
// check if the node is a leaf
if node.children.is_empty() {
leaves.push(*event_hash);
}
}
leaves
}
pub fn get_event(&self, event: &EventId) -> Option<Event> {
self.event_map.get(event).map(|en| en.event.clone())
}
pub fn get_offspring(&self, event: &EventId) -> Vec<Event> {
let mut offspring = vec![];
let mut event = *event;
let head = self.find_head();
loop {
if event == head {
break
}
if let Some(ev) = self.event_map.get(&event) {
for child in ev.children.iter() {
let child = self.event_map.get(child).unwrap();
offspring.push(child.event.clone());
event = child.event.hash();
}
} else {
break
}
}
offspring
}
async fn reorganize(&mut self) {
for (_, orphan) in std::mem::take(&mut self.orphans) {
// if self.is_orphan(&orphan) {
// // TODO should we remove orphan if it's too old
// continue
// }
let prev_event = orphan.previous_event_hash;
let node =
EventNode { parent: Some(prev_event), event: orphan.clone(), children: Vec::new() };
let node_hash = node.event.hash();
let parent = match self.event_map.get_mut(&prev_event) {
Some(parent) => parent,
None => {
error!("No parent found, Orphan is not relinked");
self.orphans.insert(orphan.hash(), orphan);
continue
}
};
parent.children.push(node_hash);
self.event_map.insert(node_hash, node.clone());
self.events_queue.dispatch(&node.event).await.expect("error dispatching the event");
// clean up the tree from old eventnodes
self.prune_chains();
self.update_root();
}
}
fn prune_chains(&mut self) {
let head = self.find_head();
let leaves = self.find_leaves();
// Reject events which attach to chains too low in the chain
// At some point we ignore all events from old branches
for leaf in leaves {
// skip the head event
if leaf == head {
continue
}
let depth = self.diff_depth(leaf, head);
if depth > MAX_DEPTH {
self.remove_node(leaf);
}
}
}
fn update_root(&mut self) {
let head = self.find_head();
let leaves = self.find_leaves();
// find the common ancestor for each leaf and the head event
let mut ancestors = vec![];
for leaf in leaves {
if leaf == head {
continue
}
let ancestor = self.find_ancestor(leaf, head);
ancestors.push(ancestor);
}
// find the highest ancestor
let highest_ancestor = ancestors
.iter()
.max_by(|&a, &b| self.find_depth(*a, &head).cmp(&self.find_depth(*b, &head)));
// set the new root
if let Some(ancestor) = highest_ancestor {
// the ancestor must have at least height > MAX_HEIGHT
let ancestor_height = self.find_height(&self.current_root, ancestor).unwrap();
if ancestor_height < MAX_HEIGHT {
return
}
// removing the parents of the new root node
let mut root = self.event_map.get(&self.current_root).unwrap();
loop {
let root_hash = root.event.hash();
if &root_hash == ancestor {
break
}
let root_childs = &root.children;
assert_eq!(root_childs.len(), 1);
let child = *root_childs.first().unwrap();
self.event_map.remove(&root_hash);
root = self.event_map.get(&child).unwrap();
}
self.current_root = *ancestor;
}
}
fn remove_node(&mut self, mut event_id: EventId) {
loop {
if !self.event_map.contains_key(&event_id) {
break
}
let node = self.event_map.get(&event_id).unwrap().clone();
self.event_map.remove(&event_id);
let parent = self.event_map.get_mut(&node.parent.unwrap()).unwrap();
let index = parent.children.iter().position(|&n| n == event_id).unwrap();
parent.children.remove(index);
if !parent.children.is_empty() {
break
}
event_id = parent.event.hash();
}
}
// find_head
// -> recursively call itself
// -> + 1 for every recursion, return self if no children
// -> select max from returned values
// Gets the lead node with the maximal number of events counting from root
fn find_head(&self) -> EventId {
self.find_longest_chain(&self.current_root, 0).0
}
fn find_longest_chain(&self, parent_node: &EventId, i: u32) -> (EventId, u32) {
let children = &self.event_map.get(parent_node).unwrap().children;
if children.is_empty() {
return (*parent_node, i)
}
let mut current_max = 0;
let mut current_node = None;
for node in children.iter() {
let (grandchild_node, grandchild_i) = self.find_longest_chain(node, i + 1);
match &grandchild_i.cmp(&current_max) {
Ordering::Greater => {
current_max = grandchild_i;
current_node = Some(grandchild_node);
}
Ordering::Equal => {
// Break ties using the timestamp
let grandchild_node_timestamp =
self.event_map.get(&grandchild_node).unwrap().event.timestamp;
let current_node_timestamp =
self.event_map.get(&current_node.unwrap()).unwrap().event.timestamp;
if grandchild_node_timestamp > current_node_timestamp {
current_max = grandchild_i;
current_node = Some(grandchild_node);
}
}
Ordering::Less => {
// Left a todo here, not sure if it should be handled
continue
}
}
}
assert_ne!(current_max, 0);
(current_node.expect("internal logic error"), current_max)
}
fn find_depth(&self, mut node: EventId, ancestor_id: &EventId) -> u32 {
let mut depth = 0;
while &node != ancestor_id {
depth += 1;
if let Some(parent) = self.event_map.get(&node).unwrap().parent {
node = parent
} else {
break
}
}
depth
}
fn find_height(&self, node: &EventId, child_id: &EventId) -> Option<u32> {
let mut height = 0;
if node == child_id {
return Some(height)
}
height += 1;
let children = &self.event_map.get(node).unwrap().children;
if children.is_empty() {
return None
}
for child in children.iter() {
if let Some(h) = self.find_height(child, child_id) {
return Some(height + h)
}
}
None
}
fn find_ancestor(&self, mut node_a: EventId, mut node_b: EventId) -> EventId {
// node_a is a child of node_b
let is_child = node_b == self.event_map.get(&node_a).unwrap().parent.unwrap();
if is_child {
return node_b
}
while node_a != node_b {
let node_a_parent = self.event_map.get(&node_a).unwrap().parent.unwrap();
let node_b_parent = self.event_map.get(&node_b).unwrap().parent.unwrap();
if node_a_parent == self.current_root || node_b_parent == self.current_root {
return self.current_root
}
node_a = node_a_parent;
node_b = node_b_parent;
}
node_a
}
fn diff_depth(&self, node_a: EventId, node_b: EventId) -> u32 {
let ancestor = self.find_ancestor(node_a, node_b);
let node_a_depth = self.find_depth(node_a, &ancestor);
let node_b_depth = self.find_depth(node_b, &ancestor);
(node_b_depth + 1) - node_a_depth
}
fn _debug(&self) {
for (event_id, event_node) in &self.event_map {
let depth = self.find_depth(*event_id, &self.current_root);
println!("{}: {:?} [depth={}]", hex::encode(event_id), event_node.event, depth);
}
println!("root: {}", hex::encode(self.current_root));
println!("head: {}", hex::encode(self.find_head()));
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::event_graph::{events_queue::EventsQueue, get_current_time};
fn create_message(
previous_event_hash: EventId,
nick: &str,
msg: &str,
timestamp: u64,
) -> Event {
Event {
previous_event_hash,
action: EventAction::PrivMsg(PrivMsgEvent {
nick: nick.to_string(),
msg: msg.to_string(),
target: "".to_string(),
}),
timestamp,
read_confirms: 4,
}
}
/* THIS IS FAILING
#[test]
fn test_update_root() {
let events_queue = EventsQueue::new();
let mut model = Model::new(events_queue);
let root_id = model.current_root;
// event_node 1
// Fill this node with MAX_HEIGHT events
let mut id1 = root_id;
for x in 0..MAX_HEIGHT {
let timestamp = get_current_time() + 1;
let node = create_message(id1, &format!("chain 1 msg {}", x), "message", timestamp);
id1 = node.hash();
model.add(node);
}
// event_node 2
// Fill this node with MAX_HEIGHT + 10 events
let mut id2 = root_id;
for x in 0..(MAX_HEIGHT + 10) {
let timestamp = get_current_time() + 1;
let node = create_message(id2, &format!("chain 2 msg {}", x), "message", timestamp);
id2 = node.hash();
model.add(node);
}
// Fill id2 node with MAX_HEIGHT / 2
let mut id3 = id2;
for x in (MAX_HEIGHT + 10)..(MAX_HEIGHT * 2) {
let timestamp = get_current_time() + 1;
let node =
create_message(id3, &format!("chain 2 branch 1 msg {}", x), "message", timestamp);
id3 = node.hash();
model.add(node);
}
// Fill id2 node with 9 events
let mut id4 = id2;
for x in (MAX_HEIGHT + 10)..(MAX_HEIGHT * 2 + 30) {
let timestamp = get_current_time() + 1;
let node =
create_message(id4, &format!("chain 2 branch 2 msg {}", x), "message", timestamp);
id4 = node.hash();
model.add(node);
}
assert_eq!(model.find_height(&model.current_root, &id2).unwrap(), 0);
assert_eq!(model.find_height(&model.current_root, &id3).unwrap(), (MAX_HEIGHT - 10));
assert_eq!(model.find_height(&model.current_root, &id4).unwrap(), (MAX_HEIGHT + 20));
assert_eq!(model.current_root, id2);
}
#[test]
fn test_find_height() {
let events_queue = EventsQueue::new();
let mut model = Model::new(events_queue);
let root_id = model.current_root;
// event_node 1
// Fill this node with 8 events
let mut id1 = root_id;
for x in 0..8 {
let timestamp = get_current_time() + 1;
let node = create_message(id1, &format!("chain 1 msg {}", x), "message", timestamp);
id1 = node.hash();
model.add(node);
}
// event_node 2
// Fill this node with 14 events
let mut id2 = root_id;
for x in 0..14 {
let timestamp = get_current_time() + 1;
let node = create_message(id2, &format!("chain 2 msg {}", x), "message", timestamp);
id2 = node.hash();
model.add(node);
}
assert_eq!(model.find_height(&model.current_root, &id1).unwrap(), 8);
assert_eq!(model.find_height(&model.current_root, &id2).unwrap(), 14);
}
#[test]
fn test_prune_chains() {
let events_queue = EventsQueue::new();
let mut model = Model::new(events_queue);
let root_id = model.current_root;
// event_node 1
// Fill this node with 3 events
let mut event_node_1_ids = vec![];
let mut id1 = root_id;
for x in 0..3 {
let timestamp = get_current_time() + 1;
let node = create_message(id1, &format!("chain 1 msg {}", x), "message", timestamp);
id1 = node.hash();
model.add(node);
event_node_1_ids.push(id1);
}
// event_node 2
// Start from the root_id and fill the node with 14 events
// All the events from event_node_1 should get removed from the tree
let mut id2 = root_id;
for x in 0..(MAX_DEPTH + 10) {
let timestamp = get_current_time() + 1;
let node = create_message(id2, &format!("chain 2 msg {}", x), "message", timestamp);
id2 = node.hash();
model.add(node);
}
assert_eq!(model.find_head(), id2);
for id in event_node_1_ids {
assert!(!model.event_map.contains_key(&id));
}
assert_eq!(model.event_map.len(), (MAX_DEPTH + 11) as usize);
}
#[test]
fn test_diff_depth() {
let events_queue = EventsQueue::new();
let mut model = Model::new(events_queue);
let root_id = model.current_root;
// event_node 1
// Fill this node with (MAX_DEPTH / 2) events
let mut id1 = root_id;
for x in 0..(MAX_DEPTH / 2) {
let timestamp = get_current_time() + 1;
let node = create_message(id1, &format!("chain 1 msg {}", x), "message", timestamp);
id1 = node.hash();
model.add(node);
}
// event_node 2
// Start from the root_id and fill the node with (MAX_DEPTH + 10) events
// all the events must be added since the depth between id1
// and the last head is less than MAX_DEPTH
let mut id2 = root_id;
for x in 0..(MAX_DEPTH + 10) {
let timestamp = get_current_time() + 1;
let node = create_message(id2, &format!("chain 2 msg {}", x), "message", timestamp);
id2 = node.hash();
model.add(node);
}
assert_eq!(model.find_head(), id2);
// event_node 3
// This will start as new chain, but no events will be added
// since the last event's depth is MAX_DEPTH + 10
let mut id3 = root_id;
for x in 0..30 {
let timestamp = get_current_time() + 1;
let node = create_message(id3, &format!("chain 3 msg {}", x), "message", timestamp);
id3 = node.hash();
model.add(node);
// ensure events are not added
assert!(!model.event_map.contains_key(&id3));
}
assert_eq!(model.find_head(), id2);
// Add more events to the event_node 1
// At the end this chain must overtake the event_node 2
for x in (MAX_DEPTH / 2)..(MAX_DEPTH + 15) {
let timestamp = get_current_time() + 1;
let node = create_message(id1, &format!("chain 1 msg {}", x), "message", timestamp);
id1 = node.hash();
model.add(node);
}
assert_eq!(model.find_head(), id1);
}
*/
#[test]
fn test_event_hash() {
let events_queue = EventsQueue::new();
let model = Model::new(events_queue);
let root_id = model.current_root;
let timestamp = get_current_time() + 1;
let event = create_message(root_id, "msg", "message", timestamp);
let mut event2 = event.clone();
let event_hash = event.hash();
event2.read_confirms += 3;
let event2_hash = event2.hash();
assert_eq!(event2_hash, event_hash);
assert_ne!(event2.read_confirms, event.read_confirms);
}
}

View File

@@ -0,0 +1,394 @@
/* This file is part of DarkFi (https://dark.fi)
*
* Copyright (C) 2020-2023 Dyne.org foundation
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
use std::collections::{HashMap, VecDeque};
use async_std::sync::{Arc, Mutex};
use async_trait::async_trait;
use darkfi_serial::{SerialDecodable, SerialEncodable};
use log::debug;
use rand::{rngs::OsRng, RngCore};
use super::get_current_time;
use crate::{
event_graph::model::{Event, EventId, ModelPtr},
net,
util::async_util::sleep,
Result,
};
const UNREAD_EVENT_EXPIRE_TIME: u64 = 3600; // in seconds
const SIZE_OF_SEEN_BUFFER: usize = 65536;
const MAX_CONFIRM: u8 = 3;
#[derive(Clone)]
struct RingBuffer<T> {
pub items: VecDeque<T>,
}
impl<T: Eq + PartialEq + Clone> RingBuffer<T> {
pub fn new(capacity: usize) -> Self {
let items = VecDeque::with_capacity(capacity);
Self { items }
}
pub fn push(&mut self, val: T) {
if self.items.len() == self.items.capacity() {
self.items.pop_front();
}
self.items.push_back(val);
}
pub fn contains(&self, val: &T) -> bool {
self.items.contains(val)
}
}
type InvId = u64;
#[derive(SerialEncodable, SerialDecodable, Clone, Debug, PartialEq, Eq, Hash)]
struct InvItem {
id: InvId,
hash: EventId,
}
#[derive(SerialDecodable, SerialEncodable, Clone, Debug)]
struct Inv {
invs: Vec<InvItem>,
}
#[derive(SerialDecodable, SerialEncodable, Clone, Debug)]
struct SyncEvent {
leaves: Vec<EventId>,
}
#[derive(SerialDecodable, SerialEncodable, Clone, Debug)]
struct GetData {
events: Vec<EventId>,
}
pub type SeenPtr<T> = Arc<Seen<T>>;
pub struct Seen<T> {
seen: Mutex<RingBuffer<T>>,
}
impl<T: Eq + PartialEq + Clone> Seen<T> {
pub fn new() -> SeenPtr<T> {
Arc::new(Self { seen: Mutex::new(RingBuffer::new(SIZE_OF_SEEN_BUFFER)) })
}
pub async fn push(&self, item: &T) -> bool {
let seen = &mut self.seen.lock().await;
if !seen.contains(item) {
seen.push(item.clone());
return true
}
false
}
}
pub type UnreadEventsPtr = Arc<Mutex<UnreadEvents>>;
#[derive(Debug)]
pub struct UnreadEvents {
pub events: HashMap<EventId, Event>,
}
impl UnreadEvents {
pub fn new() -> UnreadEventsPtr {
Arc::new(Mutex::new(Self { events: HashMap::new() }))
}
fn contains(&self, key: &EventId) -> bool {
self.events.contains_key(key)
}
fn _get(&self, key: &EventId) -> Option<Event> {
self.events.get(key).cloned()
}
// Increase the read_confirms for an event, if it has exceeded the MAX_CONFIRM
// then remove it from the hash_map and return Some(event), otherwise return None
fn inc_read_confirms(&mut self, key: &EventId) -> Option<Event> {
let mut result = None;
if let Some(event) = self.events.get_mut(key) {
event.read_confirms += 1;
if event.read_confirms >= MAX_CONFIRM {
result = Some(event.clone())
}
}
if result.is_some() {
self.events.remove(key);
}
result
}
pub fn insert(&mut self, event: &Event) {
// prune expired events
let mut prune_ids = vec![];
for (id, e) in self.events.iter() {
if e.timestamp + (UNREAD_EVENT_EXPIRE_TIME * 1000) < get_current_time() {
prune_ids.push(*id);
}
}
for id in prune_ids {
self.events.remove(&id);
}
self.events.insert(event.hash(), event.clone());
}
}
pub struct ProtocolEvent {
jobsman: net::ProtocolJobsManagerPtr,
event_sub: net::MessageSubscription<Event>,
inv_sub: net::MessageSubscription<Inv>,
getdata_sub: net::MessageSubscription<GetData>,
syncevent_sub: net::MessageSubscription<SyncEvent>,
p2p: net::P2pPtr,
channel: net::ChannelPtr,
model: ModelPtr,
seen_event: SeenPtr<EventId>,
seen_inv: SeenPtr<InvId>,
unread_events: UnreadEventsPtr,
}
impl ProtocolEvent {
pub async fn init(
channel: net::ChannelPtr,
p2p: net::P2pPtr,
model: ModelPtr,
seen_event: SeenPtr<EventId>,
seen_inv: SeenPtr<InvId>,
unread_events: UnreadEventsPtr,
) -> net::ProtocolBasePtr {
let message_subsytem = channel.get_message_subsystem();
message_subsytem.add_dispatch::<Event>().await;
message_subsytem.add_dispatch::<Inv>().await;
message_subsytem.add_dispatch::<GetData>().await;
message_subsytem.add_dispatch::<SyncEvent>().await;
let event_sub =
channel.clone().subscribe_msg::<Event>().await.expect("Missing Event dispatcher!");
let inv_sub = channel.subscribe_msg::<Inv>().await.expect("Missing Inv dispatcher!");
let getdata_sub =
channel.clone().subscribe_msg::<GetData>().await.expect("Missing GetData dispatcher!");
let syncevent_sub = channel
.clone()
.subscribe_msg::<SyncEvent>()
.await
.expect("Missing SyncEvent dispatcher!");
Arc::new(Self {
jobsman: net::ProtocolJobsManager::new("ProtocolEvent", channel.clone()),
event_sub,
inv_sub,
getdata_sub,
syncevent_sub,
p2p,
channel,
model,
seen_event,
seen_inv,
unread_events,
})
}
async fn handle_receive_event(self: Arc<Self>) -> Result<()> {
debug!(target: "ircd", "ProtocolEvent::handle_receive_event() [START]");
// let exclude_list = vec![self.channel.address()];
loop {
let event = self.event_sub.receive().await?;
let mut event = (*event).to_owned();
// This could be better
if !self.seen_event.push(&event.hash()).await {
continue
}
event.read_confirms += 1;
if event.read_confirms >= MAX_CONFIRM {
self.new_event(&event).await?;
} else {
self.unread_events.lock().await.insert(&event);
}
self.send_inv(&event).await?;
// Broadcast the msg
// self.p2p.broadcast_with_exclude(event, &exclude_list).await?;
}
}
async fn handle_receive_inv(self: Arc<Self>) -> Result<()> {
debug!(target: "ircd", "ProtocolEvent::handle_receive_inv() [START]");
let exclude_list = vec![self.channel.address()];
loop {
let inv = self.inv_sub.receive().await?;
let inv = (*inv).to_owned();
let inv_item = inv.invs[0].clone();
// for inv in inv.invs.iter() {
if !self.seen_inv.push(&inv_item.id).await {
continue
}
{
let mut unread_events = self.unread_events.lock().await;
if !unread_events.contains(&inv_item.hash) &&
self.model.lock().await.get_event(&inv_item.hash).is_none()
{
self.send_getdata(vec![inv_item.hash]).await?;
} else if let Some(event) = unread_events.inc_read_confirms(&inv_item.hash) {
self.new_event(&event).await?;
}
}
// }
// Broadcast the inv msg
self.p2p.broadcast_with_exclude(inv, &exclude_list).await?;
}
}
async fn handle_receive_getdata(self: Arc<Self>) -> Result<()> {
debug!(target: "ircd", "ProtocolEvent::handle_receive_getdata() [START]");
loop {
let getdata = self.getdata_sub.receive().await?;
let events = (*getdata).to_owned().events;
for event_id in events {
// let unread_event = self.unread_events.lock().await.get(&event_id);
// if let Some(event) = unread_event {
// self.channel.send(event).await?;
// continue
// }
let model_event = self.model.lock().await.get_event(&event_id);
if let Some(event) = model_event {
self.channel.send(event).await?;
}
}
}
}
async fn handle_receive_syncevent(self: Arc<Self>) -> Result<()> {
debug!(target: "ircd", "ProtocolEvent::handle_receive_syncevent() [START]");
loop {
let syncevent = self.syncevent_sub.receive().await?;
let model = self.model.lock().await;
let leaves = model.find_leaves();
if leaves == syncevent.leaves {
continue
}
for leaf in syncevent.leaves.iter() {
if leaves.contains(leaf) {
continue
}
let children = model.get_offspring(leaf);
for child in children {
self.channel.send(child).await?;
}
}
}
}
// every 6 seconds send a SyncEvent msg
async fn send_sync_hash_loop(self: Arc<Self>) -> Result<()> {
loop {
sleep(6).await;
let leaves = self.model.lock().await.find_leaves();
self.channel.send(SyncEvent { leaves }).await?;
}
}
async fn new_event(&self, event: &Event) -> Result<()> {
let mut model = self.model.lock().await;
model.add(event.clone()).await;
Ok(())
}
async fn send_inv(&self, event: &Event) -> Result<()> {
let id = OsRng.next_u64();
self.p2p.broadcast(Inv { invs: vec![InvItem { id, hash: event.hash() }] }).await?;
Ok(())
}
async fn send_getdata(&self, events: Vec<EventId>) -> Result<()> {
self.channel.send(GetData { events }).await?;
Ok(())
}
}
#[async_trait]
impl net::ProtocolBase for ProtocolEvent {
async fn start(self: Arc<Self>, executor: Arc<smol::Executor<'_>>) -> Result<()> {
debug!(target: "ircd", "ProtocolEvent::start() [START]");
self.jobsman.clone().start(executor.clone());
self.jobsman.clone().spawn(self.clone().handle_receive_event(), executor.clone()).await;
self.jobsman.clone().spawn(self.clone().handle_receive_inv(), executor.clone()).await;
self.jobsman.clone().spawn(self.clone().handle_receive_getdata(), executor.clone()).await;
self.jobsman.clone().spawn(self.clone().handle_receive_syncevent(), executor.clone()).await;
self.jobsman.clone().spawn(self.clone().send_sync_hash_loop(), executor.clone()).await;
debug!(target: "ircd", "ProtocolEvent::start() [END]");
Ok(())
}
fn name(&self) -> &'static str {
"ProtocolEvent"
}
}
impl net::Message for Event {
fn name() -> &'static str {
"event"
}
}
impl net::Message for Inv {
fn name() -> &'static str {
"inv"
}
}
impl net::Message for SyncEvent {
fn name() -> &'static str {
"syncevent"
}
}
impl net::Message for GetData {
fn name() -> &'static str {
"getdata"
}
}

48
src/event_graph/view.rs Normal file
View File

@@ -0,0 +1,48 @@
/* This file is part of DarkFi (https://dark.fi)
*
* Copyright (C) 2020-2023 Dyne.org foundation
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
use async_std::sync::{Arc, Mutex};
use std::collections::HashMap;
use crate::{
event_graph::{
events_queue::EventsQueuePtr,
model::{Event, EventId},
},
Result,
};
pub type ViewPtr = Arc<Mutex<View>>;
pub struct View {
pub seen: HashMap<EventId, Event>,
pub events_queue: EventsQueuePtr,
}
impl View {
pub fn new(events_queue: EventsQueuePtr) -> Self {
Self { seen: HashMap::new(), events_queue }
}
pub async fn process(&mut self) -> Result<Event> {
// loop {
let new_event = self.events_queue.fetch().await?;
Ok(new_event)
// }
}
}

View File

@@ -30,6 +30,9 @@ pub mod consensus;
#[cfg(feature = "dht")]
pub mod dht;
#[cfg(feature = "event-graph")]
pub mod event_graph;
#[cfg(feature = "net")]
pub mod net;