From ef7e4411c86536e7cb6662ed20dab487fb5762e4 Mon Sep 17 00:00:00 2001 From: oars Date: Thu, 2 Oct 2025 15:08:53 +0300 Subject: [PATCH] event_graph: add tests --- src/event_graph/event.rs | 2 +- src/event_graph/mod.rs | 10 +- src/event_graph/proto.rs | 10 +- src/event_graph/tests.rs | 369 ++++++++++++++++++++++++++++++++++++++- 4 files changed, 377 insertions(+), 14 deletions(-) diff --git a/src/event_graph/event.rs b/src/event_graph/event.rs index 1ee1e8d3f..061aee014 100644 --- a/src/event_graph/event.rs +++ b/src/event_graph/event.rs @@ -183,7 +183,7 @@ impl Event { return Ok(false) } // Perform validation - self.header.validate(&header_dag, 1, None).await + self.header.validate(header_dag, 1, None).await } /// Validate a new event for the correct layout and enforce relevant age, diff --git a/src/event_graph/mod.rs b/src/event_graph/mod.rs index e28f6bdba..3ec466f7a 100644 --- a/src/event_graph/mod.rs +++ b/src/event_graph/mod.rs @@ -297,7 +297,7 @@ impl DAGStore { // Build the layers map let mut map: LayerUTips = BTreeMap::new(); for tip in tips { - let event = self.fetch_event_from_dag(&tip, &dag).await.unwrap().unwrap(); + let event = self.fetch_event_from_dag(&tip, dag).await.unwrap().unwrap(); if let Some(layer_tips) = map.get_mut(&event.header.layer) { layer_tips.insert(tip); } else { @@ -321,7 +321,7 @@ impl DAGStore { }; let event: Event = deserialize_async(&bytes).await?; - return Ok(Some(event)) + Ok(Some(event)) } } @@ -807,7 +807,7 @@ impl EventGraph { } // Acquire exclusive locks to `broadcasted_ids` - let dag_timestamp = u64::from_str(dag_name).unwrap(); + let dag_timestamp = u64::from_str(dag_name)?; let mut broadcasted_ids = self.broadcasted_ids.write().await; let main_dag = self.dag_store.read().await.get_dag(dag_name); @@ -1085,7 +1085,7 @@ impl EventGraph { let mut vec_tips = vec![]; let mut tips_sorted = [NULL_ID; N_EVENT_PARENTS]; for (i, _) in self.dag_store.read().await.header_dags.iter() { - let (_, tips) = self.get_next_layer_with_parents(&i).await; + let (_, tips) = self.get_next_layer_with_parents(i).await; // Convert the hash to BigUint for sorting let mut sorted: Vec<_> = tips.iter().map(|x| BigUint::from_bytes_be(x.as_bytes())).collect(); @@ -1218,7 +1218,7 @@ impl EventGraph { let current_genesis = self.current_genesis.read().await; let dag_name = current_genesis.header.timestamp.to_string(); let mut graph = HashMap::new(); - for iter_elem in self.dag_store.read().await.get_dag(&dag_name).iter() { + for iter_elem in self.dag_store.read().await.get_dag(dag_name).iter() { let (id, val) = iter_elem.unwrap(); let hash = Hash::from_bytes((&id as &[u8]).try_into().unwrap()); let event: Event = deserialize_async(&val).await.unwrap(); diff --git a/src/event_graph/proto.rs b/src/event_graph/proto.rs index 1f677be4a..1e548385b 100644 --- a/src/event_graph/proto.rs +++ b/src/event_graph/proto.rs @@ -627,11 +627,11 @@ impl ProtocolEventGraph { continue } // FIXME - if !self.event_graph.fast_mode { - if self.event_graph.dag_insert(&events, &dag_name).await.is_err() { - self.clone().increase_malicious_count().await?; - continue - } + if !self.event_graph.fast_mode && + self.event_graph.dag_insert(&events, &dag_name).await.is_err() + { + self.clone().increase_malicious_count().await?; + continue } } // <-- !missing_parents.is_empty() diff --git a/src/event_graph/tests.rs b/src/event_graph/tests.rs index 56df21b58..715247752 100644 --- a/src/event_graph/tests.rs +++ b/src/event_graph/tests.rs @@ -18,8 +18,14 @@ // cargo test --release --features=event-graph --lib eventgraph_propagation -- --include-ignored -use std::{collections::HashMap, slice, sync::Arc}; +use std::{ + collections::{BTreeMap, HashMap, HashSet}, + slice, + sync::Arc, + time::{Duration, UNIX_EPOCH}, +}; +use darkfi_serial::{deserialize_async, serialize_async}; use rand::{prelude::SliceRandom, rngs::ThreadRng}; use sled_overlay::sled; use smol::{channel, future, Executor}; @@ -27,13 +33,18 @@ use tracing::{info, warn}; use url::Url; use crate::{ + error::Result, event_graph::{ + event::Header, proto::{EventPut, ProtocolEventGraph}, - Event, EventGraph, + util::next_rotation_timestamp, + DAGStore, Event, EventGraph, EventGraphPtr, DAGS_MAX_NUMBER, GENESIS_CONTENTS, + INITIAL_GENESIS, NULL_ID, N_EVENT_PARENTS, }, net::{session::SESSION_DEFAULT, settings::NetworkProfile, P2p, Settings}, - system::{msleep, sleep}, + system::{msleep, sleep, timeout::timeout}, util::logger::{setup_test_logger, Level}, + Error, }; // Number of nodes to spawn and number of peers each node connects to @@ -518,3 +529,355 @@ async fn eventgraph_chaotic_propagation_real(ex: Arc>) { eg.p2p.clone().stop().await; } } + +// DAGStore tests +async fn make_dag_store() -> Result { + let sled_db = sled::Config::new().temporary(true).open()?; + let hours_rotation = 1; + + let dag_store = DAGStore { + db: sled_db.clone(), + header_dags: BTreeMap::default(), + main_dags: BTreeMap::default(), + } + .new(sled_db.clone(), hours_rotation) + .await; + + Ok(dag_store) +} +#[test] +fn header_dags_and_main_dags_length_equals_dags_max_number() -> Result<()> { + smol::block_on(async { + let dag_store = make_dag_store().await?; + assert_eq!(dag_store.header_dags.len() as i8, DAGS_MAX_NUMBER); + assert_eq!(dag_store.main_dags.len() as i8, DAGS_MAX_NUMBER); + + Ok(()) + }) +} + +#[test] +fn all_dag_trees_are_created_on_sled_after_dag_store_creation() -> Result<()> { + smol::block_on(async { + let dag_store = make_dag_store().await?; + let dag_trees: Vec = + dag_store.db.tree_names().iter().map(|n| String::from_utf8_lossy(n).into()).collect(); + + // Should have 2 * DAGS_MAX_NUMBER trees + 1 (the default tree) + assert_eq!(dag_trees.len() as i8, DAGS_MAX_NUMBER * 2 + 1); + + for (dag_timestamp, _) in dag_store.header_dags { + assert!(dag_trees.contains(&format!("headers_{dag_timestamp}"))); + } + + for (dag_timestamp, _) in dag_store.main_dags { + assert!(dag_trees.contains(&dag_timestamp.to_string())); + } + + Ok(()) + }) +} + +#[test] +fn genesis_events_or_headers_are_added_to_all_trees_and_utips() -> Result<()> { + smol::block_on(async { + let dag_store = make_dag_store().await?; + + for (_, (tree, layer_utips)) in dag_store.header_dags { + let genesis_header = tree.first()?; + // A Genesis Header is found in sled tree + assert!(genesis_header.is_some()); + let (genesis_hash, genesis_header) = genesis_header.unwrap(); + let genesis_header: Header = deserialize_async(&genesis_header).await?; + let genesis_hash: blake3::Hash = deserialize_async(&genesis_hash).await?; + assert_eq!(genesis_header.layer, 0); + assert!(genesis_header.parents.iter().all(|p| *p == NULL_ID)); + // The Genesis Header hash is stored as Unreferenced tip + assert!(layer_utips.contains_key(&0)); + assert!(layer_utips.get(&0).unwrap().contains(&genesis_hash)); + } + + for (_, (tree, layer_utips)) in dag_store.main_dags { + let genesis_event = tree.first()?; + // A Genesis Event is found in sled tree + assert!(genesis_event.is_some()); + let (genesis_hash, genesis_event) = genesis_event.unwrap(); + let genesis_event: Event = deserialize_async(&genesis_event).await?; + let genesis_hash: blake3::Hash = deserialize_async(&genesis_hash).await?; + assert_eq!(genesis_event.header.layer, 0); + assert!(genesis_event.header.parents.iter().all(|p| *p == NULL_ID)); + assert_eq!(genesis_event.content, GENESIS_CONTENTS); + // The Genesis Header hash is stored as Unreferenced tip + assert!(layer_utips.contains_key(&0)); + assert!(layer_utips.get(&0).unwrap().contains(&genesis_hash)); + } + + Ok(()) + }) +} + +#[test] +fn adding_new_dag_removes_oldest_dag_tree() -> Result<()> { + smol::block_on(async { + let mut dag_store = make_dag_store().await?; + let oldest_dag_timestamp = dag_store.main_dags.first_key_value().unwrap().0.to_owned(); + // Next dag to add + let next_rotation = next_rotation_timestamp(INITIAL_GENESIS, 1); + let header = + Header { timestamp: next_rotation, parents: [NULL_ID; N_EVENT_PARENTS], layer: 0 }; + let next_genesis = Event { header, content: GENESIS_CONTENTS.to_vec() }; + + dag_store.add_dag(&next_genesis.header.timestamp.to_string(), &next_genesis).await; + + // The length of the dags should stay the same after adding + assert_eq!(dag_store.main_dags.len() as i8, DAGS_MAX_NUMBER); + assert_eq!(dag_store.header_dags.len() as i8, DAGS_MAX_NUMBER); + // We should have an entry with the new dag timestamp + assert!(dag_store.main_dags.contains_key(&next_rotation)); + assert!(dag_store.header_dags.contains_key(&next_rotation)); + // The oldest dag entry should have been removed + assert!(!dag_store.main_dags.contains_key(&oldest_dag_timestamp)); + assert!(!dag_store.header_dags.contains_key(&oldest_dag_timestamp)); + + let dag_trees: Vec = + dag_store.db.tree_names().iter().map(|n| String::from_utf8_lossy(n).into()).collect(); + + // The number of dag trees should stay the same after adding + assert_eq!(dag_trees.len() as i8, 2 * DAGS_MAX_NUMBER + 1); + // We should have a tree with the new dag timestamp value + assert!(dag_trees.contains(&next_rotation.to_string())); + assert!(dag_trees.contains(&format!("headers_{next_rotation}"))); + // The oldest dag sled tree should have been removed + assert!(!dag_trees.contains(&oldest_dag_timestamp.to_string())); + assert!(!dag_trees.contains(&format!("headers_{oldest_dag_timestamp}"))); + + Ok(()) + }) +} + +#[test] +fn sort_moves_current_dag_to_front() -> Result<()> { + smol::block_on(async { + let dag_store = make_dag_store().await?; + + let trees = dag_store.sort_dags().await; + let first_tree_name: String = + String::from_utf8_lossy(&trees.first().unwrap().name()).into(); + assert_eq!( + first_tree_name, + dag_store.main_dags.last_key_value().unwrap().0.to_owned().to_string() + ); + + Ok(()) + }) +} + +#[test] +fn unreferenced_tips_are_found() -> Result<()> { + smol::block_on(async { + let dag_store = make_dag_store().await?; + + let current_dag_tree = dag_store.main_dags.last_key_value().unwrap().1 .0.clone(); + let current_dag_genesis_hash = *dag_store + .main_dags + .last_key_value() + .unwrap() + .1 + .1 + .get(&0) + .unwrap() + .iter() + .next() + .unwrap(); + + let mut parents = [NULL_ID; N_EVENT_PARENTS]; + parents[0] = current_dag_genesis_hash; + let event2 = Event { + header: Header { + timestamp: UNIX_EPOCH.elapsed().unwrap().as_millis() as u64, + parents, + layer: 1, + }, + content: "event2".as_bytes().to_vec(), + }; + let event2_hash = event2.id(); + current_dag_tree.insert(event2_hash.as_bytes(), serialize_async(&event2).await)?; + + let mut parents = [NULL_ID; N_EVENT_PARENTS]; + parents[0] = event2_hash; + let event3 = Event { + header: Header { + timestamp: UNIX_EPOCH.elapsed().unwrap().as_millis() as u64, + parents, + layer: 2, + }, + content: "event3".as_bytes().to_vec(), + }; + let event3_hash = event3.id(); + current_dag_tree.insert(event3_hash.as_bytes(), serialize_async(&event3).await)?; + + let mut parents = [NULL_ID; N_EVENT_PARENTS]; + parents[0] = current_dag_genesis_hash; + let event4 = Event { + header: Header { + timestamp: UNIX_EPOCH.elapsed().unwrap().as_millis() as u64, + parents, + layer: 2, + }, + content: "event4".as_bytes().to_vec(), + }; + let event4_hash = event4.id(); + current_dag_tree.insert(event4_hash.as_bytes(), serialize_async(&event4).await)?; + + let layer_utips = dag_store.find_unreferenced_tips(¤t_dag_tree).await; + // We have unreferenced tips only on the 2nd layer + assert_eq!(layer_utips.len(), 1); + // We have two unreferenced tips + let tip_hashes = layer_utips.get(&2).unwrap(); + assert_eq!(tip_hashes.len(), 2); + // Event3 and Event4 are the only unreferenced tips + assert!(tip_hashes.contains(&event3_hash)); + assert!(tip_hashes.contains(&event4_hash)); + + Ok(()) + }) +} + +// EventGraph tests +async fn make_event_graph() -> Result { + let ex = Arc::new(Executor::new()); + let p2p = P2p::new(Settings::default(), ex.clone()).await?; + let sled_db = sled::Config::new().temporary(true).open()?; + EventGraph::new(p2p, sled_db, "/tmp".into(), false, false, 1, ex).await +} + +#[test] +fn dag_insert_on_invalid_dag_name() -> Result<()> { + smol::block_on(async { + let event_graph = make_event_graph().await?; + + let new_event = Event::new("new_event".as_bytes().to_vec(), &event_graph).await; + // Using a dag name that is not a u64 timestamp gives an error + let res = event_graph.dag_insert(&[new_event], "non_timestamp_dag_name").await; + assert!(res.is_err()); + let err = res.unwrap_err(); + match err { + Error::ParseIntError(_) => {} + _ => panic!("expected parse error"), + } + + Ok(()) + }) +} + +#[test] +fn invalid_header_dag_insert() -> Result<()> { + smol::block_on(async { + let event_graph = make_event_graph().await?; + let dag_name = event_graph + .dag_store + .read() + .await + .main_dags + .last_key_value() + .unwrap() + .0 + .clone() + .to_string(); + + let new_event = Event::new("new_event".as_bytes().to_vec(), &event_graph).await; + // Inserting an invalid event gives an error + let mut event_timestamp_too_old = new_event.clone(); + event_timestamp_too_old.header.timestamp = 1000; + + let res = + event_graph.header_dag_insert(vec![event_timestamp_too_old.header], &dag_name).await; + assert!(res.is_err()); + + let err = res.unwrap_err(); + match err { + Error::HeaderIsInvalid => {} + _ => panic!("expected invalid header error"), + } + + Ok(()) + }) +} + +#[test] +fn dag_insert_without_inserting_header() -> Result<()> { + smol::block_on(async { + let event_graph = make_event_graph().await?; + let dag_name = event_graph + .dag_store + .read() + .await + .main_dags + .last_key_value() + .unwrap() + .0 + .clone() + .to_string(); + + let new_event = Event::new("new_event".as_bytes().to_vec(), &event_graph).await; + let res = event_graph.dag_insert(slice::from_ref(&new_event), &dag_name).await; + // Inserting event without inserting its header first gets skipped + assert!(res.is_ok() && res.unwrap().is_empty()); + Ok(()) + }) +} + +#[test] +fn dag_insert_duplicate_event() -> Result<()> { + smol::block_on(async { + let event_graph = make_event_graph().await?; + let dag_name = event_graph + .dag_store + .read() + .await + .main_dags + .last_key_value() + .unwrap() + .0 + .clone() + .to_string(); + + let new_event = Event::new("new_event".as_bytes().to_vec(), &event_graph).await; + event_graph.header_dag_insert(vec![new_event.header.clone()], &dag_name).await?; + let res = event_graph.dag_insert(slice::from_ref(&new_event), &dag_name).await; + // Proper insertion + assert!(res.is_ok() && res.unwrap().len() == 1); + // Inserting duplicate event gets skipped + let res = event_graph.dag_insert(&[new_event], &dag_name).await; + assert!(res.is_ok() && res.unwrap().is_empty()); + + Ok(()) + }) +} + +#[test] +fn dag_insert_valid_event() -> Result<()> { + smol::block_on(async { + let event_graph = make_event_graph().await?; + let dag_name = *event_graph.dag_store.read().await.main_dags.last_key_value().unwrap().0; + let new_event_sub = event_graph.event_pub.clone().subscribe().await; + + let new_event = Event::new("new_event".as_bytes().to_vec(), &event_graph).await; + event_graph + .header_dag_insert(vec![new_event.header.clone()], &dag_name.to_string()) + .await?; + let res = event_graph.dag_insert(slice::from_ref(&new_event), &dag_name.to_string()).await; + assert!(res.is_ok() && res.unwrap().len() == 1); + // Unreferenced tips is updated + let layer_utips = + event_graph.dag_store.read().await.main_dags.get(&dag_name).unwrap().1.clone(); + assert!(layer_utips.get(&1).unwrap().contains(&new_event.id())); + // The new event notification is sent to subscriber + let dur = Duration::from_secs(1); + let Ok(res) = timeout(dur, new_event_sub.receive()).await else { + panic!("Event is not sent to subscriber") + }; + assert_eq!(res.id(), new_event.id()); + Ok(()) + }) +}