event_graph: add tests

This commit is contained in:
oars
2025-10-02 15:08:53 +03:00
committed by dasman
parent 6079b19cd0
commit ef7e4411c8
4 changed files with 377 additions and 14 deletions

View File

@@ -183,7 +183,7 @@ impl Event {
return Ok(false)
}
// Perform validation
self.header.validate(&header_dag, 1, None).await
self.header.validate(header_dag, 1, None).await
}
/// Validate a new event for the correct layout and enforce relevant age,

View File

@@ -297,7 +297,7 @@ impl DAGStore {
// Build the layers map
let mut map: LayerUTips = BTreeMap::new();
for tip in tips {
let event = self.fetch_event_from_dag(&tip, &dag).await.unwrap().unwrap();
let event = self.fetch_event_from_dag(&tip, dag).await.unwrap().unwrap();
if let Some(layer_tips) = map.get_mut(&event.header.layer) {
layer_tips.insert(tip);
} else {
@@ -321,7 +321,7 @@ impl DAGStore {
};
let event: Event = deserialize_async(&bytes).await?;
return Ok(Some(event))
Ok(Some(event))
}
}
@@ -807,7 +807,7 @@ impl EventGraph {
}
// Acquire exclusive locks to `broadcasted_ids`
let dag_timestamp = u64::from_str(dag_name).unwrap();
let dag_timestamp = u64::from_str(dag_name)?;
let mut broadcasted_ids = self.broadcasted_ids.write().await;
let main_dag = self.dag_store.read().await.get_dag(dag_name);
@@ -1085,7 +1085,7 @@ impl EventGraph {
let mut vec_tips = vec![];
let mut tips_sorted = [NULL_ID; N_EVENT_PARENTS];
for (i, _) in self.dag_store.read().await.header_dags.iter() {
let (_, tips) = self.get_next_layer_with_parents(&i).await;
let (_, tips) = self.get_next_layer_with_parents(i).await;
// Convert the hash to BigUint for sorting
let mut sorted: Vec<_> =
tips.iter().map(|x| BigUint::from_bytes_be(x.as_bytes())).collect();
@@ -1218,7 +1218,7 @@ impl EventGraph {
let current_genesis = self.current_genesis.read().await;
let dag_name = current_genesis.header.timestamp.to_string();
let mut graph = HashMap::new();
for iter_elem in self.dag_store.read().await.get_dag(&dag_name).iter() {
for iter_elem in self.dag_store.read().await.get_dag(dag_name).iter() {
let (id, val) = iter_elem.unwrap();
let hash = Hash::from_bytes((&id as &[u8]).try_into().unwrap());
let event: Event = deserialize_async(&val).await.unwrap();

View File

@@ -627,11 +627,11 @@ impl ProtocolEventGraph {
continue
}
// FIXME
if !self.event_graph.fast_mode {
if self.event_graph.dag_insert(&events, &dag_name).await.is_err() {
self.clone().increase_malicious_count().await?;
continue
}
if !self.event_graph.fast_mode &&
self.event_graph.dag_insert(&events, &dag_name).await.is_err()
{
self.clone().increase_malicious_count().await?;
continue
}
} // <-- !missing_parents.is_empty()

View File

@@ -18,8 +18,14 @@
// cargo test --release --features=event-graph --lib eventgraph_propagation -- --include-ignored
use std::{collections::HashMap, slice, sync::Arc};
use std::{
collections::{BTreeMap, HashMap, HashSet},
slice,
sync::Arc,
time::{Duration, UNIX_EPOCH},
};
use darkfi_serial::{deserialize_async, serialize_async};
use rand::{prelude::SliceRandom, rngs::ThreadRng};
use sled_overlay::sled;
use smol::{channel, future, Executor};
@@ -27,13 +33,18 @@ use tracing::{info, warn};
use url::Url;
use crate::{
error::Result,
event_graph::{
event::Header,
proto::{EventPut, ProtocolEventGraph},
Event, EventGraph,
util::next_rotation_timestamp,
DAGStore, Event, EventGraph, EventGraphPtr, DAGS_MAX_NUMBER, GENESIS_CONTENTS,
INITIAL_GENESIS, NULL_ID, N_EVENT_PARENTS,
},
net::{session::SESSION_DEFAULT, settings::NetworkProfile, P2p, Settings},
system::{msleep, sleep},
system::{msleep, sleep, timeout::timeout},
util::logger::{setup_test_logger, Level},
Error,
};
// Number of nodes to spawn and number of peers each node connects to
@@ -518,3 +529,355 @@ async fn eventgraph_chaotic_propagation_real(ex: Arc<Executor<'static>>) {
eg.p2p.clone().stop().await;
}
}
// DAGStore tests
async fn make_dag_store() -> Result<DAGStore> {
let sled_db = sled::Config::new().temporary(true).open()?;
let hours_rotation = 1;
let dag_store = DAGStore {
db: sled_db.clone(),
header_dags: BTreeMap::default(),
main_dags: BTreeMap::default(),
}
.new(sled_db.clone(), hours_rotation)
.await;
Ok(dag_store)
}
#[test]
fn header_dags_and_main_dags_length_equals_dags_max_number() -> Result<()> {
smol::block_on(async {
let dag_store = make_dag_store().await?;
assert_eq!(dag_store.header_dags.len() as i8, DAGS_MAX_NUMBER);
assert_eq!(dag_store.main_dags.len() as i8, DAGS_MAX_NUMBER);
Ok(())
})
}
#[test]
fn all_dag_trees_are_created_on_sled_after_dag_store_creation() -> Result<()> {
smol::block_on(async {
let dag_store = make_dag_store().await?;
let dag_trees: Vec<String> =
dag_store.db.tree_names().iter().map(|n| String::from_utf8_lossy(n).into()).collect();
// Should have 2 * DAGS_MAX_NUMBER trees + 1 (the default tree)
assert_eq!(dag_trees.len() as i8, DAGS_MAX_NUMBER * 2 + 1);
for (dag_timestamp, _) in dag_store.header_dags {
assert!(dag_trees.contains(&format!("headers_{dag_timestamp}")));
}
for (dag_timestamp, _) in dag_store.main_dags {
assert!(dag_trees.contains(&dag_timestamp.to_string()));
}
Ok(())
})
}
#[test]
fn genesis_events_or_headers_are_added_to_all_trees_and_utips() -> Result<()> {
smol::block_on(async {
let dag_store = make_dag_store().await?;
for (_, (tree, layer_utips)) in dag_store.header_dags {
let genesis_header = tree.first()?;
// A Genesis Header is found in sled tree
assert!(genesis_header.is_some());
let (genesis_hash, genesis_header) = genesis_header.unwrap();
let genesis_header: Header = deserialize_async(&genesis_header).await?;
let genesis_hash: blake3::Hash = deserialize_async(&genesis_hash).await?;
assert_eq!(genesis_header.layer, 0);
assert!(genesis_header.parents.iter().all(|p| *p == NULL_ID));
// The Genesis Header hash is stored as Unreferenced tip
assert!(layer_utips.contains_key(&0));
assert!(layer_utips.get(&0).unwrap().contains(&genesis_hash));
}
for (_, (tree, layer_utips)) in dag_store.main_dags {
let genesis_event = tree.first()?;
// A Genesis Event is found in sled tree
assert!(genesis_event.is_some());
let (genesis_hash, genesis_event) = genesis_event.unwrap();
let genesis_event: Event = deserialize_async(&genesis_event).await?;
let genesis_hash: blake3::Hash = deserialize_async(&genesis_hash).await?;
assert_eq!(genesis_event.header.layer, 0);
assert!(genesis_event.header.parents.iter().all(|p| *p == NULL_ID));
assert_eq!(genesis_event.content, GENESIS_CONTENTS);
// The Genesis Header hash is stored as Unreferenced tip
assert!(layer_utips.contains_key(&0));
assert!(layer_utips.get(&0).unwrap().contains(&genesis_hash));
}
Ok(())
})
}
#[test]
fn adding_new_dag_removes_oldest_dag_tree() -> Result<()> {
smol::block_on(async {
let mut dag_store = make_dag_store().await?;
let oldest_dag_timestamp = dag_store.main_dags.first_key_value().unwrap().0.to_owned();
// Next dag to add
let next_rotation = next_rotation_timestamp(INITIAL_GENESIS, 1);
let header =
Header { timestamp: next_rotation, parents: [NULL_ID; N_EVENT_PARENTS], layer: 0 };
let next_genesis = Event { header, content: GENESIS_CONTENTS.to_vec() };
dag_store.add_dag(&next_genesis.header.timestamp.to_string(), &next_genesis).await;
// The length of the dags should stay the same after adding
assert_eq!(dag_store.main_dags.len() as i8, DAGS_MAX_NUMBER);
assert_eq!(dag_store.header_dags.len() as i8, DAGS_MAX_NUMBER);
// We should have an entry with the new dag timestamp
assert!(dag_store.main_dags.contains_key(&next_rotation));
assert!(dag_store.header_dags.contains_key(&next_rotation));
// The oldest dag entry should have been removed
assert!(!dag_store.main_dags.contains_key(&oldest_dag_timestamp));
assert!(!dag_store.header_dags.contains_key(&oldest_dag_timestamp));
let dag_trees: Vec<String> =
dag_store.db.tree_names().iter().map(|n| String::from_utf8_lossy(n).into()).collect();
// The number of dag trees should stay the same after adding
assert_eq!(dag_trees.len() as i8, 2 * DAGS_MAX_NUMBER + 1);
// We should have a tree with the new dag timestamp value
assert!(dag_trees.contains(&next_rotation.to_string()));
assert!(dag_trees.contains(&format!("headers_{next_rotation}")));
// The oldest dag sled tree should have been removed
assert!(!dag_trees.contains(&oldest_dag_timestamp.to_string()));
assert!(!dag_trees.contains(&format!("headers_{oldest_dag_timestamp}")));
Ok(())
})
}
#[test]
fn sort_moves_current_dag_to_front() -> Result<()> {
smol::block_on(async {
let dag_store = make_dag_store().await?;
let trees = dag_store.sort_dags().await;
let first_tree_name: String =
String::from_utf8_lossy(&trees.first().unwrap().name()).into();
assert_eq!(
first_tree_name,
dag_store.main_dags.last_key_value().unwrap().0.to_owned().to_string()
);
Ok(())
})
}
#[test]
fn unreferenced_tips_are_found() -> Result<()> {
smol::block_on(async {
let dag_store = make_dag_store().await?;
let current_dag_tree = dag_store.main_dags.last_key_value().unwrap().1 .0.clone();
let current_dag_genesis_hash = *dag_store
.main_dags
.last_key_value()
.unwrap()
.1
.1
.get(&0)
.unwrap()
.iter()
.next()
.unwrap();
let mut parents = [NULL_ID; N_EVENT_PARENTS];
parents[0] = current_dag_genesis_hash;
let event2 = Event {
header: Header {
timestamp: UNIX_EPOCH.elapsed().unwrap().as_millis() as u64,
parents,
layer: 1,
},
content: "event2".as_bytes().to_vec(),
};
let event2_hash = event2.id();
current_dag_tree.insert(event2_hash.as_bytes(), serialize_async(&event2).await)?;
let mut parents = [NULL_ID; N_EVENT_PARENTS];
parents[0] = event2_hash;
let event3 = Event {
header: Header {
timestamp: UNIX_EPOCH.elapsed().unwrap().as_millis() as u64,
parents,
layer: 2,
},
content: "event3".as_bytes().to_vec(),
};
let event3_hash = event3.id();
current_dag_tree.insert(event3_hash.as_bytes(), serialize_async(&event3).await)?;
let mut parents = [NULL_ID; N_EVENT_PARENTS];
parents[0] = current_dag_genesis_hash;
let event4 = Event {
header: Header {
timestamp: UNIX_EPOCH.elapsed().unwrap().as_millis() as u64,
parents,
layer: 2,
},
content: "event4".as_bytes().to_vec(),
};
let event4_hash = event4.id();
current_dag_tree.insert(event4_hash.as_bytes(), serialize_async(&event4).await)?;
let layer_utips = dag_store.find_unreferenced_tips(&current_dag_tree).await;
// We have unreferenced tips only on the 2nd layer
assert_eq!(layer_utips.len(), 1);
// We have two unreferenced tips
let tip_hashes = layer_utips.get(&2).unwrap();
assert_eq!(tip_hashes.len(), 2);
// Event3 and Event4 are the only unreferenced tips
assert!(tip_hashes.contains(&event3_hash));
assert!(tip_hashes.contains(&event4_hash));
Ok(())
})
}
// EventGraph tests
async fn make_event_graph() -> Result<EventGraphPtr> {
let ex = Arc::new(Executor::new());
let p2p = P2p::new(Settings::default(), ex.clone()).await?;
let sled_db = sled::Config::new().temporary(true).open()?;
EventGraph::new(p2p, sled_db, "/tmp".into(), false, false, 1, ex).await
}
#[test]
fn dag_insert_on_invalid_dag_name() -> Result<()> {
smol::block_on(async {
let event_graph = make_event_graph().await?;
let new_event = Event::new("new_event".as_bytes().to_vec(), &event_graph).await;
// Using a dag name that is not a u64 timestamp gives an error
let res = event_graph.dag_insert(&[new_event], "non_timestamp_dag_name").await;
assert!(res.is_err());
let err = res.unwrap_err();
match err {
Error::ParseIntError(_) => {}
_ => panic!("expected parse error"),
}
Ok(())
})
}
#[test]
fn invalid_header_dag_insert() -> Result<()> {
smol::block_on(async {
let event_graph = make_event_graph().await?;
let dag_name = event_graph
.dag_store
.read()
.await
.main_dags
.last_key_value()
.unwrap()
.0
.clone()
.to_string();
let new_event = Event::new("new_event".as_bytes().to_vec(), &event_graph).await;
// Inserting an invalid event gives an error
let mut event_timestamp_too_old = new_event.clone();
event_timestamp_too_old.header.timestamp = 1000;
let res =
event_graph.header_dag_insert(vec![event_timestamp_too_old.header], &dag_name).await;
assert!(res.is_err());
let err = res.unwrap_err();
match err {
Error::HeaderIsInvalid => {}
_ => panic!("expected invalid header error"),
}
Ok(())
})
}
#[test]
fn dag_insert_without_inserting_header() -> Result<()> {
smol::block_on(async {
let event_graph = make_event_graph().await?;
let dag_name = event_graph
.dag_store
.read()
.await
.main_dags
.last_key_value()
.unwrap()
.0
.clone()
.to_string();
let new_event = Event::new("new_event".as_bytes().to_vec(), &event_graph).await;
let res = event_graph.dag_insert(slice::from_ref(&new_event), &dag_name).await;
// Inserting event without inserting its header first gets skipped
assert!(res.is_ok() && res.unwrap().is_empty());
Ok(())
})
}
#[test]
fn dag_insert_duplicate_event() -> Result<()> {
smol::block_on(async {
let event_graph = make_event_graph().await?;
let dag_name = event_graph
.dag_store
.read()
.await
.main_dags
.last_key_value()
.unwrap()
.0
.clone()
.to_string();
let new_event = Event::new("new_event".as_bytes().to_vec(), &event_graph).await;
event_graph.header_dag_insert(vec![new_event.header.clone()], &dag_name).await?;
let res = event_graph.dag_insert(slice::from_ref(&new_event), &dag_name).await;
// Proper insertion
assert!(res.is_ok() && res.unwrap().len() == 1);
// Inserting duplicate event gets skipped
let res = event_graph.dag_insert(&[new_event], &dag_name).await;
assert!(res.is_ok() && res.unwrap().is_empty());
Ok(())
})
}
#[test]
fn dag_insert_valid_event() -> Result<()> {
smol::block_on(async {
let event_graph = make_event_graph().await?;
let dag_name = *event_graph.dag_store.read().await.main_dags.last_key_value().unwrap().0;
let new_event_sub = event_graph.event_pub.clone().subscribe().await;
let new_event = Event::new("new_event".as_bytes().to_vec(), &event_graph).await;
event_graph
.header_dag_insert(vec![new_event.header.clone()], &dag_name.to_string())
.await?;
let res = event_graph.dag_insert(slice::from_ref(&new_event), &dag_name.to_string()).await;
assert!(res.is_ok() && res.unwrap().len() == 1);
// Unreferenced tips is updated
let layer_utips =
event_graph.dag_store.read().await.main_dags.get(&dag_name).unwrap().1.clone();
assert!(layer_utips.get(&1).unwrap().contains(&new_event.id()));
// The new event notification is sent to subscriber
let dur = Duration::from_secs(1);
let Ok(res) = timeout(dur, new_event_sub.receive()).await else {
panic!("Event is not sent to subscriber")
};
assert_eq!(res.id(), new_event.id());
Ok(())
})
}