From 7015af9d7cf58486cd13a31d5db8fe73eb021406 Mon Sep 17 00:00:00 2001 From: parazyd Date: Wed, 6 Sep 2023 14:56:39 +0200 Subject: [PATCH] event-graph: Make DAG sorting more deterministic. --- Cargo.lock | 1 + Cargo.toml | 2 + src/event_graph2/mod.rs | 88 ++++++++++++++++++++++++++++++++------- src/event_graph2/tests.rs | 8 ++-- 4 files changed, 80 insertions(+), 19 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2e47e4100..802158e21 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1431,6 +1431,7 @@ dependencies = [ "libc", "libsqlite3-sys", "log", + "num-bigint", "pin-project-lite", "plotters", "prettytable-rs", diff --git a/Cargo.toml b/Cargo.toml index 33caba291..123ea2769 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -87,6 +87,7 @@ structopt-toml = {version= "0.5.1", optional = true} toml = {version = "0.7.6", optional = true} # Big float high precision arithmetics dashu = {version = "0.3.1", optional = true} +num-bigint = {version = "0.4.4", optional = true} # Utilities # TODO: check chrono usage and impl our own @@ -191,6 +192,7 @@ event-graph = [ "async-trait", "async-recursion", "blake3", + "num-bigint", "rand", "sled", "smol", diff --git a/src/event_graph2/mod.rs b/src/event_graph2/mod.rs index 38a50059f..108ccb7b8 100644 --- a/src/event_graph2/mod.rs +++ b/src/event_graph2/mod.rs @@ -23,6 +23,7 @@ use std::{ use async_recursion::async_recursion; use darkfi_serial::{deserialize_async, serialize_async}; +use num_bigint::BigUint; use smol::lock::RwLock; use crate::{net::P2pPtr, Result}; @@ -136,31 +137,50 @@ impl EventGraph { Ok(event_id) } - /// Get a set of unreferenced tips used to produce a new [`Event`] + /// Get the current set of unreferenced tips in the DAG. async fn get_unreferenced_tips(&self) -> [blake3::Hash; N_EVENT_PARENTS] { - let mut tips = [NULL_ID; N_EVENT_PARENTS]; let unreferenced_tips = self.unreferenced_tips.read().await; - for (i, tip) in unreferenced_tips.iter().enumerate() { - if i == N_EVENT_PARENTS - 1 { - break - } - - tips[i] = *tip; + let mut tips = [NULL_ID; N_EVENT_PARENTS]; + for (i, tip) in unreferenced_tips.iter().take(N_EVENT_PARENTS).enumerate() { + tips[i] = *tip } assert!(tips.iter().any(|x| x != &NULL_ID)); tips } + /// Internal function used for DAG sorting. + async fn get_unreferenced_tips_sorted(&self) -> [blake3::Hash; N_EVENT_PARENTS] { + let tips = self.get_unreferenced_tips().await; + + // Convert the hash to BigUint for sorting + let mut sorted: Vec<_> = + tips.iter().map(|x| BigUint::from_bytes_be(x.as_bytes())).collect(); + sorted.sort_unstable(); + + // Convert back to blake3 + let mut tips_sorted = [NULL_ID; N_EVENT_PARENTS]; + for (i, id) in sorted.iter().enumerate() { + let mut bytes = id.to_bytes_be(); + + // Ensure we have 32 bytes + while bytes.len() < blake3::OUT_LEN { + bytes.insert(0, 0); + } + + tips_sorted[i] = blake3::Hash::from_bytes(bytes.try_into().unwrap()); + } + + tips_sorted + } + /// Perform a topological sort of the DAG. - /// Once the events are sorted topologically, the user can also additionally - /// sort them by timestamp after fetching the actual events from the database. pub async fn order_events(&self) -> Vec { let mut ordered_events = VecDeque::new(); let mut visited = HashSet::new(); - for tip in self.get_unreferenced_tips().await { + for tip in self.get_unreferenced_tips_sorted().await { if !visited.contains(&tip) && tip != NULL_ID { let tip = self.dag.get(tip.as_bytes()).unwrap().unwrap(); let tip = deserialize_async(&tip).await.unwrap(); @@ -171,7 +191,8 @@ impl EventGraph { ordered_events.make_contiguous().to_vec() } - /// + /// We do a DFS (), and + /// additionally we consider the timestamps. #[async_recursion] async fn dfs_topological_sort( &self, @@ -190,8 +211,45 @@ impl EventGraph { } } - // Once all the parents are visited, add the current event - // to the start of the list - ordered_events.push_front(event_id); + // Before inserting, check timestamps to determine the correct position. + let mut pos = ordered_events.len(); + for (idx, existing_id) in ordered_events.iter().enumerate().rev() { + assert!(existing_id != &NULL_ID); + if self.share_same_parents(&event_id, existing_id).await { + let existing_event = self.dag.get(existing_id.as_bytes()).unwrap().unwrap(); + let existing_event: Event = deserialize_async(&existing_event).await.unwrap(); + + // Sort by timestamp + if event.timestamp.0 < existing_event.timestamp.0 { + pos = idx; + } else if event.timestamp.0 == existing_event.timestamp.0 { + // In case of a tie-breaker, use the event ID + let a = BigUint::from_bytes_be(event_id.as_bytes()); + let b = BigUint::from_bytes_be(existing_id.as_bytes()); + if a < b { + pos = idx; + } + } + } + } + + ordered_events.insert(pos, event_id); + } + + /// Check if two events have the same parents + async fn share_same_parents(&self, event_id1: &blake3::Hash, event_id2: &blake3::Hash) -> bool { + let event1 = self.dag.get(event_id1.as_bytes()).unwrap().unwrap(); + let event1: Event = deserialize_async(&event1).await.unwrap(); + let mut parents1: Vec<_> = + event1.parents.iter().map(|x| BigUint::from_bytes_be(x.as_bytes())).collect(); + parents1.sort_unstable(); + + let event2 = self.dag.get(event_id2.as_bytes()).unwrap().unwrap(); + let event2: Event = deserialize_async(&event2).await.unwrap(); + let mut parents2: Vec<_> = + event2.parents.iter().map(|x| BigUint::from_bytes_be(x.as_bytes())).collect(); + parents2.sort_unstable(); + + parents1 == parents2 } } diff --git a/src/event_graph2/tests.rs b/src/event_graph2/tests.rs index c738129fb..9c8b884b7 100644 --- a/src/event_graph2/tests.rs +++ b/src/event_graph2/tests.rs @@ -54,8 +54,8 @@ fn eventgraph_propagation() { cfg.add_filter_ignore("net::channel::send()".to_string()); simplelog::TermLogger::init( - //simplelog::LevelFilter::Info, - simplelog::LevelFilter::Debug, + simplelog::LevelFilter::Info, + //simplelog::LevelFilter::Debug, //simplelog::LevelFilter::Trace, cfg.build(), simplelog::TerminalMode::Mixed, @@ -182,8 +182,8 @@ async fn eventgraph_propagation_real(ex: Arc>) { random_node.p2p.broadcast(&EventPut(event0)).await; } - info!("Waiting 10s until the p2p broadcasts settle"); - sleep(10).await; + info!("Waiting 20s until the p2p broadcasts settle"); + sleep(20).await; // Assert that everyone has the same DAG let mut contents = HashMap::new();