event-graph: Make DAG sorting more deterministic.

This commit is contained in:
parazyd
2023-09-06 14:56:39 +02:00
parent 5ac3af091c
commit 7015af9d7c
4 changed files with 80 additions and 19 deletions

1
Cargo.lock generated
View File

@@ -1431,6 +1431,7 @@ dependencies = [
"libc",
"libsqlite3-sys",
"log",
"num-bigint",
"pin-project-lite",
"plotters",
"prettytable-rs",

View File

@@ -87,6 +87,7 @@ structopt-toml = {version= "0.5.1", optional = true}
toml = {version = "0.7.6", optional = true}
# Big float high precision arithmetics
dashu = {version = "0.3.1", optional = true}
num-bigint = {version = "0.4.4", optional = true}
# Utilities
# TODO: check chrono usage and impl our own
@@ -191,6 +192,7 @@ event-graph = [
"async-trait",
"async-recursion",
"blake3",
"num-bigint",
"rand",
"sled",
"smol",

View File

@@ -23,6 +23,7 @@ use std::{
use async_recursion::async_recursion;
use darkfi_serial::{deserialize_async, serialize_async};
use num_bigint::BigUint;
use smol::lock::RwLock;
use crate::{net::P2pPtr, Result};
@@ -136,31 +137,50 @@ impl EventGraph {
Ok(event_id)
}
/// Get a set of unreferenced tips used to produce a new [`Event`]
/// Get the current set of unreferenced tips in the DAG.
async fn get_unreferenced_tips(&self) -> [blake3::Hash; N_EVENT_PARENTS] {
let mut tips = [NULL_ID; N_EVENT_PARENTS];
let unreferenced_tips = self.unreferenced_tips.read().await;
for (i, tip) in unreferenced_tips.iter().enumerate() {
if i == N_EVENT_PARENTS - 1 {
break
}
tips[i] = *tip;
let mut tips = [NULL_ID; N_EVENT_PARENTS];
for (i, tip) in unreferenced_tips.iter().take(N_EVENT_PARENTS).enumerate() {
tips[i] = *tip
}
assert!(tips.iter().any(|x| x != &NULL_ID));
tips
}
/// Internal function used for DAG sorting.
async fn get_unreferenced_tips_sorted(&self) -> [blake3::Hash; N_EVENT_PARENTS] {
let tips = self.get_unreferenced_tips().await;
// Convert the hash to BigUint for sorting
let mut sorted: Vec<_> =
tips.iter().map(|x| BigUint::from_bytes_be(x.as_bytes())).collect();
sorted.sort_unstable();
// Convert back to blake3
let mut tips_sorted = [NULL_ID; N_EVENT_PARENTS];
for (i, id) in sorted.iter().enumerate() {
let mut bytes = id.to_bytes_be();
// Ensure we have 32 bytes
while bytes.len() < blake3::OUT_LEN {
bytes.insert(0, 0);
}
tips_sorted[i] = blake3::Hash::from_bytes(bytes.try_into().unwrap());
}
tips_sorted
}
/// Perform a topological sort of the DAG.
/// Once the events are sorted topologically, the user can also additionally
/// sort them by timestamp after fetching the actual events from the database.
pub async fn order_events(&self) -> Vec<blake3::Hash> {
let mut ordered_events = VecDeque::new();
let mut visited = HashSet::new();
for tip in self.get_unreferenced_tips().await {
for tip in self.get_unreferenced_tips_sorted().await {
if !visited.contains(&tip) && tip != NULL_ID {
let tip = self.dag.get(tip.as_bytes()).unwrap().unwrap();
let tip = deserialize_async(&tip).await.unwrap();
@@ -171,7 +191,8 @@ impl EventGraph {
ordered_events.make_contiguous().to_vec()
}
/// <https://en.wikipedia.org/wiki/Depth-first_search>
/// We do a DFS (<https://en.wikipedia.org/wiki/Depth-first_search>), and
/// additionally we consider the timestamps.
#[async_recursion]
async fn dfs_topological_sort(
&self,
@@ -190,8 +211,45 @@ impl EventGraph {
}
}
// Once all the parents are visited, add the current event
// to the start of the list
ordered_events.push_front(event_id);
// Before inserting, check timestamps to determine the correct position.
let mut pos = ordered_events.len();
for (idx, existing_id) in ordered_events.iter().enumerate().rev() {
assert!(existing_id != &NULL_ID);
if self.share_same_parents(&event_id, existing_id).await {
let existing_event = self.dag.get(existing_id.as_bytes()).unwrap().unwrap();
let existing_event: Event = deserialize_async(&existing_event).await.unwrap();
// Sort by timestamp
if event.timestamp.0 < existing_event.timestamp.0 {
pos = idx;
} else if event.timestamp.0 == existing_event.timestamp.0 {
// In case of a tie-breaker, use the event ID
let a = BigUint::from_bytes_be(event_id.as_bytes());
let b = BigUint::from_bytes_be(existing_id.as_bytes());
if a < b {
pos = idx;
}
}
}
}
ordered_events.insert(pos, event_id);
}
/// Check if two events have the same parents
async fn share_same_parents(&self, event_id1: &blake3::Hash, event_id2: &blake3::Hash) -> bool {
let event1 = self.dag.get(event_id1.as_bytes()).unwrap().unwrap();
let event1: Event = deserialize_async(&event1).await.unwrap();
let mut parents1: Vec<_> =
event1.parents.iter().map(|x| BigUint::from_bytes_be(x.as_bytes())).collect();
parents1.sort_unstable();
let event2 = self.dag.get(event_id2.as_bytes()).unwrap().unwrap();
let event2: Event = deserialize_async(&event2).await.unwrap();
let mut parents2: Vec<_> =
event2.parents.iter().map(|x| BigUint::from_bytes_be(x.as_bytes())).collect();
parents2.sort_unstable();
parents1 == parents2
}
}

View File

@@ -54,8 +54,8 @@ fn eventgraph_propagation() {
cfg.add_filter_ignore("net::channel::send()".to_string());
simplelog::TermLogger::init(
//simplelog::LevelFilter::Info,
simplelog::LevelFilter::Debug,
simplelog::LevelFilter::Info,
//simplelog::LevelFilter::Debug,
//simplelog::LevelFilter::Trace,
cfg.build(),
simplelog::TerminalMode::Mixed,
@@ -182,8 +182,8 @@ async fn eventgraph_propagation_real(ex: Arc<Executor<'static>>) {
random_node.p2p.broadcast(&EventPut(event0)).await;
}
info!("Waiting 10s until the p2p broadcasts settle");
sleep(10).await;
info!("Waiting 20s until the p2p broadcasts settle");
sleep(20).await;
// Assert that everyone has the same DAG
let mut contents = HashMap::new();