event_graph: introduce layers to the DAG

Each event corresponds to a specific layer(height) in the dag, making identifying and preventing cycles way easier, as all parents must exist in previous layers. Additionally, propagation and sync gremlins have been eliminated, and proper validations added
This commit is contained in:
aggstam
2023-11-24 15:35:35 +02:00
parent 59d7ed09fc
commit c6fa3ba99b
13 changed files with 804 additions and 414 deletions

View File

@@ -202,6 +202,7 @@ event-graph = [
"num-bigint",
"rand",
"sled",
"sled-overlay",
"smol",
"tinyjson",

View File

@@ -359,11 +359,8 @@ impl Client {
self.server.try_encrypt(&mut privmsg).await;
// Build a DAG event and return it.
let event = Event::new(
serialize_async(&privmsg).await,
self.server.darkirc.event_graph.clone(),
)
.await;
let event =
Event::new(serialize_async(&privmsg).await, &self.server.darkirc.event_graph).await;
return Ok(Some(event))
}

View File

@@ -33,7 +33,7 @@ use darkfi::{
use rand::{rngs::OsRng, RngCore};
use smol::Executor;
use url::Url;
use log::error;
use log::{error, warn};
use super::{proto::ProtocolDht, Dhtd};
@@ -144,12 +144,20 @@ fn dht_remote_get_insert() -> Result<()> {
cfg.add_filter_ignore("net::protocol_version".to_string());
cfg.add_filter_ignore("net::protocol_ping".to_string());
simplelog::TermLogger::init(
// We check this error so we can execute same file tests in parallel,
// otherwise second one fails to init logger here.
if simplelog::TermLogger::init(
simplelog::LevelFilter::Info,
//simplelog::LevelFilter::Debug,
//simplelog::LevelFilter::Trace,
cfg.build(),
simplelog::TerminalMode::Mixed,
simplelog::ColorChoice::Auto,
)?;
)
.is_err()
{
warn!(target: "test_harness", "Logger already initialized");
}
let ex = Arc::new(Executor::new());
let (signal, shutdown) = async_std::channel::unbounded::<()>();

View File

@@ -105,7 +105,7 @@ impl JsonRpcInterface {
let genevent: GenEvent = deserialize(&dec).unwrap();
// Build a DAG event and return it.
let event = Event::new(serialize_async(&genevent).await, self.event_graph.clone()).await;
let event = Event::new(serialize_async(&genevent).await, &self.event_graph).await;
if let Err(e) = self.event_graph.dag_insert(&[event.clone()]).await {
error!("Failed inserting new event to DAG: {}", e);

View File

@@ -170,7 +170,7 @@ async fn start_sync_loop(
// Build a DAG event and return it.
let event = Event::new(
serialize_async(&encrypted_task).await,
event_graph.clone(),
&event_graph,
)
.await;
// Update the last sent event.

View File

@@ -487,6 +487,9 @@ pub enum Error {
#[error("Event is not found in tree: {0}")]
EventNotFound(String),
#[error("Event is invalid")]
EventIsInvalid,
// ====================
// Miscellaneous errors
// ====================

View File

@@ -18,9 +18,15 @@
use std::{collections::HashSet, time::UNIX_EPOCH};
use darkfi_serial::{async_trait, Encodable, SerialDecodable, SerialEncodable};
use darkfi_serial::{async_trait, deserialize_async, Encodable, SerialDecodable, SerialEncodable};
use sled_overlay::SledTreeOverlay;
use super::{EventGraphPtr, EVENT_TIME_DRIFT, NULL_ID, N_EVENT_PARENTS};
use crate::Result;
use super::{
util::next_rotation_timestamp, EventGraphPtr, EVENT_TIME_DRIFT, INITIAL_GENESIS, NULL_ID,
N_EVENT_PARENTS,
};
/// Representation of an event in the Event Graph
#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
@@ -31,20 +37,19 @@ pub struct Event {
pub(super) content: Vec<u8>,
/// Parent nodes in the event DAG
pub(super) parents: [blake3::Hash; N_EVENT_PARENTS],
/// DAG layer index of the event
pub(super) layer: u64,
}
impl Event {
/// Create a new event with the given data and an `EventGraph` reference.
/// Create a new event with the given data and an [`EventGraph`] reference.
/// The timestamp of the event will be the current time, and the parents
/// will be `N_EVENT_PARENTS` from the current event graph unreferenced tips.
/// The parents can also include NULL, but this should be handled by the rest
/// of the codebase.
pub async fn new(data: Vec<u8>, event_graph: EventGraphPtr) -> Self {
Self {
timestamp: UNIX_EPOCH.elapsed().unwrap().as_secs(),
content: data,
parents: event_graph.get_unreferenced_tips().await,
}
pub async fn new(data: Vec<u8>, event_graph: &EventGraphPtr) -> Self {
let (layer, parents) = event_graph.get_next_layer_with_parents().await;
Self { timestamp: UNIX_EPOCH.elapsed().unwrap().as_secs(), content: data, parents, layer }
}
/// Hash the [`Event`] to retrieve its ID
@@ -53,6 +58,7 @@ impl Event {
self.timestamp.encode(&mut hasher).unwrap();
self.content.encode(&mut hasher).unwrap();
self.parents.encode(&mut hasher).unwrap();
self.layer.encode(&mut hasher).unwrap();
hasher.finalize()
}
@@ -68,9 +74,92 @@ impl Event {
}
*/
/// Fully validate an event for the correct layout against provided
/// DAG [`sled::Tree`] reference and enforce relevant age, assuming
/// some possibility for a time drift. Optionally, provide an overlay
/// to use that instead of actual referenced DAG.
pub async fn validate(
&self,
dag: &sled::Tree,
genesis_timestamp: u64,
days_rotation: u64,
overlay: Option<&SledTreeOverlay>,
) -> Result<bool> {
// Let's not bother with empty events
if self.content.is_empty() {
return Ok(false)
}
// Check if the event timestamp is after genesis timestamp
if self.timestamp < genesis_timestamp - EVENT_TIME_DRIFT {
return Ok(false)
}
// If a rotation has been set, check if the event timestamp
// is after the next genesis timestamp
if days_rotation > 0 {
let next_genesis_timestamp = next_rotation_timestamp(INITIAL_GENESIS, days_rotation);
if self.timestamp > next_genesis_timestamp + EVENT_TIME_DRIFT {
return Ok(false)
}
}
// Validate the parents. We have to check that at least one parent
// is not NULL, that the parents exist, that no two parents are the
// same, and that the parent exists in previous layers, to prevent
// recursive references(circles).
let mut seen = HashSet::new();
let self_id = self.id();
for parent_id in self.parents.iter() {
if parent_id == &NULL_ID {
continue
}
if parent_id == &self_id {
return Ok(false)
}
if seen.contains(parent_id) {
return Ok(false)
}
let parent_bytes = if let Some(overlay) = overlay {
overlay.get(parent_id.as_bytes())?
} else {
dag.get(parent_id.as_bytes())?
};
if parent_bytes.is_none() {
return Ok(false)
}
let parent: Event = deserialize_async(&parent_bytes.unwrap()).await?;
if self.layer <= parent.layer {
return Ok(false)
}
seen.insert(parent_id);
}
Ok(!seen.is_empty())
}
/// Fully validate an event for the correct layout against provided
/// [`EventGraph`] reference and enforce relevant age, assuming some
/// possibility for a time drift.
pub async fn dag_validate(&self, event_graph: &EventGraphPtr) -> Result<bool> {
// Grab genesis timestamp
let genesis_timestamp = event_graph.current_genesis.read().await.timestamp;
// Perform validation
self.validate(&event_graph.dag, genesis_timestamp, event_graph.days_rotation, None).await
}
/// Validate a new event for the correct layout and enforce relevant age,
/// assuming some possibility for a time drift.
pub fn validate(&self) -> bool {
/// Note: This validation does *NOT* check for recursive references(circles),
/// and should be used as a first quick check.
pub fn validate_new(&self) -> bool {
// Let's not bother with empty events
if self.content.is_empty() {
return false
@@ -86,8 +175,7 @@ impl Event {
}
// Validate the parents. We have to check that at least one parent
// is not NULL, that the parent does not recursively reference the
// event, and that no two parents are the same.
// is not NULL and that no two parents are the same.
let mut seen = HashSet::new();
let self_id = self.id();
@@ -113,59 +201,77 @@ impl Event {
#[cfg(test)]
mod tests {
use std::sync::Arc;
use smol::Executor;
use crate::{
event_graph::EventGraph,
net::{P2p, Settings},
};
use super::*;
fn make_valid_event() -> Event {
Event {
timestamp: UNIX_EPOCH.elapsed().unwrap().as_secs(),
content: vec![1u8],
parents: [
blake3::hash(b"1"),
blake3::hash(b"2"),
blake3::hash(b"3"),
blake3::hash(b"4"),
blake3::hash(b"5"),
],
}
}
#[test]
fn event_is_valid() {
// Validate our test Event struct
assert!(make_valid_event().validate());
async fn make_event_graph() -> Result<EventGraphPtr> {
let ex = Arc::new(Executor::new());
let p2p = P2p::new(Settings::default(), ex.clone()).await;
let sled_db = sled::Config::new().temporary(true).open().unwrap();
EventGraph::new(p2p, sled_db, "dag", 1, ex).await
}
#[test]
fn invalid_events() {
// TODO: Not checked:
// - "the parent does not recursively reference the event"
let e = make_valid_event();
fn event_is_valid() -> Result<()> {
smol::block_on(async {
// Generate a dummy event graph
let event_graph = make_event_graph().await?;
let mut event_empty_content = e.clone();
event_empty_content.content = vec![];
assert!(!event_empty_content.validate());
// Create a new valid event
let valid_event = Event::new(vec![1u8], &event_graph).await;
let mut event_timestamp_too_old = e.clone();
event_timestamp_too_old.timestamp = 0;
assert!(!event_timestamp_too_old.validate());
// Validate our test Event struct
assert!(valid_event.dag_validate(&event_graph).await?);
let mut event_timestamp_too_new = e.clone();
event_timestamp_too_new.timestamp = u64::MAX;
assert!(!event_timestamp_too_new.validate());
// Thanks for reading
Ok(())
})
}
let mut event_duplicated_parents = e.clone();
let duplicated_parents = [
blake3::hash(b"1"),
blake3::hash(b"1"),
blake3::hash(b"3"),
blake3::hash(b"4"),
blake3::hash(b"5"),
];
event_duplicated_parents.parents = duplicated_parents;
assert!(!event_duplicated_parents.validate());
#[test]
fn invalid_events() -> Result<()> {
smol::block_on(async {
// Generate a dummy event graph
let event_graph = make_event_graph().await?;
let mut event_null_parents = e.clone();
let all_null_parents = [NULL_ID, NULL_ID, NULL_ID, NULL_ID, NULL_ID];
event_null_parents.parents = all_null_parents;
assert!(!event_null_parents.validate());
// Create a new valid event
let valid_event = Event::new(vec![1u8], &event_graph).await;
let mut event_empty_content = valid_event.clone();
event_empty_content.content = vec![];
assert!(!event_empty_content.dag_validate(&event_graph).await?);
let mut event_timestamp_too_old = valid_event.clone();
event_timestamp_too_old.timestamp = 0;
assert!(!event_timestamp_too_old.dag_validate(&event_graph).await?);
let mut event_timestamp_too_new = valid_event.clone();
event_timestamp_too_new.timestamp = u64::MAX;
assert!(!event_timestamp_too_new.dag_validate(&event_graph).await?);
let mut event_duplicated_parents = valid_event.clone();
event_duplicated_parents.parents[1] = valid_event.parents[0];
assert!(!event_duplicated_parents.dag_validate(&event_graph).await?);
let mut event_null_parents = valid_event.clone();
let all_null_parents = [NULL_ID, NULL_ID, NULL_ID, NULL_ID, NULL_ID];
event_null_parents.parents = all_null_parents;
assert!(!event_null_parents.dag_validate(&event_graph).await?);
let mut event_same_layer_as_parents = valid_event.clone();
event_same_layer_as_parents.layer = 0;
assert!(!event_same_layer_as_parents.dag_validate(&event_graph).await?);
// Thanks for reading
Ok(())
})
}
}

View File

@@ -18,7 +18,7 @@
use std::{
cmp::Ordering,
collections::{HashMap, HashSet, VecDeque},
collections::{BTreeMap, HashMap, HashSet, VecDeque},
sync::Arc,
};
@@ -26,6 +26,7 @@ use async_recursion::async_recursion;
use darkfi_serial::{deserialize_async, serialize_async};
use log::{debug, error, info};
use num_bigint::BigUint;
use sled_overlay::SledTreeOverlay;
use smol::{
lock::{OnceCell, RwLock},
Executor,
@@ -48,7 +49,7 @@ use proto::{EventRep, EventReq, TipRep, TipReq, REPLY_TIMEOUT};
/// Utility functions
mod util;
use util::{days_since, next_rotation_timestamp, DAY};
use util::{generate_genesis, next_rotation_timestamp};
#[cfg(test)]
mod tests;
@@ -76,7 +77,7 @@ pub struct EventGraph {
/// Sled tree containing the DAG
dag: sled::Tree,
/// The set of unreferenced DAG tips
unreferenced_tips: RwLock<HashSet<blake3::Hash>>,
unreferenced_tips: RwLock<BTreeMap<u64, HashSet<blake3::Hash>>>,
/// A `HashSet` containg event IDs and their 1-level parents.
/// These come from the events we've sent out using `EventPut`.
/// They are used with `EventReq` to decide if we should reply
@@ -88,7 +89,12 @@ pub struct EventGraph {
/// Event subscriber, this notifies whenever an event is
/// inserted into the DAG
pub event_sub: SubscriberPtr<Event>,
/// Current genesis event
current_genesis: RwLock<Event>,
/// Currently configured DAG rotation, in days
days_rotation: u64,
/// Flag signalling DAG has finished initial sync
synced: RwLock<bool>,
}
impl EventGraph {
@@ -102,12 +108,12 @@ impl EventGraph {
ex: Arc<Executor<'_>>,
) -> Result<EventGraphPtr> {
let dag = sled_db.open_tree(dag_tree_name)?;
let unreferenced_tips = RwLock::new(HashSet::new());
let unreferenced_tips = RwLock::new(BTreeMap::new());
let broadcasted_ids = RwLock::new(HashSet::new());
let event_sub = Subscriber::new();
// Create the current genesis event based on the `days_rotation`
let current_genesis = Self::generate_genesis(days_rotation);
let current_genesis = generate_genesis(days_rotation);
let self_ = Arc::new(Self {
p2p,
dag: dag.clone(),
@@ -115,7 +121,9 @@ impl EventGraph {
broadcasted_ids,
prune_task: OnceCell::new(),
event_sub,
current_genesis: RwLock::new(current_genesis.clone()),
days_rotation,
synced: RwLock::new(false),
});
// Check if we have it in our DAG.
@@ -159,29 +167,11 @@ impl EventGraph {
sled_db.flush_async().await.unwrap();
}
/// Generate a deterministic genesis event corresponding to the DAG's configuration.
fn generate_genesis(days_rotation: u64) -> Event {
// Days rotation is u64 except zero
let genesis_days_rotation = if days_rotation == 0 { 1 } else { days_rotation };
// First check how many days passed since initial genesis.
let days_passed = days_since(INITIAL_GENESIS);
// Calculate the number of days_rotation intervals since INITIAL_GENESIS
let rotations_since_genesis = days_passed / genesis_days_rotation;
// Calculate the timestamp of the most recent event
let timestamp =
INITIAL_GENESIS + (rotations_since_genesis * genesis_days_rotation * DAY as u64);
Event { timestamp, content: GENESIS_CONTENTS.to_vec(), parents: [NULL_ID; N_EVENT_PARENTS] }
}
/// Sync the DAG from connected peers
pub async fn dag_sync(&self) -> Result<()> {
// We do an optimistic sync where we ask all our connected peers for
// the DAG tips (unreferenced events) and then we accept the ones we
// see the most times.
// the latest layer DAG tips (unreferenced events) and then we accept
// the ones we see the most times.
// * Compare received tips with local ones, identify which we are missing.
// * Request these from peers
// * Recursively request these backward
@@ -202,8 +192,8 @@ impl EventGraph {
"[EVENTGRAPH] Syncing DAG from {} peers...", communicated_peers,
);
// Here we keep track of the tips and how many time we've seen them.
let mut tips: HashMap<blake3::Hash, usize> = HashMap::new();
// Here we keep track of the tips, their layers and how many time we've seen them.
let mut tips: HashMap<blake3::Hash, (u64, usize)> = HashMap::new();
// Let's first ask all of our peers for their tips and collect them
// in our hashmap above.
@@ -246,11 +236,13 @@ impl EventGraph {
let peer_tips = &peer_tips.0;
// Note down the seen tips
for tip in peer_tips {
if let Some(seen_tip) = tips.get_mut(tip) {
*seen_tip += 1;
} else {
tips.insert(*tip, 1);
for (layer, layer_tips) in peer_tips {
for tip in layer_tips {
if let Some(seen_tip) = tips.get_mut(tip) {
seen_tip.1 += 1;
} else {
tips.insert(*tip, (*layer, 1));
}
}
}
}
@@ -264,24 +256,25 @@ impl EventGraph {
return Err(Error::DagSyncFailed)
}
// We know the number of peers we've communicated with.
// Arbitrarily, let's not consider events we only got once.
// TODO: This should be more sensible depending on the peer number.
// We know the number of peers we've communicated with,
// so we will consider events we saw at more that 2/3 of
// of those peers.
let consideration_threshold = communicated_peers * 2 / 3;
let mut considered_tips = HashSet::new();
for (tip, amount) in tips.iter() {
if amount > &1 {
for (tip, (_, amount)) in tips.iter() {
if amount > &consideration_threshold {
considered_tips.insert(*tip);
}
}
drop(tips);
// Now begin fetching the events backwards.
let mut missing_parents = vec![];
let mut missing_parents = HashSet::new();
for tip in considered_tips.iter() {
assert!(tip != &NULL_ID);
if !self.dag.contains_key(tip.as_bytes()).unwrap() {
missing_parents.push(*tip);
missing_parents.insert(*tip);
}
}
@@ -291,7 +284,8 @@ impl EventGraph {
}
info!(target: "event_graph::dag_sync()", "[EVENTGRAPH] Fetching events");
let mut received_events = vec![];
let mut received_events: BTreeMap<u64, Vec<Event>> = BTreeMap::new();
let mut received_events_hashes = HashSet::new();
while !missing_parents.is_empty() {
for parent_id in missing_parents.clone().iter() {
let mut found_event = false;
@@ -363,9 +357,15 @@ impl EventGraph {
"Got correct parent event {}", parent_id,
);
received_events.push(parent.clone());
let pos = missing_parents.iter().position(|id| id == &parent.id()).unwrap();
missing_parents.remove(pos);
if let Some(layer_events) = received_events.get_mut(&parent.layer) {
layer_events.push(parent.clone());
} else {
let layer_events = vec![parent.clone()];
received_events.insert(parent.layer, layer_events);
}
received_events_hashes.insert(*parent_id);
missing_parents.remove(parent_id);
found_event = true;
// See if we have the upper parents
@@ -374,12 +374,15 @@ impl EventGraph {
continue
}
if !self.dag.contains_key(upper_parent.as_bytes()).unwrap() {
if !missing_parents.contains(upper_parent) &&
!received_events_hashes.contains(upper_parent) &&
!self.dag.contains_key(upper_parent.as_bytes()).unwrap()
{
debug!(
target: "event_graph::dag_sync()",
"Found upper missing parent event{}", upper_parent,
);
missing_parents.push(*upper_parent);
missing_parents.insert(*upper_parent);
}
}
@@ -398,10 +401,15 @@ impl EventGraph {
// At this point we should've got all the events.
// We should add them to the DAG.
// TODO: FIXME: Also validate these events.
// TODO: FIXME: This insert should also be atomic, dag_insert might need a rewrite
let received_events_rev: Vec<Event> = received_events.iter().rev().cloned().collect();
self.dag_insert(&received_events_rev).await.unwrap();
let mut events = vec![];
for (_, tips) in received_events {
for tip in tips {
events.push(tip);
}
}
self.dag_insert(&events).await?;
*self.synced.write().await = true;
info!(target: "event_graph::dag_sync()", "[EVENTGRAPH] DAG synced successfully!");
Ok(())
@@ -411,12 +419,14 @@ impl EventGraph {
async fn dag_prune(&self, genesis_event: Event) -> Result<()> {
debug!(target: "event_graph::dag_prune()", "Pruning DAG...");
// Acquire exclusive locks to unreferenced_tips and broadcasted_ids while
// this operation is happening. We do this to ensure that during the pruning
// operation, no other operations are able to access the intermediate state
// which could lead to producing the wrong state after pruning.
// Acquire exclusive locks to unreferenced_tips, broadcasted_ids and
// current_genesis while this operation is happening. We do this to
// ensure that during the pruning operation, no other operations are
// able to access the intermediate state which could lead to producing
// the wrong state after pruning.
let mut unreferenced_tips = self.unreferenced_tips.write().await;
let mut broadcasted_ids = self.broadcasted_ids.write().await;
let mut current_genesis = self.current_genesis.write().await;
// Atomically clear the DAG and write the new genesis event.
let mut batch = sled::Batch::default();
@@ -431,10 +441,13 @@ impl EventGraph {
}
// Clear unreferenced tips and bcast ids
*unreferenced_tips = HashSet::from([genesis_event.id()]);
*unreferenced_tips = BTreeMap::new();
unreferenced_tips.insert(0, HashSet::from([genesis_event.id()]));
*current_genesis = genesis_event;
*broadcasted_ids = HashSet::new();
drop(unreferenced_tips);
drop(broadcasted_ids);
drop(current_genesis);
debug!(target: "event_graph::dag_prune()", "DAG pruned successfully");
Ok(())
@@ -457,6 +470,7 @@ impl EventGraph {
timestamp: next_rotation,
content: GENESIS_CONTENTS.to_vec(),
parents: [NULL_ID; N_EVENT_PARENTS],
layer: 0,
};
// Sleep until it's time to rotate.
@@ -472,6 +486,9 @@ impl EventGraph {
}
/// Atomically insert given events into the DAG and return the event IDs.
/// All provided events must be valid. An overlay is used over the DAG tree,
/// temporary writting each event in order. After all events have been
/// validated and inserted successfully, we write the overlay to sled.
/// This will append the new events into the unreferenced tips set, and
/// remove the events' parents from it. It will also append the events'
/// level-1 parents to the `broadcasted_ids` set, so the P2P protocol
@@ -479,6 +496,11 @@ impl EventGraph {
/// TODO: The `broadcasted_ids` set should periodically be pruned, when
/// some sensible time has passed after broadcasting the event.
pub async fn dag_insert(&self, events: &[Event]) -> Result<Vec<blake3::Hash>> {
// Sanity check
if events.is_empty() {
return Ok(vec![])
}
// Acquire exclusive locks to `unreferenced_tips and broadcasted_ids`
let mut unreferenced_tips = self.unreferenced_tips.write().await;
let mut broadcasted_ids = self.broadcasted_ids.write().await;
@@ -486,18 +508,52 @@ impl EventGraph {
// Here we keep the IDs to return
let mut ids = Vec::with_capacity(events.len());
// Create an atomic batch
let mut batch = sled::Batch::default();
// Create an overlay over the DAG tree
let mut overlay = SledTreeOverlay::new(&self.dag);
// Iterate over given events
// Grab genesis timestamp
let genesis_timestamp = self.current_genesis.read().await.timestamp;
// Iterate over given events to validate them and
// write them to the overlay
for event in events {
let event_id = event.id();
debug!(
target: "event_graph::dag_insert()",
"Inserting event {} into the DAG", event_id,
);
if !event
.validate(&self.dag, genesis_timestamp, self.days_rotation, Some(&overlay))
.await?
{
error!(target: "event_graph::dag_insert()", "Event {} is invalid!", event_id);
return Err(Error::EventIsInvalid)
}
let event_se = serialize_async(event).await;
// Add the event to the overlay
overlay.insert(event_id.as_bytes(), &event_se)?;
// Note down the event ID to return
ids.push(event_id);
}
// Aggregate changes into a single batch
let batch = overlay.aggregate().unwrap();
// Atomically apply the batch.
// Panic if something is corrupted.
if let Err(e) = self.dag.apply_batch(batch) {
panic!("Failed applying dag_insert batch to sled: {}", e);
}
// Iterate over given events to update references and
// send out notifications about them
for event in events {
let event_id = event.id();
// Update the unreferenced DAG tips set
debug!(
target: "event_graph::dag_insert()",
@@ -509,31 +565,36 @@ impl EventGraph {
target: "event_graph::dag_insert()",
"Removing {} from unreferenced_tips", parent_id,
);
unreferenced_tips.remove(parent_id);
// Iterate over unreferenced tips in previous layers
// and remove the parent
// NOTE: this might be too exhaustive, but the
// assumption is that previous layers unreferenced
// tips will be few.
for (layer, tips) in unreferenced_tips.iter_mut() {
if layer >= &event.layer {
break
}
tips.remove(parent_id);
}
broadcasted_ids.insert(*parent_id);
}
}
unreferenced_tips.retain(|_, tips| !tips.is_empty());
debug!(
target: "event_graph::dag_insert()",
"Adding {} to unreferenced tips", event_id,
);
unreferenced_tips.insert(event_id);
// Add the event to the atomic batch
batch.insert(event_id.as_bytes(), event_se);
if let Some(layer_tips) = unreferenced_tips.get_mut(&event.layer) {
layer_tips.insert(event_id);
} else {
let mut layer_tips = HashSet::new();
layer_tips.insert(event_id);
unreferenced_tips.insert(event.layer, layer_tips);
}
// Note down the event ID to return
ids.push(event_id);
}
// Atomically apply the batch.
// Panic if something is corrupted.
if let Err(e) = self.dag.apply_batch(batch) {
panic!("Failed applying dag_insert batch to sled: {}", e);
}
// Send out notifications about the new events
for event in events {
// Send out notifications about the new event
self.event_sub.notify(event.clone()).await;
}
@@ -552,8 +613,33 @@ impl EventGraph {
Ok(Some(event))
}
/// Find the unreferenced tips in the current DAG state.
async fn find_unreferenced_tips(&self) -> HashSet<blake3::Hash> {
/// Get next layer along with its N_EVENT_PARENTS from the unreferenced
/// tips of the DAG. Since tips are mapped by their layer, we go backwards
/// until we fill the vector, ensuring we always use latest layers tips as
/// parents.
async fn get_next_layer_with_parents(&self) -> (u64, [blake3::Hash; N_EVENT_PARENTS]) {
let unreferenced_tips = self.unreferenced_tips.read().await;
let mut parents = [NULL_ID; N_EVENT_PARENTS];
let mut index = 0;
'outer: for (_, tips) in unreferenced_tips.iter().rev() {
for tip in tips.iter() {
parents[index] = *tip;
index += 1;
if index >= N_EVENT_PARENTS {
break 'outer
}
}
}
let next_layer = unreferenced_tips.last_key_value().unwrap().0 + 1;
assert!(parents.iter().any(|x| x != &NULL_ID));
(next_layer, parents)
}
/// Find the unreferenced tips in the current DAG state, mapped by their layers.
async fn find_unreferenced_tips(&self) -> BTreeMap<u64, HashSet<blake3::Hash>> {
// First get all the event IDs
let mut tips = HashSet::new();
for iter_elem in self.dag.iter() {
@@ -562,6 +648,7 @@ impl EventGraph {
tips.insert(id);
}
// Iterate again to find unreferenced IDs
for iter_elem in self.dag.iter() {
let (_, event) = iter_elem.unwrap();
let event: Event = deserialize_async(&event).await.unwrap();
@@ -570,26 +657,26 @@ impl EventGraph {
}
}
tips
}
/// Get the current set of unreferenced tips in the DAG.
async fn get_unreferenced_tips(&self) -> [blake3::Hash; N_EVENT_PARENTS] {
// TODO: return vec of all instead of N_EVENT_PARENTS
let unreferenced_tips = self.unreferenced_tips.read().await;
let mut tips = [NULL_ID; N_EVENT_PARENTS];
for (i, tip) in unreferenced_tips.iter().take(N_EVENT_PARENTS).enumerate() {
tips[i] = *tip
// Build the layers map
let mut map: BTreeMap<u64, HashSet<blake3::Hash>> = BTreeMap::new();
for tip in tips {
let bytes = self.dag.get(tip.as_bytes()).unwrap().unwrap();
let event: Event = deserialize_async(&bytes).await.unwrap();
if let Some(layer_tips) = map.get_mut(&event.layer) {
layer_tips.insert(tip);
} else {
let mut layer_tips = HashSet::new();
layer_tips.insert(tip);
map.insert(event.layer, layer_tips);
}
}
assert!(tips.iter().any(|x| x != &NULL_ID));
tips
map
}
/// Internal function used for DAG sorting.
async fn get_unreferenced_tips_sorted(&self) -> [blake3::Hash; N_EVENT_PARENTS] {
let tips = self.get_unreferenced_tips().await;
let (_, tips) = self.get_next_layer_with_parents().await;
// Convert the hash to BigUint for sorting
let mut sorted: Vec<_> =

View File

@@ -19,6 +19,7 @@
// TODO: FIXME: Some of the protocols should block operations until DAG is synced.
use std::{
collections::{BTreeMap, HashSet},
sync::{
atomic::{AtomicUsize, Ordering::SeqCst},
Arc,
@@ -26,11 +27,11 @@ use std::{
time::Duration,
};
use darkfi_serial::{async_trait, deserialize_async, SerialDecodable, SerialEncodable};
use darkfi_serial::{async_trait, SerialDecodable, SerialEncodable};
use log::{debug, error, trace, warn};
use smol::Executor;
use super::{Event, EventGraph, EventGraphPtr, NULL_ID};
use super::{Event, EventGraphPtr, NULL_ID};
use crate::{impl_p2p_message, net::*, system::timeout::timeout, Error, Result};
/// Malicious behaviour threshold. If the threshold is reached, we will
@@ -83,7 +84,7 @@ impl_p2p_message!(TipReq, "EventGraph::TipReq");
/// A P2P message representing a reply for the peer's DAG tips
#[derive(Clone, SerialEncodable, SerialDecodable)]
pub struct TipRep(pub Vec<blake3::Hash>);
pub struct TipRep(pub BTreeMap<u64, HashSet<blake3::Hash>>);
impl_p2p_message!(TipRep, "EventGraph::TipRep");
#[async_trait]
@@ -129,6 +130,26 @@ impl ProtocolEventGraph {
}))
}
async fn increase_malicious_count(self: Arc<Self>) -> Result<()> {
let malicious_count = self.malicious_count.fetch_add(1, SeqCst);
if malicious_count + 1 == MALICIOUS_THRESHOLD {
error!(
target: "event_graph::protocol::handle_event_put()",
"[EVENTGRAPH] Peer {} reached malicious threshold. Dropping connection.",
self.channel.address(),
);
self.channel.stop().await;
return Err(Error::ChannelStopped)
}
warn!(
target: "event_graph::protocol::handle_event_put()",
"[EVENTGRAPH] Peer {} sent us a malicious event", self.channel.address(),
);
Ok(())
}
/// Protocol function handling `EventPut`.
/// This is triggered whenever someone broadcasts (or relays) a new
/// event on the network.
@@ -142,50 +163,6 @@ impl ProtocolEventGraph {
target: "event_graph::protocol::handle_event_put()",
"Got EventPut: {} [{}]", event.id(), self.channel.address(),
);
// Check if the event is older than the genesis event. If so, we should
// not include it in our Dag.
// The genesis event marks the last time the Dag has been pruned of old
// events. The pruning interval is defined by the days_rotation field
// of [`EventGraph`].
// TODO it would be better to store/cache this instead of calculating
// on every broadcast/relay.
let genesis_timestamp =
EventGraph::generate_genesis(self.event_graph.days_rotation()).timestamp;
if event.timestamp < genesis_timestamp {
debug!(
target: "event_graph::protocol::handle_event_put()",
"Event {} is older than genesis. Event timestamp: `{}`. Genesis timestamp: `{}`",
event.id(), event.timestamp, genesis_timestamp
);
}
// We received an event. Check if we already have it in our DAG.
// Also check if we have the event's parents. In the case we do
// not have the parents, we'll request them from the peer that has
// sent this event to us. In case they do not reply in time, we drop
// the event.
// Validate the event first. If we do not consider it valid, we
// will just drop it and stay quiet. If the malicious threshold
// is reached, we will stop the connection.
if !event.validate() {
let malicious_count = self.malicious_count.fetch_add(1, SeqCst);
if malicious_count + 1 == MALICIOUS_THRESHOLD {
error!(
target: "event_graph::protocol::handle_event_put()",
"[EVENTGRAPH] Peer {} reached malicious threshold. Dropping connection.",
self.channel.address(),
);
self.channel.stop().await;
return Err(Error::ChannelStopped)
}
warn!(
target: "event_graph::protocol::handle_event_put()",
"[EVENTGRAPH] Peer {} sent us a malicious event", self.channel.address(),
);
continue
}
// If we have already seen the event, we'll stay quiet.
let event_id = event.id();
@@ -197,6 +174,35 @@ impl ProtocolEventGraph {
continue
}
// We received an event. Check if we already have it in our DAG.
// Check event is not older that current genesis event timestamp.
// Also check if we have the event's parents. In the case we do
// not have the parents, we'll request them from the peer that has
// sent this event to us. In case they do not reply in time, we drop
// the event.
// Check if the event is older than the genesis event. If so, we should
// not include it in our Dag.
// The genesis event marks the last time the Dag has been pruned of old
// events. The pruning interval is defined by the days_rotation field
// of [`EventGraph`].
let genesis_timestamp = self.event_graph.current_genesis.read().await.timestamp;
if event.timestamp < genesis_timestamp {
debug!(
target: "event_graph::protocol::handle_event_put()",
"Event {} is older than genesis. Event timestamp: `{}`. Genesis timestamp: `{}`",
event.id(), event.timestamp, genesis_timestamp
);
}
// Validate the new event first. If we do not consider it valid, we
// will just drop it and stay quiet. If the malicious threshold
// is reached, we will stop the connection.
if !event.validate_new() {
self.clone().increase_malicious_count().await?;
continue
}
// At this point, this is a new event to us. Let's see if we
// have all of its parents.
debug!(
@@ -204,16 +210,16 @@ impl ProtocolEventGraph {
"Event {} is new", event_id,
);
let mut missing_parents = vec![];
let mut missing_parents = HashSet::new();
for parent_id in event.parents.iter() {
// `event.validate()` should have already made sure that
// `event.validate_new()` should have already made sure that
// not all parents are NULL, and that there are no duplicates.
if parent_id == &NULL_ID {
continue
}
if !self.event_graph.dag.contains_key(parent_id.as_bytes()).unwrap() {
missing_parents.push(*parent_id);
missing_parents.insert(*parent_id);
}
}
@@ -221,13 +227,13 @@ impl ProtocolEventGraph {
// fetch them from this peer. Do this recursively until we
// find all of them.
if !missing_parents.is_empty() {
// We track the received events in a vec. If/when we get all
// of them, we need to insert them in reverse so the DAG state
// stays correct and unreferenced tips represent the actual thing
// they should. If we insert them out of order, then we might have
// wrong unreferenced tips.
// TODO: What should we do if at some point the events become too old?
let mut received_events = vec![];
// We track the received events mapped by their layer.
// If/when we get all of them, we need to insert them in order so
// the DAG state stays correct and unreferenced tips represent the
// actual thing they should. If we insert them out of order, then
// we might have wrong unreferenced tips.
let mut received_events: BTreeMap<u64, Vec<Event>> = BTreeMap::new();
let mut received_events_hashes = HashSet::new();
debug!(
target: "event_graph::protocol::handle_event_put()",
@@ -271,9 +277,15 @@ impl ProtocolEventGraph {
"Got correct parent event {}", parent.id(),
);
received_events.push(parent.clone());
let pos = missing_parents.iter().position(|id| id == &parent.id()).unwrap();
missing_parents.remove(pos);
if let Some(layer_events) = received_events.get_mut(&parent.layer) {
layer_events.push(parent.clone());
} else {
let layer_events = vec![parent.clone()];
received_events.insert(parent.layer, layer_events);
}
received_events_hashes.insert(*parent_id);
missing_parents.remove(parent_id);
// See if we have the upper parents
for upper_parent in parent.parents.iter() {
@@ -281,13 +293,19 @@ impl ProtocolEventGraph {
continue
}
if !self.event_graph.dag.contains_key(upper_parent.as_bytes()).unwrap()
if !missing_parents.contains(upper_parent) &&
!received_events_hashes.contains(upper_parent) &&
!self
.event_graph
.dag
.contains_key(upper_parent.as_bytes())
.unwrap()
{
debug!(
target: "event_graph::protocol::handle_event_put()",
"Found upper missing parent event{}", upper_parent,
);
missing_parents.push(*upper_parent);
missing_parents.insert(*upper_parent);
}
}
}
@@ -295,19 +313,29 @@ impl ProtocolEventGraph {
// At this point we should've got all the events.
// We should add them to the DAG.
// TODO: FIXME: Also validate these events.
let received_events_rev: Vec<Event> =
received_events.iter().rev().cloned().collect();
self.event_graph.dag_insert(&received_events_rev).await.unwrap();
let mut events = vec![];
for (_, tips) in received_events {
for tip in tips {
events.push(tip);
}
}
if self.event_graph.dag_insert(&events).await.is_err() {
self.clone().increase_malicious_count().await?;
continue
}
} // <-- !missing_parents.is_empty()
// If we're here, we have all the parents, and we can now
// add the actual event to the DAG.
// perform a full validation and add the actual event to
// the DAG.
debug!(
target: "event_graph::protocol::handle_event_put()",
"Got all parents necessary for insertion",
);
self.event_graph.dag_insert(&[event.clone()]).await.unwrap();
if self.event_graph.dag_insert(&[event.clone()]).await.is_err() {
self.clone().increase_malicious_count().await?;
continue
}
// Relay the event to other peers.
self.event_graph
@@ -330,6 +358,15 @@ impl ProtocolEventGraph {
"Got EventReq: {} [{}]", event_id, self.channel.address(),
);
// Check if node has finished syncing its DAG
if !*self.event_graph.synced.read().await {
debug!(
target: "event_graph::protocol::handle_event_put",
"DAG is still syncing, skipping..."
);
continue
}
// We received an event request from somebody.
// If we do have it, we will send it back to them as `EventRep`.
// Otherwise, we'll stay quiet. An honest node should always have
@@ -367,16 +404,12 @@ impl ProtocolEventGraph {
target: "event_graph::protocol::handle_event_req()",
"Fetching event {} from DAG", event_id,
);
let event = self.event_graph.dag.get(event_id.as_bytes()).unwrap().unwrap();
let event: Event = deserialize_async(&event).await.unwrap();
let event = self.event_graph.dag_get(&event_id).await.unwrap().unwrap();
// Check if the event is older than the genesis event. If so, something
// has gone wrong. The event should have been pruned during the last
// rotation.
// TODO it would be better to store/cache this instead of calculating
// on every broadcast/relay.
let genesis_timestamp =
EventGraph::generate_genesis(self.event_graph.days_rotation()).timestamp;
let genesis_timestamp = self.event_graph.current_genesis.read().await.timestamp;
if event.timestamp < genesis_timestamp {
error!(
target: "event_graph::protocol::handle_event_req()",
@@ -416,20 +449,29 @@ impl ProtocolEventGraph {
"Got TipReq [{}]", self.channel.address(),
);
// Check if node has finished syncing its DAG
if !*self.event_graph.synced.read().await {
debug!(
target: "event_graph::protocol::handle_event_put",
"DAG is still syncing, skipping..."
);
continue
}
// TODO: Rate limit
// We received a tip request. Let's find them, add them to
// our bcast ids list, and reply with them.
let mut tips = self.event_graph.get_unreferenced_tips().await.to_vec();
tips.retain(|x| x != &NULL_ID);
let layers = self.event_graph.unreferenced_tips.read().await.clone();
let mut bcast_ids = self.event_graph.broadcasted_ids.write().await;
for tip in tips.iter() {
bcast_ids.insert(*tip);
for (_, tips) in layers.iter() {
for tip in tips {
bcast_ids.insert(*tip);
}
}
drop(bcast_ids);
self.channel.send(&TipRep(tips)).await?;
self.channel.send(&TipRep(layers)).await?;
}
}
}

View File

@@ -20,15 +20,15 @@
use std::sync::Arc;
use log::info;
use rand::{prelude::SliceRandom, Rng};
use log::{info, warn};
use rand::{prelude::SliceRandom, rngs::ThreadRng};
use smol::{channel, future, Executor};
use url::Url;
use crate::{
event_graph::{
proto::{EventPut, ProtocolEventGraph},
Event, EventGraph, NULL_ID,
Event, EventGraph,
},
net::{P2p, Settings, SESSION_ALL},
system::sleep,
@@ -40,9 +40,7 @@ const N_CONNS: usize = 2;
//const N_NODES: usize = 50;
//const N_CONNS: usize = N_NODES / 3;
#[test]
#[ignore]
fn eventgraph_propagation() {
fn init_logger() {
let mut cfg = simplelog::ConfigBuilder::new();
cfg.add_filter_ignore("sled".to_string());
cfg.add_filter_ignore("net::protocol_ping".to_string());
@@ -56,81 +54,87 @@ fn eventgraph_propagation() {
cfg.add_filter_ignore("net::channel::send()".to_string());
cfg.add_filter_ignore("net::channel::start()".to_string());
cfg.add_filter_ignore("net::channel::subscribe_msg()".to_string());
cfg.add_filter_ignore("net::channel::main_receive_loop()".to_string());
cfg.add_filter_ignore("net::tcp".to_string());
simplelog::TermLogger::init(
//simplelog::LevelFilter::Info,
simplelog::LevelFilter::Debug,
// We check this error so we can execute same file tests in parallel,
// otherwise second one fails to init logger here.
if simplelog::TermLogger::init(
simplelog::LevelFilter::Info,
//simplelog::LevelFilter::Debug,
//simplelog::LevelFilter::Trace,
cfg.build(),
simplelog::TerminalMode::Mixed,
simplelog::ColorChoice::Auto,
)
.unwrap();
let ex = Arc::new(Executor::new());
let ex_ = ex.clone();
let (signal, shutdown) = channel::unbounded::<()>();
// Run a thread for each node.
easy_parallel::Parallel::new()
.each(0..N_NODES, |_| future::block_on(ex.run(shutdown.recv())))
.finish(|| {
future::block_on(async {
eventgraph_propagation_real(ex_).await;
drop(signal);
})
});
.is_err()
{
warn!(target: "test_harness", "Logger already initialized");
}
}
async fn eventgraph_propagation_real(ex: Arc<Executor<'static>>) {
let mut eg_instances = vec![];
let mut rng = rand::thread_rng();
async fn spawn_node(
inbound_addrs: Vec<Url>,
peers: Vec<Url>,
ex: Arc<Executor<'static>>,
) -> Arc<EventGraph> {
let settings = Settings {
localnet: true,
inbound_addrs,
outbound_connections: 0,
outbound_connect_timeout: 2,
inbound_connections: usize::MAX,
peers,
allowed_transports: vec!["tcp".to_string()],
..Default::default()
};
let mut genesis_event_id = NULL_ID;
let p2p = P2p::new(settings, ex.clone()).await;
let sled_db = sled::Config::new().temporary(true).open().unwrap();
let event_graph = EventGraph::new(p2p.clone(), sled_db, "dag", 1, ex.clone()).await.unwrap();
*event_graph.synced.write().await = true;
let event_graph_ = event_graph.clone();
// Register the P2P protocols
let registry = p2p.protocol_registry();
registry
.register(SESSION_ALL, move |channel, _| {
let event_graph_ = event_graph_.clone();
async move { ProtocolEventGraph::init(event_graph_, channel).await.unwrap() }
})
.await;
event_graph
}
async fn bootstrap_nodes(
peer_indexes: &[usize],
starting_port: usize,
rng: &mut ThreadRng,
ex: Arc<Executor<'static>>,
) -> Vec<Arc<EventGraph>> {
let mut eg_instances = vec![];
// Initialize the nodes
for i in 0..N_NODES {
// Everyone will connect to N_CONNS random peers.
let mut peer_indexes_copy = peer_indexes.to_owned();
peer_indexes_copy.remove(i);
let peer_indexes_to_connect: Vec<_> =
peer_indexes_copy.choose_multiple(rng, N_CONNS).collect();
let mut peers = vec![];
for _ in 0..N_CONNS {
let mut port = 13200 + i;
while port == 13200 + i {
port = 13200 + rng.gen_range(0..N_NODES);
}
for peer_index in peer_indexes_to_connect {
let port = starting_port + peer_index;
peers.push(Url::parse(&format!("tcp://127.0.0.1:{}", port)).unwrap());
}
let settings = Settings {
localnet: true,
inbound_addrs: vec![Url::parse(&format!("tcp://127.0.0.1:{}", 13200 + i)).unwrap()],
outbound_connections: 0,
outbound_connect_timeout: 2,
inbound_connections: usize::MAX,
let event_graph = spawn_node(
vec![Url::parse(&format!("tcp://127.0.0.1:{}", starting_port + i)).unwrap()],
peers,
allowed_transports: vec!["tcp".to_string()],
..Default::default()
};
let p2p = P2p::new(settings, ex.clone()).await;
let sled_db = sled::Config::new().temporary(true).open().unwrap();
let event_graph =
EventGraph::new(p2p.clone(), sled_db, "dag", 1, ex.clone()).await.unwrap();
let event_graph_ = event_graph.clone();
// Take the last sled item since there's only 1
if genesis_event_id == NULL_ID {
let (id, _) = event_graph.dag.last().unwrap().unwrap();
genesis_event_id = blake3::Hash::from_bytes((&id as &[u8]).try_into().unwrap());
}
// Register the P2P protocols
let registry = p2p.protocol_registry();
registry
.register(SESSION_ALL, move |channel, _| {
let event_graph_ = event_graph_.clone();
async move { ProtocolEventGraph::init(event_graph_, channel).await.unwrap() }
})
.await;
ex.clone(),
)
.await;
eg_instances.push(event_graph);
}
@@ -140,64 +144,121 @@ async fn eventgraph_propagation_real(ex: Arc<Executor<'static>>) {
eg.p2p.clone().start().await.unwrap();
}
info!("Waiting 10s until all peers connect");
sleep(10).await;
info!("Waiting 5s until all peers connect");
sleep(5).await;
eg_instances
}
async fn assert_dags(
eg_instances: &Vec<Arc<EventGraph>>,
expected_len: usize,
rng: &mut ThreadRng,
) {
let random_node = eg_instances.choose(rng).unwrap();
let last_layer_tips =
random_node.unreferenced_tips.read().await.last_key_value().unwrap().1.clone();
for (i, eg) in eg_instances.iter().enumerate() {
let node_last_layer_tips =
eg.unreferenced_tips.read().await.last_key_value().unwrap().1.clone();
assert!(
eg.dag.len() == expected_len,
"Node {}, expected {} events, have {}",
i,
expected_len,
eg.dag.len()
);
assert_eq!(
node_last_layer_tips, last_layer_tips,
"Node {} contains malformed unreferenced tips",
i
);
}
}
macro_rules! test_body {
($real_call:ident) => {
init_logger();
let ex = Arc::new(Executor::new());
let ex_ = ex.clone();
let (signal, shutdown) = channel::unbounded::<()>();
// Run a thread for each node.
easy_parallel::Parallel::new()
.each(0..N_NODES, |_| future::block_on(ex.run(shutdown.recv())))
.finish(|| {
future::block_on(async {
$real_call(ex_).await;
drop(signal);
})
});
};
}
#[test]
fn eventgraph_propagation() {
test_body!(eventgraph_propagation_real);
}
async fn eventgraph_propagation_real(ex: Arc<Executor<'static>>) {
let mut rng = rand::thread_rng();
let peer_indexes: Vec<usize> = (0..N_NODES).collect();
// Bootstrap nodes
let mut eg_instances = bootstrap_nodes(&peer_indexes, 13200, &mut rng, ex.clone()).await;
// Grab genesis event
let random_node = eg_instances.choose(&mut rng).unwrap();
let (id, _) = random_node.dag.last().unwrap().unwrap();
let genesis_event_id = blake3::Hash::from_bytes((&id as &[u8]).try_into().unwrap());
// =========================================
// 1. Assert that everyone's DAG is the same
// =========================================
for (i, eg) in eg_instances.iter().enumerate() {
let tips = eg.unreferenced_tips.read().await;
assert!(eg.dag.len() == 1, "Node {}", i);
assert!(tips.len() == 1, "Node {}", i);
assert!(tips.get(&genesis_event_id).is_some(), "Node {}", i);
}
assert_dags(&eg_instances, 1, &mut rng).await;
// ==========================================
// 2. Create an event in one node and publish
// ==========================================
let random_node = eg_instances.choose(&mut rand::thread_rng()).unwrap();
let event = Event::new(vec![1, 2, 3, 4], random_node.clone()).await;
let random_node = eg_instances.choose(&mut rng).unwrap();
let event = Event::new(vec![1, 2, 3, 4], random_node).await;
assert!(event.parents.contains(&genesis_event_id));
// The node adds it to their DAG.
// The node adds it to their DAG, on layer 1.
let event_id = random_node.dag_insert(&[event.clone()]).await.unwrap()[0];
let tips = random_node.unreferenced_tips.read().await;
assert!(tips.len() == 1);
assert!(tips.get(&event_id).is_some());
drop(tips);
let tips_layers = random_node.unreferenced_tips.read().await;
// Since genesis was referenced, its layer (0) have been removed
assert_eq!(tips_layers.len(), 1);
assert!(tips_layers.last_key_value().unwrap().1.get(&event_id).is_some());
drop(tips_layers);
info!("Broadcasting event {}", event_id);
random_node.p2p.broadcast(&EventPut(event)).await;
info!("Waiting 10s for event propagation");
sleep(10).await;
info!("Waiting 5s for event propagation");
sleep(5).await;
// ====================================================
// 3. Assert that everyone has the new event in the DAG
// ====================================================
for (i, eg) in eg_instances.iter().enumerate() {
let tips = eg.unreferenced_tips.read().await;
assert!(eg.dag.len() == 2, "Node {}", i);
assert!(tips.len() == 1, "Node {}", i);
assert!(tips.get(&event_id).is_some(), "Node {}", i);
}
assert_dags(&eg_instances, 2, &mut rng).await;
// ==============================================================
// 4. Create multiple events on a node and broadcast the last one
// The `EventPut` logic should manage to fetch all of them,
// provided that the last one references the earlier ones.
// ==============================================================
let random_node = eg_instances.choose(&mut rand::thread_rng()).unwrap();
let event0 = Event::new(vec![1, 2, 3, 4, 0], random_node.clone()).await;
let random_node = eg_instances.choose(&mut rng).unwrap();
let event0 = Event::new(vec![1, 2, 3, 4, 0], random_node).await;
let event0_id = random_node.dag_insert(&[event0.clone()]).await.unwrap()[0];
let event1 = Event::new(vec![1, 2, 3, 4, 1], random_node.clone()).await;
let event1 = Event::new(vec![1, 2, 3, 4, 1], random_node).await;
let event1_id = random_node.dag_insert(&[event1.clone()]).await.unwrap()[0];
let event2 = Event::new(vec![1, 2, 3, 4, 2], random_node.clone()).await;
let event2 = Event::new(vec![1, 2, 3, 4, 2], random_node).await;
let event2_id = random_node.dag_insert(&[event2.clone()]).await.unwrap()[0];
// Genesis event + event from 2. + upper 3 events
assert!(random_node.dag.len() == 5);
let tips = random_node.unreferenced_tips.read().await;
assert!(tips.len() == 1);
assert!(tips.get(&event2_id).is_some());
drop(tips);
// Genesis event + event from 2. + upper 3 events (layer 4)
assert_eq!(random_node.dag.len(), 5);
let tips_layers = random_node.unreferenced_tips.read().await;
assert_eq!(tips_layers.len(), 1);
assert!(tips_layers.get(&4).unwrap().get(&event2_id).is_some());
drop(tips_layers);
let event_chain =
vec![(event0_id, event0.parents), (event1_id, event1.parents), (event2_id, event2.parents)];
@@ -205,142 +266,186 @@ async fn eventgraph_propagation_real(ex: Arc<Executor<'static>>) {
info!("Broadcasting event {}", event2_id);
info!("Event chain: {:#?}", event_chain);
random_node.p2p.broadcast(&EventPut(event2)).await;
info!("Waiting 10s for event propagation");
sleep(10).await;
info!("Waiting 5s for event propagation");
sleep(5).await;
// ==========================================
// 5. Assert that everyone has all the events
// ==========================================
for (i, eg) in eg_instances.iter().enumerate() {
let tips = eg.unreferenced_tips.read().await;
assert!(eg.dag.len() == 5, "Node {}, expected 5 events, have {}", i, eg.dag.len());
assert!(tips.len() == 1, "Node {}, expected 1 tip, have {}", i, tips.len());
assert!(tips.get(&event2_id).is_some(), "Node {}, expected tip to be {}", i, event2_id);
}
assert_dags(&eg_instances, 5, &mut rng).await;
// ===========================================
// 6. Create multiple events on multiple nodes
// ===========================================
// node 1
// =======
let node1 = eg_instances.choose(&mut rand::thread_rng()).unwrap();
let event0_1 = Event::new(vec![1, 2, 3, 4, 3], node1.clone()).await;
let _ = node1.dag_insert(&[event0_1.clone()]).await.unwrap()[0];
let node1 = eg_instances.choose(&mut rng).unwrap();
let event0_1 = Event::new(vec![1, 2, 3, 4, 3], node1).await;
node1.dag_insert(&[event0_1.clone()]).await.unwrap();
node1.p2p.broadcast(&EventPut(event0_1)).await;
let event1_1 = Event::new(vec![1, 2, 3, 4, 4], node1.clone()).await;
let _ = node1.dag_insert(&[event1_1.clone()]).await.unwrap()[0];
let event1_1 = Event::new(vec![1, 2, 3, 4, 4], node1).await;
node1.dag_insert(&[event1_1.clone()]).await.unwrap();
node1.p2p.broadcast(&EventPut(event1_1)).await;
let event2_1 = Event::new(vec![1, 2, 3, 4, 5], node1.clone()).await;
let _ = node1.dag_insert(&[event2_1.clone()]).await.unwrap()[0];
let event2_1 = Event::new(vec![1, 2, 3, 4, 5], node1).await;
node1.dag_insert(&[event2_1.clone()]).await.unwrap();
node1.p2p.broadcast(&EventPut(event2_1)).await;
// =======
// node 2
// =======
let node2 = eg_instances.choose(&mut rand::thread_rng()).unwrap();
let event0_2 = Event::new(vec![1, 2, 3, 4, 6], node2.clone()).await;
let _ = node2.dag_insert(&[event0_2.clone()]).await.unwrap()[0];
let node2 = eg_instances.choose(&mut rng).unwrap();
let event0_2 = Event::new(vec![1, 2, 3, 4, 6], node2).await;
node2.dag_insert(&[event0_2.clone()]).await.unwrap();
node2.p2p.broadcast(&EventPut(event0_2)).await;
let event1_2 = Event::new(vec![1, 2, 3, 4, 7], node2.clone()).await;
let _ = node2.dag_insert(&[event1_2.clone()]).await.unwrap()[0];
let event1_2 = Event::new(vec![1, 2, 3, 4, 7], node2).await;
node2.dag_insert(&[event1_2.clone()]).await.unwrap();
node2.p2p.broadcast(&EventPut(event1_2)).await;
let event2_2 = Event::new(vec![1, 2, 3, 4, 8], node2.clone()).await;
let _ = node2.dag_insert(&[event2_2.clone()]).await.unwrap()[0];
let event2_2 = Event::new(vec![1, 2, 3, 4, 8], node2).await;
node2.dag_insert(&[event2_2.clone()]).await.unwrap();
node2.p2p.broadcast(&EventPut(event2_2)).await;
// =======
// node 3
// =======
let node3 = eg_instances.choose(&mut rand::thread_rng()).unwrap();
let event0_3 = Event::new(vec![1, 2, 3, 4, 9], node3.clone()).await;
let _ = node3.dag_insert(&[event0_3.clone()]).await.unwrap()[0];
let node3 = eg_instances.choose(&mut rng).unwrap();
let event0_3 = Event::new(vec![1, 2, 3, 4, 9], node3).await;
node3.dag_insert(&[event0_3.clone()]).await.unwrap();
node2.p2p.broadcast(&EventPut(event0_3)).await;
let event1_3 = Event::new(vec![1, 2, 3, 4, 10], node3.clone()).await;
let _ = node3.dag_insert(&[event1_3.clone()]).await.unwrap()[0];
let event1_3 = Event::new(vec![1, 2, 3, 4, 10], node3).await;
node3.dag_insert(&[event1_3.clone()]).await.unwrap();
node2.p2p.broadcast(&EventPut(event1_3)).await;
let event2_3 = Event::new(vec![1, 2, 3, 4, 11], node3.clone()).await;
let event2_3_id = node3.dag_insert(&[event2_3.clone()]).await.unwrap()[0];
let event2_3 = Event::new(vec![1, 2, 3, 4, 11], node3).await;
node3.dag_insert(&[event2_3.clone()]).await.unwrap();
node3.p2p.broadcast(&EventPut(event2_3)).await;
info!("Waiting 10s for events propagation");
sleep(10).await;
info!("Waiting 5s for events propagation");
sleep(5).await;
// ==========================================
// 7. Assert that everyone has all the events
// ==========================================
for (i, eg) in eg_instances.iter().enumerate() {
let tips = eg.unreferenced_tips.read().await;
assert!(eg.dag.len() == 14, "Node {}, expected 14 events, have {}", i, eg.dag.len());
// 5 events from 2. and 4. + 9 events from 6. = ^
assert!(tips.get(&event2_3_id).is_some(), "Node {}, expected tip to be {}", i, event2_3_id);
}
// 5 events from 2. and 4. + 9 events from 6. = 14
assert_dags(&eg_instances, 14, &mut rng).await;
// ============================================================
// 8. Start a new node and try to sync the DAG from other peers
// ============================================================
{
// Connect to N_CONNS random peers.
let peer_indexes_to_connect: Vec<_> =
peer_indexes.choose_multiple(&mut rng, N_CONNS).collect();
let mut peers = vec![];
for _ in 0..N_CONNS {
let port = 13200 + rng.gen_range(0..N_NODES);
for peer_index in peer_indexes_to_connect {
let port = 13200 + peer_index;
peers.push(Url::parse(&format!("tcp://127.0.0.1:{}", port)).unwrap());
}
let settings = Settings {
localnet: true,
inbound_addrs: vec![
Url::parse(&format!("tcp://127.0.0.1:{}", 13200 + N_NODES + 1)).unwrap()
],
outbound_connections: 0,
outbound_connect_timeout: 2,
inbound_connections: usize::MAX,
let event_graph = spawn_node(
vec![Url::parse(&format!("tcp://127.0.0.1:{}", 13200 + N_NODES + 1)).unwrap()],
peers,
allowed_transports: vec!["tcp".to_string()],
..Default::default()
};
let p2p = P2p::new(settings, ex.clone()).await;
let sled_db = sled::Config::new().temporary(true).open().unwrap();
let event_graph =
EventGraph::new(p2p.clone(), sled_db, "dag", 1, ex.clone()).await.unwrap();
let event_graph_ = event_graph.clone();
// Register the P2P protocols
let registry = p2p.protocol_registry();
registry
.register(SESSION_ALL, move |channel, _| {
let event_graph_ = event_graph_.clone();
async move { ProtocolEventGraph::init(event_graph_, channel).await.unwrap() }
})
.await;
ex.clone(),
)
.await;
eg_instances.push(event_graph.clone());
event_graph.p2p.clone().start().await.unwrap();
info!("Waiting 10s for new node connection");
sleep(10).await;
info!("Waiting 5s for new node connection");
sleep(5).await;
event_graph.dag_sync().await.unwrap();
event_graph.dag_sync().await.unwrap()
}
info!("Waiting 10s for things to settle");
sleep(10).await;
// ============================================================
// 9. Assert the new synced DAG has the same contents as others
// ============================================================
for (i, eg) in eg_instances.iter().enumerate() {
let tips = eg.unreferenced_tips.read().await;
assert!(eg.dag.len() == 14, "Node {}, expected 14 events, have {}", i, eg.dag.len());
// 5 events from 2. and 4. + 9 events from 6. = ^
assert!(tips.get(&event2_3_id).is_some(), "Node {}, expected tip to be {}", i, event2_3_id);
}
// 5 events from 2. and 4. + 9 events from 6. = 14
assert_dags(&eg_instances, 14, &mut rng).await;
// Stop the P2P network
for eg in eg_instances.iter() {
eg.p2p.clone().stop().await;
}
}
#[test]
#[ignore]
fn eventgraph_chaotic_propagation() {
test_body!(eventgraph_chaotic_propagation_real);
}
async fn eventgraph_chaotic_propagation_real(ex: Arc<Executor<'static>>) {
let mut rng = rand::thread_rng();
let peer_indexes: Vec<usize> = (0..N_NODES).collect();
let n_events: usize = 100000;
// Bootstrap nodes
let mut eg_instances = bootstrap_nodes(&peer_indexes, 14200, &mut rng, ex.clone()).await;
// =========================================
// 1. Assert that everyone's DAG is the same
// =========================================
assert_dags(&eg_instances, 1, &mut rng).await;
// ===========================================
// 2. Create multiple events on multiple nodes
for i in 0..n_events {
let random_node = eg_instances.choose(&mut rng).unwrap();
let event = Event::new(i.to_be_bytes().to_vec(), random_node).await;
random_node.dag_insert(&[event.clone()]).await.unwrap();
random_node.p2p.broadcast(&EventPut(event)).await;
}
info!("Waiting 5s for events propagation");
sleep(5).await;
// ==========================================
// 3. Assert that everyone has all the events
// ==========================================
assert_dags(&eg_instances, n_events + 1, &mut rng).await;
// ============================================================
// 4. Start a new node and try to sync the DAG from other peers
// ============================================================
{
// Connect to N_CONNS random peers.
let peer_indexes_to_connect: Vec<_> =
peer_indexes.choose_multiple(&mut rng, N_CONNS).collect();
let mut peers = vec![];
for peer_index in peer_indexes_to_connect {
let port = 14200 + peer_index;
peers.push(Url::parse(&format!("tcp://127.0.0.1:{}", port)).unwrap());
}
let event_graph = spawn_node(
vec![Url::parse(&format!("tcp://127.0.0.1:{}", 14200 + N_NODES + 1)).unwrap()],
peers,
ex.clone(),
)
.await;
eg_instances.push(event_graph.clone());
event_graph.p2p.clone().start().await.unwrap();
info!("Waiting 5s for new node connection");
sleep(5).await;
event_graph.dag_sync().await.unwrap()
}
// ============================================================
// 5. Assert the new synced DAG has the same contents as others
// ============================================================
assert_dags(&eg_instances, n_events + 1, &mut rng).await;
// Stop the P2P network
for eg in eg_instances.iter() {

View File

@@ -18,6 +18,8 @@
use std::time::UNIX_EPOCH;
use crate::event_graph::{Event, GENESIS_CONTENTS, INITIAL_GENESIS, NULL_ID, N_EVENT_PARENTS};
/// Seconds in a day
pub(super) const DAY: i64 = 86400;
@@ -89,6 +91,29 @@ pub(super) fn seconds_until_next_rotation(next_rotation: u64) -> u64 {
next_rotation - now
}
/// Generate a deterministic genesis event corresponding to the DAG's configuration.
pub(super) fn generate_genesis(days_rotation: u64) -> Event {
// Days rotation is u64 except zero
let genesis_days_rotation = if days_rotation == 0 { 1 } else { days_rotation };
// First check how many days passed since initial genesis.
let days_passed = days_since(INITIAL_GENESIS);
// Calculate the number of days_rotation intervals since INITIAL_GENESIS
let rotations_since_genesis = days_passed / genesis_days_rotation;
// Calculate the timestamp of the most recent event
let timestamp =
INITIAL_GENESIS + (rotations_since_genesis * genesis_days_rotation * DAY as u64);
Event {
timestamp,
content: GENESIS_CONTENTS.to_vec(),
parents: [NULL_ID; N_EVENT_PARENTS],
layer: 0,
}
}
#[cfg(test)]
mod tests {
use crate::event_graph::INITIAL_GENESIS;

View File

@@ -142,6 +142,7 @@ impl std::cmp::Eq for StoppableTask {}
mod tests {
use super::*;
use crate::{error::Error, system::sleep_forever};
use log::warn;
use smol::Executor;
use std::sync::Arc;
@@ -150,13 +151,21 @@ mod tests {
let mut cfg = simplelog::ConfigBuilder::new();
cfg.add_filter_ignore("async_io".to_string());
cfg.add_filter_ignore("polling".to_string());
simplelog::TermLogger::init(
// We check this error so we can execute same file tests in parallel,
// otherwise second one fails to init logger here.
if simplelog::TermLogger::init(
//simplelog::LevelFilter::Info,
//simplelog::LevelFilter::Debug,
simplelog::LevelFilter::Trace,
cfg.build(),
simplelog::TerminalMode::Mixed,
simplelog::ColorChoice::Auto,
)
.unwrap();
.is_err()
{
warn!(target: "test_harness", "Logger already initialized");
}
let executor = Arc::new(Executor::new());
let executor_ = executor.clone();

View File

@@ -29,7 +29,7 @@ use darkfi_sdk::{
pasta::pallas,
};
use halo2_proofs::dev::MockProver;
use log::info;
use log::{info, warn};
use rand::rngs::OsRng;
pub const SECRET_KEY_PREFIX: pallas::Base = pallas::Base::from_raw([4, 0, 0, 0]);
@@ -46,14 +46,21 @@ pub const HEADSTART: pallas::Base = pallas::Base::from_raw([
#[test]
fn consensus_prop() -> Result<()> {
simplelog::TermLogger::init(
let mut cfg = simplelog::ConfigBuilder::new();
// We check this error so we can execute same file tests in parallel,
// otherwise second one fails to init logger here.
if simplelog::TermLogger::init(
simplelog::LevelFilter::Info,
//simplelog::LevelFilter::Debug,
//simplelog::LevelFilter::Trace,
simplelog::ConfigBuilder::new().build(),
cfg.build(),
simplelog::TerminalMode::Mixed,
simplelog::ColorChoice::Auto,
)
.unwrap();
.is_err()
{
warn!(target: "test_harness", "Logger already initialized");
}
let input_serial = pallas::Base::from(pallas::Base::from(10));
//let input_serial = pallas::Base::from(pallas::Base::from(2));