event-graph: Sprinkle some logs around the code.

This commit is contained in:
parazyd
2023-09-07 19:25:08 +02:00
parent 0320a6ce02
commit 382604342e
3 changed files with 80 additions and 36 deletions

View File

@@ -25,7 +25,7 @@ use std::{
use async_recursion::async_recursion;
use darkfi_serial::{deserialize_async, serialize_async};
use log::debug;
use log::{debug, info};
use num_bigint::BigUint;
use smol::{
lock::{Mutex, RwLock},
@@ -55,12 +55,13 @@ mod tests;
/// Initial genesis timestamp (07 Sep 2023, 00:00:00 UTC)
/// Must always be UTC midnight.
const INITIAL_GENESIS: u64 = 1694044800;
/// Genesis event contents
const GENESIS_CONTENTS: &[u8] = &[0x47, 0x45, 0x4e, 0x45, 0x53, 0x49, 0x53];
/// The number of parents an event is supposed to have.
const N_EVENT_PARENTS: usize = 5;
/// Allowed timestamp drift in seconds
const EVENT_TIME_DRIFT: u64 = 60;
// Allowed orphan age limit in seconds
//const ORPHAN_AGE_LIMIT: u64 = 60 * 5;
/// Null event ID
const NULL_ID: blake3::Hash = blake3::Hash::from_bytes([0x00; blake3::OUT_LEN]);
@@ -113,8 +114,11 @@ impl EventGraph {
// Check if we have it in our DAG.
// If not, we can prune the DAG and insert this new genesis event.
if !dag.contains_key(current_genesis.id().as_bytes())? {
info!(
target: "event_graph::new()",
"[EVENTGRAPH] DAG does not contain current genesis, pruning existing data",
);
dag.clear()?;
sled_db.flush_async().await?;
self_.dag_insert(&current_genesis).await?;
}
@@ -153,11 +157,7 @@ impl EventGraph {
// Calculate the timestamp of the most recent event
let timestamp = INITIAL_GENESIS + (rotations_since_genesis * days_rotation * DAY as u64);
Event {
timestamp,
content: vec![0x47, 0x45, 0x4e, 0x45, 0x53, 0x49, 0x53],
parents: [NULL_ID; N_EVENT_PARENTS],
}
Event { timestamp, content: GENESIS_CONTENTS.to_vec(), parents: [NULL_ID; N_EVENT_PARENTS] }
}
/// Sync the DAG from connected peers
@@ -185,6 +185,8 @@ impl EventGraph {
// parameter. By pruning, we should deterministically replace the
// genesis event (can use a deterministic timestamp) and drop everything
// in the DAG, leaving just the new genesis event.
debug!(target: "event_graph::dag_prune()", "Spawned background DAG pruning task");
loop {
// Find the next rotation timestamp:
let next_rotation = next_rotation_timestamp(INITIAL_GENESIS, days_rotation);
@@ -192,7 +194,7 @@ impl EventGraph {
// Prepare the new genesis event
let current_genesis = Event {
timestamp: next_rotation,
content: vec![0x47, 0x45, 0x4e, 0x45, 0x53, 0x49, 0x53],
content: GENESIS_CONTENTS.to_vec(),
parents: [NULL_ID; N_EVENT_PARENTS],
};
@@ -204,6 +206,7 @@ impl EventGraph {
self.dag.clear()?;
self.dag_insert(&current_genesis).await?;
debug!(target: "event_graph::dag_prune()", "DAG pruned successfully");
}
}
@@ -216,10 +219,13 @@ impl EventGraph {
/// some sensible time has passed after broadcasting the event.
pub async fn dag_insert(&self, event: &Event) -> Result<blake3::Hash> {
let event_id = event.id();
debug!(target: "event_graph::dag_insert()", "Inserting event {} into the DAG", event_id);
let s_event = serialize_async(event).await;
// Update the unreferenced DAG tips set
let mut unreferenced_tips = self.unreferenced_tips.write().await;
let mut bcast_ids = self.broadcasted_ids.write().await;
for parent_id in event.parents.iter() {
if parent_id != &NULL_ID {
unreferenced_tips.remove(parent_id);
@@ -227,11 +233,15 @@ impl EventGraph {
}
}
unreferenced_tips.insert(event_id);
drop(unreferenced_tips);
drop(bcast_ids);
self.dag.insert(event_id.as_bytes(), s_event).unwrap();
// We hold the write locks until this point because we insert the event
// into the database above, so we don't want anything to read these until
// that insertion is complete.
drop(unreferenced_tips);
drop(bcast_ids);
Ok(event_id)
}

View File

@@ -26,7 +26,7 @@ use std::{
};
use darkfi_serial::{async_trait, deserialize_async, SerialDecodable, SerialEncodable};
use log::{debug, error};
use log::{debug, error, trace, warn};
use smol::Executor;
use super::{Event, EventGraphPtr, NULL_ID};
@@ -128,12 +128,19 @@ impl ProtocolEventGraph {
}))
}
/// Protocol function handling `EventPut`.
/// This is triggered whenever someone broadcasts (or relays) a new
/// event on the network.
async fn handle_event_put(self: Arc<Self>) -> Result<()> {
loop {
let event = match self.ev_put_sub.receive().await {
Ok(v) => v.0.clone(),
Err(_) => continue,
};
trace!(
target: "event_graph::protocol::handle_event_put()",
"Got EventPut: {} [{}]", event.id(), self.channel.address(),
);
// We received an event. Check if we already have it in our DAG.
// Also check if we have the event's parents. In the case we do
@@ -148,7 +155,7 @@ impl ProtocolEventGraph {
let malicious_count = self.malicious_count.fetch_add(1, SeqCst);
if malicious_count + 1 == MALICIOUS_THRESHOLD {
error!(
target: "event_graph::handle_event_put()",
target: "event_graph::protocol::handle_event_put()",
"[EVENTGRAPH] Peer {} reached malicious threshold. Dropping connection.",
self.channel.address(),
);
@@ -156,24 +163,30 @@ impl ProtocolEventGraph {
return Err(Error::ChannelStopped)
}
warn!(
target: "event_graph::protocol::handle_event_put()",
"[EVENTGRAPH] Peer {} sent us a malicious event", self.channel.address(),
);
continue
}
// If we have already seen the event, we'll stay quiet.
let event_id = event.id();
if self.event_graph.dag.contains_key(event_id.as_bytes()).unwrap() {
debug!(target: "event_graph::handle_event_put()", "Got known event");
debug!(
target: "event_graph::protocol::handle_event_put()",
"Event {} is already known", event_id,
);
continue
}
// At this point, this is a new event to us. Let's see if we
// have all of its parents.
/*
info!(
target: "event_graph::handle_event_put()",
"[EVENTGRAPH] Got new event"
debug!(
target: "event_graph::protocol::handle_event_put()",
"Event {} is new", event_id,
);
*/
let mut missing_parents = HashSet::new();
for parent_id in event.parents.iter() {
// `event.validate()` should have already made sure that
@@ -191,21 +204,21 @@ impl ProtocolEventGraph {
// fetch them from this peer.
if !missing_parents.is_empty() {
debug!(
target: "event_graph::handle_event_put()",
target: "event_graph::protocol::handle_event_put()",
"Event has {} missing parents. Requesting...", missing_parents.len(),
);
let mut received_events = HashMap::new();
for parent_id in missing_parents.iter() {
debug!(
target: "event_graph::handle_event_put()",
"Requesting {}", parent_id,
target: "event_graph::protocol::handle_event_put()",
"Requesting {}...", parent_id,
);
self.channel.send(&EventReq(*parent_id)).await?;
let parent = match timeout(REPLY_TIMEOUT, self.ev_rep_sub.receive()).await {
Ok(parent) => parent?,
Err(_) => {
error!(
target: "event_graph::handle_event_put()",
target: "event_graph::protocol::handle_event_put()",
"[EVENTGRAPH] Timeout while waiting for parent {} from {}",
parent_id, self.channel.address(),
);
@@ -217,7 +230,7 @@ impl ProtocolEventGraph {
if &parent.id() != parent_id {
error!(
target: "event_graph::handle_event_put()",
target: "event_graph::protocol::handle_event_put()",
"[EVENTGRAPH] Peer {} replied with a wrong event: {}",
self.channel.address(), parent.id(),
);
@@ -226,7 +239,7 @@ impl ProtocolEventGraph {
}
debug!(
target: "event_graph::handle_event_put()",
target: "event_graph::protocol::handle_event_put()",
"Got correct parent event {}", parent.id(),
);
@@ -242,9 +255,13 @@ impl ProtocolEventGraph {
// If we're here, we have all the parents, and we can now
// add the actual event to the DAG.
debug!(
target: "event_graph::protocol::handle_event_put()",
"Got all parents necessary for insertion",
);
self.event_graph.dag_insert(&event).await.unwrap();
// Relay the event to other peers
// Relay the event to other peers.
self.event_graph
.p2p
.broadcast_with_exclude(&EventPut(event), &[self.channel.address().clone()])
@@ -252,12 +269,18 @@ impl ProtocolEventGraph {
}
}
/// Protocol function handling `EventReq`.
/// This is triggered whenever someone requests an event from us.
async fn handle_event_req(self: Arc<Self>) -> Result<()> {
loop {
let event_id = match self.ev_req_sub.receive().await {
Ok(v) => v.0,
Err(_) => continue,
};
trace!(
target: "event_graph::protocol::handle_event_req()",
"Got EventReq: {} [{}]", event_id, self.channel.address(),
);
// We received an event request from somebody.
// If we do have it, we will send it back to them as `EventRep`.
@@ -274,7 +297,7 @@ impl ProtocolEventGraph {
let malicious_count = self.malicious_count.fetch_add(1, SeqCst);
if malicious_count + 1 == MALICIOUS_THRESHOLD {
error!(
target: "event_graph::handle_event_req()",
target: "event_graph::protocol::handle_event_req()",
"[EVENTGRAPH] Peer {} reached malicious threshold. Dropping connection.",
self.channel.address(),
);
@@ -282,6 +305,11 @@ impl ProtocolEventGraph {
return Err(Error::ChannelStopped)
}
warn!(
target: "event_graph::protocol::handle_event_req()",
"[EVENTGRAPH] Peer {} requested an unexpected event {}",
self.channel.address(), event_id,
);
continue
}
@@ -309,9 +337,16 @@ impl ProtocolEventGraph {
}
}
/// Protocol function handling `TipReq`.
/// This is triggered when someone requests the current unreferenced
/// tips of our DAG.
async fn handle_tip_req(self: Arc<Self>) -> Result<()> {
loop {
self.tip_req_sub.receive().await?;
trace!(
target: "event_graph::protocol::handle_tip_req()",
"Got TipReq [{}]", self.channel.address(),
);
// TODO: Rate limit

View File

@@ -32,12 +32,11 @@ use crate::{
system::sleep,
};
/// Number of nodes to spawn
const N_NODES: usize = 50;
//const N_NODES: usize = 2;
/// Number of peers each node connects to
const N_CONNS: usize = N_NODES / 3;
//const N_CONNS: usize = 1;
// Number of nodes to spawn and number of peers each node connects to
const N_NODES: usize = 5;
const N_CONNS: usize = 2;
//const N_NODES: usize = 50;
//const N_CONNS: usize = N_NODES / 3;
#[test]
#[ignore]
@@ -53,8 +52,8 @@ fn eventgraph_propagation() {
cfg.add_filter_ignore("net::channel::send()".to_string());
simplelog::TermLogger::init(
simplelog::LevelFilter::Info,
//simplelog::LevelFilter::Debug,
//simplelog::LevelFilter::Info,
simplelog::LevelFilter::Debug,
//simplelog::LevelFilter::Trace,
cfg.build(),
simplelog::TerminalMode::Mixed,