event_graph: introduce layers to the DAG

Each event corresponds to a specific layer(height) in the dag, making identifying and preventing cycles way easier, as all parents must exist in previous layers. Additionally, propagation and sync gremlins have been eliminated, and proper validations added
This commit is contained in:
aggstam
2023-11-24 15:35:35 +02:00
committed by parazyd
parent 01c883b0d4
commit cdaddfbfe2
13 changed files with 804 additions and 414 deletions

View File

@@ -203,6 +203,7 @@ event-graph = [
"num-bigint", "num-bigint",
"rand", "rand",
"sled", "sled",
"sled-overlay",
"smol", "smol",
"tinyjson", "tinyjson",

View File

@@ -359,11 +359,8 @@ impl Client {
self.server.try_encrypt(&mut privmsg).await; self.server.try_encrypt(&mut privmsg).await;
// Build a DAG event and return it. // Build a DAG event and return it.
let event = Event::new( let event =
serialize_async(&privmsg).await, Event::new(serialize_async(&privmsg).await, &self.server.darkirc.event_graph).await;
self.server.darkirc.event_graph.clone(),
)
.await;
return Ok(Some(event)) return Ok(Some(event))
} }

View File

@@ -33,7 +33,7 @@ use darkfi::{
use rand::{rngs::OsRng, RngCore}; use rand::{rngs::OsRng, RngCore};
use smol::Executor; use smol::Executor;
use url::Url; use url::Url;
use log::error; use log::{error, warn};
use super::{proto::ProtocolDht, Dhtd}; use super::{proto::ProtocolDht, Dhtd};
@@ -144,12 +144,20 @@ fn dht_remote_get_insert() -> Result<()> {
cfg.add_filter_ignore("net::protocol_version".to_string()); cfg.add_filter_ignore("net::protocol_version".to_string());
cfg.add_filter_ignore("net::protocol_ping".to_string()); cfg.add_filter_ignore("net::protocol_ping".to_string());
simplelog::TermLogger::init( // We check this error so we can execute same file tests in parallel,
// otherwise second one fails to init logger here.
if simplelog::TermLogger::init(
simplelog::LevelFilter::Info, simplelog::LevelFilter::Info,
//simplelog::LevelFilter::Debug,
//simplelog::LevelFilter::Trace,
cfg.build(), cfg.build(),
simplelog::TerminalMode::Mixed, simplelog::TerminalMode::Mixed,
simplelog::ColorChoice::Auto, simplelog::ColorChoice::Auto,
)?; )
.is_err()
{
warn!(target: "test_harness", "Logger already initialized");
}
let ex = Arc::new(Executor::new()); let ex = Arc::new(Executor::new());
let (signal, shutdown) = async_std::channel::unbounded::<()>(); let (signal, shutdown) = async_std::channel::unbounded::<()>();

View File

@@ -105,7 +105,7 @@ impl JsonRpcInterface {
let genevent: GenEvent = deserialize(&dec).unwrap(); let genevent: GenEvent = deserialize(&dec).unwrap();
// Build a DAG event and return it. // Build a DAG event and return it.
let event = Event::new(serialize_async(&genevent).await, self.event_graph.clone()).await; let event = Event::new(serialize_async(&genevent).await, &self.event_graph).await;
if let Err(e) = self.event_graph.dag_insert(&[event.clone()]).await { if let Err(e) = self.event_graph.dag_insert(&[event.clone()]).await {
error!("Failed inserting new event to DAG: {}", e); error!("Failed inserting new event to DAG: {}", e);

View File

@@ -170,7 +170,7 @@ async fn start_sync_loop(
// Build a DAG event and return it. // Build a DAG event and return it.
let event = Event::new( let event = Event::new(
serialize_async(&encrypted_task).await, serialize_async(&encrypted_task).await,
event_graph.clone(), &event_graph,
) )
.await; .await;
// Update the last sent event. // Update the last sent event.

View File

@@ -491,6 +491,9 @@ pub enum Error {
#[error("Event is not found in tree: {0}")] #[error("Event is not found in tree: {0}")]
EventNotFound(String), EventNotFound(String),
#[error("Event is invalid")]
EventIsInvalid,
// ==================== // ====================
// Miscellaneous errors // Miscellaneous errors
// ==================== // ====================

View File

@@ -18,9 +18,15 @@
use std::{collections::HashSet, time::UNIX_EPOCH}; use std::{collections::HashSet, time::UNIX_EPOCH};
use darkfi_serial::{async_trait, Encodable, SerialDecodable, SerialEncodable}; use darkfi_serial::{async_trait, deserialize_async, Encodable, SerialDecodable, SerialEncodable};
use sled_overlay::SledTreeOverlay;
use super::{EventGraphPtr, EVENT_TIME_DRIFT, NULL_ID, N_EVENT_PARENTS}; use crate::Result;
use super::{
util::next_rotation_timestamp, EventGraphPtr, EVENT_TIME_DRIFT, INITIAL_GENESIS, NULL_ID,
N_EVENT_PARENTS,
};
/// Representation of an event in the Event Graph /// Representation of an event in the Event Graph
#[derive(Debug, Clone, SerialEncodable, SerialDecodable)] #[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
@@ -31,20 +37,19 @@ pub struct Event {
pub(super) content: Vec<u8>, pub(super) content: Vec<u8>,
/// Parent nodes in the event DAG /// Parent nodes in the event DAG
pub(super) parents: [blake3::Hash; N_EVENT_PARENTS], pub(super) parents: [blake3::Hash; N_EVENT_PARENTS],
/// DAG layer index of the event
pub(super) layer: u64,
} }
impl Event { impl Event {
/// Create a new event with the given data and an `EventGraph` reference. /// Create a new event with the given data and an [`EventGraph`] reference.
/// The timestamp of the event will be the current time, and the parents /// The timestamp of the event will be the current time, and the parents
/// will be `N_EVENT_PARENTS` from the current event graph unreferenced tips. /// will be `N_EVENT_PARENTS` from the current event graph unreferenced tips.
/// The parents can also include NULL, but this should be handled by the rest /// The parents can also include NULL, but this should be handled by the rest
/// of the codebase. /// of the codebase.
pub async fn new(data: Vec<u8>, event_graph: EventGraphPtr) -> Self { pub async fn new(data: Vec<u8>, event_graph: &EventGraphPtr) -> Self {
Self { let (layer, parents) = event_graph.get_next_layer_with_parents().await;
timestamp: UNIX_EPOCH.elapsed().unwrap().as_secs(), Self { timestamp: UNIX_EPOCH.elapsed().unwrap().as_secs(), content: data, parents, layer }
content: data,
parents: event_graph.get_unreferenced_tips().await,
}
} }
/// Hash the [`Event`] to retrieve its ID /// Hash the [`Event`] to retrieve its ID
@@ -53,6 +58,7 @@ impl Event {
self.timestamp.encode(&mut hasher).unwrap(); self.timestamp.encode(&mut hasher).unwrap();
self.content.encode(&mut hasher).unwrap(); self.content.encode(&mut hasher).unwrap();
self.parents.encode(&mut hasher).unwrap(); self.parents.encode(&mut hasher).unwrap();
self.layer.encode(&mut hasher).unwrap();
hasher.finalize() hasher.finalize()
} }
@@ -68,9 +74,92 @@ impl Event {
} }
*/ */
/// Fully validate an event for the correct layout against provided
/// DAG [`sled::Tree`] reference and enforce relevant age, assuming
/// some possibility for a time drift. Optionally, provide an overlay
/// to use that instead of actual referenced DAG.
pub async fn validate(
&self,
dag: &sled::Tree,
genesis_timestamp: u64,
days_rotation: u64,
overlay: Option<&SledTreeOverlay>,
) -> Result<bool> {
// Let's not bother with empty events
if self.content.is_empty() {
return Ok(false)
}
// Check if the event timestamp is after genesis timestamp
if self.timestamp < genesis_timestamp - EVENT_TIME_DRIFT {
return Ok(false)
}
// If a rotation has been set, check if the event timestamp
// is after the next genesis timestamp
if days_rotation > 0 {
let next_genesis_timestamp = next_rotation_timestamp(INITIAL_GENESIS, days_rotation);
if self.timestamp > next_genesis_timestamp + EVENT_TIME_DRIFT {
return Ok(false)
}
}
// Validate the parents. We have to check that at least one parent
// is not NULL, that the parents exist, that no two parents are the
// same, and that the parent exists in previous layers, to prevent
// recursive references(circles).
let mut seen = HashSet::new();
let self_id = self.id();
for parent_id in self.parents.iter() {
if parent_id == &NULL_ID {
continue
}
if parent_id == &self_id {
return Ok(false)
}
if seen.contains(parent_id) {
return Ok(false)
}
let parent_bytes = if let Some(overlay) = overlay {
overlay.get(parent_id.as_bytes())?
} else {
dag.get(parent_id.as_bytes())?
};
if parent_bytes.is_none() {
return Ok(false)
}
let parent: Event = deserialize_async(&parent_bytes.unwrap()).await?;
if self.layer <= parent.layer {
return Ok(false)
}
seen.insert(parent_id);
}
Ok(!seen.is_empty())
}
/// Fully validate an event for the correct layout against provided
/// [`EventGraph`] reference and enforce relevant age, assuming some
/// possibility for a time drift.
pub async fn dag_validate(&self, event_graph: &EventGraphPtr) -> Result<bool> {
// Grab genesis timestamp
let genesis_timestamp = event_graph.current_genesis.read().await.timestamp;
// Perform validation
self.validate(&event_graph.dag, genesis_timestamp, event_graph.days_rotation, None).await
}
/// Validate a new event for the correct layout and enforce relevant age, /// Validate a new event for the correct layout and enforce relevant age,
/// assuming some possibility for a time drift. /// assuming some possibility for a time drift.
pub fn validate(&self) -> bool { /// Note: This validation does *NOT* check for recursive references(circles),
/// and should be used as a first quick check.
pub fn validate_new(&self) -> bool {
// Let's not bother with empty events // Let's not bother with empty events
if self.content.is_empty() { if self.content.is_empty() {
return false return false
@@ -86,8 +175,7 @@ impl Event {
} }
// Validate the parents. We have to check that at least one parent // Validate the parents. We have to check that at least one parent
// is not NULL, that the parent does not recursively reference the // is not NULL and that no two parents are the same.
// event, and that no two parents are the same.
let mut seen = HashSet::new(); let mut seen = HashSet::new();
let self_id = self.id(); let self_id = self.id();
@@ -113,59 +201,77 @@ impl Event {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use std::sync::Arc;
use smol::Executor;
use crate::{
event_graph::EventGraph,
net::{P2p, Settings},
};
use super::*; use super::*;
fn make_valid_event() -> Event { async fn make_event_graph() -> Result<EventGraphPtr> {
Event { let ex = Arc::new(Executor::new());
timestamp: UNIX_EPOCH.elapsed().unwrap().as_secs(), let p2p = P2p::new(Settings::default(), ex.clone()).await;
content: vec![1u8], let sled_db = sled::Config::new().temporary(true).open().unwrap();
parents: [ EventGraph::new(p2p, sled_db, "dag", 1, ex).await
blake3::hash(b"1"),
blake3::hash(b"2"),
blake3::hash(b"3"),
blake3::hash(b"4"),
blake3::hash(b"5"),
],
}
}
#[test]
fn event_is_valid() {
// Validate our test Event struct
assert!(make_valid_event().validate());
} }
#[test] #[test]
fn invalid_events() { fn event_is_valid() -> Result<()> {
// TODO: Not checked: smol::block_on(async {
// - "the parent does not recursively reference the event" // Generate a dummy event graph
let e = make_valid_event(); let event_graph = make_event_graph().await?;
let mut event_empty_content = e.clone(); // Create a new valid event
event_empty_content.content = vec![]; let valid_event = Event::new(vec![1u8], &event_graph).await;
assert!(!event_empty_content.validate());
let mut event_timestamp_too_old = e.clone(); // Validate our test Event struct
event_timestamp_too_old.timestamp = 0; assert!(valid_event.dag_validate(&event_graph).await?);
assert!(!event_timestamp_too_old.validate());
let mut event_timestamp_too_new = e.clone(); // Thanks for reading
event_timestamp_too_new.timestamp = u64::MAX; Ok(())
assert!(!event_timestamp_too_new.validate()); })
}
let mut event_duplicated_parents = e.clone(); #[test]
let duplicated_parents = [ fn invalid_events() -> Result<()> {
blake3::hash(b"1"), smol::block_on(async {
blake3::hash(b"1"), // Generate a dummy event graph
blake3::hash(b"3"), let event_graph = make_event_graph().await?;
blake3::hash(b"4"),
blake3::hash(b"5"),
];
event_duplicated_parents.parents = duplicated_parents;
assert!(!event_duplicated_parents.validate());
let mut event_null_parents = e.clone(); // Create a new valid event
let all_null_parents = [NULL_ID, NULL_ID, NULL_ID, NULL_ID, NULL_ID]; let valid_event = Event::new(vec![1u8], &event_graph).await;
event_null_parents.parents = all_null_parents;
assert!(!event_null_parents.validate()); let mut event_empty_content = valid_event.clone();
event_empty_content.content = vec![];
assert!(!event_empty_content.dag_validate(&event_graph).await?);
let mut event_timestamp_too_old = valid_event.clone();
event_timestamp_too_old.timestamp = 0;
assert!(!event_timestamp_too_old.dag_validate(&event_graph).await?);
let mut event_timestamp_too_new = valid_event.clone();
event_timestamp_too_new.timestamp = u64::MAX;
assert!(!event_timestamp_too_new.dag_validate(&event_graph).await?);
let mut event_duplicated_parents = valid_event.clone();
event_duplicated_parents.parents[1] = valid_event.parents[0];
assert!(!event_duplicated_parents.dag_validate(&event_graph).await?);
let mut event_null_parents = valid_event.clone();
let all_null_parents = [NULL_ID, NULL_ID, NULL_ID, NULL_ID, NULL_ID];
event_null_parents.parents = all_null_parents;
assert!(!event_null_parents.dag_validate(&event_graph).await?);
let mut event_same_layer_as_parents = valid_event.clone();
event_same_layer_as_parents.layer = 0;
assert!(!event_same_layer_as_parents.dag_validate(&event_graph).await?);
// Thanks for reading
Ok(())
})
} }
} }

View File

@@ -18,7 +18,7 @@
use std::{ use std::{
cmp::Ordering, cmp::Ordering,
collections::{HashMap, HashSet, VecDeque}, collections::{BTreeMap, HashMap, HashSet, VecDeque},
sync::Arc, sync::Arc,
}; };
@@ -26,6 +26,7 @@ use async_recursion::async_recursion;
use darkfi_serial::{deserialize_async, serialize_async}; use darkfi_serial::{deserialize_async, serialize_async};
use log::{debug, error, info}; use log::{debug, error, info};
use num_bigint::BigUint; use num_bigint::BigUint;
use sled_overlay::SledTreeOverlay;
use smol::{ use smol::{
lock::{OnceCell, RwLock}, lock::{OnceCell, RwLock},
Executor, Executor,
@@ -48,7 +49,7 @@ use proto::{EventRep, EventReq, TipRep, TipReq, REPLY_TIMEOUT};
/// Utility functions /// Utility functions
mod util; mod util;
use util::{days_since, next_rotation_timestamp, DAY}; use util::{generate_genesis, next_rotation_timestamp};
#[cfg(test)] #[cfg(test)]
mod tests; mod tests;
@@ -76,7 +77,7 @@ pub struct EventGraph {
/// Sled tree containing the DAG /// Sled tree containing the DAG
dag: sled::Tree, dag: sled::Tree,
/// The set of unreferenced DAG tips /// The set of unreferenced DAG tips
unreferenced_tips: RwLock<HashSet<blake3::Hash>>, unreferenced_tips: RwLock<BTreeMap<u64, HashSet<blake3::Hash>>>,
/// A `HashSet` containg event IDs and their 1-level parents. /// A `HashSet` containg event IDs and their 1-level parents.
/// These come from the events we've sent out using `EventPut`. /// These come from the events we've sent out using `EventPut`.
/// They are used with `EventReq` to decide if we should reply /// They are used with `EventReq` to decide if we should reply
@@ -88,7 +89,12 @@ pub struct EventGraph {
/// Event subscriber, this notifies whenever an event is /// Event subscriber, this notifies whenever an event is
/// inserted into the DAG /// inserted into the DAG
pub event_sub: SubscriberPtr<Event>, pub event_sub: SubscriberPtr<Event>,
/// Current genesis event
current_genesis: RwLock<Event>,
/// Currently configured DAG rotation, in days
days_rotation: u64, days_rotation: u64,
/// Flag signalling DAG has finished initial sync
synced: RwLock<bool>,
} }
impl EventGraph { impl EventGraph {
@@ -102,12 +108,12 @@ impl EventGraph {
ex: Arc<Executor<'_>>, ex: Arc<Executor<'_>>,
) -> Result<EventGraphPtr> { ) -> Result<EventGraphPtr> {
let dag = sled_db.open_tree(dag_tree_name)?; let dag = sled_db.open_tree(dag_tree_name)?;
let unreferenced_tips = RwLock::new(HashSet::new()); let unreferenced_tips = RwLock::new(BTreeMap::new());
let broadcasted_ids = RwLock::new(HashSet::new()); let broadcasted_ids = RwLock::new(HashSet::new());
let event_sub = Subscriber::new(); let event_sub = Subscriber::new();
// Create the current genesis event based on the `days_rotation` // Create the current genesis event based on the `days_rotation`
let current_genesis = Self::generate_genesis(days_rotation); let current_genesis = generate_genesis(days_rotation);
let self_ = Arc::new(Self { let self_ = Arc::new(Self {
p2p, p2p,
dag: dag.clone(), dag: dag.clone(),
@@ -115,7 +121,9 @@ impl EventGraph {
broadcasted_ids, broadcasted_ids,
prune_task: OnceCell::new(), prune_task: OnceCell::new(),
event_sub, event_sub,
current_genesis: RwLock::new(current_genesis.clone()),
days_rotation, days_rotation,
synced: RwLock::new(false),
}); });
// Check if we have it in our DAG. // Check if we have it in our DAG.
@@ -159,29 +167,11 @@ impl EventGraph {
sled_db.flush_async().await.unwrap(); sled_db.flush_async().await.unwrap();
} }
/// Generate a deterministic genesis event corresponding to the DAG's configuration.
fn generate_genesis(days_rotation: u64) -> Event {
// Days rotation is u64 except zero
let genesis_days_rotation = if days_rotation == 0 { 1 } else { days_rotation };
// First check how many days passed since initial genesis.
let days_passed = days_since(INITIAL_GENESIS);
// Calculate the number of days_rotation intervals since INITIAL_GENESIS
let rotations_since_genesis = days_passed / genesis_days_rotation;
// Calculate the timestamp of the most recent event
let timestamp =
INITIAL_GENESIS + (rotations_since_genesis * genesis_days_rotation * DAY as u64);
Event { timestamp, content: GENESIS_CONTENTS.to_vec(), parents: [NULL_ID; N_EVENT_PARENTS] }
}
/// Sync the DAG from connected peers /// Sync the DAG from connected peers
pub async fn dag_sync(&self) -> Result<()> { pub async fn dag_sync(&self) -> Result<()> {
// We do an optimistic sync where we ask all our connected peers for // We do an optimistic sync where we ask all our connected peers for
// the DAG tips (unreferenced events) and then we accept the ones we // the latest layer DAG tips (unreferenced events) and then we accept
// see the most times. // the ones we see the most times.
// * Compare received tips with local ones, identify which we are missing. // * Compare received tips with local ones, identify which we are missing.
// * Request these from peers // * Request these from peers
// * Recursively request these backward // * Recursively request these backward
@@ -202,8 +192,8 @@ impl EventGraph {
"[EVENTGRAPH] Syncing DAG from {} peers...", communicated_peers, "[EVENTGRAPH] Syncing DAG from {} peers...", communicated_peers,
); );
// Here we keep track of the tips and how many time we've seen them. // Here we keep track of the tips, their layers and how many time we've seen them.
let mut tips: HashMap<blake3::Hash, usize> = HashMap::new(); let mut tips: HashMap<blake3::Hash, (u64, usize)> = HashMap::new();
// Let's first ask all of our peers for their tips and collect them // Let's first ask all of our peers for their tips and collect them
// in our hashmap above. // in our hashmap above.
@@ -246,11 +236,13 @@ impl EventGraph {
let peer_tips = &peer_tips.0; let peer_tips = &peer_tips.0;
// Note down the seen tips // Note down the seen tips
for tip in peer_tips { for (layer, layer_tips) in peer_tips {
if let Some(seen_tip) = tips.get_mut(tip) { for tip in layer_tips {
*seen_tip += 1; if let Some(seen_tip) = tips.get_mut(tip) {
} else { seen_tip.1 += 1;
tips.insert(*tip, 1); } else {
tips.insert(*tip, (*layer, 1));
}
} }
} }
} }
@@ -264,24 +256,25 @@ impl EventGraph {
return Err(Error::DagSyncFailed) return Err(Error::DagSyncFailed)
} }
// We know the number of peers we've communicated with. // We know the number of peers we've communicated with,
// Arbitrarily, let's not consider events we only got once. // so we will consider events we saw at more that 2/3 of
// TODO: This should be more sensible depending on the peer number. // of those peers.
let consideration_threshold = communicated_peers * 2 / 3;
let mut considered_tips = HashSet::new(); let mut considered_tips = HashSet::new();
for (tip, amount) in tips.iter() { for (tip, (_, amount)) in tips.iter() {
if amount > &1 { if amount > &consideration_threshold {
considered_tips.insert(*tip); considered_tips.insert(*tip);
} }
} }
drop(tips); drop(tips);
// Now begin fetching the events backwards. // Now begin fetching the events backwards.
let mut missing_parents = vec![]; let mut missing_parents = HashSet::new();
for tip in considered_tips.iter() { for tip in considered_tips.iter() {
assert!(tip != &NULL_ID); assert!(tip != &NULL_ID);
if !self.dag.contains_key(tip.as_bytes()).unwrap() { if !self.dag.contains_key(tip.as_bytes()).unwrap() {
missing_parents.push(*tip); missing_parents.insert(*tip);
} }
} }
@@ -291,7 +284,8 @@ impl EventGraph {
} }
info!(target: "event_graph::dag_sync()", "[EVENTGRAPH] Fetching events"); info!(target: "event_graph::dag_sync()", "[EVENTGRAPH] Fetching events");
let mut received_events = vec![]; let mut received_events: BTreeMap<u64, Vec<Event>> = BTreeMap::new();
let mut received_events_hashes = HashSet::new();
while !missing_parents.is_empty() { while !missing_parents.is_empty() {
for parent_id in missing_parents.clone().iter() { for parent_id in missing_parents.clone().iter() {
let mut found_event = false; let mut found_event = false;
@@ -363,9 +357,15 @@ impl EventGraph {
"Got correct parent event {}", parent_id, "Got correct parent event {}", parent_id,
); );
received_events.push(parent.clone()); if let Some(layer_events) = received_events.get_mut(&parent.layer) {
let pos = missing_parents.iter().position(|id| id == &parent.id()).unwrap(); layer_events.push(parent.clone());
missing_parents.remove(pos); } else {
let layer_events = vec![parent.clone()];
received_events.insert(parent.layer, layer_events);
}
received_events_hashes.insert(*parent_id);
missing_parents.remove(parent_id);
found_event = true; found_event = true;
// See if we have the upper parents // See if we have the upper parents
@@ -374,12 +374,15 @@ impl EventGraph {
continue continue
} }
if !self.dag.contains_key(upper_parent.as_bytes()).unwrap() { if !missing_parents.contains(upper_parent) &&
!received_events_hashes.contains(upper_parent) &&
!self.dag.contains_key(upper_parent.as_bytes()).unwrap()
{
debug!( debug!(
target: "event_graph::dag_sync()", target: "event_graph::dag_sync()",
"Found upper missing parent event{}", upper_parent, "Found upper missing parent event{}", upper_parent,
); );
missing_parents.push(*upper_parent); missing_parents.insert(*upper_parent);
} }
} }
@@ -398,10 +401,15 @@ impl EventGraph {
// At this point we should've got all the events. // At this point we should've got all the events.
// We should add them to the DAG. // We should add them to the DAG.
// TODO: FIXME: Also validate these events. let mut events = vec![];
// TODO: FIXME: This insert should also be atomic, dag_insert might need a rewrite for (_, tips) in received_events {
let received_events_rev: Vec<Event> = received_events.iter().rev().cloned().collect(); for tip in tips {
self.dag_insert(&received_events_rev).await.unwrap(); events.push(tip);
}
}
self.dag_insert(&events).await?;
*self.synced.write().await = true;
info!(target: "event_graph::dag_sync()", "[EVENTGRAPH] DAG synced successfully!"); info!(target: "event_graph::dag_sync()", "[EVENTGRAPH] DAG synced successfully!");
Ok(()) Ok(())
@@ -411,12 +419,14 @@ impl EventGraph {
async fn dag_prune(&self, genesis_event: Event) -> Result<()> { async fn dag_prune(&self, genesis_event: Event) -> Result<()> {
debug!(target: "event_graph::dag_prune()", "Pruning DAG..."); debug!(target: "event_graph::dag_prune()", "Pruning DAG...");
// Acquire exclusive locks to unreferenced_tips and broadcasted_ids while // Acquire exclusive locks to unreferenced_tips, broadcasted_ids and
// this operation is happening. We do this to ensure that during the pruning // current_genesis while this operation is happening. We do this to
// operation, no other operations are able to access the intermediate state // ensure that during the pruning operation, no other operations are
// which could lead to producing the wrong state after pruning. // able to access the intermediate state which could lead to producing
// the wrong state after pruning.
let mut unreferenced_tips = self.unreferenced_tips.write().await; let mut unreferenced_tips = self.unreferenced_tips.write().await;
let mut broadcasted_ids = self.broadcasted_ids.write().await; let mut broadcasted_ids = self.broadcasted_ids.write().await;
let mut current_genesis = self.current_genesis.write().await;
// Atomically clear the DAG and write the new genesis event. // Atomically clear the DAG and write the new genesis event.
let mut batch = sled::Batch::default(); let mut batch = sled::Batch::default();
@@ -431,10 +441,13 @@ impl EventGraph {
} }
// Clear unreferenced tips and bcast ids // Clear unreferenced tips and bcast ids
*unreferenced_tips = HashSet::from([genesis_event.id()]); *unreferenced_tips = BTreeMap::new();
unreferenced_tips.insert(0, HashSet::from([genesis_event.id()]));
*current_genesis = genesis_event;
*broadcasted_ids = HashSet::new(); *broadcasted_ids = HashSet::new();
drop(unreferenced_tips); drop(unreferenced_tips);
drop(broadcasted_ids); drop(broadcasted_ids);
drop(current_genesis);
debug!(target: "event_graph::dag_prune()", "DAG pruned successfully"); debug!(target: "event_graph::dag_prune()", "DAG pruned successfully");
Ok(()) Ok(())
@@ -457,6 +470,7 @@ impl EventGraph {
timestamp: next_rotation, timestamp: next_rotation,
content: GENESIS_CONTENTS.to_vec(), content: GENESIS_CONTENTS.to_vec(),
parents: [NULL_ID; N_EVENT_PARENTS], parents: [NULL_ID; N_EVENT_PARENTS],
layer: 0,
}; };
// Sleep until it's time to rotate. // Sleep until it's time to rotate.
@@ -472,6 +486,9 @@ impl EventGraph {
} }
/// Atomically insert given events into the DAG and return the event IDs. /// Atomically insert given events into the DAG and return the event IDs.
/// All provided events must be valid. An overlay is used over the DAG tree,
/// temporary writting each event in order. After all events have been
/// validated and inserted successfully, we write the overlay to sled.
/// This will append the new events into the unreferenced tips set, and /// This will append the new events into the unreferenced tips set, and
/// remove the events' parents from it. It will also append the events' /// remove the events' parents from it. It will also append the events'
/// level-1 parents to the `broadcasted_ids` set, so the P2P protocol /// level-1 parents to the `broadcasted_ids` set, so the P2P protocol
@@ -479,6 +496,11 @@ impl EventGraph {
/// TODO: The `broadcasted_ids` set should periodically be pruned, when /// TODO: The `broadcasted_ids` set should periodically be pruned, when
/// some sensible time has passed after broadcasting the event. /// some sensible time has passed after broadcasting the event.
pub async fn dag_insert(&self, events: &[Event]) -> Result<Vec<blake3::Hash>> { pub async fn dag_insert(&self, events: &[Event]) -> Result<Vec<blake3::Hash>> {
// Sanity check
if events.is_empty() {
return Ok(vec![])
}
// Acquire exclusive locks to `unreferenced_tips and broadcasted_ids` // Acquire exclusive locks to `unreferenced_tips and broadcasted_ids`
let mut unreferenced_tips = self.unreferenced_tips.write().await; let mut unreferenced_tips = self.unreferenced_tips.write().await;
let mut broadcasted_ids = self.broadcasted_ids.write().await; let mut broadcasted_ids = self.broadcasted_ids.write().await;
@@ -486,18 +508,52 @@ impl EventGraph {
// Here we keep the IDs to return // Here we keep the IDs to return
let mut ids = Vec::with_capacity(events.len()); let mut ids = Vec::with_capacity(events.len());
// Create an atomic batch // Create an overlay over the DAG tree
let mut batch = sled::Batch::default(); let mut overlay = SledTreeOverlay::new(&self.dag);
// Iterate over given events // Grab genesis timestamp
let genesis_timestamp = self.current_genesis.read().await.timestamp;
// Iterate over given events to validate them and
// write them to the overlay
for event in events { for event in events {
let event_id = event.id(); let event_id = event.id();
debug!( debug!(
target: "event_graph::dag_insert()", target: "event_graph::dag_insert()",
"Inserting event {} into the DAG", event_id, "Inserting event {} into the DAG", event_id,
); );
if !event
.validate(&self.dag, genesis_timestamp, self.days_rotation, Some(&overlay))
.await?
{
error!(target: "event_graph::dag_insert()", "Event {} is invalid!", event_id);
return Err(Error::EventIsInvalid)
}
let event_se = serialize_async(event).await; let event_se = serialize_async(event).await;
// Add the event to the overlay
overlay.insert(event_id.as_bytes(), &event_se)?;
// Note down the event ID to return
ids.push(event_id);
}
// Aggregate changes into a single batch
let batch = overlay.aggregate().unwrap();
// Atomically apply the batch.
// Panic if something is corrupted.
if let Err(e) = self.dag.apply_batch(batch) {
panic!("Failed applying dag_insert batch to sled: {}", e);
}
// Iterate over given events to update references and
// send out notifications about them
for event in events {
let event_id = event.id();
// Update the unreferenced DAG tips set // Update the unreferenced DAG tips set
debug!( debug!(
target: "event_graph::dag_insert()", target: "event_graph::dag_insert()",
@@ -509,31 +565,36 @@ impl EventGraph {
target: "event_graph::dag_insert()", target: "event_graph::dag_insert()",
"Removing {} from unreferenced_tips", parent_id, "Removing {} from unreferenced_tips", parent_id,
); );
unreferenced_tips.remove(parent_id);
// Iterate over unreferenced tips in previous layers
// and remove the parent
// NOTE: this might be too exhaustive, but the
// assumption is that previous layers unreferenced
// tips will be few.
for (layer, tips) in unreferenced_tips.iter_mut() {
if layer >= &event.layer {
break
}
tips.remove(parent_id);
}
broadcasted_ids.insert(*parent_id); broadcasted_ids.insert(*parent_id);
} }
} }
unreferenced_tips.retain(|_, tips| !tips.is_empty());
debug!( debug!(
target: "event_graph::dag_insert()", target: "event_graph::dag_insert()",
"Adding {} to unreferenced tips", event_id, "Adding {} to unreferenced tips", event_id,
); );
unreferenced_tips.insert(event_id);
// Add the event to the atomic batch if let Some(layer_tips) = unreferenced_tips.get_mut(&event.layer) {
batch.insert(event_id.as_bytes(), event_se); layer_tips.insert(event_id);
} else {
let mut layer_tips = HashSet::new();
layer_tips.insert(event_id);
unreferenced_tips.insert(event.layer, layer_tips);
}
// Note down the event ID to return // Send out notifications about the new event
ids.push(event_id);
}
// Atomically apply the batch.
// Panic if something is corrupted.
if let Err(e) = self.dag.apply_batch(batch) {
panic!("Failed applying dag_insert batch to sled: {}", e);
}
// Send out notifications about the new events
for event in events {
self.event_sub.notify(event.clone()).await; self.event_sub.notify(event.clone()).await;
} }
@@ -552,8 +613,33 @@ impl EventGraph {
Ok(Some(event)) Ok(Some(event))
} }
/// Find the unreferenced tips in the current DAG state. /// Get next layer along with its N_EVENT_PARENTS from the unreferenced
async fn find_unreferenced_tips(&self) -> HashSet<blake3::Hash> { /// tips of the DAG. Since tips are mapped by their layer, we go backwards
/// until we fill the vector, ensuring we always use latest layers tips as
/// parents.
async fn get_next_layer_with_parents(&self) -> (u64, [blake3::Hash; N_EVENT_PARENTS]) {
let unreferenced_tips = self.unreferenced_tips.read().await;
let mut parents = [NULL_ID; N_EVENT_PARENTS];
let mut index = 0;
'outer: for (_, tips) in unreferenced_tips.iter().rev() {
for tip in tips.iter() {
parents[index] = *tip;
index += 1;
if index >= N_EVENT_PARENTS {
break 'outer
}
}
}
let next_layer = unreferenced_tips.last_key_value().unwrap().0 + 1;
assert!(parents.iter().any(|x| x != &NULL_ID));
(next_layer, parents)
}
/// Find the unreferenced tips in the current DAG state, mapped by their layers.
async fn find_unreferenced_tips(&self) -> BTreeMap<u64, HashSet<blake3::Hash>> {
// First get all the event IDs // First get all the event IDs
let mut tips = HashSet::new(); let mut tips = HashSet::new();
for iter_elem in self.dag.iter() { for iter_elem in self.dag.iter() {
@@ -562,6 +648,7 @@ impl EventGraph {
tips.insert(id); tips.insert(id);
} }
// Iterate again to find unreferenced IDs
for iter_elem in self.dag.iter() { for iter_elem in self.dag.iter() {
let (_, event) = iter_elem.unwrap(); let (_, event) = iter_elem.unwrap();
let event: Event = deserialize_async(&event).await.unwrap(); let event: Event = deserialize_async(&event).await.unwrap();
@@ -570,26 +657,26 @@ impl EventGraph {
} }
} }
tips // Build the layers map
} let mut map: BTreeMap<u64, HashSet<blake3::Hash>> = BTreeMap::new();
for tip in tips {
/// Get the current set of unreferenced tips in the DAG. let bytes = self.dag.get(tip.as_bytes()).unwrap().unwrap();
async fn get_unreferenced_tips(&self) -> [blake3::Hash; N_EVENT_PARENTS] { let event: Event = deserialize_async(&bytes).await.unwrap();
// TODO: return vec of all instead of N_EVENT_PARENTS if let Some(layer_tips) = map.get_mut(&event.layer) {
let unreferenced_tips = self.unreferenced_tips.read().await; layer_tips.insert(tip);
} else {
let mut tips = [NULL_ID; N_EVENT_PARENTS]; let mut layer_tips = HashSet::new();
for (i, tip) in unreferenced_tips.iter().take(N_EVENT_PARENTS).enumerate() { layer_tips.insert(tip);
tips[i] = *tip map.insert(event.layer, layer_tips);
}
} }
assert!(tips.iter().any(|x| x != &NULL_ID)); map
tips
} }
/// Internal function used for DAG sorting. /// Internal function used for DAG sorting.
async fn get_unreferenced_tips_sorted(&self) -> [blake3::Hash; N_EVENT_PARENTS] { async fn get_unreferenced_tips_sorted(&self) -> [blake3::Hash; N_EVENT_PARENTS] {
let tips = self.get_unreferenced_tips().await; let (_, tips) = self.get_next_layer_with_parents().await;
// Convert the hash to BigUint for sorting // Convert the hash to BigUint for sorting
let mut sorted: Vec<_> = let mut sorted: Vec<_> =

View File

@@ -19,6 +19,7 @@
// TODO: FIXME: Some of the protocols should block operations until DAG is synced. // TODO: FIXME: Some of the protocols should block operations until DAG is synced.
use std::{ use std::{
collections::{BTreeMap, HashSet},
sync::{ sync::{
atomic::{AtomicUsize, Ordering::SeqCst}, atomic::{AtomicUsize, Ordering::SeqCst},
Arc, Arc,
@@ -26,11 +27,11 @@ use std::{
time::Duration, time::Duration,
}; };
use darkfi_serial::{async_trait, deserialize_async, SerialDecodable, SerialEncodable}; use darkfi_serial::{async_trait, SerialDecodable, SerialEncodable};
use log::{debug, error, trace, warn}; use log::{debug, error, trace, warn};
use smol::Executor; use smol::Executor;
use super::{Event, EventGraph, EventGraphPtr, NULL_ID}; use super::{Event, EventGraphPtr, NULL_ID};
use crate::{impl_p2p_message, net::*, system::timeout::timeout, Error, Result}; use crate::{impl_p2p_message, net::*, system::timeout::timeout, Error, Result};
/// Malicious behaviour threshold. If the threshold is reached, we will /// Malicious behaviour threshold. If the threshold is reached, we will
@@ -83,7 +84,7 @@ impl_p2p_message!(TipReq, "EventGraph::TipReq");
/// A P2P message representing a reply for the peer's DAG tips /// A P2P message representing a reply for the peer's DAG tips
#[derive(Clone, SerialEncodable, SerialDecodable)] #[derive(Clone, SerialEncodable, SerialDecodable)]
pub struct TipRep(pub Vec<blake3::Hash>); pub struct TipRep(pub BTreeMap<u64, HashSet<blake3::Hash>>);
impl_p2p_message!(TipRep, "EventGraph::TipRep"); impl_p2p_message!(TipRep, "EventGraph::TipRep");
#[async_trait] #[async_trait]
@@ -129,6 +130,26 @@ impl ProtocolEventGraph {
})) }))
} }
async fn increase_malicious_count(self: Arc<Self>) -> Result<()> {
let malicious_count = self.malicious_count.fetch_add(1, SeqCst);
if malicious_count + 1 == MALICIOUS_THRESHOLD {
error!(
target: "event_graph::protocol::handle_event_put()",
"[EVENTGRAPH] Peer {} reached malicious threshold. Dropping connection.",
self.channel.address(),
);
self.channel.stop().await;
return Err(Error::ChannelStopped)
}
warn!(
target: "event_graph::protocol::handle_event_put()",
"[EVENTGRAPH] Peer {} sent us a malicious event", self.channel.address(),
);
Ok(())
}
/// Protocol function handling `EventPut`. /// Protocol function handling `EventPut`.
/// This is triggered whenever someone broadcasts (or relays) a new /// This is triggered whenever someone broadcasts (or relays) a new
/// event on the network. /// event on the network.
@@ -142,50 +163,6 @@ impl ProtocolEventGraph {
target: "event_graph::protocol::handle_event_put()", target: "event_graph::protocol::handle_event_put()",
"Got EventPut: {} [{}]", event.id(), self.channel.address(), "Got EventPut: {} [{}]", event.id(), self.channel.address(),
); );
// Check if the event is older than the genesis event. If so, we should
// not include it in our Dag.
// The genesis event marks the last time the Dag has been pruned of old
// events. The pruning interval is defined by the days_rotation field
// of [`EventGraph`].
// TODO it would be better to store/cache this instead of calculating
// on every broadcast/relay.
let genesis_timestamp =
EventGraph::generate_genesis(self.event_graph.days_rotation()).timestamp;
if event.timestamp < genesis_timestamp {
debug!(
target: "event_graph::protocol::handle_event_put()",
"Event {} is older than genesis. Event timestamp: `{}`. Genesis timestamp: `{}`",
event.id(), event.timestamp, genesis_timestamp
);
}
// We received an event. Check if we already have it in our DAG.
// Also check if we have the event's parents. In the case we do
// not have the parents, we'll request them from the peer that has
// sent this event to us. In case they do not reply in time, we drop
// the event.
// Validate the event first. If we do not consider it valid, we
// will just drop it and stay quiet. If the malicious threshold
// is reached, we will stop the connection.
if !event.validate() {
let malicious_count = self.malicious_count.fetch_add(1, SeqCst);
if malicious_count + 1 == MALICIOUS_THRESHOLD {
error!(
target: "event_graph::protocol::handle_event_put()",
"[EVENTGRAPH] Peer {} reached malicious threshold. Dropping connection.",
self.channel.address(),
);
self.channel.stop().await;
return Err(Error::ChannelStopped)
}
warn!(
target: "event_graph::protocol::handle_event_put()",
"[EVENTGRAPH] Peer {} sent us a malicious event", self.channel.address(),
);
continue
}
// If we have already seen the event, we'll stay quiet. // If we have already seen the event, we'll stay quiet.
let event_id = event.id(); let event_id = event.id();
@@ -197,6 +174,35 @@ impl ProtocolEventGraph {
continue continue
} }
// We received an event. Check if we already have it in our DAG.
// Check event is not older that current genesis event timestamp.
// Also check if we have the event's parents. In the case we do
// not have the parents, we'll request them from the peer that has
// sent this event to us. In case they do not reply in time, we drop
// the event.
// Check if the event is older than the genesis event. If so, we should
// not include it in our Dag.
// The genesis event marks the last time the Dag has been pruned of old
// events. The pruning interval is defined by the days_rotation field
// of [`EventGraph`].
let genesis_timestamp = self.event_graph.current_genesis.read().await.timestamp;
if event.timestamp < genesis_timestamp {
debug!(
target: "event_graph::protocol::handle_event_put()",
"Event {} is older than genesis. Event timestamp: `{}`. Genesis timestamp: `{}`",
event.id(), event.timestamp, genesis_timestamp
);
}
// Validate the new event first. If we do not consider it valid, we
// will just drop it and stay quiet. If the malicious threshold
// is reached, we will stop the connection.
if !event.validate_new() {
self.clone().increase_malicious_count().await?;
continue
}
// At this point, this is a new event to us. Let's see if we // At this point, this is a new event to us. Let's see if we
// have all of its parents. // have all of its parents.
debug!( debug!(
@@ -204,16 +210,16 @@ impl ProtocolEventGraph {
"Event {} is new", event_id, "Event {} is new", event_id,
); );
let mut missing_parents = vec![]; let mut missing_parents = HashSet::new();
for parent_id in event.parents.iter() { for parent_id in event.parents.iter() {
// `event.validate()` should have already made sure that // `event.validate_new()` should have already made sure that
// not all parents are NULL, and that there are no duplicates. // not all parents are NULL, and that there are no duplicates.
if parent_id == &NULL_ID { if parent_id == &NULL_ID {
continue continue
} }
if !self.event_graph.dag.contains_key(parent_id.as_bytes()).unwrap() { if !self.event_graph.dag.contains_key(parent_id.as_bytes()).unwrap() {
missing_parents.push(*parent_id); missing_parents.insert(*parent_id);
} }
} }
@@ -221,13 +227,13 @@ impl ProtocolEventGraph {
// fetch them from this peer. Do this recursively until we // fetch them from this peer. Do this recursively until we
// find all of them. // find all of them.
if !missing_parents.is_empty() { if !missing_parents.is_empty() {
// We track the received events in a vec. If/when we get all // We track the received events mapped by their layer.
// of them, we need to insert them in reverse so the DAG state // If/when we get all of them, we need to insert them in order so
// stays correct and unreferenced tips represent the actual thing // the DAG state stays correct and unreferenced tips represent the
// they should. If we insert them out of order, then we might have // actual thing they should. If we insert them out of order, then
// wrong unreferenced tips. // we might have wrong unreferenced tips.
// TODO: What should we do if at some point the events become too old? let mut received_events: BTreeMap<u64, Vec<Event>> = BTreeMap::new();
let mut received_events = vec![]; let mut received_events_hashes = HashSet::new();
debug!( debug!(
target: "event_graph::protocol::handle_event_put()", target: "event_graph::protocol::handle_event_put()",
@@ -271,9 +277,15 @@ impl ProtocolEventGraph {
"Got correct parent event {}", parent.id(), "Got correct parent event {}", parent.id(),
); );
received_events.push(parent.clone()); if let Some(layer_events) = received_events.get_mut(&parent.layer) {
let pos = missing_parents.iter().position(|id| id == &parent.id()).unwrap(); layer_events.push(parent.clone());
missing_parents.remove(pos); } else {
let layer_events = vec![parent.clone()];
received_events.insert(parent.layer, layer_events);
}
received_events_hashes.insert(*parent_id);
missing_parents.remove(parent_id);
// See if we have the upper parents // See if we have the upper parents
for upper_parent in parent.parents.iter() { for upper_parent in parent.parents.iter() {
@@ -281,13 +293,19 @@ impl ProtocolEventGraph {
continue continue
} }
if !self.event_graph.dag.contains_key(upper_parent.as_bytes()).unwrap() if !missing_parents.contains(upper_parent) &&
!received_events_hashes.contains(upper_parent) &&
!self
.event_graph
.dag
.contains_key(upper_parent.as_bytes())
.unwrap()
{ {
debug!( debug!(
target: "event_graph::protocol::handle_event_put()", target: "event_graph::protocol::handle_event_put()",
"Found upper missing parent event{}", upper_parent, "Found upper missing parent event{}", upper_parent,
); );
missing_parents.push(*upper_parent); missing_parents.insert(*upper_parent);
} }
} }
} }
@@ -295,19 +313,29 @@ impl ProtocolEventGraph {
// At this point we should've got all the events. // At this point we should've got all the events.
// We should add them to the DAG. // We should add them to the DAG.
// TODO: FIXME: Also validate these events. let mut events = vec![];
let received_events_rev: Vec<Event> = for (_, tips) in received_events {
received_events.iter().rev().cloned().collect(); for tip in tips {
self.event_graph.dag_insert(&received_events_rev).await.unwrap(); events.push(tip);
}
}
if self.event_graph.dag_insert(&events).await.is_err() {
self.clone().increase_malicious_count().await?;
continue
}
} // <-- !missing_parents.is_empty() } // <-- !missing_parents.is_empty()
// If we're here, we have all the parents, and we can now // If we're here, we have all the parents, and we can now
// add the actual event to the DAG. // perform a full validation and add the actual event to
// the DAG.
debug!( debug!(
target: "event_graph::protocol::handle_event_put()", target: "event_graph::protocol::handle_event_put()",
"Got all parents necessary for insertion", "Got all parents necessary for insertion",
); );
self.event_graph.dag_insert(&[event.clone()]).await.unwrap(); if self.event_graph.dag_insert(&[event.clone()]).await.is_err() {
self.clone().increase_malicious_count().await?;
continue
}
// Relay the event to other peers. // Relay the event to other peers.
self.event_graph self.event_graph
@@ -330,6 +358,15 @@ impl ProtocolEventGraph {
"Got EventReq: {} [{}]", event_id, self.channel.address(), "Got EventReq: {} [{}]", event_id, self.channel.address(),
); );
// Check if node has finished syncing its DAG
if !*self.event_graph.synced.read().await {
debug!(
target: "event_graph::protocol::handle_event_put",
"DAG is still syncing, skipping..."
);
continue
}
// We received an event request from somebody. // We received an event request from somebody.
// If we do have it, we will send it back to them as `EventRep`. // If we do have it, we will send it back to them as `EventRep`.
// Otherwise, we'll stay quiet. An honest node should always have // Otherwise, we'll stay quiet. An honest node should always have
@@ -367,16 +404,12 @@ impl ProtocolEventGraph {
target: "event_graph::protocol::handle_event_req()", target: "event_graph::protocol::handle_event_req()",
"Fetching event {} from DAG", event_id, "Fetching event {} from DAG", event_id,
); );
let event = self.event_graph.dag.get(event_id.as_bytes()).unwrap().unwrap(); let event = self.event_graph.dag_get(&event_id).await.unwrap().unwrap();
let event: Event = deserialize_async(&event).await.unwrap();
// Check if the incoming event is older than the genesis event. If so, something // Check if the incoming event is older than the genesis event. If so, something
// has gone wrong. The event should have been pruned during the last // has gone wrong. The event should have been pruned during the last
// rotation. // rotation.
// TODO it would be better to store/cache this instead of calculating let genesis_timestamp = self.event_graph.current_genesis.read().await.timestamp;
// on every broadcast/relay.
let genesis_timestamp =
EventGraph::generate_genesis(self.event_graph.days_rotation()).timestamp;
if event.timestamp < genesis_timestamp { if event.timestamp < genesis_timestamp {
error!( error!(
target: "event_graph::protocol::handle_event_req()", target: "event_graph::protocol::handle_event_req()",
@@ -416,20 +449,29 @@ impl ProtocolEventGraph {
"Got TipReq [{}]", self.channel.address(), "Got TipReq [{}]", self.channel.address(),
); );
// Check if node has finished syncing its DAG
if !*self.event_graph.synced.read().await {
debug!(
target: "event_graph::protocol::handle_event_put",
"DAG is still syncing, skipping..."
);
continue
}
// TODO: Rate limit // TODO: Rate limit
// We received a tip request. Let's find them, add them to // We received a tip request. Let's find them, add them to
// our bcast ids list, and reply with them. // our bcast ids list, and reply with them.
let mut tips = self.event_graph.get_unreferenced_tips().await.to_vec(); let layers = self.event_graph.unreferenced_tips.read().await.clone();
tips.retain(|x| x != &NULL_ID);
let mut bcast_ids = self.event_graph.broadcasted_ids.write().await; let mut bcast_ids = self.event_graph.broadcasted_ids.write().await;
for tip in tips.iter() { for (_, tips) in layers.iter() {
bcast_ids.insert(*tip); for tip in tips {
bcast_ids.insert(*tip);
}
} }
drop(bcast_ids); drop(bcast_ids);
self.channel.send(&TipRep(tips)).await?; self.channel.send(&TipRep(layers)).await?;
} }
} }
} }

View File

@@ -20,15 +20,15 @@
use std::sync::Arc; use std::sync::Arc;
use log::info; use log::{info, warn};
use rand::{prelude::SliceRandom, Rng}; use rand::{prelude::SliceRandom, rngs::ThreadRng};
use smol::{channel, future, Executor}; use smol::{channel, future, Executor};
use url::Url; use url::Url;
use crate::{ use crate::{
event_graph::{ event_graph::{
proto::{EventPut, ProtocolEventGraph}, proto::{EventPut, ProtocolEventGraph},
Event, EventGraph, NULL_ID, Event, EventGraph,
}, },
net::{P2p, Settings, SESSION_ALL}, net::{P2p, Settings, SESSION_ALL},
system::sleep, system::sleep,
@@ -40,9 +40,7 @@ const N_CONNS: usize = 2;
//const N_NODES: usize = 50; //const N_NODES: usize = 50;
//const N_CONNS: usize = N_NODES / 3; //const N_CONNS: usize = N_NODES / 3;
#[test] fn init_logger() {
#[ignore]
fn eventgraph_propagation() {
let mut cfg = simplelog::ConfigBuilder::new(); let mut cfg = simplelog::ConfigBuilder::new();
cfg.add_filter_ignore("sled".to_string()); cfg.add_filter_ignore("sled".to_string());
cfg.add_filter_ignore("net::protocol_ping".to_string()); cfg.add_filter_ignore("net::protocol_ping".to_string());
@@ -56,81 +54,87 @@ fn eventgraph_propagation() {
cfg.add_filter_ignore("net::channel::send()".to_string()); cfg.add_filter_ignore("net::channel::send()".to_string());
cfg.add_filter_ignore("net::channel::start()".to_string()); cfg.add_filter_ignore("net::channel::start()".to_string());
cfg.add_filter_ignore("net::channel::subscribe_msg()".to_string()); cfg.add_filter_ignore("net::channel::subscribe_msg()".to_string());
cfg.add_filter_ignore("net::channel::main_receive_loop()".to_string());
cfg.add_filter_ignore("net::tcp".to_string());
simplelog::TermLogger::init( // We check this error so we can execute same file tests in parallel,
//simplelog::LevelFilter::Info, // otherwise second one fails to init logger here.
simplelog::LevelFilter::Debug, if simplelog::TermLogger::init(
simplelog::LevelFilter::Info,
//simplelog::LevelFilter::Debug,
//simplelog::LevelFilter::Trace, //simplelog::LevelFilter::Trace,
cfg.build(), cfg.build(),
simplelog::TerminalMode::Mixed, simplelog::TerminalMode::Mixed,
simplelog::ColorChoice::Auto, simplelog::ColorChoice::Auto,
) )
.unwrap(); .is_err()
{
let ex = Arc::new(Executor::new()); warn!(target: "test_harness", "Logger already initialized");
let ex_ = ex.clone(); }
let (signal, shutdown) = channel::unbounded::<()>();
// Run a thread for each node.
easy_parallel::Parallel::new()
.each(0..N_NODES, |_| future::block_on(ex.run(shutdown.recv())))
.finish(|| {
future::block_on(async {
eventgraph_propagation_real(ex_).await;
drop(signal);
})
});
} }
async fn eventgraph_propagation_real(ex: Arc<Executor<'static>>) { async fn spawn_node(
let mut eg_instances = vec![]; inbound_addrs: Vec<Url>,
let mut rng = rand::thread_rng(); peers: Vec<Url>,
ex: Arc<Executor<'static>>,
) -> Arc<EventGraph> {
let settings = Settings {
localnet: true,
inbound_addrs,
outbound_connections: 0,
outbound_connect_timeout: 2,
inbound_connections: usize::MAX,
peers,
allowed_transports: vec!["tcp".to_string()],
..Default::default()
};
let mut genesis_event_id = NULL_ID; let p2p = P2p::new(settings, ex.clone()).await;
let sled_db = sled::Config::new().temporary(true).open().unwrap();
let event_graph = EventGraph::new(p2p.clone(), sled_db, "dag", 1, ex.clone()).await.unwrap();
*event_graph.synced.write().await = true;
let event_graph_ = event_graph.clone();
// Register the P2P protocols
let registry = p2p.protocol_registry();
registry
.register(SESSION_ALL, move |channel, _| {
let event_graph_ = event_graph_.clone();
async move { ProtocolEventGraph::init(event_graph_, channel).await.unwrap() }
})
.await;
event_graph
}
async fn bootstrap_nodes(
peer_indexes: &[usize],
starting_port: usize,
rng: &mut ThreadRng,
ex: Arc<Executor<'static>>,
) -> Vec<Arc<EventGraph>> {
let mut eg_instances = vec![];
// Initialize the nodes // Initialize the nodes
for i in 0..N_NODES { for i in 0..N_NODES {
// Everyone will connect to N_CONNS random peers. // Everyone will connect to N_CONNS random peers.
let mut peer_indexes_copy = peer_indexes.to_owned();
peer_indexes_copy.remove(i);
let peer_indexes_to_connect: Vec<_> =
peer_indexes_copy.choose_multiple(rng, N_CONNS).collect();
let mut peers = vec![]; let mut peers = vec![];
for _ in 0..N_CONNS { for peer_index in peer_indexes_to_connect {
let mut port = 13200 + i; let port = starting_port + peer_index;
while port == 13200 + i {
port = 13200 + rng.gen_range(0..N_NODES);
}
peers.push(Url::parse(&format!("tcp://127.0.0.1:{}", port)).unwrap()); peers.push(Url::parse(&format!("tcp://127.0.0.1:{}", port)).unwrap());
} }
let settings = Settings { let event_graph = spawn_node(
localnet: true, vec![Url::parse(&format!("tcp://127.0.0.1:{}", starting_port + i)).unwrap()],
inbound_addrs: vec![Url::parse(&format!("tcp://127.0.0.1:{}", 13200 + i)).unwrap()],
outbound_connections: 0,
outbound_connect_timeout: 2,
inbound_connections: usize::MAX,
peers, peers,
allowed_transports: vec!["tcp".to_string()], ex.clone(),
..Default::default() )
}; .await;
let p2p = P2p::new(settings, ex.clone()).await;
let sled_db = sled::Config::new().temporary(true).open().unwrap();
let event_graph =
EventGraph::new(p2p.clone(), sled_db, "dag", 1, ex.clone()).await.unwrap();
let event_graph_ = event_graph.clone();
// Take the last sled item since there's only 1
if genesis_event_id == NULL_ID {
let (id, _) = event_graph.dag.last().unwrap().unwrap();
genesis_event_id = blake3::Hash::from_bytes((&id as &[u8]).try_into().unwrap());
}
// Register the P2P protocols
let registry = p2p.protocol_registry();
registry
.register(SESSION_ALL, move |channel, _| {
let event_graph_ = event_graph_.clone();
async move { ProtocolEventGraph::init(event_graph_, channel).await.unwrap() }
})
.await;
eg_instances.push(event_graph); eg_instances.push(event_graph);
} }
@@ -140,64 +144,121 @@ async fn eventgraph_propagation_real(ex: Arc<Executor<'static>>) {
eg.p2p.clone().start().await.unwrap(); eg.p2p.clone().start().await.unwrap();
} }
info!("Waiting 10s until all peers connect"); info!("Waiting 5s until all peers connect");
sleep(10).await; sleep(5).await;
eg_instances
}
async fn assert_dags(
eg_instances: &Vec<Arc<EventGraph>>,
expected_len: usize,
rng: &mut ThreadRng,
) {
let random_node = eg_instances.choose(rng).unwrap();
let last_layer_tips =
random_node.unreferenced_tips.read().await.last_key_value().unwrap().1.clone();
for (i, eg) in eg_instances.iter().enumerate() {
let node_last_layer_tips =
eg.unreferenced_tips.read().await.last_key_value().unwrap().1.clone();
assert!(
eg.dag.len() == expected_len,
"Node {}, expected {} events, have {}",
i,
expected_len,
eg.dag.len()
);
assert_eq!(
node_last_layer_tips, last_layer_tips,
"Node {} contains malformed unreferenced tips",
i
);
}
}
macro_rules! test_body {
($real_call:ident) => {
init_logger();
let ex = Arc::new(Executor::new());
let ex_ = ex.clone();
let (signal, shutdown) = channel::unbounded::<()>();
// Run a thread for each node.
easy_parallel::Parallel::new()
.each(0..N_NODES, |_| future::block_on(ex.run(shutdown.recv())))
.finish(|| {
future::block_on(async {
$real_call(ex_).await;
drop(signal);
})
});
};
}
#[test]
fn eventgraph_propagation() {
test_body!(eventgraph_propagation_real);
}
async fn eventgraph_propagation_real(ex: Arc<Executor<'static>>) {
let mut rng = rand::thread_rng();
let peer_indexes: Vec<usize> = (0..N_NODES).collect();
// Bootstrap nodes
let mut eg_instances = bootstrap_nodes(&peer_indexes, 13200, &mut rng, ex.clone()).await;
// Grab genesis event
let random_node = eg_instances.choose(&mut rng).unwrap();
let (id, _) = random_node.dag.last().unwrap().unwrap();
let genesis_event_id = blake3::Hash::from_bytes((&id as &[u8]).try_into().unwrap());
// ========================================= // =========================================
// 1. Assert that everyone's DAG is the same // 1. Assert that everyone's DAG is the same
// ========================================= // =========================================
for (i, eg) in eg_instances.iter().enumerate() { assert_dags(&eg_instances, 1, &mut rng).await;
let tips = eg.unreferenced_tips.read().await;
assert!(eg.dag.len() == 1, "Node {}", i);
assert!(tips.len() == 1, "Node {}", i);
assert!(tips.get(&genesis_event_id).is_some(), "Node {}", i);
}
// ========================================== // ==========================================
// 2. Create an event in one node and publish // 2. Create an event in one node and publish
// ========================================== // ==========================================
let random_node = eg_instances.choose(&mut rand::thread_rng()).unwrap(); let random_node = eg_instances.choose(&mut rng).unwrap();
let event = Event::new(vec![1, 2, 3, 4], random_node.clone()).await; let event = Event::new(vec![1, 2, 3, 4], random_node).await;
assert!(event.parents.contains(&genesis_event_id)); assert!(event.parents.contains(&genesis_event_id));
// The node adds it to their DAG. // The node adds it to their DAG, on layer 1.
let event_id = random_node.dag_insert(&[event.clone()]).await.unwrap()[0]; let event_id = random_node.dag_insert(&[event.clone()]).await.unwrap()[0];
let tips = random_node.unreferenced_tips.read().await; let tips_layers = random_node.unreferenced_tips.read().await;
assert!(tips.len() == 1); // Since genesis was referenced, its layer (0) have been removed
assert!(tips.get(&event_id).is_some()); assert_eq!(tips_layers.len(), 1);
drop(tips); assert!(tips_layers.last_key_value().unwrap().1.get(&event_id).is_some());
drop(tips_layers);
info!("Broadcasting event {}", event_id); info!("Broadcasting event {}", event_id);
random_node.p2p.broadcast(&EventPut(event)).await; random_node.p2p.broadcast(&EventPut(event)).await;
info!("Waiting 10s for event propagation"); info!("Waiting 5s for event propagation");
sleep(10).await; sleep(5).await;
// ==================================================== // ====================================================
// 3. Assert that everyone has the new event in the DAG // 3. Assert that everyone has the new event in the DAG
// ==================================================== // ====================================================
for (i, eg) in eg_instances.iter().enumerate() { assert_dags(&eg_instances, 2, &mut rng).await;
let tips = eg.unreferenced_tips.read().await;
assert!(eg.dag.len() == 2, "Node {}", i);
assert!(tips.len() == 1, "Node {}", i);
assert!(tips.get(&event_id).is_some(), "Node {}", i);
}
// ============================================================== // ==============================================================
// 4. Create multiple events on a node and broadcast the last one // 4. Create multiple events on a node and broadcast the last one
// The `EventPut` logic should manage to fetch all of them, // The `EventPut` logic should manage to fetch all of them,
// provided that the last one references the earlier ones. // provided that the last one references the earlier ones.
// ============================================================== // ==============================================================
let random_node = eg_instances.choose(&mut rand::thread_rng()).unwrap(); let random_node = eg_instances.choose(&mut rng).unwrap();
let event0 = Event::new(vec![1, 2, 3, 4, 0], random_node.clone()).await; let event0 = Event::new(vec![1, 2, 3, 4, 0], random_node).await;
let event0_id = random_node.dag_insert(&[event0.clone()]).await.unwrap()[0]; let event0_id = random_node.dag_insert(&[event0.clone()]).await.unwrap()[0];
let event1 = Event::new(vec![1, 2, 3, 4, 1], random_node.clone()).await; let event1 = Event::new(vec![1, 2, 3, 4, 1], random_node).await;
let event1_id = random_node.dag_insert(&[event1.clone()]).await.unwrap()[0]; let event1_id = random_node.dag_insert(&[event1.clone()]).await.unwrap()[0];
let event2 = Event::new(vec![1, 2, 3, 4, 2], random_node.clone()).await; let event2 = Event::new(vec![1, 2, 3, 4, 2], random_node).await;
let event2_id = random_node.dag_insert(&[event2.clone()]).await.unwrap()[0]; let event2_id = random_node.dag_insert(&[event2.clone()]).await.unwrap()[0];
// Genesis event + event from 2. + upper 3 events // Genesis event + event from 2. + upper 3 events (layer 4)
assert!(random_node.dag.len() == 5); assert_eq!(random_node.dag.len(), 5);
let tips = random_node.unreferenced_tips.read().await; let tips_layers = random_node.unreferenced_tips.read().await;
assert!(tips.len() == 1); assert_eq!(tips_layers.len(), 1);
assert!(tips.get(&event2_id).is_some()); assert!(tips_layers.get(&4).unwrap().get(&event2_id).is_some());
drop(tips); drop(tips_layers);
let event_chain = let event_chain =
vec![(event0_id, event0.parents), (event1_id, event1.parents), (event2_id, event2.parents)]; vec![(event0_id, event0.parents), (event1_id, event1.parents), (event2_id, event2.parents)];
@@ -205,142 +266,186 @@ async fn eventgraph_propagation_real(ex: Arc<Executor<'static>>) {
info!("Broadcasting event {}", event2_id); info!("Broadcasting event {}", event2_id);
info!("Event chain: {:#?}", event_chain); info!("Event chain: {:#?}", event_chain);
random_node.p2p.broadcast(&EventPut(event2)).await; random_node.p2p.broadcast(&EventPut(event2)).await;
info!("Waiting 10s for event propagation"); info!("Waiting 5s for event propagation");
sleep(10).await; sleep(5).await;
// ========================================== // ==========================================
// 5. Assert that everyone has all the events // 5. Assert that everyone has all the events
// ========================================== // ==========================================
for (i, eg) in eg_instances.iter().enumerate() { assert_dags(&eg_instances, 5, &mut rng).await;
let tips = eg.unreferenced_tips.read().await;
assert!(eg.dag.len() == 5, "Node {}, expected 5 events, have {}", i, eg.dag.len());
assert!(tips.len() == 1, "Node {}, expected 1 tip, have {}", i, tips.len());
assert!(tips.get(&event2_id).is_some(), "Node {}, expected tip to be {}", i, event2_id);
}
// =========================================== // ===========================================
// 6. Create multiple events on multiple nodes // 6. Create multiple events on multiple nodes
// =========================================== // ===========================================
// node 1 // node 1
// ======= // =======
let node1 = eg_instances.choose(&mut rand::thread_rng()).unwrap(); let node1 = eg_instances.choose(&mut rng).unwrap();
let event0_1 = Event::new(vec![1, 2, 3, 4, 3], node1.clone()).await; let event0_1 = Event::new(vec![1, 2, 3, 4, 3], node1).await;
let _ = node1.dag_insert(&[event0_1.clone()]).await.unwrap()[0]; node1.dag_insert(&[event0_1.clone()]).await.unwrap();
node1.p2p.broadcast(&EventPut(event0_1)).await; node1.p2p.broadcast(&EventPut(event0_1)).await;
let event1_1 = Event::new(vec![1, 2, 3, 4, 4], node1.clone()).await; let event1_1 = Event::new(vec![1, 2, 3, 4, 4], node1).await;
let _ = node1.dag_insert(&[event1_1.clone()]).await.unwrap()[0]; node1.dag_insert(&[event1_1.clone()]).await.unwrap();
node1.p2p.broadcast(&EventPut(event1_1)).await; node1.p2p.broadcast(&EventPut(event1_1)).await;
let event2_1 = Event::new(vec![1, 2, 3, 4, 5], node1.clone()).await; let event2_1 = Event::new(vec![1, 2, 3, 4, 5], node1).await;
let _ = node1.dag_insert(&[event2_1.clone()]).await.unwrap()[0]; node1.dag_insert(&[event2_1.clone()]).await.unwrap();
node1.p2p.broadcast(&EventPut(event2_1)).await; node1.p2p.broadcast(&EventPut(event2_1)).await;
// ======= // =======
// node 2 // node 2
// ======= // =======
let node2 = eg_instances.choose(&mut rand::thread_rng()).unwrap(); let node2 = eg_instances.choose(&mut rng).unwrap();
let event0_2 = Event::new(vec![1, 2, 3, 4, 6], node2.clone()).await; let event0_2 = Event::new(vec![1, 2, 3, 4, 6], node2).await;
let _ = node2.dag_insert(&[event0_2.clone()]).await.unwrap()[0]; node2.dag_insert(&[event0_2.clone()]).await.unwrap();
node2.p2p.broadcast(&EventPut(event0_2)).await; node2.p2p.broadcast(&EventPut(event0_2)).await;
let event1_2 = Event::new(vec![1, 2, 3, 4, 7], node2.clone()).await;
let _ = node2.dag_insert(&[event1_2.clone()]).await.unwrap()[0]; let event1_2 = Event::new(vec![1, 2, 3, 4, 7], node2).await;
node2.dag_insert(&[event1_2.clone()]).await.unwrap();
node2.p2p.broadcast(&EventPut(event1_2)).await; node2.p2p.broadcast(&EventPut(event1_2)).await;
let event2_2 = Event::new(vec![1, 2, 3, 4, 8], node2.clone()).await; let event2_2 = Event::new(vec![1, 2, 3, 4, 8], node2).await;
let _ = node2.dag_insert(&[event2_2.clone()]).await.unwrap()[0]; node2.dag_insert(&[event2_2.clone()]).await.unwrap();
node2.p2p.broadcast(&EventPut(event2_2)).await; node2.p2p.broadcast(&EventPut(event2_2)).await;
// ======= // =======
// node 3 // node 3
// ======= // =======
let node3 = eg_instances.choose(&mut rand::thread_rng()).unwrap(); let node3 = eg_instances.choose(&mut rng).unwrap();
let event0_3 = Event::new(vec![1, 2, 3, 4, 9], node3.clone()).await; let event0_3 = Event::new(vec![1, 2, 3, 4, 9], node3).await;
let _ = node3.dag_insert(&[event0_3.clone()]).await.unwrap()[0]; node3.dag_insert(&[event0_3.clone()]).await.unwrap();
node2.p2p.broadcast(&EventPut(event0_3)).await; node2.p2p.broadcast(&EventPut(event0_3)).await;
let event1_3 = Event::new(vec![1, 2, 3, 4, 10], node3.clone()).await; let event1_3 = Event::new(vec![1, 2, 3, 4, 10], node3).await;
let _ = node3.dag_insert(&[event1_3.clone()]).await.unwrap()[0]; node3.dag_insert(&[event1_3.clone()]).await.unwrap();
node2.p2p.broadcast(&EventPut(event1_3)).await; node2.p2p.broadcast(&EventPut(event1_3)).await;
let event2_3 = Event::new(vec![1, 2, 3, 4, 11], node3.clone()).await; let event2_3 = Event::new(vec![1, 2, 3, 4, 11], node3).await;
let event2_3_id = node3.dag_insert(&[event2_3.clone()]).await.unwrap()[0]; node3.dag_insert(&[event2_3.clone()]).await.unwrap();
node3.p2p.broadcast(&EventPut(event2_3)).await; node3.p2p.broadcast(&EventPut(event2_3)).await;
info!("Waiting 10s for events propagation"); info!("Waiting 5s for events propagation");
sleep(10).await; sleep(5).await;
// ========================================== // ==========================================
// 7. Assert that everyone has all the events // 7. Assert that everyone has all the events
// ========================================== // ==========================================
for (i, eg) in eg_instances.iter().enumerate() { // 5 events from 2. and 4. + 9 events from 6. = 14
let tips = eg.unreferenced_tips.read().await; assert_dags(&eg_instances, 14, &mut rng).await;
assert!(eg.dag.len() == 14, "Node {}, expected 14 events, have {}", i, eg.dag.len());
// 5 events from 2. and 4. + 9 events from 6. = ^
assert!(tips.get(&event2_3_id).is_some(), "Node {}, expected tip to be {}", i, event2_3_id);
}
// ============================================================ // ============================================================
// 8. Start a new node and try to sync the DAG from other peers // 8. Start a new node and try to sync the DAG from other peers
// ============================================================ // ============================================================
{ {
// Connect to N_CONNS random peers. // Connect to N_CONNS random peers.
let peer_indexes_to_connect: Vec<_> =
peer_indexes.choose_multiple(&mut rng, N_CONNS).collect();
let mut peers = vec![]; let mut peers = vec![];
for _ in 0..N_CONNS { for peer_index in peer_indexes_to_connect {
let port = 13200 + rng.gen_range(0..N_NODES); let port = 13200 + peer_index;
peers.push(Url::parse(&format!("tcp://127.0.0.1:{}", port)).unwrap()); peers.push(Url::parse(&format!("tcp://127.0.0.1:{}", port)).unwrap());
} }
let settings = Settings { let event_graph = spawn_node(
localnet: true, vec![Url::parse(&format!("tcp://127.0.0.1:{}", 13200 + N_NODES + 1)).unwrap()],
inbound_addrs: vec![
Url::parse(&format!("tcp://127.0.0.1:{}", 13200 + N_NODES + 1)).unwrap()
],
outbound_connections: 0,
outbound_connect_timeout: 2,
inbound_connections: usize::MAX,
peers, peers,
allowed_transports: vec!["tcp".to_string()], ex.clone(),
..Default::default() )
}; .await;
let p2p = P2p::new(settings, ex.clone()).await;
let sled_db = sled::Config::new().temporary(true).open().unwrap();
let event_graph =
EventGraph::new(p2p.clone(), sled_db, "dag", 1, ex.clone()).await.unwrap();
let event_graph_ = event_graph.clone();
// Register the P2P protocols
let registry = p2p.protocol_registry();
registry
.register(SESSION_ALL, move |channel, _| {
let event_graph_ = event_graph_.clone();
async move { ProtocolEventGraph::init(event_graph_, channel).await.unwrap() }
})
.await;
eg_instances.push(event_graph.clone()); eg_instances.push(event_graph.clone());
event_graph.p2p.clone().start().await.unwrap(); event_graph.p2p.clone().start().await.unwrap();
info!("Waiting 10s for new node connection"); info!("Waiting 5s for new node connection");
sleep(10).await; sleep(5).await;
event_graph.dag_sync().await.unwrap(); event_graph.dag_sync().await.unwrap()
} }
info!("Waiting 10s for things to settle");
sleep(10).await;
// ============================================================ // ============================================================
// 9. Assert the new synced DAG has the same contents as others // 9. Assert the new synced DAG has the same contents as others
// ============================================================ // ============================================================
for (i, eg) in eg_instances.iter().enumerate() { // 5 events from 2. and 4. + 9 events from 6. = 14
let tips = eg.unreferenced_tips.read().await; assert_dags(&eg_instances, 14, &mut rng).await;
assert!(eg.dag.len() == 14, "Node {}, expected 14 events, have {}", i, eg.dag.len());
// 5 events from 2. and 4. + 9 events from 6. = ^ // Stop the P2P network
assert!(tips.get(&event2_3_id).is_some(), "Node {}, expected tip to be {}", i, event2_3_id); for eg in eg_instances.iter() {
} eg.p2p.clone().stop().await;
}
}
#[test]
#[ignore]
fn eventgraph_chaotic_propagation() {
test_body!(eventgraph_chaotic_propagation_real);
}
async fn eventgraph_chaotic_propagation_real(ex: Arc<Executor<'static>>) {
let mut rng = rand::thread_rng();
let peer_indexes: Vec<usize> = (0..N_NODES).collect();
let n_events: usize = 100000;
// Bootstrap nodes
let mut eg_instances = bootstrap_nodes(&peer_indexes, 14200, &mut rng, ex.clone()).await;
// =========================================
// 1. Assert that everyone's DAG is the same
// =========================================
assert_dags(&eg_instances, 1, &mut rng).await;
// ===========================================
// 2. Create multiple events on multiple nodes
for i in 0..n_events {
let random_node = eg_instances.choose(&mut rng).unwrap();
let event = Event::new(i.to_be_bytes().to_vec(), random_node).await;
random_node.dag_insert(&[event.clone()]).await.unwrap();
random_node.p2p.broadcast(&EventPut(event)).await;
}
info!("Waiting 5s for events propagation");
sleep(5).await;
// ==========================================
// 3. Assert that everyone has all the events
// ==========================================
assert_dags(&eg_instances, n_events + 1, &mut rng).await;
// ============================================================
// 4. Start a new node and try to sync the DAG from other peers
// ============================================================
{
// Connect to N_CONNS random peers.
let peer_indexes_to_connect: Vec<_> =
peer_indexes.choose_multiple(&mut rng, N_CONNS).collect();
let mut peers = vec![];
for peer_index in peer_indexes_to_connect {
let port = 14200 + peer_index;
peers.push(Url::parse(&format!("tcp://127.0.0.1:{}", port)).unwrap());
}
let event_graph = spawn_node(
vec![Url::parse(&format!("tcp://127.0.0.1:{}", 14200 + N_NODES + 1)).unwrap()],
peers,
ex.clone(),
)
.await;
eg_instances.push(event_graph.clone());
event_graph.p2p.clone().start().await.unwrap();
info!("Waiting 5s for new node connection");
sleep(5).await;
event_graph.dag_sync().await.unwrap()
}
// ============================================================
// 5. Assert the new synced DAG has the same contents as others
// ============================================================
assert_dags(&eg_instances, n_events + 1, &mut rng).await;
// Stop the P2P network // Stop the P2P network
for eg in eg_instances.iter() { for eg in eg_instances.iter() {

View File

@@ -18,6 +18,8 @@
use std::time::UNIX_EPOCH; use std::time::UNIX_EPOCH;
use crate::event_graph::{Event, GENESIS_CONTENTS, INITIAL_GENESIS, NULL_ID, N_EVENT_PARENTS};
/// Seconds in a day /// Seconds in a day
pub(super) const DAY: i64 = 86400; pub(super) const DAY: i64 = 86400;
@@ -89,6 +91,29 @@ pub(super) fn seconds_until_next_rotation(next_rotation: u64) -> u64 {
next_rotation - now next_rotation - now
} }
/// Generate a deterministic genesis event corresponding to the DAG's configuration.
pub(super) fn generate_genesis(days_rotation: u64) -> Event {
// Days rotation is u64 except zero
let genesis_days_rotation = if days_rotation == 0 { 1 } else { days_rotation };
// First check how many days passed since initial genesis.
let days_passed = days_since(INITIAL_GENESIS);
// Calculate the number of days_rotation intervals since INITIAL_GENESIS
let rotations_since_genesis = days_passed / genesis_days_rotation;
// Calculate the timestamp of the most recent event
let timestamp =
INITIAL_GENESIS + (rotations_since_genesis * genesis_days_rotation * DAY as u64);
Event {
timestamp,
content: GENESIS_CONTENTS.to_vec(),
parents: [NULL_ID; N_EVENT_PARENTS],
layer: 0,
}
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use crate::event_graph::INITIAL_GENESIS; use crate::event_graph::INITIAL_GENESIS;

View File

@@ -142,6 +142,7 @@ impl std::cmp::Eq for StoppableTask {}
mod tests { mod tests {
use super::*; use super::*;
use crate::{error::Error, system::sleep_forever}; use crate::{error::Error, system::sleep_forever};
use log::warn;
use smol::Executor; use smol::Executor;
use std::sync::Arc; use std::sync::Arc;
@@ -150,13 +151,21 @@ mod tests {
let mut cfg = simplelog::ConfigBuilder::new(); let mut cfg = simplelog::ConfigBuilder::new();
cfg.add_filter_ignore("async_io".to_string()); cfg.add_filter_ignore("async_io".to_string());
cfg.add_filter_ignore("polling".to_string()); cfg.add_filter_ignore("polling".to_string());
simplelog::TermLogger::init(
// We check this error so we can execute same file tests in parallel,
// otherwise second one fails to init logger here.
if simplelog::TermLogger::init(
//simplelog::LevelFilter::Info,
//simplelog::LevelFilter::Debug,
simplelog::LevelFilter::Trace, simplelog::LevelFilter::Trace,
cfg.build(), cfg.build(),
simplelog::TerminalMode::Mixed, simplelog::TerminalMode::Mixed,
simplelog::ColorChoice::Auto, simplelog::ColorChoice::Auto,
) )
.unwrap(); .is_err()
{
warn!(target: "test_harness", "Logger already initialized");
}
let executor = Arc::new(Executor::new()); let executor = Arc::new(Executor::new());
let executor_ = executor.clone(); let executor_ = executor.clone();

View File

@@ -29,7 +29,7 @@ use darkfi_sdk::{
pasta::pallas, pasta::pallas,
}; };
use halo2_proofs::dev::MockProver; use halo2_proofs::dev::MockProver;
use log::info; use log::{info, warn};
use rand::rngs::OsRng; use rand::rngs::OsRng;
pub const SECRET_KEY_PREFIX: pallas::Base = pallas::Base::from_raw([4, 0, 0, 0]); pub const SECRET_KEY_PREFIX: pallas::Base = pallas::Base::from_raw([4, 0, 0, 0]);
@@ -46,14 +46,21 @@ pub const HEADSTART: pallas::Base = pallas::Base::from_raw([
#[test] #[test]
fn consensus_prop() -> Result<()> { fn consensus_prop() -> Result<()> {
simplelog::TermLogger::init( let mut cfg = simplelog::ConfigBuilder::new();
// We check this error so we can execute same file tests in parallel,
// otherwise second one fails to init logger here.
if simplelog::TermLogger::init(
simplelog::LevelFilter::Info, simplelog::LevelFilter::Info,
//simplelog::LevelFilter::Debug,
//simplelog::LevelFilter::Trace, //simplelog::LevelFilter::Trace,
simplelog::ConfigBuilder::new().build(), cfg.build(),
simplelog::TerminalMode::Mixed, simplelog::TerminalMode::Mixed,
simplelog::ColorChoice::Auto, simplelog::ColorChoice::Auto,
) )
.unwrap(); .is_err()
{
warn!(target: "test_harness", "Logger already initialized");
}
let input_serial = pallas::Base::from(pallas::Base::from(10)); let input_serial = pallas::Base::from(pallas::Base::from(10));
//let input_serial = pallas::Base::from(pallas::Base::from(2)); //let input_serial = pallas::Base::from(pallas::Base::from(2));