diff --git a/script/evgrd/bin/evgrd.rs b/script/evgrd/bin/evgrd.rs index 788e8f392..124f35573 100644 --- a/script/evgrd/bin/evgrd.rs +++ b/script/evgrd/bin/evgrd.rs @@ -18,7 +18,7 @@ use darkfi::{ async_daemonize, cli_desc, - event_graph::{self, proto::ProtocolEventGraph, EventGraph, EventGraphPtr}, + event_graph::{proto::ProtocolEventGraph, Event, EventGraph, EventGraphPtr}, net::{ session::SESSION_DEFAULT, settings::SettingsOpt as NetSettingsOpt, @@ -29,24 +29,16 @@ use darkfi::{ jsonrpc::JsonSubscriber, server::{listen_and_serve, RequestHandler}, }, - system::{sleep, Publisher, PublisherPtr, StoppableTask, StoppableTaskPtr}, - util::path::{expand_path, get_config_path}, + system::{sleep, StoppableTask, StoppableTaskPtr}, + util::path::expand_path, Error, Result, }; -use darkfi_serial::{ - async_trait, deserialize_async, serialize_async, AsyncDecodable, AsyncEncodable, Encodable, - SerialDecodable, SerialEncodable, -}; +use darkfi_serial::{AsyncDecodable, AsyncEncodable}; use futures::FutureExt; use log::{debug, error, info}; -use rand::rngs::OsRng; use sled_overlay::sled; use smol::{fs, lock::Mutex, stream::StreamExt, Executor}; -use std::{ - collections::HashSet, - path::PathBuf, - sync::{Arc, Mutex as SyncMutex}, -}; +use std::{collections::HashSet, path::PathBuf, sync::Arc}; use structopt_toml::{serde::Deserialize, structopt::StructOpt, StructOptToml}; use url::Url; @@ -102,7 +94,7 @@ struct Args { sync_attempts: u8, /// Number of seconds to wait before trying again if sync fails. - #[structopt(long, default_value = "10")] + #[structopt(long, default_value = "50")] sync_timeout: u8, /// P2P network settings @@ -170,14 +162,12 @@ async fn rpc_serve( } } } - - Ok(()) } async fn handle_connect( mut stream: Box, daemon: Arc, - ex: Arc>, + _ex: Arc>, ) -> Result<()> { let client_version = VersionMessage::decode_async(&mut stream).await?; info!(target: "evgrd", "Client version: {}", client_version.protocol_version); @@ -204,6 +194,8 @@ async fn handle_connect( info!(target: "evgrd", "Fetching events {fetchevs:?}"); let events = daemon.event_graph.fetch_successors_of(fetchevs.unref_tips).await?; + info!("fetched {events:?}"); + for event in events { MSG_EVENT.encode_async(&mut stream).await?; event.encode_async(&mut stream).await?; @@ -234,12 +226,18 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> { sled_db.clone(), replay_datastore.clone(), replay_mode, - "darkirc_dag", + "evgrd_dag", 1, ex.clone(), ) .await?; + // Adding some events + // for i in 1..6 { + // let event = Event::new(vec![1, 2, 3, i], &event_graph).await; + // event_graph.dag_insert(&[event.clone()]).await.unwrap(); + // } + let prune_task = event_graph.prune_task.get().unwrap(); info!("Registering EventGraph P2P protocol"); diff --git a/script/evgrd/bin/test.rs b/script/evgrd/bin/test.rs index ded87b7df..ee231a13e 100644 --- a/script/evgrd/bin/test.rs +++ b/script/evgrd/bin/test.rs @@ -17,33 +17,27 @@ */ use darkfi::{ - async_daemonize, cli_desc, - event_graph::{self, proto::ProtocolEventGraph, EventGraph, EventGraphPtr}, - net::{ - session::SESSION_DEFAULT, - settings::SettingsOpt as NetSettingsOpt, - transport::{Dialer, Listener, PtListener, PtStream}, - P2p, P2pPtr, - }, - rpc::{ - jsonrpc::JsonSubscriber, - server::{listen_and_serve, RequestHandler}, - }, - system::{sleep, StoppableTask, StoppableTaskPtr}, - util::path::{expand_path, get_config_path}, + event_graph::{self}, + net::transport::Dialer, + util::path::expand_path, Error, Result, }; -use darkfi_serial::{ - async_trait, deserialize_async, serialize_async, AsyncDecodable, AsyncEncodable, Encodable, - SerialDecodable, SerialEncodable, -}; -use log::{debug, error, info, warn}; +use darkfi_serial::{AsyncDecodable, AsyncEncodable}; +use log::{error, info}; +use sled_overlay::sled; +use smol::fs; use url::Url; use evgrd::{FetchEventsMessage, LocalEventGraph, VersionMessage, MSG_EVENT, MSG_FETCHEVENTS}; async fn amain() -> Result<()> { - let evgr = LocalEventGraph::new(); + info!("Instantiating event DAG"); + let ex = std::sync::Arc::new(smol::Executor::new()); + let datastore = expand_path("~/.local/darkfi/evgrd")?; + fs::create_dir_all(&datastore).await?; + let sled_db = sled::open(datastore)?; + + let evgr = LocalEventGraph::new(sled_db.clone(), "evgrd_testdag", 1, ex.clone()).await?; let endpoint = "tcp://127.0.0.1:5588"; let endpoint = Url::parse(endpoint)?; @@ -51,33 +45,44 @@ async fn amain() -> Result<()> { let dialer = Dialer::new(endpoint, None).await?; let timeout = std::time::Duration::from_secs(60); - info!("Connecting..."); + println!("Connecting..."); let mut stream = dialer.dial(Some(timeout)).await?; - info!("Connected!"); + println!("Connected!"); let version = VersionMessage::new(); version.encode_async(&mut stream).await?; let server_version = VersionMessage::decode_async(&mut stream).await?; - info!("Server version: {}", server_version.protocol_version); + println!("Server version: {}", server_version.protocol_version); - let fetchevs = FetchEventsMessage::new(evgr.unref_tips.clone()); + let unref_tips = evgr.unreferenced_tips.read().await.clone(); + let fetchevs = FetchEventsMessage::new(unref_tips); MSG_FETCHEVENTS.encode_async(&mut stream).await?; fetchevs.encode_async(&mut stream).await?; loop { let msg_type = u8::decode_async(&mut stream).await?; + println!("Received: {msg_type:?}"); if msg_type != MSG_EVENT { error!("Received invalid msg_type: {msg_type}"); return Err(Error::MalformedPacket) } let ev = event_graph::Event::decode_async(&mut stream).await?; - } - Ok(()) + let genesis_timestamp = evgr.current_genesis.read().await.clone().timestamp; + let ev_id = ev.id(); + if !evgr.dag.contains_key(ev_id.as_bytes()).unwrap() && + ev.validate(&evgr.dag, genesis_timestamp, evgr.days_rotation, None).await? + { + println!("got {ev:?}"); + evgr.dag_insert(&[ev]).await.unwrap(); + } else { + println!("Event is invalid!") + } + } } fn main() { - smol::block_on(amain()); + let _ = smol::block_on(amain()); } diff --git a/script/evgrd/src/lib.rs b/script/evgrd/src/lib.rs index 97884ebc7..2a53f6ed3 100644 --- a/script/evgrd/src/lib.rs +++ b/script/evgrd/src/lib.rs @@ -16,21 +16,336 @@ * along with this program. If not, see . */ -use darkfi::event_graph::Event; -use darkfi_serial::{ - async_trait, deserialize_async, serialize_async, Encodable, SerialDecodable, SerialEncodable, +use darkfi::{ + event_graph::{ + util::{generate_genesis, next_rotation_timestamp, seconds_until_next_rotation}, + Event, GENESIS_CONTENTS, INITIAL_GENESIS, NULL_ID, N_EVENT_PARENTS, + }, + system::{sleep, Publisher, PublisherPtr, StoppableTask, StoppableTaskPtr}, + Error, Result, +}; +use darkfi_serial::{ + async_trait, deserialize_async, serialize_async, SerialDecodable, SerialEncodable, +}; +use log::{debug, error, info}; +use sled_overlay::{sled, SledTreeOverlay}; +use smol::{ + lock::{OnceCell, RwLock}, + Executor, +}; +use std::{ + collections::{BTreeMap, HashSet}, + sync::Arc, }; -use std::sync::Arc; pub const PROTOCOL_VERSION: u32 = 1; +/// Atomic pointer to an [`EventGraph`] instance. +pub type LocalEventGraphPtr = Arc; + pub struct LocalEventGraph { - pub unref_tips: Vec<(u64, blake3::Hash)>, + /// Sled tree containing the DAG + pub dag: sled::Tree, + /// The set of unreferenced DAG tips + pub unreferenced_tips: RwLock>>, + /// A `HashSet` containg event IDs and their 1-level parents. + /// These come from the events we've sent out using `EventPut`. + /// They are used with `EventReq` to decide if we should reply + /// or not. Additionally it is also used when we broadcast the + /// `TipRep` message telling peers about our unreferenced tips. + broadcasted_ids: RwLock>, + /// DAG Pruning Task + pub prune_task: OnceCell, + /// Event publisher, this notifies whenever an event is + /// inserted into the DAG + pub event_pub: PublisherPtr, + /// Current genesis event + pub current_genesis: RwLock, + /// Currently configured DAG rotation, in days + pub days_rotation: u64, + /// Flag signalling DAG has finished initial sync + pub synced: RwLock, + /// Enable graph debugging + pub deg_enabled: RwLock, } impl LocalEventGraph { - pub fn new() -> Self { - Self { unref_tips: vec![] } + pub async fn new( + sled_db: sled::Db, + dag_tree_name: &str, + days_rotation: u64, + ex: Arc>, + ) -> Result { + let dag = sled_db.open_tree(dag_tree_name)?; + let unreferenced_tips = RwLock::new(BTreeMap::new()); + let broadcasted_ids = RwLock::new(HashSet::new()); + let event_pub = Publisher::new(); + + // Create the current genesis event based on the `days_rotation` + let current_genesis = generate_genesis(days_rotation); + let self_ = Arc::new(Self { + dag: dag.clone(), + unreferenced_tips, + broadcasted_ids, + prune_task: OnceCell::new(), + event_pub, + current_genesis: RwLock::new(current_genesis.clone()), + days_rotation, + synced: RwLock::new(false), + deg_enabled: RwLock::new(false), + }); + + // Check if we have it in our DAG. + // If not, we can prune the DAG and insert this new genesis event. + if !dag.contains_key(current_genesis.id().as_bytes())? { + info!( + target: "event_graph::new()", + "[EVENTGRAPH] DAG does not contain current genesis, pruning existing data", + ); + self_.dag_prune(current_genesis).await?; + } + + // Find the unreferenced tips in the current DAG state. + *self_.unreferenced_tips.write().await = self_.find_unreferenced_tips().await; + + // Spawn the DAG pruning task + if days_rotation > 0 { + let prune_task = StoppableTask::new(); + let _ = self_.prune_task.set(prune_task.clone()).await; + + prune_task.clone().start( + self_.clone().dag_prune_task(days_rotation), + |_| async move { + info!(target: "event_graph::_handle_stop()", "[EVENTGRAPH] Prune task stopped, flushing sled") + }, + Error::DetachedTaskStopped, + ex.clone(), + ); + } + + Ok(self_) + } + + async fn dag_prune(&self, genesis_event: Event) -> Result<()> { + debug!(target: "event_graph::dag_prune()", "Pruning DAG..."); + + // Acquire exclusive locks to unreferenced_tips, broadcasted_ids and + // current_genesis while this operation is happening. We do this to + // ensure that during the pruning operation, no other operations are + // able to access the intermediate state which could lead to producing + // the wrong state after pruning. + let mut unreferenced_tips = self.unreferenced_tips.write().await; + let mut broadcasted_ids = self.broadcasted_ids.write().await; + let mut current_genesis = self.current_genesis.write().await; + + // Atomically clear the DAG and write the new genesis event. + let mut batch = sled::Batch::default(); + for key in self.dag.iter().keys() { + batch.remove(key.unwrap()); + } + batch.insert(genesis_event.id().as_bytes(), serialize_async(&genesis_event).await); + + debug!(target: "event_graph::dag_prune()", "Applying batch..."); + if let Err(e) = self.dag.apply_batch(batch) { + panic!("Failed pruning DAG, sled apply_batch error: {}", e); + } + + // Clear unreferenced tips and bcast ids + *unreferenced_tips = BTreeMap::new(); + unreferenced_tips.insert(0, HashSet::from([genesis_event.id()])); + *current_genesis = genesis_event; + *broadcasted_ids = HashSet::new(); + drop(unreferenced_tips); + drop(broadcasted_ids); + drop(current_genesis); + + debug!(target: "event_graph::dag_prune()", "DAG pruned successfully"); + Ok(()) + } + + /// Background task periodically pruning the DAG. + async fn dag_prune_task(self: Arc, days_rotation: u64) -> Result<()> { + // The DAG should periodically be pruned. This can be a configurable + // parameter. By pruning, we should deterministically replace the + // genesis event (can use a deterministic timestamp) and drop everything + // in the DAG, leaving just the new genesis event. + debug!(target: "event_graph::dag_prune_task()", "Spawned background DAG pruning task"); + + loop { + // Find the next rotation timestamp: + let next_rotation = next_rotation_timestamp(INITIAL_GENESIS, days_rotation); + + // Prepare the new genesis event + let current_genesis = Event { + timestamp: next_rotation, + content: GENESIS_CONTENTS.to_vec(), + parents: [NULL_ID; N_EVENT_PARENTS], + layer: 0, + }; + + // Sleep until it's time to rotate. + let s = seconds_until_next_rotation(next_rotation); + + debug!(target: "event_graph::dag_prune_task()", "Sleeping {}s until next DAG prune", s); + sleep(s).await; + debug!(target: "event_graph::dag_prune_task()", "Rotation period reached"); + + // Trigger DAG prune + self.dag_prune(current_genesis).await?; + } + } + + /// Find the unreferenced tips in the current DAG state, mapped by their layers. + async fn find_unreferenced_tips(&self) -> BTreeMap> { + // First get all the event IDs + let mut tips = HashSet::new(); + for iter_elem in self.dag.iter() { + let (id, _) = iter_elem.unwrap(); + let id = blake3::Hash::from_bytes((&id as &[u8]).try_into().unwrap()); + tips.insert(id); + } + + // Iterate again to find unreferenced IDs + for iter_elem in self.dag.iter() { + let (_, event) = iter_elem.unwrap(); + let event: Event = deserialize_async(&event).await.unwrap(); + for parent in event.parents.iter() { + tips.remove(parent); + } + } + + // Build the layers map + let mut map: BTreeMap> = BTreeMap::new(); + for tip in tips { + let event = self.dag_get(&tip).await.unwrap().unwrap(); + if let Some(layer_tips) = map.get_mut(&event.layer) { + layer_tips.insert(tip); + } else { + let mut layer_tips = HashSet::new(); + layer_tips.insert(tip); + map.insert(event.layer, layer_tips); + } + } + + map + } + + pub async fn dag_insert(&self, events: &[Event]) -> Result> { + // Sanity check + if events.is_empty() { + return Ok(vec![]) + } + + // Acquire exclusive locks to `unreferenced_tips and broadcasted_ids` + let mut unreferenced_tips = self.unreferenced_tips.write().await; + let mut broadcasted_ids = self.broadcasted_ids.write().await; + + // Here we keep the IDs to return + let mut ids = Vec::with_capacity(events.len()); + + // Create an overlay over the DAG tree + let mut overlay = SledTreeOverlay::new(&self.dag); + + // Grab genesis timestamp + let genesis_timestamp = self.current_genesis.read().await.timestamp; + + // Iterate over given events to validate them and + // write them to the overlay + for event in events { + let event_id = event.id(); + debug!( + target: "event_graph::dag_insert()", + "Inserting event {} into the DAG", event_id, + ); + + if !event + .validate(&self.dag, genesis_timestamp, self.days_rotation, Some(&overlay)) + .await? + { + error!(target: "event_graph::dag_insert()", "Event {} is invalid!", event_id); + return Err(Error::EventIsInvalid) + } + + let event_se = serialize_async(event).await; + + // Add the event to the overlay + overlay.insert(event_id.as_bytes(), &event_se)?; + + // Note down the event ID to return + ids.push(event_id); + } + + // Aggregate changes into a single batch + let batch = overlay.aggregate().unwrap(); + + // Atomically apply the batch. + // Panic if something is corrupted. + if let Err(e) = self.dag.apply_batch(batch) { + panic!("Failed applying dag_insert batch to sled: {}", e); + } + + // Iterate over given events to update references and + // send out notifications about them + for event in events { + let event_id = event.id(); + + // Update the unreferenced DAG tips set + debug!( + target: "event_graph::dag_insert()", + "Event {} parents {:#?}", event_id, event.parents, + ); + for parent_id in event.parents.iter() { + if parent_id != &NULL_ID { + debug!( + target: "event_graph::dag_insert()", + "Removing {} from unreferenced_tips", parent_id, + ); + + // Iterate over unreferenced tips in previous layers + // and remove the parent + // NOTE: this might be too exhaustive, but the + // assumption is that previous layers unreferenced + // tips will be few. + for (layer, tips) in unreferenced_tips.iter_mut() { + if layer >= &event.layer { + continue + } + tips.remove(parent_id); + } + broadcasted_ids.insert(*parent_id); + } + } + unreferenced_tips.retain(|_, tips| !tips.is_empty()); + debug!( + target: "event_graph::dag_insert()", + "Adding {} to unreferenced tips", event_id, + ); + + if let Some(layer_tips) = unreferenced_tips.get_mut(&event.layer) { + layer_tips.insert(event_id); + } else { + let mut layer_tips = HashSet::new(); + layer_tips.insert(event_id); + unreferenced_tips.insert(event.layer, layer_tips); + } + + // Send out notifications about the new event + self.event_pub.notify(event.clone()).await; + } + + // Drop the exclusive locks + drop(unreferenced_tips); + drop(broadcasted_ids); + + Ok(ids) + } + + /// Fetch an event from the DAG + pub async fn dag_get(&self, event_id: &blake3::Hash) -> Result> { + let Some(bytes) = self.dag.get(event_id.as_bytes())? else { return Ok(None) }; + let event: Event = deserialize_async(&bytes).await?; + + Ok(Some(event)) } } @@ -45,13 +360,19 @@ impl VersionMessage { } } +impl Default for VersionMessage { + fn default() -> Self { + Self::new() + } +} + #[derive(Debug, Clone, SerialEncodable, SerialDecodable)] pub struct FetchEventsMessage { - pub unref_tips: Vec<(u64, blake3::Hash)>, + pub unref_tips: BTreeMap>, } impl FetchEventsMessage { - pub fn new(unref_tips: Vec<(u64, blake3::Hash)>) -> Self { + pub fn new(unref_tips: BTreeMap>) -> Self { Self { unref_tips } } } diff --git a/src/event_graph/event.rs b/src/event_graph/event.rs index 677d83eb2..cc45b524a 100644 --- a/src/event_graph/event.rs +++ b/src/event_graph/event.rs @@ -34,11 +34,11 @@ pub struct Event { /// Timestamp of the event in whole seconds pub timestamp: u64, /// Content of the event - pub(crate) content: Vec, + pub content: Vec, /// Parent nodes in the event DAG - pub(crate) parents: [blake3::Hash; N_EVENT_PARENTS], + pub parents: [blake3::Hash; N_EVENT_PARENTS], /// DAG layer index of the event - pub(crate) layer: u64, + pub layer: u64, } impl Event { diff --git a/src/event_graph/mod.rs b/src/event_graph/mod.rs index 515711fad..e32556266 100644 --- a/src/event_graph/mod.rs +++ b/src/event_graph/mod.rs @@ -60,7 +60,7 @@ pub mod util; use util::{generate_genesis, next_rotation_timestamp}; // Debugging event graph -pub(crate) mod deg; +pub mod deg; use deg::DegEvent; #[cfg(test)] @@ -68,12 +68,12 @@ mod tests; /// Initial genesis timestamp in millis (07 Sep 2023, 00:00:00 UTC) /// Must always be UTC midnight. -const INITIAL_GENESIS: u64 = 1_694_044_800_000; +pub const INITIAL_GENESIS: u64 = 1_694_044_800_000; /// Genesis event contents -const GENESIS_CONTENTS: &[u8] = &[0x47, 0x45, 0x4e, 0x45, 0x53, 0x49, 0x53]; +pub const GENESIS_CONTENTS: &[u8] = &[0x47, 0x45, 0x4e, 0x45, 0x53, 0x49, 0x53]; /// The number of parents an event is supposed to have. -const N_EVENT_PARENTS: usize = 5; +pub const N_EVENT_PARENTS: usize = 5; /// Allowed timestamp drift in milliseconds const EVENT_TIME_DRIFT: u64 = 60_000; /// Null event ID @@ -843,10 +843,13 @@ impl EventGraph { /// Fetch all the events that are on a higher layers than the /// provided ones. - pub async fn fetch_successors_of(&self, tips: Vec<(u64, blake3::Hash)>) -> Result> { + pub async fn fetch_successors_of( + &self, + tips: BTreeMap>, + ) -> Result> { debug!( target: "event_graph::fetch_successors_of()", - "fetching successors of c{tips:?}" + "fetching successors of {tips:?}" ); let mut graph = HashMap::new(); @@ -859,17 +862,22 @@ impl EventGraph { let mut result = vec![]; - for tip in tips.iter() { - if !graph.contains_key(&tip.1) { - continue; + 'outer: for tip in tips.iter() { + for i in tip.1.iter() { + if !graph.contains_key(i) { + continue 'outer; + } } + for (_, ev) in graph.iter() { - if ev.layer > tip.0 && !result.contains(ev) { + if ev.layer > *tip.0 && !result.contains(ev) { result.push(ev.clone()) } } } + result.sort_by(|a, b| a.layer.cmp(&b.layer)); + Ok(result) } diff --git a/src/event_graph/util.rs b/src/event_graph/util.rs index d23051b5f..3bdc49ccc 100644 --- a/src/event_graph/util.rs +++ b/src/event_graph/util.rs @@ -69,7 +69,7 @@ pub(super) fn days_since(midnight_ts: u64) -> u64 { } /// Calculate the timestamp of the next DAG rotation. -pub(super) fn next_rotation_timestamp(starting_timestamp: u64, rotation_period: u64) -> u64 { +pub fn next_rotation_timestamp(starting_timestamp: u64, rotation_period: u64) -> u64 { // Prevent division by 0 if rotation_period == 0 { panic!("Rotation period cannot be 0"); @@ -99,7 +99,7 @@ pub(super) fn next_rotation_timestamp(starting_timestamp: u64, rotation_period: /// Calculate the time in milliseconds until the next_rotation, given /// as a timestamp. /// `next_rotation` here represents a timestamp in UNIX epoch format. -pub(super) fn seconds_until_next_rotation(next_rotation: u64) -> u64 { +pub fn seconds_until_next_rotation(next_rotation: u64) -> u64 { // Store `now` in a variable in order to avoid a TOCTOU error. // There may be a drift of one second between this panic check and // the return value if we get unlucky. @@ -111,7 +111,7 @@ pub(super) fn seconds_until_next_rotation(next_rotation: u64) -> u64 { } /// Generate a deterministic genesis event corresponding to the DAG's configuration. -pub(super) fn generate_genesis(days_rotation: u64) -> Event { +pub fn generate_genesis(days_rotation: u64) -> Event { // Days rotation is u64 except zero let timestamp = if days_rotation == 0 { INITIAL_GENESIS