event-graph: Implement an event subscriber that notifies of new events.

This commit is contained in:
parazyd
2023-09-16 17:26:50 +02:00
parent 44482105c8
commit 3561205b2d
3 changed files with 32 additions and 10 deletions

View File

@@ -56,6 +56,11 @@ impl Event {
hasher.finalize()
}
/// Return a reference to the event's content
pub fn content(&self) -> &[u8] {
&self.content
}
/*
/// Check if an [`Event`] is considered too old.
fn is_too_old(&self) -> bool {

View File

@@ -19,7 +19,10 @@
use std::{
cmp::Ordering,
collections::{HashMap, HashSet, VecDeque},
sync::Arc,
sync::{
atomic::{AtomicBool, Ordering::SeqCst},
Arc,
},
time::UNIX_EPOCH,
};
@@ -34,7 +37,7 @@ use smol::{
use crate::{
net::P2pPtr,
system::{sleep, timeout::timeout, StoppableTask, StoppableTaskPtr},
system::{sleep, timeout::timeout, StoppableTask, StoppableTaskPtr, Subscriber, SubscriberPtr},
Error, Result,
};
@@ -64,7 +67,7 @@ const N_EVENT_PARENTS: usize = 5;
/// Allowed timestamp drift in seconds
const EVENT_TIME_DRIFT: u64 = 60;
/// Null event ID
const NULL_ID: blake3::Hash = blake3::Hash::from_bytes([0x00; blake3::OUT_LEN]);
pub const NULL_ID: blake3::Hash = blake3::Hash::from_bytes([0x00; blake3::OUT_LEN]);
/// Atomic pointer to an [`EventGraph`] instance.
pub type EventGraphPtr = Arc<EventGraph>;
@@ -83,8 +86,13 @@ pub struct EventGraph {
/// or not. Additionally it is also used when we broadcast the
/// `TipRep` message telling peers about our unreferenced tips.
broadcasted_ids: RwLock<HashSet<blake3::Hash>>,
/// Marker telling us if we consider the DAG synced
dag_synced: AtomicBool,
/// DAG Pruning Task
prune_task: Mutex<Option<StoppableTaskPtr>>,
/// Event subscriber, this notifies whenever an event is
/// inserted into the DAG
pub event_sub: SubscriberPtr<Event>,
}
impl EventGraph {
@@ -100,13 +108,16 @@ impl EventGraph {
let dag = sled_db.open_tree(dag_tree_name)?;
let unreferenced_tips = RwLock::new(HashSet::new());
let broadcasted_ids = RwLock::new(HashSet::new());
let event_sub = Subscriber::new();
let self_ = Arc::new(Self {
p2p,
dag: dag.clone(),
unreferenced_tips,
broadcasted_ids,
dag_synced: AtomicBool::new(false),
prune_task: Mutex::new(None),
event_sub,
});
// Create the current genesis event based on the `days_rotation`
@@ -120,7 +131,7 @@ impl EventGraph {
"[EVENTGRAPH] DAG does not contain current genesis, pruning existing data",
);
dag.clear()?;
self_.dag_insert(&current_genesis).await?;
self_.dag_insert(current_genesis).await?;
}
// Find the unreferenced tips in the current DAG state.
@@ -380,10 +391,11 @@ impl EventGraph {
// We should add them to the DAG.
// TODO: FIXME: Also validate these events.
for event in received_events.iter().rev() {
self.dag_insert(event).await.unwrap();
self.dag_insert(event.clone()).await.unwrap();
}
info!(target: "event_graph::dag_sync()", "[EVENTGRAPH] DAG synced successfully!");
self.dag_synced.store(true, SeqCst);
Ok(())
}
@@ -414,7 +426,7 @@ impl EventGraph {
*self.unreferenced_tips.write().await = HashSet::new();
self.dag.clear()?;
self.dag_insert(&current_genesis).await?;
self.dag_insert(current_genesis).await?;
debug!(target: "event_graph::dag_prune()", "DAG pruned successfully");
}
}
@@ -426,10 +438,10 @@ impl EventGraph {
/// knows that any requests for them are actually legitimate.
/// TODO: The `broadcasted_ids` set should periodically be pruned, when
/// some sensible time has passed after broadcasting the event.
pub async fn dag_insert(&self, event: &Event) -> Result<blake3::Hash> {
pub async fn dag_insert(&self, event: Event) -> Result<blake3::Hash> {
let event_id = event.id();
debug!(target: "event_graph::dag_insert()", "Inserting event {} into the DAG", event_id);
let s_event = serialize_async(event).await;
let s_event = serialize_async(&event).await;
// Update the unreferenced DAG tips set
let mut unreferenced_tips = self.unreferenced_tips.write().await;
@@ -451,6 +463,9 @@ impl EventGraph {
drop(unreferenced_tips);
drop(bcast_ids);
// Notify about the event on the event subscriber
self.event_sub.notify(event).await;
Ok(event_id)
}

View File

@@ -16,6 +16,8 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
// TODO: FIXME: Some of the protocols should block operations until DAG is synced.
use std::{
sync::{
atomic::{AtomicUsize, Ordering::SeqCst},
@@ -279,7 +281,7 @@ impl ProtocolEventGraph {
// We should add them to the DAG.
// TODO: FIXME: Also validate these events.
for event in received_events.iter().rev() {
self.event_graph.dag_insert(event).await.unwrap();
self.event_graph.dag_insert(event.clone()).await.unwrap();
}
} // <-- !missing_parents.is_empty()
@@ -289,7 +291,7 @@ impl ProtocolEventGraph {
target: "event_graph::protocol::handle_event_put()",
"Got all parents necessary for insertion",
);
self.event_graph.dag_insert(&event).await.unwrap();
self.event_graph.dag_insert(event.clone()).await.unwrap();
// Relay the event to other peers.
self.event_graph