From 382604342e5b349c246ccaad928f2cf47abc12a9 Mon Sep 17 00:00:00 2001 From: parazyd Date: Thu, 7 Sep 2023 19:25:08 +0200 Subject: [PATCH] event-graph: Sprinkle some logs around the code. --- src/event_graph2/mod.rs | 34 +++++++++++++------- src/event_graph2/proto.rs | 67 +++++++++++++++++++++++++++++---------- src/event_graph2/tests.rs | 15 ++++----- 3 files changed, 80 insertions(+), 36 deletions(-) diff --git a/src/event_graph2/mod.rs b/src/event_graph2/mod.rs index 73fedf77a..954e62905 100644 --- a/src/event_graph2/mod.rs +++ b/src/event_graph2/mod.rs @@ -25,7 +25,7 @@ use std::{ use async_recursion::async_recursion; use darkfi_serial::{deserialize_async, serialize_async}; -use log::debug; +use log::{debug, info}; use num_bigint::BigUint; use smol::{ lock::{Mutex, RwLock}, @@ -55,12 +55,13 @@ mod tests; /// Initial genesis timestamp (07 Sep 2023, 00:00:00 UTC) /// Must always be UTC midnight. const INITIAL_GENESIS: u64 = 1694044800; +/// Genesis event contents +const GENESIS_CONTENTS: &[u8] = &[0x47, 0x45, 0x4e, 0x45, 0x53, 0x49, 0x53]; + /// The number of parents an event is supposed to have. const N_EVENT_PARENTS: usize = 5; /// Allowed timestamp drift in seconds const EVENT_TIME_DRIFT: u64 = 60; -// Allowed orphan age limit in seconds -//const ORPHAN_AGE_LIMIT: u64 = 60 * 5; /// Null event ID const NULL_ID: blake3::Hash = blake3::Hash::from_bytes([0x00; blake3::OUT_LEN]); @@ -113,8 +114,11 @@ impl EventGraph { // Check if we have it in our DAG. // If not, we can prune the DAG and insert this new genesis event. if !dag.contains_key(current_genesis.id().as_bytes())? { + info!( + target: "event_graph::new()", + "[EVENTGRAPH] DAG does not contain current genesis, pruning existing data", + ); dag.clear()?; - sled_db.flush_async().await?; self_.dag_insert(¤t_genesis).await?; } @@ -153,11 +157,7 @@ impl EventGraph { // Calculate the timestamp of the most recent event let timestamp = INITIAL_GENESIS + (rotations_since_genesis * days_rotation * DAY as u64); - Event { - timestamp, - content: vec![0x47, 0x45, 0x4e, 0x45, 0x53, 0x49, 0x53], - parents: [NULL_ID; N_EVENT_PARENTS], - } + Event { timestamp, content: GENESIS_CONTENTS.to_vec(), parents: [NULL_ID; N_EVENT_PARENTS] } } /// Sync the DAG from connected peers @@ -185,6 +185,8 @@ impl EventGraph { // parameter. By pruning, we should deterministically replace the // genesis event (can use a deterministic timestamp) and drop everything // in the DAG, leaving just the new genesis event. + debug!(target: "event_graph::dag_prune()", "Spawned background DAG pruning task"); + loop { // Find the next rotation timestamp: let next_rotation = next_rotation_timestamp(INITIAL_GENESIS, days_rotation); @@ -192,7 +194,7 @@ impl EventGraph { // Prepare the new genesis event let current_genesis = Event { timestamp: next_rotation, - content: vec![0x47, 0x45, 0x4e, 0x45, 0x53, 0x49, 0x53], + content: GENESIS_CONTENTS.to_vec(), parents: [NULL_ID; N_EVENT_PARENTS], }; @@ -204,6 +206,7 @@ impl EventGraph { self.dag.clear()?; self.dag_insert(¤t_genesis).await?; + debug!(target: "event_graph::dag_prune()", "DAG pruned successfully"); } } @@ -216,10 +219,13 @@ impl EventGraph { /// some sensible time has passed after broadcasting the event. pub async fn dag_insert(&self, event: &Event) -> Result { let event_id = event.id(); + debug!(target: "event_graph::dag_insert()", "Inserting event {} into the DAG", event_id); let s_event = serialize_async(event).await; + // Update the unreferenced DAG tips set let mut unreferenced_tips = self.unreferenced_tips.write().await; let mut bcast_ids = self.broadcasted_ids.write().await; + for parent_id in event.parents.iter() { if parent_id != &NULL_ID { unreferenced_tips.remove(parent_id); @@ -227,11 +233,15 @@ impl EventGraph { } } unreferenced_tips.insert(event_id); - drop(unreferenced_tips); - drop(bcast_ids); self.dag.insert(event_id.as_bytes(), s_event).unwrap(); + // We hold the write locks until this point because we insert the event + // into the database above, so we don't want anything to read these until + // that insertion is complete. + drop(unreferenced_tips); + drop(bcast_ids); + Ok(event_id) } diff --git a/src/event_graph2/proto.rs b/src/event_graph2/proto.rs index 71241f9ea..b85227cf6 100644 --- a/src/event_graph2/proto.rs +++ b/src/event_graph2/proto.rs @@ -26,7 +26,7 @@ use std::{ }; use darkfi_serial::{async_trait, deserialize_async, SerialDecodable, SerialEncodable}; -use log::{debug, error}; +use log::{debug, error, trace, warn}; use smol::Executor; use super::{Event, EventGraphPtr, NULL_ID}; @@ -128,12 +128,19 @@ impl ProtocolEventGraph { })) } + /// Protocol function handling `EventPut`. + /// This is triggered whenever someone broadcasts (or relays) a new + /// event on the network. async fn handle_event_put(self: Arc) -> Result<()> { loop { let event = match self.ev_put_sub.receive().await { Ok(v) => v.0.clone(), Err(_) => continue, }; + trace!( + target: "event_graph::protocol::handle_event_put()", + "Got EventPut: {} [{}]", event.id(), self.channel.address(), + ); // We received an event. Check if we already have it in our DAG. // Also check if we have the event's parents. In the case we do @@ -148,7 +155,7 @@ impl ProtocolEventGraph { let malicious_count = self.malicious_count.fetch_add(1, SeqCst); if malicious_count + 1 == MALICIOUS_THRESHOLD { error!( - target: "event_graph::handle_event_put()", + target: "event_graph::protocol::handle_event_put()", "[EVENTGRAPH] Peer {} reached malicious threshold. Dropping connection.", self.channel.address(), ); @@ -156,24 +163,30 @@ impl ProtocolEventGraph { return Err(Error::ChannelStopped) } + warn!( + target: "event_graph::protocol::handle_event_put()", + "[EVENTGRAPH] Peer {} sent us a malicious event", self.channel.address(), + ); continue } // If we have already seen the event, we'll stay quiet. let event_id = event.id(); if self.event_graph.dag.contains_key(event_id.as_bytes()).unwrap() { - debug!(target: "event_graph::handle_event_put()", "Got known event"); + debug!( + target: "event_graph::protocol::handle_event_put()", + "Event {} is already known", event_id, + ); continue } // At this point, this is a new event to us. Let's see if we // have all of its parents. - /* - info!( - target: "event_graph::handle_event_put()", - "[EVENTGRAPH] Got new event" + debug!( + target: "event_graph::protocol::handle_event_put()", + "Event {} is new", event_id, ); - */ + let mut missing_parents = HashSet::new(); for parent_id in event.parents.iter() { // `event.validate()` should have already made sure that @@ -191,21 +204,21 @@ impl ProtocolEventGraph { // fetch them from this peer. if !missing_parents.is_empty() { debug!( - target: "event_graph::handle_event_put()", + target: "event_graph::protocol::handle_event_put()", "Event has {} missing parents. Requesting...", missing_parents.len(), ); let mut received_events = HashMap::new(); for parent_id in missing_parents.iter() { debug!( - target: "event_graph::handle_event_put()", - "Requesting {}", parent_id, + target: "event_graph::protocol::handle_event_put()", + "Requesting {}...", parent_id, ); self.channel.send(&EventReq(*parent_id)).await?; let parent = match timeout(REPLY_TIMEOUT, self.ev_rep_sub.receive()).await { Ok(parent) => parent?, Err(_) => { error!( - target: "event_graph::handle_event_put()", + target: "event_graph::protocol::handle_event_put()", "[EVENTGRAPH] Timeout while waiting for parent {} from {}", parent_id, self.channel.address(), ); @@ -217,7 +230,7 @@ impl ProtocolEventGraph { if &parent.id() != parent_id { error!( - target: "event_graph::handle_event_put()", + target: "event_graph::protocol::handle_event_put()", "[EVENTGRAPH] Peer {} replied with a wrong event: {}", self.channel.address(), parent.id(), ); @@ -226,7 +239,7 @@ impl ProtocolEventGraph { } debug!( - target: "event_graph::handle_event_put()", + target: "event_graph::protocol::handle_event_put()", "Got correct parent event {}", parent.id(), ); @@ -242,9 +255,13 @@ impl ProtocolEventGraph { // If we're here, we have all the parents, and we can now // add the actual event to the DAG. + debug!( + target: "event_graph::protocol::handle_event_put()", + "Got all parents necessary for insertion", + ); self.event_graph.dag_insert(&event).await.unwrap(); - // Relay the event to other peers + // Relay the event to other peers. self.event_graph .p2p .broadcast_with_exclude(&EventPut(event), &[self.channel.address().clone()]) @@ -252,12 +269,18 @@ impl ProtocolEventGraph { } } + /// Protocol function handling `EventReq`. + /// This is triggered whenever someone requests an event from us. async fn handle_event_req(self: Arc) -> Result<()> { loop { let event_id = match self.ev_req_sub.receive().await { Ok(v) => v.0, Err(_) => continue, }; + trace!( + target: "event_graph::protocol::handle_event_req()", + "Got EventReq: {} [{}]", event_id, self.channel.address(), + ); // We received an event request from somebody. // If we do have it, we will send it back to them as `EventRep`. @@ -274,7 +297,7 @@ impl ProtocolEventGraph { let malicious_count = self.malicious_count.fetch_add(1, SeqCst); if malicious_count + 1 == MALICIOUS_THRESHOLD { error!( - target: "event_graph::handle_event_req()", + target: "event_graph::protocol::handle_event_req()", "[EVENTGRAPH] Peer {} reached malicious threshold. Dropping connection.", self.channel.address(), ); @@ -282,6 +305,11 @@ impl ProtocolEventGraph { return Err(Error::ChannelStopped) } + warn!( + target: "event_graph::protocol::handle_event_req()", + "[EVENTGRAPH] Peer {} requested an unexpected event {}", + self.channel.address(), event_id, + ); continue } @@ -309,9 +337,16 @@ impl ProtocolEventGraph { } } + /// Protocol function handling `TipReq`. + /// This is triggered when someone requests the current unreferenced + /// tips of our DAG. async fn handle_tip_req(self: Arc) -> Result<()> { loop { self.tip_req_sub.receive().await?; + trace!( + target: "event_graph::protocol::handle_tip_req()", + "Got TipReq [{}]", self.channel.address(), + ); // TODO: Rate limit diff --git a/src/event_graph2/tests.rs b/src/event_graph2/tests.rs index 5a920b1f4..14a74b28b 100644 --- a/src/event_graph2/tests.rs +++ b/src/event_graph2/tests.rs @@ -32,12 +32,11 @@ use crate::{ system::sleep, }; -/// Number of nodes to spawn -const N_NODES: usize = 50; -//const N_NODES: usize = 2; -/// Number of peers each node connects to -const N_CONNS: usize = N_NODES / 3; -//const N_CONNS: usize = 1; +// Number of nodes to spawn and number of peers each node connects to +const N_NODES: usize = 5; +const N_CONNS: usize = 2; +//const N_NODES: usize = 50; +//const N_CONNS: usize = N_NODES / 3; #[test] #[ignore] @@ -53,8 +52,8 @@ fn eventgraph_propagation() { cfg.add_filter_ignore("net::channel::send()".to_string()); simplelog::TermLogger::init( - simplelog::LevelFilter::Info, - //simplelog::LevelFilter::Debug, + //simplelog::LevelFilter::Info, + simplelog::LevelFilter::Debug, //simplelog::LevelFilter::Trace, cfg.build(), simplelog::TerminalMode::Mixed,