From 921af416df6f8d899837d25cfb537b81690898e5 Mon Sep 17 00:00:00 2001 From: dasman Date: Mon, 15 Sep 2025 15:36:05 +0300 Subject: [PATCH] evgr2: merge multi-dag branch one is able to sync up to 5 dags each of 24hrs, pruning means dropping the oldest (fifth) dag and prepend a new one. --- bin/darkirc/darkirc_config.toml | 9 + bin/darkirc/src/irc/client.rs | 6 +- bin/darkirc/src/main.rs | 23 +- bin/genev/genevd/src/main.rs | 3 +- bin/genev/genevd/src/rpc.rs | 9 +- bin/tau/taud/src/main.rs | 10 +- .../darkirc_full_node1.toml | 11 +- .../darkirc_full_node2.toml | 11 +- .../darkirc_full_node3.toml | 11 +- .../darkirc_full_node4.toml | 11 +- .../darkirc-four-nodes/tmux_sessions.sh | 4 +- src/event_graph/event.rs | 74 +- src/event_graph/mod.rs | 683 ++++++++++-------- src/event_graph/proto.rs | 128 ++-- src/event_graph/tests.rs | 108 +-- 15 files changed, 660 insertions(+), 441 deletions(-) diff --git a/bin/darkirc/darkirc_config.toml b/bin/darkirc/darkirc_config.toml index 25cf6e3d4..467d7d2c0 100644 --- a/bin/darkirc/darkirc_config.toml +++ b/bin/darkirc/darkirc_config.toml @@ -10,6 +10,11 @@ ## TLS secret key path if IRC acceptor uses TLS (optional) #irc_tls_secret = "/etc/letsencrypt/darkirc/privkey.pem" +## How many DAGs to be synced (currently each DAG represents a 24hr msg +## history counting from UTC midnight), increasing this number means +## you get/sync previous days msg history as well (max. 5) +#dags_count = 1 + ## Sets Datastore Path #datastore = "~/.local/share/darkfi/darkirc/darkirc_db" @@ -42,6 +47,10 @@ autojoin = [ # Set log level. 1 is info (default), 2 is debug, 3 is trace #verbose = 2 +## Running darkirc in header-only mode +## history won't be fetched but DAG sync fast +#fast_mod = false + ## JSON-RPC settings [rpc] ## JSON-RPC listen URL diff --git a/bin/darkirc/src/irc/client.rs b/bin/darkirc/src/irc/client.rs index c5c085209..50fe6f1e3 100644 --- a/bin/darkirc/src/irc/client.rs +++ b/bin/darkirc/src/irc/client.rs @@ -194,12 +194,14 @@ impl Client { // Update the last sent event. let event_id = event.header.id(); *self.last_sent.write().await = event_id; + let current_genesis = self.server.darkirc.event_graph.current_genesis.read().await; + let dag_name = current_genesis.id().to_string(); // If it fails for some reason, for now, we just note it and pass. - if let Err(e) = self.server.darkirc.event_graph.header_dag_insert(vec![event.header.clone()]).await { + if let Err(e) = self.server.darkirc.event_graph.header_dag_insert(vec![event.header.clone()], &dag_name).await { error!("[IRC CLIENT] Failed inserting new header to Header DAG: {}", e); } - if let Err(e) = self.server.darkirc.event_graph.dag_insert(&[event.clone()]).await { + if let Err(e) = self.server.darkirc.event_graph.dag_insert(&[event.clone()], &dag_name).await { error!("[IRC CLIENT] Failed inserting new event to DAG: {}", e); } else { // We sent this, so it should be considered seen. diff --git a/bin/darkirc/src/main.rs b/bin/darkirc/src/main.rs index 9f757b518..10bc7eea7 100644 --- a/bin/darkirc/src/main.rs +++ b/bin/darkirc/src/main.rs @@ -90,6 +90,10 @@ struct Args { /// Optional TLS certificate key file path if `irc_listen` uses TLS irc_tls_secret: Option, + /// How many DAGs to sync. + #[structopt(short, long, default_value = "1")] + dags_count: usize, + #[structopt(short, long, default_value = "~/.local/share/darkfi/darkirc_db")] /// Datastore (DB) path datastore: String, @@ -358,7 +362,6 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> { replay_datastore.clone(), replay_mode, fast_mode, - "darkirc_dag", 1, ex.clone(), ) @@ -502,7 +505,9 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> { } // Initial DAG sync - if let Err(e) = sync_task(&p2p, &event_graph, args.skip_dag_sync, args.fast_mode).await { + if let Err(e) = + sync_task(&p2p, &event_graph, args.skip_dag_sync, args.fast_mode, args.dags_count).await + { error!("DAG sync task failed to start: {e}"); return Err(e); }; @@ -510,7 +515,13 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> { // Stoppable task to monitor network and resync on disconnect. let sync_mon_task = StoppableTask::new(); sync_mon_task.clone().start( - sync_and_monitor(p2p.clone(), event_graph.clone(), args.skip_dag_sync, args.fast_mode), + sync_and_monitor( + p2p.clone(), + event_graph.clone(), + args.skip_dag_sync, + args.fast_mode, + args.dags_count, + ), |res| async move { match res { Ok(()) | Err(Error::DetachedTaskStopped) => { /* TODO: */ } @@ -557,6 +568,7 @@ async fn sync_task( event_graph: &EventGraphPtr, skip_dag_sync: bool, fast_mode: bool, + dags_count: usize, ) -> Result<()> { let comms_timeout = p2p.settings().read().await.outbound_connect_timeout; @@ -566,7 +578,7 @@ async fn sync_task( // We'll attempt to sync for ever if !skip_dag_sync { info!("Syncing event DAG"); - match event_graph.dag_sync(fast_mode).await { + match event_graph.sync_selected(dags_count, fast_mode).await { Ok(()) => break, Err(e) => { // TODO: Maybe at this point we should prune or something? @@ -594,6 +606,7 @@ async fn sync_and_monitor( event_graph: EventGraphPtr, skip_dag_sync: bool, fast_mode: bool, + dags_count: usize, ) -> Result<()> { loop { let net_subscription = p2p.hosts().subscribe_disconnect().await; @@ -606,7 +619,7 @@ async fn sync_and_monitor( // Sync node again info!("Network disconnection detected, resyncing..."); *event_graph.synced.write().await = false; - sync_task(&p2p, &event_graph, skip_dag_sync, fast_mode).await?; + sync_task(&p2p, &event_graph, skip_dag_sync, fast_mode, dags_count).await?; } Err(e) => return Err(e), } diff --git a/bin/genev/genevd/src/main.rs b/bin/genev/genevd/src/main.rs index c0e14d705..492a59e15 100644 --- a/bin/genev/genevd/src/main.rs +++ b/bin/genev/genevd/src/main.rs @@ -131,7 +131,6 @@ async fn realmain(settings: Args, executor: Arc>) -> Res replay_datastore, replay_mode, fast_mode, - "genevd_dag", 1, executor.clone(), ) @@ -158,7 +157,7 @@ async fn realmain(settings: Args, executor: Arc>) -> Res if !settings.skip_dag_sync { for i in 1..=6 { info!("Syncing event DAG (attempt #{})", i); - match event_graph.dag_sync(settings.fast_mode).await { + match event_graph.sync_selected(1, settings.fast_mode).await { Ok(()) => break, Err(e) => { if i == 6 { diff --git a/bin/genev/genevd/src/rpc.rs b/bin/genev/genevd/src/rpc.rs index 45da9b9dd..10ba2ff48 100644 --- a/bin/genev/genevd/src/rpc.rs +++ b/bin/genev/genevd/src/rpc.rs @@ -208,7 +208,14 @@ impl JsonRpcInterface { // Build a DAG event and return it. let event = Event::new(serialize_async(&genevent).await, &self.event_graph).await; - if let Err(e) = self.event_graph.dag_insert(&[event.clone()]).await { + let current_genesis = self.event_graph.current_genesis.read().await; + let dag_name = current_genesis.id().to_string(); + if let Err(e) = + self.event_graph.header_dag_insert(vec![event.header.clone()], &dag_name).await + { + error!("Failed inserting new header to Header DAG: {}", e); + } + if let Err(e) = self.event_graph.dag_insert(&[event.clone()], &dag_name).await { error!("Failed inserting new event to DAG: {}", e); } else { // Otherwise, broadcast it diff --git a/bin/tau/taud/src/main.rs b/bin/tau/taud/src/main.rs index ad265b3cd..43319763d 100644 --- a/bin/tau/taud/src/main.rs +++ b/bin/tau/taud/src/main.rs @@ -321,7 +321,12 @@ async fn start_sync_loop( // If it fails for some reason, for now, we just note it // and pass. - if let Err(e) = event_graph.dag_insert(&[event.clone()]).await { + let current_genesis = event_graph.current_genesis.read().await; + let dag_name = current_genesis.id().to_string(); + if let Err(e) = event_graph.header_dag_insert(vec![event.header.clone()], &dag_name).await { + error!(target: "taud", "Failed inserting new header to DAG: {}", e); + } + if let Err(e) = event_graph.dag_insert(&[event.clone()], &dag_name).await { error!(target: "taud", "Failed inserting new event to DAG: {}", e); } else { // Otherwise, broadcast it @@ -526,7 +531,6 @@ async fn realmain(settings: Args, executor: Arc>) -> Res replay_datastore, replay_mode, fast_mode, - "taud_dag", 0, executor.clone(), ) @@ -555,7 +559,7 @@ async fn realmain(settings: Args, executor: Arc>) -> Res // We'll attempt to sync for ever if !settings.skip_dag_sync { info!(target: "taud", "Syncing event DAG"); - match event_graph.dag_sync(settings.fast_mode).await { + match event_graph.sync_selected(1, settings.fast_mode).await { Ok(()) => break, Err(e) => { // TODO: Maybe at this point we should prune or something? diff --git a/contrib/localnet/darkirc-four-nodes/darkirc_full_node1.toml b/contrib/localnet/darkirc-four-nodes/darkirc_full_node1.toml index 0044340c1..10067d4c8 100644 --- a/contrib/localnet/darkirc-four-nodes/darkirc_full_node1.toml +++ b/contrib/localnet/darkirc-four-nodes/darkirc_full_node1.toml @@ -1,6 +1,11 @@ ## IRC listen URL irc_listen = "tcp://127.0.0.1:22022" +## How many DAGs to be synced (currently each DAG represents a 24hr msg +## history counting from UTC midnight), increasing this number means +## you get/sync previous days msg history as well (max. 5) +dags_count = 1 + ## Sets Datastore Path datastore = "darkirc1" @@ -8,10 +13,14 @@ datastore = "darkirc1" autojoin = ["#dev", "#test"] # Log to file. Off by default. -#log = "/tmp/darkirc1.log" +log = "/tmp/darkirc1.log" # Set log level. 1 is info (default), 2 is debug, 3 is trace #verbose = 2 +## Running darkirc in header-only mode +## history won't be fetched but DAG sync fast +#fast_mod = false + ## JSON-RPC settings [rpc] ## JSON-RPC listen URL diff --git a/contrib/localnet/darkirc-four-nodes/darkirc_full_node2.toml b/contrib/localnet/darkirc-four-nodes/darkirc_full_node2.toml index c58304425..385f8d33e 100644 --- a/contrib/localnet/darkirc-four-nodes/darkirc_full_node2.toml +++ b/contrib/localnet/darkirc-four-nodes/darkirc_full_node2.toml @@ -1,6 +1,11 @@ ## IRC listen URL irc_listen = "tcp://127.0.0.1:22023" +## How many DAGs to be synced (currently each DAG represents a 24hr msg +## history counting from UTC midnight), increasing this number means +## you get/sync previous days msg history as well (max. 5) +dags_count = 1 + ## Sets Datastore Path datastore = "darkirc2" @@ -8,10 +13,14 @@ datastore = "darkirc2" autojoin = ["#dev", "#test"] # Log to file. Off by default. -#log = "/tmp/darkirc2.log" +log = "/tmp/darkirc2.log" # Set log level. 1 is info (default), 2 is debug, 3 is trace #verbose = 2 +## Running darkirc in header-only mode +## history won't be fetched but DAG sync fast +#fast_mod = false + ## JSON-RPC settings [rpc] ## JSON-RPC listen URL diff --git a/contrib/localnet/darkirc-four-nodes/darkirc_full_node3.toml b/contrib/localnet/darkirc-four-nodes/darkirc_full_node3.toml index f5d1cfda4..e164c9be2 100644 --- a/contrib/localnet/darkirc-four-nodes/darkirc_full_node3.toml +++ b/contrib/localnet/darkirc-four-nodes/darkirc_full_node3.toml @@ -1,6 +1,11 @@ ## IRC listen URL irc_listen = "tcp://127.0.0.1:22024" +## How many DAGs to be synced (currently each DAG represents a 24hr msg +## history counting from UTC midnight), increasing this number means +## you get/sync previous days msg history as well (max. 5) +dags_count = 1 + ## Sets Datastore Path datastore = "darkirc3" @@ -8,10 +13,14 @@ datastore = "darkirc3" autojoin = ["#dev", "#test"] # Log to file. Off by default. -#log = "/tmp/darkirc3.log" +log = "/tmp/darkirc3.log" # Set log level. 1 is info (default), 2 is debug, 3 is trace #verbose = 2 +## Running darkirc in header-only mode +## history won't be fetched but DAG sync fast +#fast_mod = false + ## JSON-RPC settings [rpc] ## JSON-RPC listen URL diff --git a/contrib/localnet/darkirc-four-nodes/darkirc_full_node4.toml b/contrib/localnet/darkirc-four-nodes/darkirc_full_node4.toml index 9e425286c..fa0139bfd 100644 --- a/contrib/localnet/darkirc-four-nodes/darkirc_full_node4.toml +++ b/contrib/localnet/darkirc-four-nodes/darkirc_full_node4.toml @@ -1,6 +1,11 @@ ## IRC listen URL irc_listen = "tcp://127.0.0.1:22025" +## How many DAGs to be synced (currently each DAG represents a 24hr msg +## history counting from UTC midnight), increasing this number means +## you get/sync previous days msg history as well (max. 5) +dags_count = 3 + ## Sets Datastore Path datastore = "darkirc4" @@ -8,10 +13,14 @@ datastore = "darkirc4" autojoin = ["#dev", "#test"] # Log to file. Off by default. -#log = "/tmp/darkirc4.log" +log = "/tmp/darkirc4.log" # Set log level. 1 is info (default), 2 is debug, 3 is trace #verbose = 2 +## Running darkirc in header-only mode +## history won't be fetched but DAG sync fast +#fast_mod = false + ## JSON-RPC settings [rpc] ## JSON-RPC listen URL diff --git a/contrib/localnet/darkirc-four-nodes/tmux_sessions.sh b/contrib/localnet/darkirc-four-nodes/tmux_sessions.sh index 2cad22863..a36dba39c 100755 --- a/contrib/localnet/darkirc-four-nodes/tmux_sessions.sh +++ b/contrib/localnet/darkirc-four-nodes/tmux_sessions.sh @@ -7,7 +7,7 @@ set -e DARKIRC="../../../darkirc" WEECHAT="weechat -t -r" -session=darkirc +session=darkirc-local tmux new-session -d -s $session -n "seed" tmux send-keys -t $session "$DARKIRC -c seed.toml --skip-dag-sync" Enter @@ -34,7 +34,7 @@ if [ "$1" ]; then fi sleep 1 tmux new-window -t $session -n "node4" -tmux send-keys -t $session "$DARKIRC -c darkirc_full_node4.toml" Enter +tmux send-keys -t $session "$DARKIRC -c darkirc_full_node4.toml --fast-mode" Enter if [ "$1" ]; then tmux split-window -t $session -v tmux send-keys -t $session "$WEECHAT '/server add darkirc_d 127.0.0.1/22025 -notls;/connect darkirc_d;/set irc.server_default.nicks Dave'" Enter diff --git a/src/event_graph/event.rs b/src/event_graph/event.rs index 398dda228..3548fea55 100644 --- a/src/event_graph/event.rs +++ b/src/event_graph/event.rs @@ -19,7 +19,6 @@ use std::{collections::HashSet, time::UNIX_EPOCH}; use darkfi_serial::{async_trait, deserialize_async, Encodable, SerialDecodable, SerialEncodable}; -use log::info; use sled_overlay::{sled, SledTreeOverlay}; use crate::Result; @@ -44,12 +43,14 @@ pub struct Header { impl Header { // Create a new Header given EventGraph to retrieve the correct layout pub async fn new(event_graph: &EventGraph) -> Self { - let (layer, parents) = event_graph.get_next_layer_with_parents().await; + let current_dag_name = event_graph.current_genesis.read().await.id(); + let (layer, parents) = event_graph.get_next_layer_with_parents(¤t_dag_name).await; Self { timestamp: UNIX_EPOCH.elapsed().unwrap().as_millis() as u64, parents, layer } } pub async fn with_timestamp(timestamp: u64, event_graph: &EventGraph) -> Self { - let (layer, parents) = event_graph.get_next_layer_with_parents().await; + let current_dag_name = event_graph.current_genesis.read().await.id(); + let (layer, parents) = event_graph.get_next_layer_with_parents(¤t_dag_name).await; Self { timestamp, parents, layer } } @@ -68,23 +69,15 @@ impl Header { /// to use that instead of actual referenced DAG. pub async fn validate( &self, - dag: &sled::Tree, - genesis_timestamp: u64, + header_dag: &sled::Tree, days_rotation: u64, overlay: Option<&SledTreeOverlay>, ) -> Result { - // Check if the event timestamp is after genesis timestamp - if self.timestamp < genesis_timestamp - EVENT_TIME_DRIFT { - info!("timestampe"); - return Ok(false) - } - // If a rotation has been set, check if the event timestamp // is after the next genesis timestamp if days_rotation > 0 { let next_genesis_timestamp = next_rotation_timestamp(INITIAL_GENESIS, days_rotation); if self.timestamp > next_genesis_timestamp + EVENT_TIME_DRIFT { - info!("rotation"); return Ok(false) } } @@ -98,33 +91,28 @@ impl Header { for parent_id in self.parents.iter() { if parent_id == &NULL_ID { - info!("null"); continue } if parent_id == &self_id { - info!("self"); return Ok(false) } if seen.contains(parent_id) { - info!("seen"); return Ok(false) } let parent_bytes = if let Some(overlay) = overlay { overlay.get(parent_id.as_bytes())? } else { - dag.get(parent_id.as_bytes())? + header_dag.get(parent_id.as_bytes())? }; if parent_bytes.is_none() { - info!("none"); return Ok(false) } let parent: Header = deserialize_async(&parent_bytes.unwrap()).await?; if self.layer <= parent.layer { - info!("layer"); return Ok(false) } @@ -154,6 +142,10 @@ impl Event { Self { header, content: data } } + pub fn id(&self) -> blake3::Hash { + self.header.id() + } + /// Same as `Event::new()` but allows specifying the timestamp explicitly. pub async fn with_timestamp(timestamp: u64, data: Vec, event_graph: &EventGraph) -> Self { let header = Header::with_timestamp(timestamp, event_graph).await; @@ -171,25 +163,19 @@ impl Event { /// to use that instead of actual referenced DAG. /// TODO: is this necessary? we validate headers and events should /// be downloaded into the correct structure. - pub async fn validate(&self, _dag: &sled::Tree) -> Result { - // Let's not bother with empty events - if self.content.is_empty() { - info!("empty"); - return Ok(false) - } - - Ok(true) - } + // pub async fn validate(&self) -> Result { + // Ok(true) + // } /// Fully validate an event for the correct layout against provided /// [`EventGraph`] reference and enforce relevant age, assuming some /// possibility for a time drift. - pub async fn dag_validate(&self, event_graph: &EventGraph) -> Result { - // Grab genesis timestamp - // let genesis_timestamp = event_graph.current_genesis.read().await.header.timestamp; - + pub async fn dag_validate(&self, header_dag: &sled::Tree) -> Result { + if self.content.is_empty() { + return Ok(false) + } // Perform validation - self.validate(&event_graph.header_dag).await + self.header.validate(&header_dag, 1, None).await } /// Validate a new event for the correct layout and enforce relevant age, @@ -252,7 +238,7 @@ mod tests { let ex = Arc::new(Executor::new()); let p2p = P2p::new(Settings::default(), ex.clone()).await?; let sled_db = sled::Config::new().temporary(true).open().unwrap(); - EventGraph::new(p2p, sled_db, "/tmp".into(), false, false, "dag", 1, ex).await + EventGraph::new(p2p, sled_db, "/tmp".into(), false, false, 1, ex).await } #[test] @@ -261,11 +247,15 @@ mod tests { // Generate a dummy event graph let event_graph = make_event_graph().await?; + let dag_name = event_graph.current_genesis.read().await.id().to_string(); + let hdr_tree_name = format!("headers_{dag_name}"); + let header_dag = event_graph.dag_store.read().await.get_dag(&hdr_tree_name); + // Create a new valid event let valid_event = Event::new(vec![1u8], &event_graph).await; // Validate our test Event struct - assert!(valid_event.dag_validate(&event_graph).await?); + assert!(valid_event.dag_validate(&header_dag).await?); // Thanks for reading Ok(()) @@ -278,33 +268,37 @@ mod tests { // Generate a dummy event graph let event_graph = make_event_graph().await?; + let dag_name = event_graph.current_genesis.read().await.id().to_string(); + let hdr_tree_name = format!("headers_{dag_name}"); + let header_dag = event_graph.dag_store.read().await.get_dag(&hdr_tree_name); + // Create a new valid event let valid_event = Event::new(vec![1u8], &event_graph).await; let mut event_empty_content = valid_event.clone(); event_empty_content.content = vec![]; - assert!(!event_empty_content.dag_validate(&event_graph).await?); + assert!(!event_empty_content.dag_validate(&header_dag).await?); let mut event_timestamp_too_old = valid_event.clone(); event_timestamp_too_old.header.timestamp = 0; - assert!(!event_timestamp_too_old.dag_validate(&event_graph).await?); + assert!(!event_timestamp_too_old.dag_validate(&header_dag).await?); let mut event_timestamp_too_new = valid_event.clone(); event_timestamp_too_new.header.timestamp = u64::MAX; - assert!(!event_timestamp_too_new.dag_validate(&event_graph).await?); + assert!(!event_timestamp_too_new.dag_validate(&header_dag).await?); let mut event_duplicated_parents = valid_event.clone(); event_duplicated_parents.header.parents[1] = valid_event.header.parents[0]; - assert!(!event_duplicated_parents.dag_validate(&event_graph).await?); + assert!(!event_duplicated_parents.dag_validate(&header_dag).await?); let mut event_null_parents = valid_event.clone(); let all_null_parents = [NULL_ID, NULL_ID, NULL_ID, NULL_ID, NULL_ID]; event_null_parents.header.parents = all_null_parents; - assert!(!event_null_parents.dag_validate(&event_graph).await?); + assert!(!event_null_parents.dag_validate(&header_dag).await?); let mut event_same_layer_as_parents = valid_event.clone(); event_same_layer_as_parents.header.layer = 0; - assert!(!event_same_layer_as_parents.dag_validate(&event_graph).await?); + assert!(!event_same_layer_as_parents.dag_validate(&header_dag).await?); // Thanks for reading Ok(()) diff --git a/src/event_graph/mod.rs b/src/event_graph/mod.rs index 338d52f7e..93b30da74 100644 --- a/src/event_graph/mod.rs +++ b/src/event_graph/mod.rs @@ -17,23 +17,23 @@ */ // use async_std::stream::from_iter; +use std::{ + collections::{BTreeMap, HashMap, HashSet, VecDeque}, + path::PathBuf, + str::FromStr, + sync::Arc, +}; + // use futures::stream::FuturesOrdered; +use blake3::Hash; +use darkfi_serial::{deserialize_async, serialize_async}; +use event::Header; use futures::{ future::join_all, // future, stream::FuturesUnordered, StreamExt, }; -use rand::{rngs::OsRng, seq::SliceRandom}; -use std::{ - collections::{BTreeMap, HashMap, HashSet, VecDeque}, - path::PathBuf, - sync::Arc, -}; - -use blake3::Hash; -use darkfi_serial::{deserialize_async, serialize_async}; -use event::Header; use log::{debug, error, info, warn}; use num_bigint::BigUint; use sled_overlay::{sled, SledTreeOverlay}; @@ -44,7 +44,7 @@ use smol::{ use tinyjson::JsonValue::{self}; use crate::{ - event_graph::util::replayer_log, + event_graph::util::{midnight_timestamp, replayer_log}, net::{channel::Channel, P2pPtr}, rpc::{ jsonrpc::{JsonResponse, JsonResult}, @@ -87,24 +87,256 @@ const EVENT_TIME_DRIFT: u64 = 60_000; /// Null event ID pub const NULL_ID: Hash = Hash::from_bytes([0x00; blake3::OUT_LEN]); +/// Maximum number of DAGs to store, this should be configurable +pub const DAGS_MAX_NUMBER: i8 = 5; + /// Atomic pointer to an [`EventGraph`] instance. pub type EventGraphPtr = Arc; +pub type LayerUTips = BTreeMap>; + +#[derive(Clone)] +pub struct DAGStore { + db: sled::Db, + header_dags: HashMap, + main_dags: HashMap, +} + +impl DAGStore { + pub async fn new(&self, sled_db: sled::Db, days_rotation: u64) -> Self { + let mut considered_trees = HashMap::new(); + let mut considered_header_trees = HashMap::new(); + if days_rotation > 0 { + // Create previous genesises if not existing, since they are deterministic. + for i in 1..=DAGS_MAX_NUMBER { + let i_days_ago = midnight_timestamp((i - DAGS_MAX_NUMBER).into()); + let header = + Header { timestamp: i_days_ago, parents: [NULL_ID; N_EVENT_PARENTS], layer: 0 }; + let genesis = Event { header, content: GENESIS_CONTENTS.to_vec() }; + + let tree_name = genesis.id().to_string(); + let hdr_tree_name = format!("headers_{tree_name}"); + let hdr_dag = sled_db.open_tree(hdr_tree_name).unwrap(); + let dag = sled_db.open_tree(tree_name).unwrap(); + + if hdr_dag.is_empty() { + let mut overlay = SledTreeOverlay::new(&hdr_dag); + + let header_se = serialize_async(&genesis.header).await; + + // Add the header to the overlay + overlay.insert(genesis.id().as_bytes(), &header_se).unwrap(); + + // Aggregate changes into a single batch + let batch = overlay.aggregate().unwrap(); + + // Atomically apply the batch. + // Panic if something is corrupted. + if let Err(e) = hdr_dag.apply_batch(batch) { + panic!("Failed applying header_dag_insert batch to sled: {}", e); + } + } + if dag.is_empty() { + let mut overlay = SledTreeOverlay::new(&dag); + + let event_se = serialize_async(&genesis).await; + + // Add the event to the overlay + overlay.insert(genesis.id().as_bytes(), &event_se).unwrap(); + + // Aggregate changes into a single batch + let batch = overlay.aggregate().unwrap(); + + // Atomically apply the batch. + // Panic if something is corrupted. + if let Err(e) = dag.apply_batch(batch) { + panic!("Failed applying dag_insert batch to sled: {}", e); + } + } + let utips = self.find_unreferenced_tips(&dag).await; + considered_header_trees.insert(genesis.id(), (hdr_dag, utips.clone())); + considered_trees.insert(genesis.id(), (dag, utips)); + } + } else { + let genesis = generate_genesis(0); + let tree_name = genesis.id().to_string(); + let hdr_tree_name = format!("headers_{tree_name}"); + let hdr_dag = sled_db.open_tree(hdr_tree_name).unwrap(); + let dag = sled_db.open_tree(tree_name).unwrap(); + if hdr_dag.is_empty() { + let mut overlay = SledTreeOverlay::new(&hdr_dag); + + let header_se = serialize_async(&genesis.header).await; + + // Add the header to the overlay + overlay.insert(genesis.id().as_bytes(), &header_se).unwrap(); + + // Aggregate changes into a single batch + let batch = overlay.aggregate().unwrap(); + + // Atomically apply the batch. + // Panic if something is corrupted. + if let Err(e) = hdr_dag.apply_batch(batch) { + panic!("Failed applying header_dag_insert batch to sled: {}", e); + } + } + if dag.is_empty() { + let mut overlay = SledTreeOverlay::new(&dag); + + let event_se = serialize_async(&genesis).await; + + // Add the event to the overlay + overlay.insert(genesis.id().as_bytes(), &event_se).unwrap(); + + // Aggregate changes into a single batch + let batch = overlay.aggregate().unwrap(); + + // Atomically apply the batch. + // Panic if something is corrupted. + if let Err(e) = dag.apply_batch(batch) { + panic!("Failed applying dag_insert batch to sled: {}", e); + } + } + let utips = self.find_unreferenced_tips(&dag).await; + considered_header_trees.insert(genesis.id(), (hdr_dag, utips.clone())); + considered_trees.insert(genesis.id(), (dag, utips)); + } + + Self { db: sled_db, header_dags: considered_header_trees, main_dags: considered_trees } + } + + /// Adds a DAG into the set of DAGs and drops the oldest one if exeeding DAGS_MAX_NUMBER, + /// This is called if prune_task activates. + pub async fn add_dag(&mut self, dag_name: &str, genesis_event: &Event) { + debug!("add_dag::dags: {}", self.main_dags.len()); + if self.main_dags.len() != self.header_dags.len() { + panic!("main dags length is not the same as header dags") + } + // TODO: sort dags by timestamp and drop the oldest + if self.main_dags.len() > DAGS_MAX_NUMBER.try_into().unwrap() { + while self.main_dags.len() >= DAGS_MAX_NUMBER.try_into().unwrap() { + debug!("[EVENTGRAPH] dropping oldest dag"); + let sorted_dags = self.sort_dags().await; + // since dags are sorted in reverse + let oldest_tree = sorted_dags.last().unwrap().name(); + let oldest_key = String::from_utf8_lossy(&oldest_tree); + let oldest_key = blake3::Hash::from_str(&oldest_key).unwrap(); + + let oldest_hdr_tree = self.header_dags.remove(&oldest_key).unwrap(); + let oldest_tree = self.main_dags.remove(&oldest_key).unwrap(); + self.db.drop_tree(oldest_hdr_tree.0.name()).unwrap(); + self.db.drop_tree(oldest_tree.0.name()).unwrap(); + } + } + + // Insert genesis + let hdr_tree_name = format!("headers_{dag_name}"); + let hdr_dag = self.get_dag(&hdr_tree_name); + hdr_dag + .insert(genesis_event.id().as_bytes(), serialize_async(&genesis_event.header).await) + .unwrap(); + + let dag = self.get_dag(dag_name); + dag.insert(genesis_event.id().as_bytes(), serialize_async(genesis_event).await).unwrap(); + let utips = self.find_unreferenced_tips(&dag).await; + self.header_dags.insert(genesis_event.id(), (hdr_dag, utips.clone())); + self.main_dags.insert(genesis_event.id(), (dag, utips)); + } + + // Get a DAG providing its name. + pub fn get_dag(&self, dag_name: &str) -> sled::Tree { + self.db.open_tree(dag_name).unwrap() + } + + /// Get {count} many DAGs. + pub async fn get_dags(&self, count: usize) -> Vec { + let sorted_dags = self.sort_dags().await; + sorted_dags.into_iter().take(count).collect() + } + + /// Sort DAGs chronologically + async fn sort_dags(&self) -> Vec { + let mut vec_dags = vec![]; + + let dags = self + .main_dags + .iter() + .map(|x| { + let trees = x.1; + trees.0.clone() + }) + .collect::>(); + + for dag in dags { + let genesis = dag.first().unwrap().unwrap().1; + let genesis_event: Event = deserialize_async(&genesis).await.unwrap(); + vec_dags.push((genesis_event.header.timestamp, dag)); + } + + vec_dags.sort_by_key(|&(ts, _)| ts); + vec_dags.reverse(); + + vec_dags.into_iter().map(|(_, dag)| dag).collect() + } + + /// Find the unreferenced tips in the current DAG state, mapped by their layers. + async fn find_unreferenced_tips(&self, dag: &sled::Tree) -> LayerUTips { + // First get all the event IDs + let mut tips = HashSet::new(); + for iter_elem in dag.iter() { + let (id, _) = iter_elem.unwrap(); + let id = blake3::Hash::from_bytes((&id as &[u8]).try_into().unwrap()); + tips.insert(id); + } + // Iterate again to find unreferenced IDs + for iter_elem in dag.iter() { + let (_, event) = iter_elem.unwrap(); + let event: Event = deserialize_async(&event).await.unwrap(); + for parent in event.header.parents.iter() { + tips.remove(parent); + } + } + // Build the layers map + let mut map: LayerUTips = BTreeMap::new(); + for tip in tips { + let event = self.fetch_event_from_dag(&tip, &dag).await.unwrap().unwrap(); + if let Some(layer_tips) = map.get_mut(&event.header.layer) { + layer_tips.insert(tip); + } else { + let mut layer_tips = HashSet::new(); + layer_tips.insert(tip); + map.insert(event.header.layer, layer_tips); + } + } + + map + } + + /// Fetch an event from the DAG + pub async fn fetch_event_from_dag( + &self, + event_id: &blake3::Hash, + dag: &sled::Tree, + ) -> Result> { + let Some(bytes) = dag.get(event_id.as_bytes())? else { + return Ok(None); + }; + let event: Event = deserialize_async(&bytes).await?; + + return Ok(Some(event)) + } +} /// An Event Graph instance pub struct EventGraph { /// Pointer to the P2P network instance p2p: P2pPtr, /// Sled tree containing the headers - header_dag: sled::Tree, - /// Main sled tree containing the events - main_dag: sled::Tree, + dag_store: RwLock, /// Replay logs path. datastore: PathBuf, /// Run in replay_mode where if set we log Sled DB instructions /// into `datastore`, useful to reacreate a faulty DAG to debug. replay_mode: bool, - /// The set of unreferenced DAG tips - unreferenced_tips: RwLock>>, /// A `HashSet` containg event IDs and their 1-level parents. /// These come from the events we've sent out using `EventPut`. /// They are used with `EventReq` to decide if we should reply @@ -117,7 +349,7 @@ pub struct EventGraph { /// inserted into the DAG pub event_pub: PublisherPtr, /// Current genesis event - current_genesis: RwLock, + pub current_genesis: RwLock, /// Currently configured DAG rotation, in days days_rotation: u64, /// Flag signalling DAG has finished initial sync @@ -142,7 +374,6 @@ impl EventGraph { /// * `datastore` path where we should log db instrucion if run in /// replay mode. /// * `replay_mode` set the flag to keep a log of db instructions. - /// * `dag_tree_name` the name of disk-backed tree (or DAG name). /// * `days_rotation` marks the lifetime of the DAG before it's /// pruned. pub async fn new( @@ -151,27 +382,29 @@ impl EventGraph { datastore: PathBuf, replay_mode: bool, fast_mode: bool, - dag_tree_name: &str, days_rotation: u64, ex: Arc>, ) -> Result { - let hdr_tree_name = format!("headers_{dag_tree_name}"); - let hdr_dag = sled_db.open_tree(hdr_tree_name)?; - let dag = sled_db.open_tree(dag_tree_name)?; - let unreferenced_tips = RwLock::new(BTreeMap::new()); let broadcasted_ids = RwLock::new(HashSet::new()); let event_pub = Publisher::new(); // Create the current genesis event based on the `days_rotation` let current_genesis = generate_genesis(days_rotation); + let current_dag_tree_name = current_genesis.id().to_string(); + let dag_store = DAGStore { + db: sled_db.clone(), + header_dags: HashMap::default(), + main_dags: HashMap::default(), + } + .new(sled_db, days_rotation) + .await; + let self_ = Arc::new(Self { p2p, - header_dag: hdr_dag.clone(), - main_dag: dag.clone(), + dag_store: RwLock::new(dag_store.clone()), datastore, replay_mode, fast_mode, - unreferenced_tips, broadcasted_ids, prune_task: OnceCell::new(), event_pub, @@ -184,7 +417,8 @@ impl EventGraph { // Check if we have it in our DAG. // If not, we can prune the DAG and insert this new genesis event. - if !dag.contains_key(current_genesis.header.id().as_bytes())? { + let dag = dag_store.get_dag(¤t_dag_tree_name); + if !dag.contains_key(current_genesis.id().as_bytes())? { info!( target: "event_graph::new()", "[EVENTGRAPH] DAG does not contain current genesis, pruning existing data", @@ -192,9 +426,6 @@ impl EventGraph { self_.dag_prune(current_genesis).await?; } - // Find the unreferenced tips in the current DAG state. - *self_.unreferenced_tips.write().await = self_.find_unreferenced_tips().await; - // Spawn the DAG pruning task if days_rotation > 0 { let prune_task = StoppableTask::new(); @@ -221,7 +452,7 @@ impl EventGraph { } /// Sync the DAG from connected peers - pub async fn dag_sync(&self, fast_mode: bool) -> Result<()> { + pub async fn dag_sync(&self, dag: sled::Tree, fast_mode: bool) -> Result<()> { // We do an optimistic sync where we ask all our connected peers for // the latest layer DAG tips (unreferenced events) and then we accept // the ones we see the most times. @@ -237,6 +468,8 @@ impl EventGraph { // amount of iterations, these could be faulty peers and we can try again // from the beginning + let dag_name = String::from_utf8_lossy(&dag.name()).to_string(); + // Get references to all our peers. let channels = self.p2p.hosts().peers(); let mut communicated_peers = channels.len(); @@ -268,7 +501,7 @@ impl EventGraph { } }; - if let Err(e) = channel.send(&TipReq {}).await { + if let Err(e) = channel.send(&TipReq(dag_name.clone())).await { error!( target: "event_graph::dag_sync()", "[EVENTGRAPH] Sync: Couldn't contact peer {}, skipping ({})", url, e, @@ -328,7 +561,7 @@ impl EventGraph { for tip in considered_tips.iter() { assert!(tip != &NULL_ID); - if !self.main_dag.contains_key(tip.as_bytes()).unwrap() { + if !dag.contains_key(tip.as_bytes()).unwrap() { missing_parents.insert(*tip); } } @@ -344,13 +577,15 @@ impl EventGraph { // TODO: requesting headers should be in a way that we wouldn't // recieve the same header(s) again, by sending our tip, other // nodes should send back the ones after it + let hdr_tree_name = format!("headers_{dag_name}"); + let header_dag = self.dag_store.read().await.get_dag(&hdr_tree_name); let mut headers_requests = FuturesUnordered::new(); for channel in channels.iter() { - headers_requests.push(request_header(&channel, comms_timeout)) + headers_requests.push(request_header(&channel, dag_name.clone(), comms_timeout)) } while let Some(peer_headers) = headers_requests.next().await { - self.header_dag_insert(peer_headers?).await? + self.header_dag_insert(peer_headers?, &dag_name).await? } // start download payload @@ -358,7 +593,7 @@ impl EventGraph { info!(target: "event_graph::dag_sync()", "[EVENTGRAPH] Fetching events"); let mut header_sorted = vec![]; - for iter_elem in self.header_dag.iter() { + for iter_elem in header_dag.iter() { let (_, val) = iter_elem.unwrap(); let val: Header = deserialize_async(&val).await.unwrap(); header_sorted.push(val); @@ -438,59 +673,9 @@ impl EventGraph { let mut verified_count = 0; for (_, chunk) in received_events { verified_count += chunk.len(); - self.dag_insert(&chunk).await?; + self.dag_insert(&chunk, &dag_name).await?; info!(target: "event_graph::dag_sync()", "[EVENTGRAPH] Verified Events: {}/{}", verified_count, retrieved_count); } - - // 1. Fetch events one by one - // let mut events_requests = FuturesOrdered::new(); - // let peer = peer_selection(peers.clone()); - // let peer = channels[0].clone(); - // for header in header_sorted.iter() { - // let received_events = - // request_event(peer.clone(), vec![header.id()], comms_timeout).await?; - // self.dag_insert(&received_events).await?; - //} - - // let mut received_events = vec![]; - // while let Some(peer_events) = events_requests.next().await { - // let events = peer_events?; - // for i in events.iter() { - // info!("Received events id: {:?}", i.header.id()); - // info!("layer: {}", i.header.layer); - // } - // received_events.extend(events); - // } - - // self.dag_insert(&received_events).await?; - - // // 2. split sorted headers into chunks and assign them to each connected peer - // let mut responses = vec![]; - // for header in header_sorted.chunks_exact(peers.len()) { - // // For each peer, create a future that sends a request - // let pairs = peers.iter().zip(header).collect::>(); - // let pair_stream = from_iter(pairs.iter()); - // let requests_stream = pair_stream.map(|(peer, header)| send_request(peer, header)); - // // Collect all the responses into a vector - // let x = requests_stream.collect::>().await; - // info!("len of x: {}", x.len()); - // // responses.push(x); - // responses.extend(x); - // } - // // Wait for all the futures to complete - // let x = future::join_all(responses).await; - // let fetched_parents = x.into_iter().map(|f| f.unwrap()).collect::>().concat(); - // info!("fetched parents: {}", fetched_parents.len()); - // for i in fetched_parents.iter() { - // info!("layer: {}", i.header.layer) - // } - - // // 3. Fetch all events at once (just a POC) - // let peers = channels.clone().into_iter().collect::>(); - // let missing = header_sorted.iter().map(|x| x.id()).collect::>(); - // info!("first missing: {}", missing[0]); - // let parents = send_requests(&peers, &missing).await?.concat(); - // info!("fetched parents: {}", parents.len()); } // <-- end download payload @@ -500,6 +685,23 @@ impl EventGraph { Ok(()) } + /// Choose how many dags to sync + pub async fn sync_selected(&self, count: usize, fast_mode: bool) -> Result<()> { + let mut dags_to_sync = self.dag_store.read().await.get_dags(count).await; + // Since get_dags() return sorted dags in reverse + dags_to_sync.reverse(); + for dag in dags_to_sync { + match self.dag_sync(dag, fast_mode).await { + Ok(()) => continue, + Err(e) => { + return Err(e); + } + } + } + + Ok(()) + } + /// Atomically prune the DAG and insert the given event as genesis. async fn dag_prune(&self, genesis_event: Event) -> Result<()> { debug!(target: "event_graph::dag_prune()", "Pruning DAG..."); @@ -509,44 +711,15 @@ impl EventGraph { // ensure that during the pruning operation, no other operations are // able to access the intermediate state which could lead to producing // the wrong state after pruning. - let mut unreferenced_tips = self.unreferenced_tips.write().await; let mut broadcasted_ids = self.broadcasted_ids.write().await; let mut current_genesis = self.current_genesis.write().await; - // Atomically clear the main and headers DAGs and write the new genesis event. - // Header - let mut batch = sled::Batch::default(); - for key in self.header_dag.iter().keys() { - batch.remove(key.unwrap()); - } - batch.insert( - genesis_event.header.id().as_bytes(), - serialize_async(&genesis_event.header).await, - ); + let dag_name = genesis_event.id().to_string(); + self.dag_store.write().await.add_dag(&dag_name, &genesis_event).await; - debug!(target: "event_graph::dag_prune()", "Applying batch..."); - if let Err(e) = self.header_dag.apply_batch(batch) { - panic!("Failed pruning header DAG, sled apply_batch error: {}", e); - } - - // Main - let mut batch = sled::Batch::default(); - for key in self.main_dag.iter().keys() { - batch.remove(key.unwrap()); - } - batch.insert(genesis_event.header.id().as_bytes(), serialize_async(&genesis_event).await); - - debug!(target: "event_graph::dag_prune()", "Applying batch..."); - if let Err(e) = self.main_dag.apply_batch(batch) { - panic!("Failed pruning main DAG, sled apply_batch error: {}", e); - } - - // Clear unreferenced tips and bcast ids - *unreferenced_tips = BTreeMap::new(); - unreferenced_tips.insert(0, HashSet::from([genesis_event.header.id()])); + // Clear bcast ids *current_genesis = genesis_event; *broadcasted_ids = HashSet::new(); - drop(unreferenced_tips); drop(broadcasted_ids); drop(current_genesis); @@ -593,21 +766,25 @@ impl EventGraph { /// knows that any requests for them are actually legitimate. /// TODO: The `broadcasted_ids` set should periodically be pruned, when /// some sensible time has passed after broadcasting the event. - pub async fn dag_insert(&self, events: &[Event]) -> Result> { + pub async fn dag_insert(&self, events: &[Event], dag_name: &str) -> Result> { // Sanity check if events.is_empty() { return Ok(vec![]) } - // Acquire exclusive locks to `unreferenced_tips and broadcasted_ids` - let mut unreferenced_tips = self.unreferenced_tips.write().await; + // Acquire exclusive locks to `broadcasted_ids` + let dag_name_hash = blake3::Hash::from_str(dag_name).unwrap(); let mut broadcasted_ids = self.broadcasted_ids.write().await; + let main_dag = self.dag_store.read().await.get_dag(dag_name); + let hdr_tree_name = format!("headers_{dag_name}"); + let header_dag = self.dag_store.read().await.get_dag(&hdr_tree_name); + // Here we keep the IDs to return let mut ids = Vec::with_capacity(events.len()); // Create an overlay over the DAG tree - let mut overlay = SledTreeOverlay::new(&self.main_dag); + let mut overlay = SledTreeOverlay::new(&main_dag); // Grab genesis timestamp // let genesis_timestamp = self.current_genesis.read().await.header.timestamp; @@ -615,7 +792,7 @@ impl EventGraph { // Iterate over given events to validate them and // write them to the overlay for event in events { - let event_id = event.header.id(); + let event_id = event.id(); if event.header.layer == 0 { return Ok(vec![]) } @@ -625,16 +802,16 @@ impl EventGraph { ); // check if we already have the event - if self.main_dag.contains_key(event_id.as_bytes())? { + if main_dag.contains_key(event_id.as_bytes())? { continue } // check if its header is in header's store - if !self.header_dag.contains_key(event_id.as_bytes())? { + if !header_dag.contains_key(event_id.as_bytes())? { continue } - if !event.validate(&self.header_dag).await? { + if !event.dag_validate(&header_dag).await? { error!(target: "event_graph::dag_insert()", "Event {} is invalid!", event_id); return Err(Error::EventIsInvalid) } @@ -659,14 +836,20 @@ impl EventGraph { // Atomically apply the batch. // Panic if something is corrupted. - if let Err(e) = self.main_dag.apply_batch(batch) { + if let Err(e) = main_dag.apply_batch(batch) { panic!("Failed applying dag_insert batch to sled: {}", e); } + drop(main_dag); + drop(header_dag); + + let mut dag_store = self.dag_store.write().await; + let (_, unreferenced_tips) = &mut dag_store.main_dags.get_mut(&dag_name_hash).unwrap(); + // Iterate over given events to update references and // send out notifications about them for event in events { - let event_id = event.header.id(); + let event_id = event.id(); // Update the unreferenced DAG tips set debug!( @@ -713,18 +896,23 @@ impl EventGraph { } // Drop the exclusive locks - drop(unreferenced_tips); + drop(dag_store); drop(broadcasted_ids); + let mut dag_store = self.dag_store.write().await; + dag_store.header_dags.get_mut(&dag_name_hash).unwrap().1 = + dag_store.main_dags.get(&dag_name_hash).unwrap().1.clone(); + + drop(dag_store); + Ok(ids) } - pub async fn header_dag_insert(&self, headers: Vec
) -> Result<()> { + pub async fn header_dag_insert(&self, headers: Vec
, dag_name: &str) -> Result<()> { + let hdr_tree_name = format!("headers_{dag_name}"); + let header_dag = self.dag_store.read().await.get_dag(&hdr_tree_name); // Create an overlay over the DAG tree - let mut overlay = SledTreeOverlay::new(&self.header_dag); - - // Grab genesis timestamp - let genesis_timestamp = self.current_genesis.read().await.header.timestamp; + let mut overlay = SledTreeOverlay::new(&header_dag); // Acquire exclusive locks to `unreferenced_tips and broadcasted_ids` // let mut unreferenced_header = self.unreferenced_tips.write().await; @@ -744,10 +932,7 @@ impl EventGraph { target: "event_graph::header_dag_insert()", "Inserting header {} into the DAG", header_id, ); - if !header - .validate(&self.header_dag, genesis_timestamp, self.days_rotation, Some(&overlay)) - .await? - { + if !header.validate(&header_dag, self.days_rotation, Some(&overlay)).await? { error!(target: "event_graph::header_dag_insert()", "Header {} is invalid!", header_id); return Err(Error::HeaderIsInvalid) } @@ -765,27 +950,39 @@ impl EventGraph { // Atomically apply the batch. // Panic if something is corrupted. - if let Err(e) = self.header_dag.apply_batch(batch) { + if let Err(e) = header_dag.apply_batch(batch) { panic!("Failed applying dag_insert batch to sled: {}", e); } Ok(()) } - /// Fetch an event from the DAG - pub async fn dag_get(&self, event_id: &Hash) -> Result> { - let Some(bytes) = self.main_dag.get(event_id.as_bytes())? else { return Ok(None) }; - let event: Event = deserialize_async(&bytes).await?; + /// Search and fetch an event through all DAGs + pub async fn fetch_event_from_dags(&self, event_id: &blake3::Hash) -> Result> { + let store = self.dag_store.read().await; + for tree_elem in store.main_dags.clone() { + let dag_name = tree_elem.0.to_string(); + let Some(bytes) = store.get_dag(&dag_name).get(event_id.as_bytes())? else { + continue; + }; + let event: Event = deserialize_async(&bytes).await?; - Ok(Some(event)) + return Ok(Some(event)) + } + + Ok(None) } /// Get next layer along with its N_EVENT_PARENTS from the unreferenced /// tips of the DAG. Since tips are mapped by their layer, we go backwards /// until we fill the vector, ensuring we always use latest layers tips as /// parents. - async fn get_next_layer_with_parents(&self) -> (u64, [Hash; N_EVENT_PARENTS]) { - let unreferenced_tips = self.unreferenced_tips.read().await; + async fn get_next_layer_with_parents( + &self, + dag_name: &Hash, + ) -> (u64, [blake3::Hash; N_EVENT_PARENTS]) { + let store = self.dag_store.read().await; + let (_, unreferenced_tips) = store.header_dags.get(dag_name).unwrap(); let mut parents = [NULL_ID; N_EVENT_PARENTS]; let mut index = 0; @@ -805,83 +1002,53 @@ impl EventGraph { (next_layer, parents) } - /// Find the unreferenced tips in the current DAG state, mapped by their layers. - async fn find_unreferenced_tips(&self) -> BTreeMap> { - // First get all the event IDs - let mut tips = HashSet::new(); - for iter_elem in self.main_dag.iter() { - let (id, _) = iter_elem.unwrap(); - let id = Hash::from_bytes((&id as &[u8]).try_into().unwrap()); - tips.insert(id); - } - - // Iterate again to find unreferenced IDs - for iter_elem in self.main_dag.iter() { - let (_, event) = iter_elem.unwrap(); - let event: Event = deserialize_async(&event).await.unwrap(); - for parent in event.header.parents.iter() { - tips.remove(parent); - } - } - - // Build the layers map - let mut map: BTreeMap> = BTreeMap::new(); - for tip in tips { - let event = self.dag_get(&tip).await.unwrap().unwrap(); - if let Some(layer_tips) = map.get_mut(&event.header.layer) { - layer_tips.insert(tip); - } else { - let mut layer_tips = HashSet::new(); - layer_tips.insert(tip); - map.insert(event.header.layer, layer_tips); - } - } - - map - } - /// Internal function used for DAG sorting. - async fn get_unreferenced_tips_sorted(&self) -> [Hash; N_EVENT_PARENTS] { - let (_, tips) = self.get_next_layer_with_parents().await; - - // Convert the hash to BigUint for sorting - let mut sorted: Vec<_> = - tips.iter().map(|x| BigUint::from_bytes_be(x.as_bytes())).collect(); - sorted.sort_unstable(); - - // Convert back to blake3 + async fn get_unreferenced_tips_sorted(&self) -> Vec<[blake3::Hash; N_EVENT_PARENTS]> { + let mut vec_tips = vec![]; let mut tips_sorted = [NULL_ID; N_EVENT_PARENTS]; - for (i, id) in sorted.iter().enumerate() { - let mut bytes = id.to_bytes_be(); + for (i, _) in self.dag_store.read().await.header_dags.iter() { + let (_, tips) = self.get_next_layer_with_parents(&i).await; + // Convert the hash to BigUint for sorting + let mut sorted: Vec<_> = + tips.iter().map(|x| BigUint::from_bytes_be(x.as_bytes())).collect(); + sorted.sort_unstable(); - // Ensure we have 32 bytes - while bytes.len() < blake3::OUT_LEN { - bytes.insert(0, 0); + // Convert back to blake3 + for (i, id) in sorted.iter().enumerate() { + let mut bytes = id.to_bytes_be(); + + // Ensure we have 32 bytes + while bytes.len() < blake3::OUT_LEN { + bytes.insert(0, 0); + } + + tips_sorted[i] = blake3::Hash::from_bytes(bytes.try_into().unwrap()); } - tips_sorted[i] = Hash::from_bytes(bytes.try_into().unwrap()); + vec_tips.push(tips_sorted); } - tips_sorted + vec_tips } + // TODO: Fix fetching all events from all dags and then order and retrun them /// Perform a topological sort of the DAG. pub async fn order_events(&self) -> Vec { let mut ordered_events = VecDeque::new(); let mut visited = HashSet::new(); - for tip in self.get_unreferenced_tips_sorted().await { - if !visited.contains(&tip) && tip != NULL_ID { - let tip = self.dag_get(&tip).await.unwrap().unwrap(); - ordered_events.extend(self.dfs_topological_sort(tip, &mut visited).await); + for i in self.get_unreferenced_tips_sorted().await { + for tip in i { + if !visited.contains(&tip) && tip != NULL_ID { + let tip = self.fetch_event_from_dags(&tip).await.unwrap().unwrap(); + ordered_events.extend(self.dfs_topological_sort(tip, &mut visited).await); + } } } let mut ord_events_vec = ordered_events.make_contiguous().to_vec(); - // Order events based on thier layer numbers, or based on timestamp if they are equal - ord_events_vec.sort_unstable_by(|a, b| { - a.0.cmp(&b.0).then(b.1.header.timestamp.cmp(&a.1.header.timestamp)) - }); + // Order events by timestamp. + ord_events_vec.sort_unstable_by(|a, b| a.1.header.timestamp.cmp(&b.1.header.timestamp)); ord_events_vec.iter().map(|a| a.1.clone()).collect::>() } @@ -895,13 +1062,13 @@ impl EventGraph { ) -> VecDeque<(u64, Event)> { let mut ordered_events = VecDeque::new(); let mut stack = VecDeque::new(); - let event_id = event.header.id(); + let event_id = event.id(); stack.push_back(event_id); while let Some(event_id) = stack.pop_front() { if !visited.contains(&event_id) && event_id != NULL_ID { visited.insert(event_id); - if let Some(event) = self.dag_get(&event_id).await.unwrap() { + if let Some(event) = self.fetch_event_from_dags(&event_id).await.unwrap() { for parent in event.header.parents.iter() { stack.push_back(*parent); } @@ -937,8 +1104,10 @@ impl EventGraph { } pub async fn eventgraph_info(&self, id: u16, _params: JsonValue) -> JsonResult { + let current_genesis = self.current_genesis.read().await; + let dag_name = current_genesis.id().to_string(); let mut graph = HashMap::new(); - for iter_elem in self.main_dag.iter() { + for iter_elem in self.dag_store.read().await.get_dag(&dag_name).iter() { let (id, val) = iter_elem.unwrap(); let id = Hash::from_bytes((&id as &[u8]).try_into().unwrap()); let val: Event = deserialize_async(&val).await.unwrap(); @@ -962,17 +1131,16 @@ impl EventGraph { /// Fetch all the events that are on a higher layers than the /// provided ones. - pub async fn fetch_successors_of( - &self, - tips: BTreeMap>, - ) -> Result> { + pub async fn fetch_successors_of(&self, tips: LayerUTips) -> Result> { debug!( target: "event_graph::fetch_successors_of()", "fetching successors of {tips:?}" ); + let current_genesis = self.current_genesis.read().await; + let dag_name = current_genesis.id().to_string(); let mut graph = HashMap::new(); - for iter_elem in self.main_dag.iter() { + for iter_elem in self.dag_store.read().await.get_dag(&dag_name).iter() { let (id, val) = iter_elem.unwrap(); let hash = Hash::from_bytes((&id as &[u8]).try_into().unwrap()); let event: Event = deserialize_async(&val).await.unwrap(); @@ -1001,36 +1169,11 @@ impl EventGraph { } } -async fn _send_request(peer: &Channel, missing: &Header) -> Result> { - info!("in send_request first missing: {}", missing.id()); - let url = peer.address(); - debug!(target: "event_graph::dag_sync()","Requesting {:?} from {}...", missing, url); - let ev_rep_sub = match peer.subscribe_msg::().await { - Ok(v) => v, - Err(e) => { - error!(target: "event_graph::dag_sync()","[EVENTGRAPH] Sync: Couldn't subscribe EventRep for peer {}, skipping ({})",url, e); - return Err(Error::Custom("Couldn't subscribe EventRep".to_string())) - } - }; - - if let Err(e) = peer.send(&EventReq(vec![missing.id()])).await { - error!(target: "event_graph::dag_sync()","[EVENTGRAPH] Sync: Failed communicating EventReq({:?}) to {}: {}",missing, url, e); - return Err(Error::Custom("Failed communicating EventReq".to_string())) - } - - let Ok(parent) = ev_rep_sub.receive_with_timeout(15).await else { - error!( - target: "event_graph::dag_sync()", - "[EVENTGRAPH] Sync: Timeout waiting for parents {:?} from {}", - missing, url, - ); - return Err(().into()) - }; - - Ok(parent.0.clone()) -} - -async fn request_header(peer: &Channel, comms_timeout: u64) -> Result> { +async fn request_header( + peer: &Channel, + tree_name: String, + comms_timeout: u64, +) -> Result> { let url = peer.address(); let hdr_rep_sub = match peer.subscribe_msg::().await { @@ -1045,7 +1188,7 @@ async fn request_header(peer: &Channel, comms_timeout: u64) -> Result>) -> Arc { - peers.choose(&mut OsRng).unwrap().clone() -} - -// async fn send_request(peer: &Channel, missing: &[Hash]) -> Result> { -// info!("in send_request first missing: {}", missing[0]); -// let url = peer.address(); -// debug!(target: "event_graph::dag_sync()","Requesting {:?} from {}...", missing, url); -// let ev_rep_sub = match peer.subscribe_msg::().await { -// Ok(v) => v, -// Err(e) => { -// error!(target: "event_graph::dag_sync()","[EVENTGRAPH] Sync: Couldn't subscribe EventRep for peer {}, skipping ({})",url, e); -// return Err(Error::Custom("Couldn't subscribe EventRep".to_string())) -// } -// }; - -// if let Err(e) = peer.send(&EventReq(missing.to_vec())).await { -// error!(target: "event_graph::dag_sync()","[EVENTGRAPH] Sync: Failed communicating EventReq({:?}) to {}: {}",missing, url, e); -// return Err(Error::Custom("Failed communicating EventReq".to_string())) -// } - -// let Ok(parent) = ev_rep_sub.receive_with_timeout(15).await else { -// error!( -// target: "event_graph::dag_sync()", -// "[EVENTGRAPH] Sync: Timeout waiting for parents {:?} from {}", -// missing, url, -// ); -// return Err(().into()) -// }; - -// Ok(parent.0.clone()) -// } - -// // A function that sends requests to multiple peers concurrently -// async fn send_requests(peers: &[Arc], missing: &[Hash]) -> Result>> { -// info!("in send_requests first missing: {}", missing[0]); -// let chunk_size = (missing.len() as f64 / peers.len() as f64).ceil() as usize; -// let pairs = peers.iter().zip(missing.chunks(chunk_size)).collect::>(); - -// // For each peer, create a future that sends a request -// let pair_stream = from_iter(pairs.iter()); -// let requests_stream = pair_stream.map(|(peer, missing)| send_request(peer, missing)); - -// // Collect all the responses into a vector -// let responses = requests_stream.collect::>().await; - -// // Wait for all the futures to complete -// future::try_join_all(responses).await -// } diff --git a/src/event_graph/proto.rs b/src/event_graph/proto.rs index a6e71b9a6..2b4371c2f 100644 --- a/src/event_graph/proto.rs +++ b/src/event_graph/proto.rs @@ -18,6 +18,7 @@ use std::{ collections::{BTreeMap, HashSet, VecDeque}, + str::FromStr, sync::{ atomic::{AtomicUsize, Ordering::SeqCst}, Arc, @@ -28,7 +29,7 @@ use darkfi_serial::{async_trait, deserialize_async, SerialDecodable, SerialEncod use log::{debug, error, trace, warn}; use smol::Executor; -use super::{event::Header, Event, EventGraphPtr, NULL_ID}; +use super::{event::Header, Event, EventGraphPtr, LayerUTips, NULL_ID}; use crate::{ impl_p2p_message, net::{ @@ -151,7 +152,7 @@ impl_p2p_message!(HeaderPut, "EventGraph::HeaderPut", 0, 0, DEFAULT_METERING_CON /// A P2P message representing a header request #[derive(Clone, SerialEncodable, SerialDecodable)] -pub struct HeaderReq {} +pub struct HeaderReq(pub String); impl_p2p_message!(HeaderReq, "EventGraph::HeaderReq", 0, 0, DEFAULT_METERING_CONFIGURATION); /// A P2P message representing a header reply @@ -161,12 +162,12 @@ impl_p2p_message!(HeaderRep, "EventGraph::HeaderRep", 0, 0, DEFAULT_METERING_CON /// A P2P message representing a request for a peer's DAG tips #[derive(Clone, SerialEncodable, SerialDecodable)] -pub struct TipReq {} +pub struct TipReq(pub String); impl_p2p_message!(TipReq, "EventGraph::TipReq", 0, 0, DEFAULT_METERING_CONFIGURATION); /// A P2P message representing a reply for the peer's DAG tips #[derive(Clone, SerialEncodable, SerialDecodable)] -pub struct TipRep(pub BTreeMap>); +pub struct TipRep(pub LayerUTips); impl_p2p_message!(TipRep, "EventGraph::TipRep", 0, 0, DEFAULT_METERING_CONFIGURATION); #[async_trait] @@ -177,7 +178,7 @@ impl ProtocolBase for ProtocolEventGraph { self.jobsman.clone().spawn(self.clone().handle_event_req(), ex.clone()).await; // self.jobsman.clone().spawn(self.clone().handle_header_put(), ex.clone()).await; // self.jobsman.clone().spawn(self.clone().handle_header_req(), ex.clone()).await; - self.jobsman.clone().spawn(self.clone().handle_header_rep(), ex.clone()).await; + self.jobsman.clone().spawn(self.clone().handle_header_req(), ex.clone()).await; self.jobsman.clone().spawn(self.clone().handle_tip_req(), ex.clone()).await; self.jobsman.clone().spawn(self.clone().broadcast_rate_limiter(), ex.clone()).await; Ok(()) @@ -263,7 +264,7 @@ impl ProtocolEventGraph { }; trace!( target: "event_graph::protocol::handle_event_put()", - "Got EventPut: {} [{}]", event.header.id(), self.channel.address(), + "Got EventPut: {} [{}]", event.id(), self.channel.address(), ); // Check if node has finished syncing its DAG @@ -276,8 +277,19 @@ impl ProtocolEventGraph { } // If we have already seen the event, we'll stay quiet. - let event_id = event.header.id(); - if self.event_graph.main_dag.contains_key(event_id.as_bytes()).unwrap() { + let current_genesis = self.event_graph.current_genesis.read().await; + let dag_name = current_genesis.id().to_string(); + let hdr_tree_name = format!("headers_{dag_name}"); + let event_id = event.id(); + if self + .event_graph + .dag_store + .read() + .await + .get_dag(&hdr_tree_name) + .contains_key(event_id.as_bytes()) + .unwrap() + { debug!( target: "event_graph::protocol::handle_event_put()", "Event {} is already known", event_id, @@ -311,7 +323,7 @@ impl ProtocolEventGraph { debug!( target: "event_graph::protocol::handle_event_put()", "Event {} is older than genesis. Event timestamp: `{}`. Genesis timestamp: `{}`", - event.header.id(), event.header.timestamp, genesis_timestamp + event.id(), event.header.timestamp, genesis_timestamp ); } @@ -341,7 +353,15 @@ impl ProtocolEventGraph { continue } - if !self.event_graph.main_dag.contains_key(parent_id.as_bytes()).unwrap() { + if !self + .event_graph + .dag_store + .read() + .await + .get_dag(&hdr_tree_name) + .contains_key(parent_id.as_bytes()) + .unwrap() + { missing_parents.insert(*parent_id); } } @@ -363,6 +383,10 @@ impl ProtocolEventGraph { "Event has {} missing parents. Requesting...", missing_parents.len(), ); + let current_genesis = self.event_graph.current_genesis.read().await; + let dag_name = current_genesis.id().to_string(); + let hdr_tree_name = format!("headers_{dag_name}"); + while !missing_parents.is_empty() { // for parent_id in missing_parents.clone().iter() { debug!( @@ -394,12 +418,12 @@ impl ProtocolEventGraph { let parents = parents.0.clone(); for parent in parents { - let parent_id = parent.header.id(); + let parent_id = parent.id(); if !missing_parents.contains(&parent_id) { error!( target: "event_graph::protocol::handle_event_put()", "[EVENTGRAPH] Peer {} replied with a wrong event: {}", - self.channel.address(), parent.header.id(), + self.channel.address(), parent.id(), ); self.channel.stop().await; return Err(Error::ChannelStopped) @@ -407,7 +431,7 @@ impl ProtocolEventGraph { debug!( target: "event_graph::protocol::handle_event_put()", - "Got correct parent event {}", parent.header.id(), + "Got correct parent event {}", parent.id(), ); if let Some(layer_events) = received_events.get_mut(&parent.header.layer) { @@ -430,7 +454,10 @@ impl ProtocolEventGraph { !received_events_hashes.contains(upper_parent) && !self .event_graph - .main_dag + .dag_store + .read() + .await + .get_dag(&hdr_tree_name) .contains_key(upper_parent.as_bytes()) .unwrap() { @@ -453,13 +480,13 @@ impl ProtocolEventGraph { } } let headers = events.iter().map(|x| x.header.clone()).collect(); - if self.event_graph.header_dag_insert(headers).await.is_err() { + if self.event_graph.header_dag_insert(headers, &dag_name).await.is_err() { self.clone().increase_malicious_count().await?; continue } // FIXME if !self.event_graph.fast_mode { - if self.event_graph.dag_insert(&events).await.is_err() { + if self.event_graph.dag_insert(&events, &dag_name).await.is_err() { self.clone().increase_malicious_count().await?; continue } @@ -473,12 +500,17 @@ impl ProtocolEventGraph { target: "event_graph::protocol::handle_event_put()", "Got all parents necessary for insertion", ); - if self.event_graph.header_dag_insert(vec![event.header.clone()]).await.is_err() { + if self + .event_graph + .header_dag_insert(vec![event.header.clone()], &dag_name) + .await + .is_err() + { self.clone().increase_malicious_count().await?; continue } - if self.event_graph.dag_insert(&[event.clone()]).await.is_err() { + if self.event_graph.dag_insert(&[event.clone()], &dag_name).await.is_err() { self.clone().increase_malicious_count().await?; continue } @@ -522,7 +554,20 @@ impl ProtocolEventGraph { // reading our db and steal our bandwidth. let mut events = vec![]; for event_id in event_ids.iter() { - if !self.event_graph.header_dag.contains_key(event_id.as_bytes())? { + if let Ok(event) = self + .event_graph + .fetch_event_from_dags(event_id) + .await? + .ok_or(Error::EventNotFound("The requested event is not found".to_owned())) + { + // At this point we should have it in our DAG. + // This code panics if this is not the case. + debug!( + target: "event_graph::protocol::handle_event_req()", + "Fetching event {:?} from DAG", event_id, + ); + events.push(event); + } else { let malicious_count = self.malicious_count.fetch_add(1, SeqCst); if malicious_count + 1 == MALICIOUS_THRESHOLD { error!( @@ -541,20 +586,6 @@ impl ProtocolEventGraph { ); continue } - - // At this point we should have it in our DAG. - // This code panics if this is not the case. - debug!( - target: "event_graph::protocol::handle_event_req()", - "Fetching event {:?} from DAG", event_id, - ); - - events.push( - self.event_graph - .dag_get(event_id) - .await? - .ok_or(Error::EventNotFound("Event Not Found in DAG".to_owned()))?, - ); } // Check if the incoming event is older than the genesis event. If so, something @@ -569,7 +600,7 @@ impl ProtocolEventGraph { target: "event_graph::protocol::handle_event_req()", "Requested event by peer {} is older than previous rotation period. It should have been pruned. Event timestamp: `{}`. Genesis timestamp: `{}`", - event.header.id(), event.header.timestamp, genesis_timestamp + event.id(), event.header.timestamp, genesis_timestamp ); } @@ -591,16 +622,15 @@ impl ProtocolEventGraph { } } - // async fn handle_header_req(self: Arc) -> Result<()> { - // Ok(()) - // } - /// Protocol function handling `HeaderReq`. /// This is triggered whenever someone requests syncing headers by /// sending their current headers. - async fn handle_header_rep(self: Arc) -> Result<()> { + async fn handle_header_req(self: Arc) -> Result<()> { loop { - self.hdr_req_sub.receive().await?; + let dag_name = match self.hdr_req_sub.receive().await { + Ok(v) => v.0.clone(), + Err(_) => continue, + }; trace!( target: "event_graph::protocol::handle_tip_req()", "Got TipReq [{}]", self.channel.address(), @@ -619,8 +649,9 @@ impl ProtocolEventGraph { // We received header request. Let's find them, add them to // our bcast ids list, and reply with them. + let main_dag = self.event_graph.dag_store.read().await.get_dag(&dag_name); let mut headers = vec![]; - for item in self.event_graph.main_dag.iter() { + for item in main_dag.iter() { let (_, event) = item.unwrap(); let event: Event = deserialize_async(&event).await.unwrap(); if !headers.contains(&event.header) || event.header.layer != 0 { @@ -645,7 +676,10 @@ impl ProtocolEventGraph { /// tips of our DAG. async fn handle_tip_req(self: Arc) -> Result<()> { loop { - self.tip_req_sub.receive().await?; + let dag_name = match self.tip_req_sub.receive().await { + Ok(v) => v.0.clone(), + Err(_) => continue, + }; trace!( target: "event_graph::protocol::handle_tip_req()", "Got TipReq [{}]", self.channel.address(), @@ -664,7 +698,13 @@ impl ProtocolEventGraph { // We received a tip request. Let's find them, add them to // our bcast ids list, and reply with them. - let layers = self.event_graph.unreferenced_tips.read().await.clone(); + let dag_name_hash = blake3::Hash::from_str(&dag_name).unwrap(); + let store = self.event_graph.dag_store.read().await; + let (_, layers) = match store.header_dags.get(&dag_name_hash) { + Some(v) => v, + None => continue, + }; + // let layers = self.event_graph.dag_store.read().await.find_unreferenced_tips(&dag_name).await; let mut bcast_ids = self.event_graph.broadcasted_ids.write().await; for (_, tips) in layers.iter() { for tip in tips { @@ -673,7 +713,7 @@ impl ProtocolEventGraph { } drop(bcast_ids); - self.channel.send(&TipRep(layers)).await?; + self.channel.send(&TipRep(layers.clone())).await?; } } diff --git a/src/event_graph/tests.rs b/src/event_graph/tests.rs index 6363dbc73..d76874ab9 100644 --- a/src/event_graph/tests.rs +++ b/src/event_graph/tests.rs @@ -93,7 +93,7 @@ async fn spawn_node( let p2p = P2p::new(settings, ex.clone()).await.unwrap(); let sled_db = sled::Config::new().temporary(true).open().unwrap(); let event_graph = - EventGraph::new(p2p.clone(), sled_db, "/tmp".into(), false, false, "dag", 1, ex.clone()) + EventGraph::new(p2p.clone(), sled_db, "/tmp".into(), false, false, 1, ex.clone()) .await .unwrap(); *event_graph.synced.write().await = true; @@ -156,17 +156,22 @@ async fn bootstrap_nodes( async fn assert_dags(eg_instances: &[Arc], expected_len: usize, rng: &mut ThreadRng) { let random_node = eg_instances.choose(rng).unwrap(); - let last_layer_tips = - random_node.unreferenced_tips.read().await.last_key_value().unwrap().1.clone(); + let random_node_genesis = random_node.current_genesis.read().await.id(); + let store = random_node.dag_store.read().await; + let (_, unreferenced_tips) = store.main_dags.get(&random_node_genesis).unwrap(); + let last_layer_tips = unreferenced_tips.last_key_value().unwrap().1.clone(); for (i, eg) in eg_instances.iter().enumerate() { - let node_last_layer_tips = - eg.unreferenced_tips.read().await.last_key_value().unwrap().1.clone(); + let current_genesis = eg.current_genesis.read().await; + let dag_name = current_genesis.id().to_string(); + let dag = eg.dag_store.read().await.get_dag(&dag_name); + let unreferenced_tips = eg.dag_store.read().await.find_unreferenced_tips(&dag).await; + let node_last_layer_tips = unreferenced_tips.last_key_value().unwrap().1.clone(); assert!( - eg.main_dag.len() == expected_len, + dag.len() == expected_len, "Node {}, expected {} events, have {}", i, expected_len, - eg.main_dag.len() + dag.len() ); assert_eq!( node_last_layer_tips, last_layer_tips, @@ -210,9 +215,13 @@ async fn eventgraph_propagation_real(ex: Arc>) { // Grab genesis event let random_node = eg_instances.choose(&mut rng).unwrap(); - let (id, _) = random_node.main_dag.last().unwrap().unwrap(); + let current_genesis = random_node.current_genesis.read().await; + let dag_name = current_genesis.id().to_string(); + let (id, _) = random_node.dag_store.read().await.get_dag(&dag_name).last().unwrap().unwrap(); let genesis_event_id = blake3::Hash::from_bytes((&id as &[u8]).try_into().unwrap()); + drop(current_genesis); + // ========================================= // 1. Assert that everyone's DAG is the same // ========================================= @@ -222,16 +231,22 @@ async fn eventgraph_propagation_real(ex: Arc>) { // 2. Create an event in one node and publish // ========================================== let random_node = eg_instances.choose(&mut rng).unwrap(); + let current_genesis = random_node.current_genesis.read().await; + let dag_name = current_genesis.id().to_string(); let event = Event::new(vec![1, 2, 3, 4], random_node).await; assert!(event.header.parents.contains(&genesis_event_id)); // The node adds it to their DAG, on layer 1. - random_node.header_dag_insert(vec![event.header.clone()]).await.unwrap(); - let event_id = random_node.dag_insert(&[event.clone()]).await.unwrap()[0]; - let tips_layers = random_node.unreferenced_tips.read().await; + random_node.header_dag_insert(vec![event.header.clone()], &dag_name).await.unwrap(); + let event_id = random_node.dag_insert(&[event.clone()], &dag_name).await.unwrap()[0]; + + let store = random_node.dag_store.read().await; + let (_, tips_layers) = store.header_dags.get(¤t_genesis.id()).unwrap(); + // Since genesis was referenced, its layer (0) have been removed assert_eq!(tips_layers.len(), 1); assert!(tips_layers.last_key_value().unwrap().1.get(&event_id).is_some()); - drop(tips_layers); + drop(store); + drop(current_genesis); info!("Broadcasting event {}", event_id); random_node.p2p.broadcast(&EventPut(event)).await; info!("Waiting 5s for event propagation"); @@ -249,20 +264,25 @@ async fn eventgraph_propagation_real(ex: Arc>) { // ============================================================== let random_node = eg_instances.choose(&mut rng).unwrap(); let event0 = Event::new(vec![1, 2, 3, 4, 0], random_node).await; - random_node.header_dag_insert(vec![event0.header.clone()]).await.unwrap(); - let event0_id = random_node.dag_insert(&[event0.clone()]).await.unwrap()[0]; + random_node.header_dag_insert(vec![event0.header.clone()], &dag_name).await.unwrap(); + let event0_id = random_node.dag_insert(&[event0.clone()], &dag_name).await.unwrap()[0]; let event1 = Event::new(vec![1, 2, 3, 4, 1], random_node).await; - random_node.header_dag_insert(vec![event1.header.clone()]).await.unwrap(); - let event1_id = random_node.dag_insert(&[event1.clone()]).await.unwrap()[0]; + random_node.header_dag_insert(vec![event1.header.clone()], &dag_name).await.unwrap(); + let event1_id = random_node.dag_insert(&[event1.clone()], &dag_name).await.unwrap()[0]; let event2 = Event::new(vec![1, 2, 3, 4, 2], random_node).await; - random_node.header_dag_insert(vec![event2.header.clone()]).await.unwrap(); - let event2_id = random_node.dag_insert(&[event2.clone()]).await.unwrap()[0]; + random_node.header_dag_insert(vec![event2.header.clone()], &dag_name).await.unwrap(); + let event2_id = random_node.dag_insert(&[event2.clone()], &dag_name).await.unwrap()[0]; // Genesis event + event from 2. + upper 3 events (layer 4) - assert_eq!(random_node.main_dag.len(), 5); - let tips_layers = random_node.unreferenced_tips.read().await; + let current_genesis = random_node.current_genesis.read().await; + let dag_name = current_genesis.id().to_string(); + assert_eq!(random_node.dag_store.read().await.get_dag(&dag_name).len(), 5); + let random_node_genesis = random_node.current_genesis.read().await.id(); + let store = random_node.dag_store.read().await; + let (_, tips_layers) = store.header_dags.get(&random_node_genesis).unwrap(); assert_eq!(tips_layers.len(), 1); assert!(tips_layers.get(&4).unwrap().get(&event2_id).is_some()); - drop(tips_layers); + drop(current_genesis); + drop(store); let event_chain = vec![ (event0_id, event0.header.parents), @@ -288,20 +308,20 @@ async fn eventgraph_propagation_real(ex: Arc>) { // ======= let node1 = eg_instances.choose(&mut rng).unwrap(); let event0_1 = Event::new(vec![1, 2, 3, 4, 3], node1).await; - node1.header_dag_insert(vec![event0_1.header.clone()]).await.unwrap(); - node1.dag_insert(&[event0_1.clone()]).await.unwrap(); + node1.header_dag_insert(vec![event0_1.header.clone()], &dag_name).await.unwrap(); + node1.dag_insert(&[event0_1.clone()], &dag_name).await.unwrap(); node1.p2p.broadcast(&EventPut(event0_1)).await; msleep(300).await; let event1_1 = Event::new(vec![1, 2, 3, 4, 4], node1).await; - node1.header_dag_insert(vec![event1_1.header.clone()]).await.unwrap(); - node1.dag_insert(&[event1_1.clone()]).await.unwrap(); + node1.header_dag_insert(vec![event1_1.header.clone()], &dag_name).await.unwrap(); + node1.dag_insert(&[event1_1.clone()], &dag_name).await.unwrap(); node1.p2p.broadcast(&EventPut(event1_1)).await; msleep(300).await; let event2_1 = Event::new(vec![1, 2, 3, 4, 5], node1).await; - node1.header_dag_insert(vec![event2_1.header.clone()]).await.unwrap(); - node1.dag_insert(&[event2_1.clone()]).await.unwrap(); + node1.header_dag_insert(vec![event2_1.header.clone()], &dag_name).await.unwrap(); + node1.dag_insert(&[event2_1.clone()], &dag_name).await.unwrap(); node1.p2p.broadcast(&EventPut(event2_1)).await; msleep(300).await; @@ -310,20 +330,20 @@ async fn eventgraph_propagation_real(ex: Arc>) { // ======= let node2 = eg_instances.choose(&mut rng).unwrap(); let event0_2 = Event::new(vec![1, 2, 3, 4, 6], node2).await; - node2.header_dag_insert(vec![event0_2.header.clone()]).await.unwrap(); - node2.dag_insert(&[event0_2.clone()]).await.unwrap(); + node2.header_dag_insert(vec![event0_2.header.clone()], &dag_name).await.unwrap(); + node2.dag_insert(&[event0_2.clone()], &dag_name).await.unwrap(); node2.p2p.broadcast(&EventPut(event0_2)).await; msleep(300).await; let event1_2 = Event::new(vec![1, 2, 3, 4, 7], node2).await; - node2.header_dag_insert(vec![event1_2.header.clone()]).await.unwrap(); - node2.dag_insert(&[event1_2.clone()]).await.unwrap(); + node2.header_dag_insert(vec![event1_2.header.clone()], &dag_name).await.unwrap(); + node2.dag_insert(&[event1_2.clone()], &dag_name).await.unwrap(); node2.p2p.broadcast(&EventPut(event1_2)).await; msleep(300).await; let event2_2 = Event::new(vec![1, 2, 3, 4, 8], node2).await; - node2.header_dag_insert(vec![event2_2.header.clone()]).await.unwrap(); - node2.dag_insert(&[event2_2.clone()]).await.unwrap(); + node2.header_dag_insert(vec![event2_2.header.clone()], &dag_name).await.unwrap(); + node2.dag_insert(&[event2_2.clone()], &dag_name).await.unwrap(); node2.p2p.broadcast(&EventPut(event2_2)).await; msleep(300).await; @@ -332,20 +352,20 @@ async fn eventgraph_propagation_real(ex: Arc>) { // ======= let node3 = eg_instances.choose(&mut rng).unwrap(); let event0_3 = Event::new(vec![1, 2, 3, 4, 9], node3).await; - node3.header_dag_insert(vec![event0_3.header.clone()]).await.unwrap(); - node3.dag_insert(&[event0_3.clone()]).await.unwrap(); + node3.header_dag_insert(vec![event0_3.header.clone()], &dag_name).await.unwrap(); + node3.dag_insert(&[event0_3.clone()], &dag_name).await.unwrap(); node3.p2p.broadcast(&EventPut(event0_3)).await; msleep(300).await; let event1_3 = Event::new(vec![1, 2, 3, 4, 10], node3).await; - node3.header_dag_insert(vec![event1_3.header.clone()]).await.unwrap(); - node3.dag_insert(&[event1_3.clone()]).await.unwrap(); + node3.header_dag_insert(vec![event1_3.header.clone()], &dag_name).await.unwrap(); + node3.dag_insert(&[event1_3.clone()], &dag_name).await.unwrap(); node3.p2p.broadcast(&EventPut(event1_3)).await; msleep(300).await; let event2_3 = Event::new(vec![1, 2, 3, 4, 11], node3).await; - node3.header_dag_insert(vec![event2_3.header.clone()]).await.unwrap(); - node3.dag_insert(&[event2_3.clone()]).await.unwrap(); + node3.header_dag_insert(vec![event2_3.header.clone()], &dag_name).await.unwrap(); + node3.dag_insert(&[event2_3.clone()], &dag_name).await.unwrap(); node3.p2p.broadcast(&EventPut(event2_3)).await; msleep(300).await; @@ -401,7 +421,7 @@ async fn eventgraph_propagation_real(ex: Arc>) { info!("Waiting 5s for new node connection"); sleep(5).await; - event_graph.dag_sync(false).await.unwrap() + event_graph.sync_selected(1, false).await.unwrap(); } // ============================================================ @@ -440,8 +460,10 @@ async fn eventgraph_chaotic_propagation_real(ex: Arc>) { for i in 0..n_events { let random_node = eg_instances.choose(&mut rng).unwrap(); let event = Event::new(i.to_be_bytes().to_vec(), random_node).await; - random_node.header_dag_insert(vec![event.header.clone()]).await.unwrap(); - random_node.dag_insert(&[event.clone()]).await.unwrap(); + let current_genesis = random_node.current_genesis.read().await; + let dag_name = current_genesis.id().to_string(); + random_node.header_dag_insert(vec![event.header.clone()], &dag_name).await.unwrap(); + random_node.dag_insert(&[event.clone()], &dag_name).await.unwrap(); random_node.p2p.broadcast(&EventPut(event)).await; } info!("Waiting 5s for events propagation"); @@ -480,7 +502,7 @@ async fn eventgraph_chaotic_propagation_real(ex: Arc>) { info!("Waiting 5s for new node connection"); sleep(5).await; - event_graph.dag_sync(false).await.unwrap() + event_graph.sync_selected(2, false).await.unwrap() } // ============================================================