From 3561205b2dee18923d8a9f77eefad234c6992d03 Mon Sep 17 00:00:00 2001 From: parazyd Date: Sat, 16 Sep 2023 17:26:50 +0200 Subject: [PATCH] event-graph: Implement an event subscriber that notifies of new events. --- src/event_graph2/event.rs | 5 +++++ src/event_graph2/mod.rs | 31 +++++++++++++++++++++++-------- src/event_graph2/proto.rs | 6 ++++-- 3 files changed, 32 insertions(+), 10 deletions(-) diff --git a/src/event_graph2/event.rs b/src/event_graph2/event.rs index 39c6a8aa7..c853a05cf 100644 --- a/src/event_graph2/event.rs +++ b/src/event_graph2/event.rs @@ -56,6 +56,11 @@ impl Event { hasher.finalize() } + /// Return a reference to the event's content + pub fn content(&self) -> &[u8] { + &self.content + } + /* /// Check if an [`Event`] is considered too old. fn is_too_old(&self) -> bool { diff --git a/src/event_graph2/mod.rs b/src/event_graph2/mod.rs index 725e9e2bc..713f94d94 100644 --- a/src/event_graph2/mod.rs +++ b/src/event_graph2/mod.rs @@ -19,7 +19,10 @@ use std::{ cmp::Ordering, collections::{HashMap, HashSet, VecDeque}, - sync::Arc, + sync::{ + atomic::{AtomicBool, Ordering::SeqCst}, + Arc, + }, time::UNIX_EPOCH, }; @@ -34,7 +37,7 @@ use smol::{ use crate::{ net::P2pPtr, - system::{sleep, timeout::timeout, StoppableTask, StoppableTaskPtr}, + system::{sleep, timeout::timeout, StoppableTask, StoppableTaskPtr, Subscriber, SubscriberPtr}, Error, Result, }; @@ -64,7 +67,7 @@ const N_EVENT_PARENTS: usize = 5; /// Allowed timestamp drift in seconds const EVENT_TIME_DRIFT: u64 = 60; /// Null event ID -const NULL_ID: blake3::Hash = blake3::Hash::from_bytes([0x00; blake3::OUT_LEN]); +pub const NULL_ID: blake3::Hash = blake3::Hash::from_bytes([0x00; blake3::OUT_LEN]); /// Atomic pointer to an [`EventGraph`] instance. pub type EventGraphPtr = Arc; @@ -83,8 +86,13 @@ pub struct EventGraph { /// or not. Additionally it is also used when we broadcast the /// `TipRep` message telling peers about our unreferenced tips. broadcasted_ids: RwLock>, + /// Marker telling us if we consider the DAG synced + dag_synced: AtomicBool, /// DAG Pruning Task prune_task: Mutex>, + /// Event subscriber, this notifies whenever an event is + /// inserted into the DAG + pub event_sub: SubscriberPtr, } impl EventGraph { @@ -100,13 +108,16 @@ impl EventGraph { let dag = sled_db.open_tree(dag_tree_name)?; let unreferenced_tips = RwLock::new(HashSet::new()); let broadcasted_ids = RwLock::new(HashSet::new()); + let event_sub = Subscriber::new(); let self_ = Arc::new(Self { p2p, dag: dag.clone(), unreferenced_tips, broadcasted_ids, + dag_synced: AtomicBool::new(false), prune_task: Mutex::new(None), + event_sub, }); // Create the current genesis event based on the `days_rotation` @@ -120,7 +131,7 @@ impl EventGraph { "[EVENTGRAPH] DAG does not contain current genesis, pruning existing data", ); dag.clear()?; - self_.dag_insert(¤t_genesis).await?; + self_.dag_insert(current_genesis).await?; } // Find the unreferenced tips in the current DAG state. @@ -380,10 +391,11 @@ impl EventGraph { // We should add them to the DAG. // TODO: FIXME: Also validate these events. for event in received_events.iter().rev() { - self.dag_insert(event).await.unwrap(); + self.dag_insert(event.clone()).await.unwrap(); } info!(target: "event_graph::dag_sync()", "[EVENTGRAPH] DAG synced successfully!"); + self.dag_synced.store(true, SeqCst); Ok(()) } @@ -414,7 +426,7 @@ impl EventGraph { *self.unreferenced_tips.write().await = HashSet::new(); self.dag.clear()?; - self.dag_insert(¤t_genesis).await?; + self.dag_insert(current_genesis).await?; debug!(target: "event_graph::dag_prune()", "DAG pruned successfully"); } } @@ -426,10 +438,10 @@ impl EventGraph { /// knows that any requests for them are actually legitimate. /// TODO: The `broadcasted_ids` set should periodically be pruned, when /// some sensible time has passed after broadcasting the event. - pub async fn dag_insert(&self, event: &Event) -> Result { + pub async fn dag_insert(&self, event: Event) -> Result { let event_id = event.id(); debug!(target: "event_graph::dag_insert()", "Inserting event {} into the DAG", event_id); - let s_event = serialize_async(event).await; + let s_event = serialize_async(&event).await; // Update the unreferenced DAG tips set let mut unreferenced_tips = self.unreferenced_tips.write().await; @@ -451,6 +463,9 @@ impl EventGraph { drop(unreferenced_tips); drop(bcast_ids); + // Notify about the event on the event subscriber + self.event_sub.notify(event).await; + Ok(event_id) } diff --git a/src/event_graph2/proto.rs b/src/event_graph2/proto.rs index 0a4cc94fd..3e18d76d0 100644 --- a/src/event_graph2/proto.rs +++ b/src/event_graph2/proto.rs @@ -16,6 +16,8 @@ * along with this program. If not, see . */ +// TODO: FIXME: Some of the protocols should block operations until DAG is synced. + use std::{ sync::{ atomic::{AtomicUsize, Ordering::SeqCst}, @@ -279,7 +281,7 @@ impl ProtocolEventGraph { // We should add them to the DAG. // TODO: FIXME: Also validate these events. for event in received_events.iter().rev() { - self.event_graph.dag_insert(event).await.unwrap(); + self.event_graph.dag_insert(event.clone()).await.unwrap(); } } // <-- !missing_parents.is_empty() @@ -289,7 +291,7 @@ impl ProtocolEventGraph { target: "event_graph::protocol::handle_event_put()", "Got all parents necessary for insertion", ); - self.event_graph.dag_insert(&event).await.unwrap(); + self.event_graph.dag_insert(event.clone()).await.unwrap(); // Relay the event to other peers. self.event_graph