diff --git a/Cargo.toml b/Cargo.toml index 04fdd5b47..f21d0111c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -202,6 +202,7 @@ event-graph = [ "num-bigint", "rand", "sled", + "sled-overlay", "smol", "tinyjson", diff --git a/bin/darkirc/src/irc/client.rs b/bin/darkirc/src/irc/client.rs index de8679513..11bc8bfbe 100644 --- a/bin/darkirc/src/irc/client.rs +++ b/bin/darkirc/src/irc/client.rs @@ -359,11 +359,8 @@ impl Client { self.server.try_encrypt(&mut privmsg).await; // Build a DAG event and return it. - let event = Event::new( - serialize_async(&privmsg).await, - self.server.darkirc.event_graph.clone(), - ) - .await; + let event = + Event::new(serialize_async(&privmsg).await, &self.server.darkirc.event_graph).await; return Ok(Some(event)) } diff --git a/bin/dhtd/dhtd/src/tests.rs b/bin/dhtd/dhtd/src/tests.rs index 21ae7258e..b0d48a4d5 100644 --- a/bin/dhtd/dhtd/src/tests.rs +++ b/bin/dhtd/dhtd/src/tests.rs @@ -33,7 +33,7 @@ use darkfi::{ use rand::{rngs::OsRng, RngCore}; use smol::Executor; use url::Url; -use log::error; +use log::{error, warn}; use super::{proto::ProtocolDht, Dhtd}; @@ -144,12 +144,20 @@ fn dht_remote_get_insert() -> Result<()> { cfg.add_filter_ignore("net::protocol_version".to_string()); cfg.add_filter_ignore("net::protocol_ping".to_string()); - simplelog::TermLogger::init( + // We check this error so we can execute same file tests in parallel, + // otherwise second one fails to init logger here. + if simplelog::TermLogger::init( simplelog::LevelFilter::Info, + //simplelog::LevelFilter::Debug, + //simplelog::LevelFilter::Trace, cfg.build(), simplelog::TerminalMode::Mixed, simplelog::ColorChoice::Auto, - )?; + ) + .is_err() + { + warn!(target: "test_harness", "Logger already initialized"); + } let ex = Arc::new(Executor::new()); let (signal, shutdown) = async_std::channel::unbounded::<()>(); diff --git a/bin/genev/genevd/src/rpc.rs b/bin/genev/genevd/src/rpc.rs index 55560410c..6a18e5015 100644 --- a/bin/genev/genevd/src/rpc.rs +++ b/bin/genev/genevd/src/rpc.rs @@ -105,7 +105,7 @@ impl JsonRpcInterface { let genevent: GenEvent = deserialize(&dec).unwrap(); // Build a DAG event and return it. - let event = Event::new(serialize_async(&genevent).await, self.event_graph.clone()).await; + let event = Event::new(serialize_async(&genevent).await, &self.event_graph).await; if let Err(e) = self.event_graph.dag_insert(&[event.clone()]).await { error!("Failed inserting new event to DAG: {}", e); diff --git a/bin/tau/taud/src/main.rs b/bin/tau/taud/src/main.rs index 39fc90395..37d692faf 100644 --- a/bin/tau/taud/src/main.rs +++ b/bin/tau/taud/src/main.rs @@ -170,7 +170,7 @@ async fn start_sync_loop( // Build a DAG event and return it. let event = Event::new( serialize_async(&encrypted_task).await, - event_graph.clone(), + &event_graph, ) .await; // Update the last sent event. diff --git a/src/error.rs b/src/error.rs index e161850ca..82dc9bb9d 100644 --- a/src/error.rs +++ b/src/error.rs @@ -487,6 +487,9 @@ pub enum Error { #[error("Event is not found in tree: {0}")] EventNotFound(String), + #[error("Event is invalid")] + EventIsInvalid, + // ==================== // Miscellaneous errors // ==================== diff --git a/src/event_graph/event.rs b/src/event_graph/event.rs index 0ff561b2e..99b61bb61 100644 --- a/src/event_graph/event.rs +++ b/src/event_graph/event.rs @@ -18,9 +18,15 @@ use std::{collections::HashSet, time::UNIX_EPOCH}; -use darkfi_serial::{async_trait, Encodable, SerialDecodable, SerialEncodable}; +use darkfi_serial::{async_trait, deserialize_async, Encodable, SerialDecodable, SerialEncodable}; +use sled_overlay::SledTreeOverlay; -use super::{EventGraphPtr, EVENT_TIME_DRIFT, NULL_ID, N_EVENT_PARENTS}; +use crate::Result; + +use super::{ + util::next_rotation_timestamp, EventGraphPtr, EVENT_TIME_DRIFT, INITIAL_GENESIS, NULL_ID, + N_EVENT_PARENTS, +}; /// Representation of an event in the Event Graph #[derive(Debug, Clone, SerialEncodable, SerialDecodable)] @@ -31,20 +37,19 @@ pub struct Event { pub(super) content: Vec, /// Parent nodes in the event DAG pub(super) parents: [blake3::Hash; N_EVENT_PARENTS], + /// DAG layer index of the event + pub(super) layer: u64, } impl Event { - /// Create a new event with the given data and an `EventGraph` reference. + /// Create a new event with the given data and an [`EventGraph`] reference. /// The timestamp of the event will be the current time, and the parents /// will be `N_EVENT_PARENTS` from the current event graph unreferenced tips. /// The parents can also include NULL, but this should be handled by the rest /// of the codebase. - pub async fn new(data: Vec, event_graph: EventGraphPtr) -> Self { - Self { - timestamp: UNIX_EPOCH.elapsed().unwrap().as_secs(), - content: data, - parents: event_graph.get_unreferenced_tips().await, - } + pub async fn new(data: Vec, event_graph: &EventGraphPtr) -> Self { + let (layer, parents) = event_graph.get_next_layer_with_parents().await; + Self { timestamp: UNIX_EPOCH.elapsed().unwrap().as_secs(), content: data, parents, layer } } /// Hash the [`Event`] to retrieve its ID @@ -53,6 +58,7 @@ impl Event { self.timestamp.encode(&mut hasher).unwrap(); self.content.encode(&mut hasher).unwrap(); self.parents.encode(&mut hasher).unwrap(); + self.layer.encode(&mut hasher).unwrap(); hasher.finalize() } @@ -68,9 +74,92 @@ impl Event { } */ + /// Fully validate an event for the correct layout against provided + /// DAG [`sled::Tree`] reference and enforce relevant age, assuming + /// some possibility for a time drift. Optionally, provide an overlay + /// to use that instead of actual referenced DAG. + pub async fn validate( + &self, + dag: &sled::Tree, + genesis_timestamp: u64, + days_rotation: u64, + overlay: Option<&SledTreeOverlay>, + ) -> Result { + // Let's not bother with empty events + if self.content.is_empty() { + return Ok(false) + } + + // Check if the event timestamp is after genesis timestamp + if self.timestamp < genesis_timestamp - EVENT_TIME_DRIFT { + return Ok(false) + } + + // If a rotation has been set, check if the event timestamp + // is after the next genesis timestamp + if days_rotation > 0 { + let next_genesis_timestamp = next_rotation_timestamp(INITIAL_GENESIS, days_rotation); + if self.timestamp > next_genesis_timestamp + EVENT_TIME_DRIFT { + return Ok(false) + } + } + + // Validate the parents. We have to check that at least one parent + // is not NULL, that the parents exist, that no two parents are the + // same, and that the parent exists in previous layers, to prevent + // recursive references(circles). + let mut seen = HashSet::new(); + let self_id = self.id(); + + for parent_id in self.parents.iter() { + if parent_id == &NULL_ID { + continue + } + + if parent_id == &self_id { + return Ok(false) + } + + if seen.contains(parent_id) { + return Ok(false) + } + + let parent_bytes = if let Some(overlay) = overlay { + overlay.get(parent_id.as_bytes())? + } else { + dag.get(parent_id.as_bytes())? + }; + if parent_bytes.is_none() { + return Ok(false) + } + + let parent: Event = deserialize_async(&parent_bytes.unwrap()).await?; + if self.layer <= parent.layer { + return Ok(false) + } + + seen.insert(parent_id); + } + + Ok(!seen.is_empty()) + } + + /// Fully validate an event for the correct layout against provided + /// [`EventGraph`] reference and enforce relevant age, assuming some + /// possibility for a time drift. + pub async fn dag_validate(&self, event_graph: &EventGraphPtr) -> Result { + // Grab genesis timestamp + let genesis_timestamp = event_graph.current_genesis.read().await.timestamp; + + // Perform validation + self.validate(&event_graph.dag, genesis_timestamp, event_graph.days_rotation, None).await + } + /// Validate a new event for the correct layout and enforce relevant age, /// assuming some possibility for a time drift. - pub fn validate(&self) -> bool { + /// Note: This validation does *NOT* check for recursive references(circles), + /// and should be used as a first quick check. + pub fn validate_new(&self) -> bool { // Let's not bother with empty events if self.content.is_empty() { return false @@ -86,8 +175,7 @@ impl Event { } // Validate the parents. We have to check that at least one parent - // is not NULL, that the parent does not recursively reference the - // event, and that no two parents are the same. + // is not NULL and that no two parents are the same. let mut seen = HashSet::new(); let self_id = self.id(); @@ -113,59 +201,77 @@ impl Event { #[cfg(test)] mod tests { + use std::sync::Arc; + + use smol::Executor; + + use crate::{ + event_graph::EventGraph, + net::{P2p, Settings}, + }; + use super::*; - fn make_valid_event() -> Event { - Event { - timestamp: UNIX_EPOCH.elapsed().unwrap().as_secs(), - content: vec![1u8], - parents: [ - blake3::hash(b"1"), - blake3::hash(b"2"), - blake3::hash(b"3"), - blake3::hash(b"4"), - blake3::hash(b"5"), - ], - } - } - #[test] - fn event_is_valid() { - // Validate our test Event struct - assert!(make_valid_event().validate()); + async fn make_event_graph() -> Result { + let ex = Arc::new(Executor::new()); + let p2p = P2p::new(Settings::default(), ex.clone()).await; + let sled_db = sled::Config::new().temporary(true).open().unwrap(); + EventGraph::new(p2p, sled_db, "dag", 1, ex).await } #[test] - fn invalid_events() { - // TODO: Not checked: - // - "the parent does not recursively reference the event" - let e = make_valid_event(); + fn event_is_valid() -> Result<()> { + smol::block_on(async { + // Generate a dummy event graph + let event_graph = make_event_graph().await?; - let mut event_empty_content = e.clone(); - event_empty_content.content = vec![]; - assert!(!event_empty_content.validate()); + // Create a new valid event + let valid_event = Event::new(vec![1u8], &event_graph).await; - let mut event_timestamp_too_old = e.clone(); - event_timestamp_too_old.timestamp = 0; - assert!(!event_timestamp_too_old.validate()); + // Validate our test Event struct + assert!(valid_event.dag_validate(&event_graph).await?); - let mut event_timestamp_too_new = e.clone(); - event_timestamp_too_new.timestamp = u64::MAX; - assert!(!event_timestamp_too_new.validate()); + // Thanks for reading + Ok(()) + }) + } - let mut event_duplicated_parents = e.clone(); - let duplicated_parents = [ - blake3::hash(b"1"), - blake3::hash(b"1"), - blake3::hash(b"3"), - blake3::hash(b"4"), - blake3::hash(b"5"), - ]; - event_duplicated_parents.parents = duplicated_parents; - assert!(!event_duplicated_parents.validate()); + #[test] + fn invalid_events() -> Result<()> { + smol::block_on(async { + // Generate a dummy event graph + let event_graph = make_event_graph().await?; - let mut event_null_parents = e.clone(); - let all_null_parents = [NULL_ID, NULL_ID, NULL_ID, NULL_ID, NULL_ID]; - event_null_parents.parents = all_null_parents; - assert!(!event_null_parents.validate()); + // Create a new valid event + let valid_event = Event::new(vec![1u8], &event_graph).await; + + let mut event_empty_content = valid_event.clone(); + event_empty_content.content = vec![]; + assert!(!event_empty_content.dag_validate(&event_graph).await?); + + let mut event_timestamp_too_old = valid_event.clone(); + event_timestamp_too_old.timestamp = 0; + assert!(!event_timestamp_too_old.dag_validate(&event_graph).await?); + + let mut event_timestamp_too_new = valid_event.clone(); + event_timestamp_too_new.timestamp = u64::MAX; + assert!(!event_timestamp_too_new.dag_validate(&event_graph).await?); + + let mut event_duplicated_parents = valid_event.clone(); + event_duplicated_parents.parents[1] = valid_event.parents[0]; + assert!(!event_duplicated_parents.dag_validate(&event_graph).await?); + + let mut event_null_parents = valid_event.clone(); + let all_null_parents = [NULL_ID, NULL_ID, NULL_ID, NULL_ID, NULL_ID]; + event_null_parents.parents = all_null_parents; + assert!(!event_null_parents.dag_validate(&event_graph).await?); + + let mut event_same_layer_as_parents = valid_event.clone(); + event_same_layer_as_parents.layer = 0; + assert!(!event_same_layer_as_parents.dag_validate(&event_graph).await?); + + // Thanks for reading + Ok(()) + }) } } diff --git a/src/event_graph/mod.rs b/src/event_graph/mod.rs index 3760f33a4..15d8537f1 100644 --- a/src/event_graph/mod.rs +++ b/src/event_graph/mod.rs @@ -18,7 +18,7 @@ use std::{ cmp::Ordering, - collections::{HashMap, HashSet, VecDeque}, + collections::{BTreeMap, HashMap, HashSet, VecDeque}, sync::Arc, }; @@ -26,6 +26,7 @@ use async_recursion::async_recursion; use darkfi_serial::{deserialize_async, serialize_async}; use log::{debug, error, info}; use num_bigint::BigUint; +use sled_overlay::SledTreeOverlay; use smol::{ lock::{OnceCell, RwLock}, Executor, @@ -48,7 +49,7 @@ use proto::{EventRep, EventReq, TipRep, TipReq, REPLY_TIMEOUT}; /// Utility functions mod util; -use util::{days_since, next_rotation_timestamp, DAY}; +use util::{generate_genesis, next_rotation_timestamp}; #[cfg(test)] mod tests; @@ -76,7 +77,7 @@ pub struct EventGraph { /// Sled tree containing the DAG dag: sled::Tree, /// The set of unreferenced DAG tips - unreferenced_tips: RwLock>, + unreferenced_tips: RwLock>>, /// A `HashSet` containg event IDs and their 1-level parents. /// These come from the events we've sent out using `EventPut`. /// They are used with `EventReq` to decide if we should reply @@ -88,7 +89,12 @@ pub struct EventGraph { /// Event subscriber, this notifies whenever an event is /// inserted into the DAG pub event_sub: SubscriberPtr, + /// Current genesis event + current_genesis: RwLock, + /// Currently configured DAG rotation, in days days_rotation: u64, + /// Flag signalling DAG has finished initial sync + synced: RwLock, } impl EventGraph { @@ -102,12 +108,12 @@ impl EventGraph { ex: Arc>, ) -> Result { let dag = sled_db.open_tree(dag_tree_name)?; - let unreferenced_tips = RwLock::new(HashSet::new()); + let unreferenced_tips = RwLock::new(BTreeMap::new()); let broadcasted_ids = RwLock::new(HashSet::new()); let event_sub = Subscriber::new(); // Create the current genesis event based on the `days_rotation` - let current_genesis = Self::generate_genesis(days_rotation); + let current_genesis = generate_genesis(days_rotation); let self_ = Arc::new(Self { p2p, dag: dag.clone(), @@ -115,7 +121,9 @@ impl EventGraph { broadcasted_ids, prune_task: OnceCell::new(), event_sub, + current_genesis: RwLock::new(current_genesis.clone()), days_rotation, + synced: RwLock::new(false), }); // Check if we have it in our DAG. @@ -159,29 +167,11 @@ impl EventGraph { sled_db.flush_async().await.unwrap(); } - /// Generate a deterministic genesis event corresponding to the DAG's configuration. - fn generate_genesis(days_rotation: u64) -> Event { - // Days rotation is u64 except zero - let genesis_days_rotation = if days_rotation == 0 { 1 } else { days_rotation }; - - // First check how many days passed since initial genesis. - let days_passed = days_since(INITIAL_GENESIS); - - // Calculate the number of days_rotation intervals since INITIAL_GENESIS - let rotations_since_genesis = days_passed / genesis_days_rotation; - - // Calculate the timestamp of the most recent event - let timestamp = - INITIAL_GENESIS + (rotations_since_genesis * genesis_days_rotation * DAY as u64); - - Event { timestamp, content: GENESIS_CONTENTS.to_vec(), parents: [NULL_ID; N_EVENT_PARENTS] } - } - /// Sync the DAG from connected peers pub async fn dag_sync(&self) -> Result<()> { // We do an optimistic sync where we ask all our connected peers for - // the DAG tips (unreferenced events) and then we accept the ones we - // see the most times. + // the latest layer DAG tips (unreferenced events) and then we accept + // the ones we see the most times. // * Compare received tips with local ones, identify which we are missing. // * Request these from peers // * Recursively request these backward @@ -202,8 +192,8 @@ impl EventGraph { "[EVENTGRAPH] Syncing DAG from {} peers...", communicated_peers, ); - // Here we keep track of the tips and how many time we've seen them. - let mut tips: HashMap = HashMap::new(); + // Here we keep track of the tips, their layers and how many time we've seen them. + let mut tips: HashMap = HashMap::new(); // Let's first ask all of our peers for their tips and collect them // in our hashmap above. @@ -246,11 +236,13 @@ impl EventGraph { let peer_tips = &peer_tips.0; // Note down the seen tips - for tip in peer_tips { - if let Some(seen_tip) = tips.get_mut(tip) { - *seen_tip += 1; - } else { - tips.insert(*tip, 1); + for (layer, layer_tips) in peer_tips { + for tip in layer_tips { + if let Some(seen_tip) = tips.get_mut(tip) { + seen_tip.1 += 1; + } else { + tips.insert(*tip, (*layer, 1)); + } } } } @@ -264,24 +256,25 @@ impl EventGraph { return Err(Error::DagSyncFailed) } - // We know the number of peers we've communicated with. - // Arbitrarily, let's not consider events we only got once. - // TODO: This should be more sensible depending on the peer number. + // We know the number of peers we've communicated with, + // so we will consider events we saw at more that 2/3 of + // of those peers. + let consideration_threshold = communicated_peers * 2 / 3; let mut considered_tips = HashSet::new(); - for (tip, amount) in tips.iter() { - if amount > &1 { + for (tip, (_, amount)) in tips.iter() { + if amount > &consideration_threshold { considered_tips.insert(*tip); } } drop(tips); // Now begin fetching the events backwards. - let mut missing_parents = vec![]; + let mut missing_parents = HashSet::new(); for tip in considered_tips.iter() { assert!(tip != &NULL_ID); if !self.dag.contains_key(tip.as_bytes()).unwrap() { - missing_parents.push(*tip); + missing_parents.insert(*tip); } } @@ -291,7 +284,8 @@ impl EventGraph { } info!(target: "event_graph::dag_sync()", "[EVENTGRAPH] Fetching events"); - let mut received_events = vec![]; + let mut received_events: BTreeMap> = BTreeMap::new(); + let mut received_events_hashes = HashSet::new(); while !missing_parents.is_empty() { for parent_id in missing_parents.clone().iter() { let mut found_event = false; @@ -363,9 +357,15 @@ impl EventGraph { "Got correct parent event {}", parent_id, ); - received_events.push(parent.clone()); - let pos = missing_parents.iter().position(|id| id == &parent.id()).unwrap(); - missing_parents.remove(pos); + if let Some(layer_events) = received_events.get_mut(&parent.layer) { + layer_events.push(parent.clone()); + } else { + let layer_events = vec![parent.clone()]; + received_events.insert(parent.layer, layer_events); + } + received_events_hashes.insert(*parent_id); + + missing_parents.remove(parent_id); found_event = true; // See if we have the upper parents @@ -374,12 +374,15 @@ impl EventGraph { continue } - if !self.dag.contains_key(upper_parent.as_bytes()).unwrap() { + if !missing_parents.contains(upper_parent) && + !received_events_hashes.contains(upper_parent) && + !self.dag.contains_key(upper_parent.as_bytes()).unwrap() + { debug!( target: "event_graph::dag_sync()", "Found upper missing parent event{}", upper_parent, ); - missing_parents.push(*upper_parent); + missing_parents.insert(*upper_parent); } } @@ -398,10 +401,15 @@ impl EventGraph { // At this point we should've got all the events. // We should add them to the DAG. - // TODO: FIXME: Also validate these events. - // TODO: FIXME: This insert should also be atomic, dag_insert might need a rewrite - let received_events_rev: Vec = received_events.iter().rev().cloned().collect(); - self.dag_insert(&received_events_rev).await.unwrap(); + let mut events = vec![]; + for (_, tips) in received_events { + for tip in tips { + events.push(tip); + } + } + self.dag_insert(&events).await?; + + *self.synced.write().await = true; info!(target: "event_graph::dag_sync()", "[EVENTGRAPH] DAG synced successfully!"); Ok(()) @@ -411,12 +419,14 @@ impl EventGraph { async fn dag_prune(&self, genesis_event: Event) -> Result<()> { debug!(target: "event_graph::dag_prune()", "Pruning DAG..."); - // Acquire exclusive locks to unreferenced_tips and broadcasted_ids while - // this operation is happening. We do this to ensure that during the pruning - // operation, no other operations are able to access the intermediate state - // which could lead to producing the wrong state after pruning. + // Acquire exclusive locks to unreferenced_tips, broadcasted_ids and + // current_genesis while this operation is happening. We do this to + // ensure that during the pruning operation, no other operations are + // able to access the intermediate state which could lead to producing + // the wrong state after pruning. let mut unreferenced_tips = self.unreferenced_tips.write().await; let mut broadcasted_ids = self.broadcasted_ids.write().await; + let mut current_genesis = self.current_genesis.write().await; // Atomically clear the DAG and write the new genesis event. let mut batch = sled::Batch::default(); @@ -431,10 +441,13 @@ impl EventGraph { } // Clear unreferenced tips and bcast ids - *unreferenced_tips = HashSet::from([genesis_event.id()]); + *unreferenced_tips = BTreeMap::new(); + unreferenced_tips.insert(0, HashSet::from([genesis_event.id()])); + *current_genesis = genesis_event; *broadcasted_ids = HashSet::new(); drop(unreferenced_tips); drop(broadcasted_ids); + drop(current_genesis); debug!(target: "event_graph::dag_prune()", "DAG pruned successfully"); Ok(()) @@ -457,6 +470,7 @@ impl EventGraph { timestamp: next_rotation, content: GENESIS_CONTENTS.to_vec(), parents: [NULL_ID; N_EVENT_PARENTS], + layer: 0, }; // Sleep until it's time to rotate. @@ -472,6 +486,9 @@ impl EventGraph { } /// Atomically insert given events into the DAG and return the event IDs. + /// All provided events must be valid. An overlay is used over the DAG tree, + /// temporary writting each event in order. After all events have been + /// validated and inserted successfully, we write the overlay to sled. /// This will append the new events into the unreferenced tips set, and /// remove the events' parents from it. It will also append the events' /// level-1 parents to the `broadcasted_ids` set, so the P2P protocol @@ -479,6 +496,11 @@ impl EventGraph { /// TODO: The `broadcasted_ids` set should periodically be pruned, when /// some sensible time has passed after broadcasting the event. pub async fn dag_insert(&self, events: &[Event]) -> Result> { + // Sanity check + if events.is_empty() { + return Ok(vec![]) + } + // Acquire exclusive locks to `unreferenced_tips and broadcasted_ids` let mut unreferenced_tips = self.unreferenced_tips.write().await; let mut broadcasted_ids = self.broadcasted_ids.write().await; @@ -486,18 +508,52 @@ impl EventGraph { // Here we keep the IDs to return let mut ids = Vec::with_capacity(events.len()); - // Create an atomic batch - let mut batch = sled::Batch::default(); + // Create an overlay over the DAG tree + let mut overlay = SledTreeOverlay::new(&self.dag); - // Iterate over given events + // Grab genesis timestamp + let genesis_timestamp = self.current_genesis.read().await.timestamp; + + // Iterate over given events to validate them and + // write them to the overlay for event in events { let event_id = event.id(); debug!( target: "event_graph::dag_insert()", "Inserting event {} into the DAG", event_id, ); + + if !event + .validate(&self.dag, genesis_timestamp, self.days_rotation, Some(&overlay)) + .await? + { + error!(target: "event_graph::dag_insert()", "Event {} is invalid!", event_id); + return Err(Error::EventIsInvalid) + } + let event_se = serialize_async(event).await; + // Add the event to the overlay + overlay.insert(event_id.as_bytes(), &event_se)?; + + // Note down the event ID to return + ids.push(event_id); + } + + // Aggregate changes into a single batch + let batch = overlay.aggregate().unwrap(); + + // Atomically apply the batch. + // Panic if something is corrupted. + if let Err(e) = self.dag.apply_batch(batch) { + panic!("Failed applying dag_insert batch to sled: {}", e); + } + + // Iterate over given events to update references and + // send out notifications about them + for event in events { + let event_id = event.id(); + // Update the unreferenced DAG tips set debug!( target: "event_graph::dag_insert()", @@ -509,31 +565,36 @@ impl EventGraph { target: "event_graph::dag_insert()", "Removing {} from unreferenced_tips", parent_id, ); - unreferenced_tips.remove(parent_id); + + // Iterate over unreferenced tips in previous layers + // and remove the parent + // NOTE: this might be too exhaustive, but the + // assumption is that previous layers unreferenced + // tips will be few. + for (layer, tips) in unreferenced_tips.iter_mut() { + if layer >= &event.layer { + break + } + tips.remove(parent_id); + } broadcasted_ids.insert(*parent_id); } } + unreferenced_tips.retain(|_, tips| !tips.is_empty()); debug!( target: "event_graph::dag_insert()", "Adding {} to unreferenced tips", event_id, ); - unreferenced_tips.insert(event_id); - // Add the event to the atomic batch - batch.insert(event_id.as_bytes(), event_se); + if let Some(layer_tips) = unreferenced_tips.get_mut(&event.layer) { + layer_tips.insert(event_id); + } else { + let mut layer_tips = HashSet::new(); + layer_tips.insert(event_id); + unreferenced_tips.insert(event.layer, layer_tips); + } - // Note down the event ID to return - ids.push(event_id); - } - - // Atomically apply the batch. - // Panic if something is corrupted. - if let Err(e) = self.dag.apply_batch(batch) { - panic!("Failed applying dag_insert batch to sled: {}", e); - } - - // Send out notifications about the new events - for event in events { + // Send out notifications about the new event self.event_sub.notify(event.clone()).await; } @@ -552,8 +613,33 @@ impl EventGraph { Ok(Some(event)) } - /// Find the unreferenced tips in the current DAG state. - async fn find_unreferenced_tips(&self) -> HashSet { + /// Get next layer along with its N_EVENT_PARENTS from the unreferenced + /// tips of the DAG. Since tips are mapped by their layer, we go backwards + /// until we fill the vector, ensuring we always use latest layers tips as + /// parents. + async fn get_next_layer_with_parents(&self) -> (u64, [blake3::Hash; N_EVENT_PARENTS]) { + let unreferenced_tips = self.unreferenced_tips.read().await; + + let mut parents = [NULL_ID; N_EVENT_PARENTS]; + let mut index = 0; + 'outer: for (_, tips) in unreferenced_tips.iter().rev() { + for tip in tips.iter() { + parents[index] = *tip; + index += 1; + if index >= N_EVENT_PARENTS { + break 'outer + } + } + } + + let next_layer = unreferenced_tips.last_key_value().unwrap().0 + 1; + + assert!(parents.iter().any(|x| x != &NULL_ID)); + (next_layer, parents) + } + + /// Find the unreferenced tips in the current DAG state, mapped by their layers. + async fn find_unreferenced_tips(&self) -> BTreeMap> { // First get all the event IDs let mut tips = HashSet::new(); for iter_elem in self.dag.iter() { @@ -562,6 +648,7 @@ impl EventGraph { tips.insert(id); } + // Iterate again to find unreferenced IDs for iter_elem in self.dag.iter() { let (_, event) = iter_elem.unwrap(); let event: Event = deserialize_async(&event).await.unwrap(); @@ -570,26 +657,26 @@ impl EventGraph { } } - tips - } - - /// Get the current set of unreferenced tips in the DAG. - async fn get_unreferenced_tips(&self) -> [blake3::Hash; N_EVENT_PARENTS] { - // TODO: return vec of all instead of N_EVENT_PARENTS - let unreferenced_tips = self.unreferenced_tips.read().await; - - let mut tips = [NULL_ID; N_EVENT_PARENTS]; - for (i, tip) in unreferenced_tips.iter().take(N_EVENT_PARENTS).enumerate() { - tips[i] = *tip + // Build the layers map + let mut map: BTreeMap> = BTreeMap::new(); + for tip in tips { + let bytes = self.dag.get(tip.as_bytes()).unwrap().unwrap(); + let event: Event = deserialize_async(&bytes).await.unwrap(); + if let Some(layer_tips) = map.get_mut(&event.layer) { + layer_tips.insert(tip); + } else { + let mut layer_tips = HashSet::new(); + layer_tips.insert(tip); + map.insert(event.layer, layer_tips); + } } - assert!(tips.iter().any(|x| x != &NULL_ID)); - tips + map } /// Internal function used for DAG sorting. async fn get_unreferenced_tips_sorted(&self) -> [blake3::Hash; N_EVENT_PARENTS] { - let tips = self.get_unreferenced_tips().await; + let (_, tips) = self.get_next_layer_with_parents().await; // Convert the hash to BigUint for sorting let mut sorted: Vec<_> = diff --git a/src/event_graph/proto.rs b/src/event_graph/proto.rs index 44ec75c76..b9fd1cf08 100644 --- a/src/event_graph/proto.rs +++ b/src/event_graph/proto.rs @@ -19,6 +19,7 @@ // TODO: FIXME: Some of the protocols should block operations until DAG is synced. use std::{ + collections::{BTreeMap, HashSet}, sync::{ atomic::{AtomicUsize, Ordering::SeqCst}, Arc, @@ -26,11 +27,11 @@ use std::{ time::Duration, }; -use darkfi_serial::{async_trait, deserialize_async, SerialDecodable, SerialEncodable}; +use darkfi_serial::{async_trait, SerialDecodable, SerialEncodable}; use log::{debug, error, trace, warn}; use smol::Executor; -use super::{Event, EventGraph, EventGraphPtr, NULL_ID}; +use super::{Event, EventGraphPtr, NULL_ID}; use crate::{impl_p2p_message, net::*, system::timeout::timeout, Error, Result}; /// Malicious behaviour threshold. If the threshold is reached, we will @@ -83,7 +84,7 @@ impl_p2p_message!(TipReq, "EventGraph::TipReq"); /// A P2P message representing a reply for the peer's DAG tips #[derive(Clone, SerialEncodable, SerialDecodable)] -pub struct TipRep(pub Vec); +pub struct TipRep(pub BTreeMap>); impl_p2p_message!(TipRep, "EventGraph::TipRep"); #[async_trait] @@ -129,6 +130,26 @@ impl ProtocolEventGraph { })) } + async fn increase_malicious_count(self: Arc) -> Result<()> { + let malicious_count = self.malicious_count.fetch_add(1, SeqCst); + if malicious_count + 1 == MALICIOUS_THRESHOLD { + error!( + target: "event_graph::protocol::handle_event_put()", + "[EVENTGRAPH] Peer {} reached malicious threshold. Dropping connection.", + self.channel.address(), + ); + self.channel.stop().await; + return Err(Error::ChannelStopped) + } + + warn!( + target: "event_graph::protocol::handle_event_put()", + "[EVENTGRAPH] Peer {} sent us a malicious event", self.channel.address(), + ); + + Ok(()) + } + /// Protocol function handling `EventPut`. /// This is triggered whenever someone broadcasts (or relays) a new /// event on the network. @@ -142,50 +163,6 @@ impl ProtocolEventGraph { target: "event_graph::protocol::handle_event_put()", "Got EventPut: {} [{}]", event.id(), self.channel.address(), ); - // Check if the event is older than the genesis event. If so, we should - // not include it in our Dag. - // The genesis event marks the last time the Dag has been pruned of old - // events. The pruning interval is defined by the days_rotation field - // of [`EventGraph`]. - // TODO it would be better to store/cache this instead of calculating - // on every broadcast/relay. - let genesis_timestamp = - EventGraph::generate_genesis(self.event_graph.days_rotation()).timestamp; - if event.timestamp < genesis_timestamp { - debug!( - target: "event_graph::protocol::handle_event_put()", - "Event {} is older than genesis. Event timestamp: `{}`. Genesis timestamp: `{}`", - event.id(), event.timestamp, genesis_timestamp - ); - } - - // We received an event. Check if we already have it in our DAG. - // Also check if we have the event's parents. In the case we do - // not have the parents, we'll request them from the peer that has - // sent this event to us. In case they do not reply in time, we drop - // the event. - - // Validate the event first. If we do not consider it valid, we - // will just drop it and stay quiet. If the malicious threshold - // is reached, we will stop the connection. - if !event.validate() { - let malicious_count = self.malicious_count.fetch_add(1, SeqCst); - if malicious_count + 1 == MALICIOUS_THRESHOLD { - error!( - target: "event_graph::protocol::handle_event_put()", - "[EVENTGRAPH] Peer {} reached malicious threshold. Dropping connection.", - self.channel.address(), - ); - self.channel.stop().await; - return Err(Error::ChannelStopped) - } - - warn!( - target: "event_graph::protocol::handle_event_put()", - "[EVENTGRAPH] Peer {} sent us a malicious event", self.channel.address(), - ); - continue - } // If we have already seen the event, we'll stay quiet. let event_id = event.id(); @@ -197,6 +174,35 @@ impl ProtocolEventGraph { continue } + // We received an event. Check if we already have it in our DAG. + // Check event is not older that current genesis event timestamp. + // Also check if we have the event's parents. In the case we do + // not have the parents, we'll request them from the peer that has + // sent this event to us. In case they do not reply in time, we drop + // the event. + + // Check if the event is older than the genesis event. If so, we should + // not include it in our Dag. + // The genesis event marks the last time the Dag has been pruned of old + // events. The pruning interval is defined by the days_rotation field + // of [`EventGraph`]. + let genesis_timestamp = self.event_graph.current_genesis.read().await.timestamp; + if event.timestamp < genesis_timestamp { + debug!( + target: "event_graph::protocol::handle_event_put()", + "Event {} is older than genesis. Event timestamp: `{}`. Genesis timestamp: `{}`", + event.id(), event.timestamp, genesis_timestamp + ); + } + + // Validate the new event first. If we do not consider it valid, we + // will just drop it and stay quiet. If the malicious threshold + // is reached, we will stop the connection. + if !event.validate_new() { + self.clone().increase_malicious_count().await?; + continue + } + // At this point, this is a new event to us. Let's see if we // have all of its parents. debug!( @@ -204,16 +210,16 @@ impl ProtocolEventGraph { "Event {} is new", event_id, ); - let mut missing_parents = vec![]; + let mut missing_parents = HashSet::new(); for parent_id in event.parents.iter() { - // `event.validate()` should have already made sure that + // `event.validate_new()` should have already made sure that // not all parents are NULL, and that there are no duplicates. if parent_id == &NULL_ID { continue } if !self.event_graph.dag.contains_key(parent_id.as_bytes()).unwrap() { - missing_parents.push(*parent_id); + missing_parents.insert(*parent_id); } } @@ -221,13 +227,13 @@ impl ProtocolEventGraph { // fetch them from this peer. Do this recursively until we // find all of them. if !missing_parents.is_empty() { - // We track the received events in a vec. If/when we get all - // of them, we need to insert them in reverse so the DAG state - // stays correct and unreferenced tips represent the actual thing - // they should. If we insert them out of order, then we might have - // wrong unreferenced tips. - // TODO: What should we do if at some point the events become too old? - let mut received_events = vec![]; + // We track the received events mapped by their layer. + // If/when we get all of them, we need to insert them in order so + // the DAG state stays correct and unreferenced tips represent the + // actual thing they should. If we insert them out of order, then + // we might have wrong unreferenced tips. + let mut received_events: BTreeMap> = BTreeMap::new(); + let mut received_events_hashes = HashSet::new(); debug!( target: "event_graph::protocol::handle_event_put()", @@ -271,9 +277,15 @@ impl ProtocolEventGraph { "Got correct parent event {}", parent.id(), ); - received_events.push(parent.clone()); - let pos = missing_parents.iter().position(|id| id == &parent.id()).unwrap(); - missing_parents.remove(pos); + if let Some(layer_events) = received_events.get_mut(&parent.layer) { + layer_events.push(parent.clone()); + } else { + let layer_events = vec![parent.clone()]; + received_events.insert(parent.layer, layer_events); + } + received_events_hashes.insert(*parent_id); + + missing_parents.remove(parent_id); // See if we have the upper parents for upper_parent in parent.parents.iter() { @@ -281,13 +293,19 @@ impl ProtocolEventGraph { continue } - if !self.event_graph.dag.contains_key(upper_parent.as_bytes()).unwrap() + if !missing_parents.contains(upper_parent) && + !received_events_hashes.contains(upper_parent) && + !self + .event_graph + .dag + .contains_key(upper_parent.as_bytes()) + .unwrap() { debug!( target: "event_graph::protocol::handle_event_put()", "Found upper missing parent event{}", upper_parent, ); - missing_parents.push(*upper_parent); + missing_parents.insert(*upper_parent); } } } @@ -295,19 +313,29 @@ impl ProtocolEventGraph { // At this point we should've got all the events. // We should add them to the DAG. - // TODO: FIXME: Also validate these events. - let received_events_rev: Vec = - received_events.iter().rev().cloned().collect(); - self.event_graph.dag_insert(&received_events_rev).await.unwrap(); + let mut events = vec![]; + for (_, tips) in received_events { + for tip in tips { + events.push(tip); + } + } + if self.event_graph.dag_insert(&events).await.is_err() { + self.clone().increase_malicious_count().await?; + continue + } } // <-- !missing_parents.is_empty() // If we're here, we have all the parents, and we can now - // add the actual event to the DAG. + // perform a full validation and add the actual event to + // the DAG. debug!( target: "event_graph::protocol::handle_event_put()", "Got all parents necessary for insertion", ); - self.event_graph.dag_insert(&[event.clone()]).await.unwrap(); + if self.event_graph.dag_insert(&[event.clone()]).await.is_err() { + self.clone().increase_malicious_count().await?; + continue + } // Relay the event to other peers. self.event_graph @@ -330,6 +358,15 @@ impl ProtocolEventGraph { "Got EventReq: {} [{}]", event_id, self.channel.address(), ); + // Check if node has finished syncing its DAG + if !*self.event_graph.synced.read().await { + debug!( + target: "event_graph::protocol::handle_event_put", + "DAG is still syncing, skipping..." + ); + continue + } + // We received an event request from somebody. // If we do have it, we will send it back to them as `EventRep`. // Otherwise, we'll stay quiet. An honest node should always have @@ -367,16 +404,12 @@ impl ProtocolEventGraph { target: "event_graph::protocol::handle_event_req()", "Fetching event {} from DAG", event_id, ); - let event = self.event_graph.dag.get(event_id.as_bytes()).unwrap().unwrap(); - let event: Event = deserialize_async(&event).await.unwrap(); + let event = self.event_graph.dag_get(&event_id).await.unwrap().unwrap(); // Check if the event is older than the genesis event. If so, something // has gone wrong. The event should have been pruned during the last // rotation. - // TODO it would be better to store/cache this instead of calculating - // on every broadcast/relay. - let genesis_timestamp = - EventGraph::generate_genesis(self.event_graph.days_rotation()).timestamp; + let genesis_timestamp = self.event_graph.current_genesis.read().await.timestamp; if event.timestamp < genesis_timestamp { error!( target: "event_graph::protocol::handle_event_req()", @@ -416,20 +449,29 @@ impl ProtocolEventGraph { "Got TipReq [{}]", self.channel.address(), ); + // Check if node has finished syncing its DAG + if !*self.event_graph.synced.read().await { + debug!( + target: "event_graph::protocol::handle_event_put", + "DAG is still syncing, skipping..." + ); + continue + } + // TODO: Rate limit // We received a tip request. Let's find them, add them to // our bcast ids list, and reply with them. - let mut tips = self.event_graph.get_unreferenced_tips().await.to_vec(); - tips.retain(|x| x != &NULL_ID); - + let layers = self.event_graph.unreferenced_tips.read().await.clone(); let mut bcast_ids = self.event_graph.broadcasted_ids.write().await; - for tip in tips.iter() { - bcast_ids.insert(*tip); + for (_, tips) in layers.iter() { + for tip in tips { + bcast_ids.insert(*tip); + } } drop(bcast_ids); - self.channel.send(&TipRep(tips)).await?; + self.channel.send(&TipRep(layers)).await?; } } } diff --git a/src/event_graph/tests.rs b/src/event_graph/tests.rs index a1c48f771..df2f9abe7 100644 --- a/src/event_graph/tests.rs +++ b/src/event_graph/tests.rs @@ -20,15 +20,15 @@ use std::sync::Arc; -use log::info; -use rand::{prelude::SliceRandom, Rng}; +use log::{info, warn}; +use rand::{prelude::SliceRandom, rngs::ThreadRng}; use smol::{channel, future, Executor}; use url::Url; use crate::{ event_graph::{ proto::{EventPut, ProtocolEventGraph}, - Event, EventGraph, NULL_ID, + Event, EventGraph, }, net::{P2p, Settings, SESSION_ALL}, system::sleep, @@ -40,9 +40,7 @@ const N_CONNS: usize = 2; //const N_NODES: usize = 50; //const N_CONNS: usize = N_NODES / 3; -#[test] -#[ignore] -fn eventgraph_propagation() { +fn init_logger() { let mut cfg = simplelog::ConfigBuilder::new(); cfg.add_filter_ignore("sled".to_string()); cfg.add_filter_ignore("net::protocol_ping".to_string()); @@ -56,81 +54,87 @@ fn eventgraph_propagation() { cfg.add_filter_ignore("net::channel::send()".to_string()); cfg.add_filter_ignore("net::channel::start()".to_string()); cfg.add_filter_ignore("net::channel::subscribe_msg()".to_string()); + cfg.add_filter_ignore("net::channel::main_receive_loop()".to_string()); + cfg.add_filter_ignore("net::tcp".to_string()); - simplelog::TermLogger::init( - //simplelog::LevelFilter::Info, - simplelog::LevelFilter::Debug, + // We check this error so we can execute same file tests in parallel, + // otherwise second one fails to init logger here. + if simplelog::TermLogger::init( + simplelog::LevelFilter::Info, + //simplelog::LevelFilter::Debug, //simplelog::LevelFilter::Trace, cfg.build(), simplelog::TerminalMode::Mixed, simplelog::ColorChoice::Auto, ) - .unwrap(); - - let ex = Arc::new(Executor::new()); - let ex_ = ex.clone(); - let (signal, shutdown) = channel::unbounded::<()>(); - - // Run a thread for each node. - easy_parallel::Parallel::new() - .each(0..N_NODES, |_| future::block_on(ex.run(shutdown.recv()))) - .finish(|| { - future::block_on(async { - eventgraph_propagation_real(ex_).await; - drop(signal); - }) - }); + .is_err() + { + warn!(target: "test_harness", "Logger already initialized"); + } } -async fn eventgraph_propagation_real(ex: Arc>) { - let mut eg_instances = vec![]; - let mut rng = rand::thread_rng(); +async fn spawn_node( + inbound_addrs: Vec, + peers: Vec, + ex: Arc>, +) -> Arc { + let settings = Settings { + localnet: true, + inbound_addrs, + outbound_connections: 0, + outbound_connect_timeout: 2, + inbound_connections: usize::MAX, + peers, + allowed_transports: vec!["tcp".to_string()], + ..Default::default() + }; - let mut genesis_event_id = NULL_ID; + let p2p = P2p::new(settings, ex.clone()).await; + let sled_db = sled::Config::new().temporary(true).open().unwrap(); + let event_graph = EventGraph::new(p2p.clone(), sled_db, "dag", 1, ex.clone()).await.unwrap(); + *event_graph.synced.write().await = true; + let event_graph_ = event_graph.clone(); + + // Register the P2P protocols + let registry = p2p.protocol_registry(); + registry + .register(SESSION_ALL, move |channel, _| { + let event_graph_ = event_graph_.clone(); + async move { ProtocolEventGraph::init(event_graph_, channel).await.unwrap() } + }) + .await; + + event_graph +} + +async fn bootstrap_nodes( + peer_indexes: &[usize], + starting_port: usize, + rng: &mut ThreadRng, + ex: Arc>, +) -> Vec> { + let mut eg_instances = vec![]; // Initialize the nodes for i in 0..N_NODES { // Everyone will connect to N_CONNS random peers. + let mut peer_indexes_copy = peer_indexes.to_owned(); + peer_indexes_copy.remove(i); + let peer_indexes_to_connect: Vec<_> = + peer_indexes_copy.choose_multiple(rng, N_CONNS).collect(); + let mut peers = vec![]; - for _ in 0..N_CONNS { - let mut port = 13200 + i; - while port == 13200 + i { - port = 13200 + rng.gen_range(0..N_NODES); - } + for peer_index in peer_indexes_to_connect { + let port = starting_port + peer_index; peers.push(Url::parse(&format!("tcp://127.0.0.1:{}", port)).unwrap()); } - let settings = Settings { - localnet: true, - inbound_addrs: vec![Url::parse(&format!("tcp://127.0.0.1:{}", 13200 + i)).unwrap()], - outbound_connections: 0, - outbound_connect_timeout: 2, - inbound_connections: usize::MAX, + let event_graph = spawn_node( + vec![Url::parse(&format!("tcp://127.0.0.1:{}", starting_port + i)).unwrap()], peers, - allowed_transports: vec!["tcp".to_string()], - ..Default::default() - }; - - let p2p = P2p::new(settings, ex.clone()).await; - let sled_db = sled::Config::new().temporary(true).open().unwrap(); - let event_graph = - EventGraph::new(p2p.clone(), sled_db, "dag", 1, ex.clone()).await.unwrap(); - let event_graph_ = event_graph.clone(); - - // Take the last sled item since there's only 1 - if genesis_event_id == NULL_ID { - let (id, _) = event_graph.dag.last().unwrap().unwrap(); - genesis_event_id = blake3::Hash::from_bytes((&id as &[u8]).try_into().unwrap()); - } - - // Register the P2P protocols - let registry = p2p.protocol_registry(); - registry - .register(SESSION_ALL, move |channel, _| { - let event_graph_ = event_graph_.clone(); - async move { ProtocolEventGraph::init(event_graph_, channel).await.unwrap() } - }) - .await; + ex.clone(), + ) + .await; eg_instances.push(event_graph); } @@ -140,64 +144,121 @@ async fn eventgraph_propagation_real(ex: Arc>) { eg.p2p.clone().start().await.unwrap(); } - info!("Waiting 10s until all peers connect"); - sleep(10).await; + info!("Waiting 5s until all peers connect"); + sleep(5).await; + + eg_instances +} + +async fn assert_dags( + eg_instances: &Vec>, + expected_len: usize, + rng: &mut ThreadRng, +) { + let random_node = eg_instances.choose(rng).unwrap(); + let last_layer_tips = + random_node.unreferenced_tips.read().await.last_key_value().unwrap().1.clone(); + for (i, eg) in eg_instances.iter().enumerate() { + let node_last_layer_tips = + eg.unreferenced_tips.read().await.last_key_value().unwrap().1.clone(); + assert!( + eg.dag.len() == expected_len, + "Node {}, expected {} events, have {}", + i, + expected_len, + eg.dag.len() + ); + assert_eq!( + node_last_layer_tips, last_layer_tips, + "Node {} contains malformed unreferenced tips", + i + ); + } +} + +macro_rules! test_body { + ($real_call:ident) => { + init_logger(); + + let ex = Arc::new(Executor::new()); + let ex_ = ex.clone(); + let (signal, shutdown) = channel::unbounded::<()>(); + + // Run a thread for each node. + easy_parallel::Parallel::new() + .each(0..N_NODES, |_| future::block_on(ex.run(shutdown.recv()))) + .finish(|| { + future::block_on(async { + $real_call(ex_).await; + drop(signal); + }) + }); + }; +} + +#[test] +fn eventgraph_propagation() { + test_body!(eventgraph_propagation_real); +} + +async fn eventgraph_propagation_real(ex: Arc>) { + let mut rng = rand::thread_rng(); + let peer_indexes: Vec = (0..N_NODES).collect(); + + // Bootstrap nodes + let mut eg_instances = bootstrap_nodes(&peer_indexes, 13200, &mut rng, ex.clone()).await; + + // Grab genesis event + let random_node = eg_instances.choose(&mut rng).unwrap(); + let (id, _) = random_node.dag.last().unwrap().unwrap(); + let genesis_event_id = blake3::Hash::from_bytes((&id as &[u8]).try_into().unwrap()); // ========================================= // 1. Assert that everyone's DAG is the same // ========================================= - for (i, eg) in eg_instances.iter().enumerate() { - let tips = eg.unreferenced_tips.read().await; - assert!(eg.dag.len() == 1, "Node {}", i); - assert!(tips.len() == 1, "Node {}", i); - assert!(tips.get(&genesis_event_id).is_some(), "Node {}", i); - } + assert_dags(&eg_instances, 1, &mut rng).await; // ========================================== // 2. Create an event in one node and publish // ========================================== - let random_node = eg_instances.choose(&mut rand::thread_rng()).unwrap(); - let event = Event::new(vec![1, 2, 3, 4], random_node.clone()).await; + let random_node = eg_instances.choose(&mut rng).unwrap(); + let event = Event::new(vec![1, 2, 3, 4], random_node).await; assert!(event.parents.contains(&genesis_event_id)); - // The node adds it to their DAG. + // The node adds it to their DAG, on layer 1. let event_id = random_node.dag_insert(&[event.clone()]).await.unwrap()[0]; - let tips = random_node.unreferenced_tips.read().await; - assert!(tips.len() == 1); - assert!(tips.get(&event_id).is_some()); - drop(tips); + let tips_layers = random_node.unreferenced_tips.read().await; + // Since genesis was referenced, its layer (0) have been removed + assert_eq!(tips_layers.len(), 1); + assert!(tips_layers.last_key_value().unwrap().1.get(&event_id).is_some()); + drop(tips_layers); info!("Broadcasting event {}", event_id); random_node.p2p.broadcast(&EventPut(event)).await; - info!("Waiting 10s for event propagation"); - sleep(10).await; + info!("Waiting 5s for event propagation"); + sleep(5).await; // ==================================================== // 3. Assert that everyone has the new event in the DAG // ==================================================== - for (i, eg) in eg_instances.iter().enumerate() { - let tips = eg.unreferenced_tips.read().await; - assert!(eg.dag.len() == 2, "Node {}", i); - assert!(tips.len() == 1, "Node {}", i); - assert!(tips.get(&event_id).is_some(), "Node {}", i); - } + assert_dags(&eg_instances, 2, &mut rng).await; // ============================================================== // 4. Create multiple events on a node and broadcast the last one // The `EventPut` logic should manage to fetch all of them, // provided that the last one references the earlier ones. // ============================================================== - let random_node = eg_instances.choose(&mut rand::thread_rng()).unwrap(); - let event0 = Event::new(vec![1, 2, 3, 4, 0], random_node.clone()).await; + let random_node = eg_instances.choose(&mut rng).unwrap(); + let event0 = Event::new(vec![1, 2, 3, 4, 0], random_node).await; let event0_id = random_node.dag_insert(&[event0.clone()]).await.unwrap()[0]; - let event1 = Event::new(vec![1, 2, 3, 4, 1], random_node.clone()).await; + let event1 = Event::new(vec![1, 2, 3, 4, 1], random_node).await; let event1_id = random_node.dag_insert(&[event1.clone()]).await.unwrap()[0]; - let event2 = Event::new(vec![1, 2, 3, 4, 2], random_node.clone()).await; + let event2 = Event::new(vec![1, 2, 3, 4, 2], random_node).await; let event2_id = random_node.dag_insert(&[event2.clone()]).await.unwrap()[0]; - // Genesis event + event from 2. + upper 3 events - assert!(random_node.dag.len() == 5); - let tips = random_node.unreferenced_tips.read().await; - assert!(tips.len() == 1); - assert!(tips.get(&event2_id).is_some()); - drop(tips); + // Genesis event + event from 2. + upper 3 events (layer 4) + assert_eq!(random_node.dag.len(), 5); + let tips_layers = random_node.unreferenced_tips.read().await; + assert_eq!(tips_layers.len(), 1); + assert!(tips_layers.get(&4).unwrap().get(&event2_id).is_some()); + drop(tips_layers); let event_chain = vec![(event0_id, event0.parents), (event1_id, event1.parents), (event2_id, event2.parents)]; @@ -205,142 +266,186 @@ async fn eventgraph_propagation_real(ex: Arc>) { info!("Broadcasting event {}", event2_id); info!("Event chain: {:#?}", event_chain); random_node.p2p.broadcast(&EventPut(event2)).await; - info!("Waiting 10s for event propagation"); - sleep(10).await; + info!("Waiting 5s for event propagation"); + sleep(5).await; // ========================================== // 5. Assert that everyone has all the events // ========================================== - for (i, eg) in eg_instances.iter().enumerate() { - let tips = eg.unreferenced_tips.read().await; - assert!(eg.dag.len() == 5, "Node {}, expected 5 events, have {}", i, eg.dag.len()); - assert!(tips.len() == 1, "Node {}, expected 1 tip, have {}", i, tips.len()); - assert!(tips.get(&event2_id).is_some(), "Node {}, expected tip to be {}", i, event2_id); - } + assert_dags(&eg_instances, 5, &mut rng).await; // =========================================== // 6. Create multiple events on multiple nodes // =========================================== // node 1 // ======= - let node1 = eg_instances.choose(&mut rand::thread_rng()).unwrap(); - let event0_1 = Event::new(vec![1, 2, 3, 4, 3], node1.clone()).await; - let _ = node1.dag_insert(&[event0_1.clone()]).await.unwrap()[0]; + let node1 = eg_instances.choose(&mut rng).unwrap(); + let event0_1 = Event::new(vec![1, 2, 3, 4, 3], node1).await; + node1.dag_insert(&[event0_1.clone()]).await.unwrap(); node1.p2p.broadcast(&EventPut(event0_1)).await; - let event1_1 = Event::new(vec![1, 2, 3, 4, 4], node1.clone()).await; - let _ = node1.dag_insert(&[event1_1.clone()]).await.unwrap()[0]; + let event1_1 = Event::new(vec![1, 2, 3, 4, 4], node1).await; + node1.dag_insert(&[event1_1.clone()]).await.unwrap(); node1.p2p.broadcast(&EventPut(event1_1)).await; - let event2_1 = Event::new(vec![1, 2, 3, 4, 5], node1.clone()).await; - let _ = node1.dag_insert(&[event2_1.clone()]).await.unwrap()[0]; + let event2_1 = Event::new(vec![1, 2, 3, 4, 5], node1).await; + node1.dag_insert(&[event2_1.clone()]).await.unwrap(); node1.p2p.broadcast(&EventPut(event2_1)).await; // ======= // node 2 // ======= - let node2 = eg_instances.choose(&mut rand::thread_rng()).unwrap(); - let event0_2 = Event::new(vec![1, 2, 3, 4, 6], node2.clone()).await; - let _ = node2.dag_insert(&[event0_2.clone()]).await.unwrap()[0]; + let node2 = eg_instances.choose(&mut rng).unwrap(); + let event0_2 = Event::new(vec![1, 2, 3, 4, 6], node2).await; + node2.dag_insert(&[event0_2.clone()]).await.unwrap(); node2.p2p.broadcast(&EventPut(event0_2)).await; - let event1_2 = Event::new(vec![1, 2, 3, 4, 7], node2.clone()).await; - let _ = node2.dag_insert(&[event1_2.clone()]).await.unwrap()[0]; + + let event1_2 = Event::new(vec![1, 2, 3, 4, 7], node2).await; + node2.dag_insert(&[event1_2.clone()]).await.unwrap(); node2.p2p.broadcast(&EventPut(event1_2)).await; - let event2_2 = Event::new(vec![1, 2, 3, 4, 8], node2.clone()).await; - let _ = node2.dag_insert(&[event2_2.clone()]).await.unwrap()[0]; + let event2_2 = Event::new(vec![1, 2, 3, 4, 8], node2).await; + node2.dag_insert(&[event2_2.clone()]).await.unwrap(); node2.p2p.broadcast(&EventPut(event2_2)).await; // ======= // node 3 // ======= - let node3 = eg_instances.choose(&mut rand::thread_rng()).unwrap(); - let event0_3 = Event::new(vec![1, 2, 3, 4, 9], node3.clone()).await; - let _ = node3.dag_insert(&[event0_3.clone()]).await.unwrap()[0]; + let node3 = eg_instances.choose(&mut rng).unwrap(); + let event0_3 = Event::new(vec![1, 2, 3, 4, 9], node3).await; + node3.dag_insert(&[event0_3.clone()]).await.unwrap(); node2.p2p.broadcast(&EventPut(event0_3)).await; - let event1_3 = Event::new(vec![1, 2, 3, 4, 10], node3.clone()).await; - let _ = node3.dag_insert(&[event1_3.clone()]).await.unwrap()[0]; + let event1_3 = Event::new(vec![1, 2, 3, 4, 10], node3).await; + node3.dag_insert(&[event1_3.clone()]).await.unwrap(); node2.p2p.broadcast(&EventPut(event1_3)).await; - let event2_3 = Event::new(vec![1, 2, 3, 4, 11], node3.clone()).await; - let event2_3_id = node3.dag_insert(&[event2_3.clone()]).await.unwrap()[0]; + let event2_3 = Event::new(vec![1, 2, 3, 4, 11], node3).await; + node3.dag_insert(&[event2_3.clone()]).await.unwrap(); node3.p2p.broadcast(&EventPut(event2_3)).await; - info!("Waiting 10s for events propagation"); - sleep(10).await; + info!("Waiting 5s for events propagation"); + sleep(5).await; // ========================================== // 7. Assert that everyone has all the events // ========================================== - for (i, eg) in eg_instances.iter().enumerate() { - let tips = eg.unreferenced_tips.read().await; - assert!(eg.dag.len() == 14, "Node {}, expected 14 events, have {}", i, eg.dag.len()); - // 5 events from 2. and 4. + 9 events from 6. = ^ - assert!(tips.get(&event2_3_id).is_some(), "Node {}, expected tip to be {}", i, event2_3_id); - } + // 5 events from 2. and 4. + 9 events from 6. = 14 + assert_dags(&eg_instances, 14, &mut rng).await; // ============================================================ // 8. Start a new node and try to sync the DAG from other peers // ============================================================ { // Connect to N_CONNS random peers. + let peer_indexes_to_connect: Vec<_> = + peer_indexes.choose_multiple(&mut rng, N_CONNS).collect(); + let mut peers = vec![]; - for _ in 0..N_CONNS { - let port = 13200 + rng.gen_range(0..N_NODES); + for peer_index in peer_indexes_to_connect { + let port = 13200 + peer_index; peers.push(Url::parse(&format!("tcp://127.0.0.1:{}", port)).unwrap()); } - let settings = Settings { - localnet: true, - inbound_addrs: vec![ - Url::parse(&format!("tcp://127.0.0.1:{}", 13200 + N_NODES + 1)).unwrap() - ], - outbound_connections: 0, - outbound_connect_timeout: 2, - inbound_connections: usize::MAX, + let event_graph = spawn_node( + vec![Url::parse(&format!("tcp://127.0.0.1:{}", 13200 + N_NODES + 1)).unwrap()], peers, - allowed_transports: vec!["tcp".to_string()], - ..Default::default() - }; - - let p2p = P2p::new(settings, ex.clone()).await; - let sled_db = sled::Config::new().temporary(true).open().unwrap(); - let event_graph = - EventGraph::new(p2p.clone(), sled_db, "dag", 1, ex.clone()).await.unwrap(); - let event_graph_ = event_graph.clone(); - - // Register the P2P protocols - let registry = p2p.protocol_registry(); - registry - .register(SESSION_ALL, move |channel, _| { - let event_graph_ = event_graph_.clone(); - async move { ProtocolEventGraph::init(event_graph_, channel).await.unwrap() } - }) - .await; + ex.clone(), + ) + .await; eg_instances.push(event_graph.clone()); event_graph.p2p.clone().start().await.unwrap(); - info!("Waiting 10s for new node connection"); - sleep(10).await; + info!("Waiting 5s for new node connection"); + sleep(5).await; - event_graph.dag_sync().await.unwrap(); + event_graph.dag_sync().await.unwrap() } - info!("Waiting 10s for things to settle"); - sleep(10).await; - // ============================================================ // 9. Assert the new synced DAG has the same contents as others // ============================================================ - for (i, eg) in eg_instances.iter().enumerate() { - let tips = eg.unreferenced_tips.read().await; - assert!(eg.dag.len() == 14, "Node {}, expected 14 events, have {}", i, eg.dag.len()); - // 5 events from 2. and 4. + 9 events from 6. = ^ - assert!(tips.get(&event2_3_id).is_some(), "Node {}, expected tip to be {}", i, event2_3_id); - } + // 5 events from 2. and 4. + 9 events from 6. = 14 + assert_dags(&eg_instances, 14, &mut rng).await; + + // Stop the P2P network + for eg in eg_instances.iter() { + eg.p2p.clone().stop().await; + } +} + +#[test] +#[ignore] +fn eventgraph_chaotic_propagation() { + test_body!(eventgraph_chaotic_propagation_real); +} + +async fn eventgraph_chaotic_propagation_real(ex: Arc>) { + let mut rng = rand::thread_rng(); + let peer_indexes: Vec = (0..N_NODES).collect(); + let n_events: usize = 100000; + + // Bootstrap nodes + let mut eg_instances = bootstrap_nodes(&peer_indexes, 14200, &mut rng, ex.clone()).await; + + // ========================================= + // 1. Assert that everyone's DAG is the same + // ========================================= + assert_dags(&eg_instances, 1, &mut rng).await; + + // =========================================== + // 2. Create multiple events on multiple nodes + for i in 0..n_events { + let random_node = eg_instances.choose(&mut rng).unwrap(); + let event = Event::new(i.to_be_bytes().to_vec(), random_node).await; + random_node.dag_insert(&[event.clone()]).await.unwrap(); + random_node.p2p.broadcast(&EventPut(event)).await; + } + info!("Waiting 5s for events propagation"); + sleep(5).await; + + // ========================================== + // 3. Assert that everyone has all the events + // ========================================== + assert_dags(&eg_instances, n_events + 1, &mut rng).await; + + // ============================================================ + // 4. Start a new node and try to sync the DAG from other peers + // ============================================================ + { + // Connect to N_CONNS random peers. + let peer_indexes_to_connect: Vec<_> = + peer_indexes.choose_multiple(&mut rng, N_CONNS).collect(); + + let mut peers = vec![]; + for peer_index in peer_indexes_to_connect { + let port = 14200 + peer_index; + peers.push(Url::parse(&format!("tcp://127.0.0.1:{}", port)).unwrap()); + } + + let event_graph = spawn_node( + vec![Url::parse(&format!("tcp://127.0.0.1:{}", 14200 + N_NODES + 1)).unwrap()], + peers, + ex.clone(), + ) + .await; + + eg_instances.push(event_graph.clone()); + + event_graph.p2p.clone().start().await.unwrap(); + + info!("Waiting 5s for new node connection"); + sleep(5).await; + + event_graph.dag_sync().await.unwrap() + } + + // ============================================================ + // 5. Assert the new synced DAG has the same contents as others + // ============================================================ + assert_dags(&eg_instances, n_events + 1, &mut rng).await; // Stop the P2P network for eg in eg_instances.iter() { diff --git a/src/event_graph/util.rs b/src/event_graph/util.rs index 34199ebef..2f0969b79 100644 --- a/src/event_graph/util.rs +++ b/src/event_graph/util.rs @@ -18,6 +18,8 @@ use std::time::UNIX_EPOCH; +use crate::event_graph::{Event, GENESIS_CONTENTS, INITIAL_GENESIS, NULL_ID, N_EVENT_PARENTS}; + /// Seconds in a day pub(super) const DAY: i64 = 86400; @@ -89,6 +91,29 @@ pub(super) fn seconds_until_next_rotation(next_rotation: u64) -> u64 { next_rotation - now } +/// Generate a deterministic genesis event corresponding to the DAG's configuration. +pub(super) fn generate_genesis(days_rotation: u64) -> Event { + // Days rotation is u64 except zero + let genesis_days_rotation = if days_rotation == 0 { 1 } else { days_rotation }; + + // First check how many days passed since initial genesis. + let days_passed = days_since(INITIAL_GENESIS); + + // Calculate the number of days_rotation intervals since INITIAL_GENESIS + let rotations_since_genesis = days_passed / genesis_days_rotation; + + // Calculate the timestamp of the most recent event + let timestamp = + INITIAL_GENESIS + (rotations_since_genesis * genesis_days_rotation * DAY as u64); + + Event { + timestamp, + content: GENESIS_CONTENTS.to_vec(), + parents: [NULL_ID; N_EVENT_PARENTS], + layer: 0, + } +} + #[cfg(test)] mod tests { use crate::event_graph::INITIAL_GENESIS; diff --git a/src/system/stoppable_task.rs b/src/system/stoppable_task.rs index f8ef013cd..6470efb45 100644 --- a/src/system/stoppable_task.rs +++ b/src/system/stoppable_task.rs @@ -142,6 +142,7 @@ impl std::cmp::Eq for StoppableTask {} mod tests { use super::*; use crate::{error::Error, system::sleep_forever}; + use log::warn; use smol::Executor; use std::sync::Arc; @@ -150,13 +151,21 @@ mod tests { let mut cfg = simplelog::ConfigBuilder::new(); cfg.add_filter_ignore("async_io".to_string()); cfg.add_filter_ignore("polling".to_string()); - simplelog::TermLogger::init( + + // We check this error so we can execute same file tests in parallel, + // otherwise second one fails to init logger here. + if simplelog::TermLogger::init( + //simplelog::LevelFilter::Info, + //simplelog::LevelFilter::Debug, simplelog::LevelFilter::Trace, cfg.build(), simplelog::TerminalMode::Mixed, simplelog::ColorChoice::Auto, ) - .unwrap(); + .is_err() + { + warn!(target: "test_harness", "Logger already initialized"); + } let executor = Arc::new(Executor::new()); let executor_ = executor.clone(); diff --git a/tests/consensus_prop.rs b/tests/consensus_prop.rs index 3711b5251..fcb846322 100644 --- a/tests/consensus_prop.rs +++ b/tests/consensus_prop.rs @@ -29,7 +29,7 @@ use darkfi_sdk::{ pasta::pallas, }; use halo2_proofs::dev::MockProver; -use log::info; +use log::{info, warn}; use rand::rngs::OsRng; pub const SECRET_KEY_PREFIX: pallas::Base = pallas::Base::from_raw([4, 0, 0, 0]); @@ -46,14 +46,21 @@ pub const HEADSTART: pallas::Base = pallas::Base::from_raw([ #[test] fn consensus_prop() -> Result<()> { - simplelog::TermLogger::init( + let mut cfg = simplelog::ConfigBuilder::new(); + // We check this error so we can execute same file tests in parallel, + // otherwise second one fails to init logger here. + if simplelog::TermLogger::init( simplelog::LevelFilter::Info, + //simplelog::LevelFilter::Debug, //simplelog::LevelFilter::Trace, - simplelog::ConfigBuilder::new().build(), + cfg.build(), simplelog::TerminalMode::Mixed, simplelog::ColorChoice::Auto, ) - .unwrap(); + .is_err() + { + warn!(target: "test_harness", "Logger already initialized"); + } let input_serial = pallas::Base::from(pallas::Base::from(10)); //let input_serial = pallas::Base::from(pallas::Base::from(2));