evgr2: merge multi-dag branch

one is able to sync up to 5 dags each of 24hrs, pruning means dropping the oldest (fifth) dag and prepend a new one.
This commit is contained in:
dasman
2025-09-15 15:36:05 +03:00
parent d85dd658b0
commit 921af416df
15 changed files with 660 additions and 441 deletions

View File

@@ -10,6 +10,11 @@
## TLS secret key path if IRC acceptor uses TLS (optional)
#irc_tls_secret = "/etc/letsencrypt/darkirc/privkey.pem"
## How many DAGs to be synced (currently each DAG represents a 24hr msg
## history counting from UTC midnight), increasing this number means
## you get/sync previous days msg history as well (max. 5)
#dags_count = 1
## Sets Datastore Path
#datastore = "~/.local/share/darkfi/darkirc/darkirc_db"
@@ -42,6 +47,10 @@ autojoin = [
# Set log level. 1 is info (default), 2 is debug, 3 is trace
#verbose = 2
## Running darkirc in header-only mode
## history won't be fetched but DAG sync fast
#fast_mod = false
## JSON-RPC settings
[rpc]
## JSON-RPC listen URL

View File

@@ -194,12 +194,14 @@ impl Client {
// Update the last sent event.
let event_id = event.header.id();
*self.last_sent.write().await = event_id;
let current_genesis = self.server.darkirc.event_graph.current_genesis.read().await;
let dag_name = current_genesis.id().to_string();
// If it fails for some reason, for now, we just note it and pass.
if let Err(e) = self.server.darkirc.event_graph.header_dag_insert(vec![event.header.clone()]).await {
if let Err(e) = self.server.darkirc.event_graph.header_dag_insert(vec![event.header.clone()], &dag_name).await {
error!("[IRC CLIENT] Failed inserting new header to Header DAG: {}", e);
}
if let Err(e) = self.server.darkirc.event_graph.dag_insert(&[event.clone()]).await {
if let Err(e) = self.server.darkirc.event_graph.dag_insert(&[event.clone()], &dag_name).await {
error!("[IRC CLIENT] Failed inserting new event to DAG: {}", e);
} else {
// We sent this, so it should be considered seen.

View File

@@ -90,6 +90,10 @@ struct Args {
/// Optional TLS certificate key file path if `irc_listen` uses TLS
irc_tls_secret: Option<String>,
/// How many DAGs to sync.
#[structopt(short, long, default_value = "1")]
dags_count: usize,
#[structopt(short, long, default_value = "~/.local/share/darkfi/darkirc_db")]
/// Datastore (DB) path
datastore: String,
@@ -358,7 +362,6 @@ async fn realmain(args: Args, ex: Arc<Executor<'static>>) -> Result<()> {
replay_datastore.clone(),
replay_mode,
fast_mode,
"darkirc_dag",
1,
ex.clone(),
)
@@ -502,7 +505,9 @@ async fn realmain(args: Args, ex: Arc<Executor<'static>>) -> Result<()> {
}
// Initial DAG sync
if let Err(e) = sync_task(&p2p, &event_graph, args.skip_dag_sync, args.fast_mode).await {
if let Err(e) =
sync_task(&p2p, &event_graph, args.skip_dag_sync, args.fast_mode, args.dags_count).await
{
error!("DAG sync task failed to start: {e}");
return Err(e);
};
@@ -510,7 +515,13 @@ async fn realmain(args: Args, ex: Arc<Executor<'static>>) -> Result<()> {
// Stoppable task to monitor network and resync on disconnect.
let sync_mon_task = StoppableTask::new();
sync_mon_task.clone().start(
sync_and_monitor(p2p.clone(), event_graph.clone(), args.skip_dag_sync, args.fast_mode),
sync_and_monitor(
p2p.clone(),
event_graph.clone(),
args.skip_dag_sync,
args.fast_mode,
args.dags_count,
),
|res| async move {
match res {
Ok(()) | Err(Error::DetachedTaskStopped) => { /* TODO: */ }
@@ -557,6 +568,7 @@ async fn sync_task(
event_graph: &EventGraphPtr,
skip_dag_sync: bool,
fast_mode: bool,
dags_count: usize,
) -> Result<()> {
let comms_timeout = p2p.settings().read().await.outbound_connect_timeout;
@@ -566,7 +578,7 @@ async fn sync_task(
// We'll attempt to sync for ever
if !skip_dag_sync {
info!("Syncing event DAG");
match event_graph.dag_sync(fast_mode).await {
match event_graph.sync_selected(dags_count, fast_mode).await {
Ok(()) => break,
Err(e) => {
// TODO: Maybe at this point we should prune or something?
@@ -594,6 +606,7 @@ async fn sync_and_monitor(
event_graph: EventGraphPtr,
skip_dag_sync: bool,
fast_mode: bool,
dags_count: usize,
) -> Result<()> {
loop {
let net_subscription = p2p.hosts().subscribe_disconnect().await;
@@ -606,7 +619,7 @@ async fn sync_and_monitor(
// Sync node again
info!("Network disconnection detected, resyncing...");
*event_graph.synced.write().await = false;
sync_task(&p2p, &event_graph, skip_dag_sync, fast_mode).await?;
sync_task(&p2p, &event_graph, skip_dag_sync, fast_mode, dags_count).await?;
}
Err(e) => return Err(e),
}

View File

@@ -131,7 +131,6 @@ async fn realmain(settings: Args, executor: Arc<smol::Executor<'static>>) -> Res
replay_datastore,
replay_mode,
fast_mode,
"genevd_dag",
1,
executor.clone(),
)
@@ -158,7 +157,7 @@ async fn realmain(settings: Args, executor: Arc<smol::Executor<'static>>) -> Res
if !settings.skip_dag_sync {
for i in 1..=6 {
info!("Syncing event DAG (attempt #{})", i);
match event_graph.dag_sync(settings.fast_mode).await {
match event_graph.sync_selected(1, settings.fast_mode).await {
Ok(()) => break,
Err(e) => {
if i == 6 {

View File

@@ -208,7 +208,14 @@ impl JsonRpcInterface {
// Build a DAG event and return it.
let event = Event::new(serialize_async(&genevent).await, &self.event_graph).await;
if let Err(e) = self.event_graph.dag_insert(&[event.clone()]).await {
let current_genesis = self.event_graph.current_genesis.read().await;
let dag_name = current_genesis.id().to_string();
if let Err(e) =
self.event_graph.header_dag_insert(vec![event.header.clone()], &dag_name).await
{
error!("Failed inserting new header to Header DAG: {}", e);
}
if let Err(e) = self.event_graph.dag_insert(&[event.clone()], &dag_name).await {
error!("Failed inserting new event to DAG: {}", e);
} else {
// Otherwise, broadcast it

View File

@@ -321,7 +321,12 @@ async fn start_sync_loop(
// If it fails for some reason, for now, we just note it
// and pass.
if let Err(e) = event_graph.dag_insert(&[event.clone()]).await {
let current_genesis = event_graph.current_genesis.read().await;
let dag_name = current_genesis.id().to_string();
if let Err(e) = event_graph.header_dag_insert(vec![event.header.clone()], &dag_name).await {
error!(target: "taud", "Failed inserting new header to DAG: {}", e);
}
if let Err(e) = event_graph.dag_insert(&[event.clone()], &dag_name).await {
error!(target: "taud", "Failed inserting new event to DAG: {}", e);
} else {
// Otherwise, broadcast it
@@ -526,7 +531,6 @@ async fn realmain(settings: Args, executor: Arc<smol::Executor<'static>>) -> Res
replay_datastore,
replay_mode,
fast_mode,
"taud_dag",
0,
executor.clone(),
)
@@ -555,7 +559,7 @@ async fn realmain(settings: Args, executor: Arc<smol::Executor<'static>>) -> Res
// We'll attempt to sync for ever
if !settings.skip_dag_sync {
info!(target: "taud", "Syncing event DAG");
match event_graph.dag_sync(settings.fast_mode).await {
match event_graph.sync_selected(1, settings.fast_mode).await {
Ok(()) => break,
Err(e) => {
// TODO: Maybe at this point we should prune or something?

View File

@@ -1,6 +1,11 @@
## IRC listen URL
irc_listen = "tcp://127.0.0.1:22022"
## How many DAGs to be synced (currently each DAG represents a 24hr msg
## history counting from UTC midnight), increasing this number means
## you get/sync previous days msg history as well (max. 5)
dags_count = 1
## Sets Datastore Path
datastore = "darkirc1"
@@ -8,10 +13,14 @@ datastore = "darkirc1"
autojoin = ["#dev", "#test"]
# Log to file. Off by default.
#log = "/tmp/darkirc1.log"
log = "/tmp/darkirc1.log"
# Set log level. 1 is info (default), 2 is debug, 3 is trace
#verbose = 2
## Running darkirc in header-only mode
## history won't be fetched but DAG sync fast
#fast_mod = false
## JSON-RPC settings
[rpc]
## JSON-RPC listen URL

View File

@@ -1,6 +1,11 @@
## IRC listen URL
irc_listen = "tcp://127.0.0.1:22023"
## How many DAGs to be synced (currently each DAG represents a 24hr msg
## history counting from UTC midnight), increasing this number means
## you get/sync previous days msg history as well (max. 5)
dags_count = 1
## Sets Datastore Path
datastore = "darkirc2"
@@ -8,10 +13,14 @@ datastore = "darkirc2"
autojoin = ["#dev", "#test"]
# Log to file. Off by default.
#log = "/tmp/darkirc2.log"
log = "/tmp/darkirc2.log"
# Set log level. 1 is info (default), 2 is debug, 3 is trace
#verbose = 2
## Running darkirc in header-only mode
## history won't be fetched but DAG sync fast
#fast_mod = false
## JSON-RPC settings
[rpc]
## JSON-RPC listen URL

View File

@@ -1,6 +1,11 @@
## IRC listen URL
irc_listen = "tcp://127.0.0.1:22024"
## How many DAGs to be synced (currently each DAG represents a 24hr msg
## history counting from UTC midnight), increasing this number means
## you get/sync previous days msg history as well (max. 5)
dags_count = 1
## Sets Datastore Path
datastore = "darkirc3"
@@ -8,10 +13,14 @@ datastore = "darkirc3"
autojoin = ["#dev", "#test"]
# Log to file. Off by default.
#log = "/tmp/darkirc3.log"
log = "/tmp/darkirc3.log"
# Set log level. 1 is info (default), 2 is debug, 3 is trace
#verbose = 2
## Running darkirc in header-only mode
## history won't be fetched but DAG sync fast
#fast_mod = false
## JSON-RPC settings
[rpc]
## JSON-RPC listen URL

View File

@@ -1,6 +1,11 @@
## IRC listen URL
irc_listen = "tcp://127.0.0.1:22025"
## How many DAGs to be synced (currently each DAG represents a 24hr msg
## history counting from UTC midnight), increasing this number means
## you get/sync previous days msg history as well (max. 5)
dags_count = 3
## Sets Datastore Path
datastore = "darkirc4"
@@ -8,10 +13,14 @@ datastore = "darkirc4"
autojoin = ["#dev", "#test"]
# Log to file. Off by default.
#log = "/tmp/darkirc4.log"
log = "/tmp/darkirc4.log"
# Set log level. 1 is info (default), 2 is debug, 3 is trace
#verbose = 2
## Running darkirc in header-only mode
## history won't be fetched but DAG sync fast
#fast_mod = false
## JSON-RPC settings
[rpc]
## JSON-RPC listen URL

View File

@@ -7,7 +7,7 @@ set -e
DARKIRC="../../../darkirc"
WEECHAT="weechat -t -r"
session=darkirc
session=darkirc-local
tmux new-session -d -s $session -n "seed"
tmux send-keys -t $session "$DARKIRC -c seed.toml --skip-dag-sync" Enter
@@ -34,7 +34,7 @@ if [ "$1" ]; then
fi
sleep 1
tmux new-window -t $session -n "node4"
tmux send-keys -t $session "$DARKIRC -c darkirc_full_node4.toml" Enter
tmux send-keys -t $session "$DARKIRC -c darkirc_full_node4.toml --fast-mode" Enter
if [ "$1" ]; then
tmux split-window -t $session -v
tmux send-keys -t $session "$WEECHAT '/server add darkirc_d 127.0.0.1/22025 -notls;/connect darkirc_d;/set irc.server_default.nicks Dave'" Enter

View File

@@ -19,7 +19,6 @@
use std::{collections::HashSet, time::UNIX_EPOCH};
use darkfi_serial::{async_trait, deserialize_async, Encodable, SerialDecodable, SerialEncodable};
use log::info;
use sled_overlay::{sled, SledTreeOverlay};
use crate::Result;
@@ -44,12 +43,14 @@ pub struct Header {
impl Header {
// Create a new Header given EventGraph to retrieve the correct layout
pub async fn new(event_graph: &EventGraph) -> Self {
let (layer, parents) = event_graph.get_next_layer_with_parents().await;
let current_dag_name = event_graph.current_genesis.read().await.id();
let (layer, parents) = event_graph.get_next_layer_with_parents(&current_dag_name).await;
Self { timestamp: UNIX_EPOCH.elapsed().unwrap().as_millis() as u64, parents, layer }
}
pub async fn with_timestamp(timestamp: u64, event_graph: &EventGraph) -> Self {
let (layer, parents) = event_graph.get_next_layer_with_parents().await;
let current_dag_name = event_graph.current_genesis.read().await.id();
let (layer, parents) = event_graph.get_next_layer_with_parents(&current_dag_name).await;
Self { timestamp, parents, layer }
}
@@ -68,23 +69,15 @@ impl Header {
/// to use that instead of actual referenced DAG.
pub async fn validate(
&self,
dag: &sled::Tree,
genesis_timestamp: u64,
header_dag: &sled::Tree,
days_rotation: u64,
overlay: Option<&SledTreeOverlay>,
) -> Result<bool> {
// Check if the event timestamp is after genesis timestamp
if self.timestamp < genesis_timestamp - EVENT_TIME_DRIFT {
info!("timestampe");
return Ok(false)
}
// If a rotation has been set, check if the event timestamp
// is after the next genesis timestamp
if days_rotation > 0 {
let next_genesis_timestamp = next_rotation_timestamp(INITIAL_GENESIS, days_rotation);
if self.timestamp > next_genesis_timestamp + EVENT_TIME_DRIFT {
info!("rotation");
return Ok(false)
}
}
@@ -98,33 +91,28 @@ impl Header {
for parent_id in self.parents.iter() {
if parent_id == &NULL_ID {
info!("null");
continue
}
if parent_id == &self_id {
info!("self");
return Ok(false)
}
if seen.contains(parent_id) {
info!("seen");
return Ok(false)
}
let parent_bytes = if let Some(overlay) = overlay {
overlay.get(parent_id.as_bytes())?
} else {
dag.get(parent_id.as_bytes())?
header_dag.get(parent_id.as_bytes())?
};
if parent_bytes.is_none() {
info!("none");
return Ok(false)
}
let parent: Header = deserialize_async(&parent_bytes.unwrap()).await?;
if self.layer <= parent.layer {
info!("layer");
return Ok(false)
}
@@ -154,6 +142,10 @@ impl Event {
Self { header, content: data }
}
pub fn id(&self) -> blake3::Hash {
self.header.id()
}
/// Same as `Event::new()` but allows specifying the timestamp explicitly.
pub async fn with_timestamp(timestamp: u64, data: Vec<u8>, event_graph: &EventGraph) -> Self {
let header = Header::with_timestamp(timestamp, event_graph).await;
@@ -171,25 +163,19 @@ impl Event {
/// to use that instead of actual referenced DAG.
/// TODO: is this necessary? we validate headers and events should
/// be downloaded into the correct structure.
pub async fn validate(&self, _dag: &sled::Tree) -> Result<bool> {
// Let's not bother with empty events
if self.content.is_empty() {
info!("empty");
return Ok(false)
}
Ok(true)
}
// pub async fn validate(&self) -> Result<bool> {
// Ok(true)
// }
/// Fully validate an event for the correct layout against provided
/// [`EventGraph`] reference and enforce relevant age, assuming some
/// possibility for a time drift.
pub async fn dag_validate(&self, event_graph: &EventGraph) -> Result<bool> {
// Grab genesis timestamp
// let genesis_timestamp = event_graph.current_genesis.read().await.header.timestamp;
pub async fn dag_validate(&self, header_dag: &sled::Tree) -> Result<bool> {
if self.content.is_empty() {
return Ok(false)
}
// Perform validation
self.validate(&event_graph.header_dag).await
self.header.validate(&header_dag, 1, None).await
}
/// Validate a new event for the correct layout and enforce relevant age,
@@ -252,7 +238,7 @@ mod tests {
let ex = Arc::new(Executor::new());
let p2p = P2p::new(Settings::default(), ex.clone()).await?;
let sled_db = sled::Config::new().temporary(true).open().unwrap();
EventGraph::new(p2p, sled_db, "/tmp".into(), false, false, "dag", 1, ex).await
EventGraph::new(p2p, sled_db, "/tmp".into(), false, false, 1, ex).await
}
#[test]
@@ -261,11 +247,15 @@ mod tests {
// Generate a dummy event graph
let event_graph = make_event_graph().await?;
let dag_name = event_graph.current_genesis.read().await.id().to_string();
let hdr_tree_name = format!("headers_{dag_name}");
let header_dag = event_graph.dag_store.read().await.get_dag(&hdr_tree_name);
// Create a new valid event
let valid_event = Event::new(vec![1u8], &event_graph).await;
// Validate our test Event struct
assert!(valid_event.dag_validate(&event_graph).await?);
assert!(valid_event.dag_validate(&header_dag).await?);
// Thanks for reading
Ok(())
@@ -278,33 +268,37 @@ mod tests {
// Generate a dummy event graph
let event_graph = make_event_graph().await?;
let dag_name = event_graph.current_genesis.read().await.id().to_string();
let hdr_tree_name = format!("headers_{dag_name}");
let header_dag = event_graph.dag_store.read().await.get_dag(&hdr_tree_name);
// Create a new valid event
let valid_event = Event::new(vec![1u8], &event_graph).await;
let mut event_empty_content = valid_event.clone();
event_empty_content.content = vec![];
assert!(!event_empty_content.dag_validate(&event_graph).await?);
assert!(!event_empty_content.dag_validate(&header_dag).await?);
let mut event_timestamp_too_old = valid_event.clone();
event_timestamp_too_old.header.timestamp = 0;
assert!(!event_timestamp_too_old.dag_validate(&event_graph).await?);
assert!(!event_timestamp_too_old.dag_validate(&header_dag).await?);
let mut event_timestamp_too_new = valid_event.clone();
event_timestamp_too_new.header.timestamp = u64::MAX;
assert!(!event_timestamp_too_new.dag_validate(&event_graph).await?);
assert!(!event_timestamp_too_new.dag_validate(&header_dag).await?);
let mut event_duplicated_parents = valid_event.clone();
event_duplicated_parents.header.parents[1] = valid_event.header.parents[0];
assert!(!event_duplicated_parents.dag_validate(&event_graph).await?);
assert!(!event_duplicated_parents.dag_validate(&header_dag).await?);
let mut event_null_parents = valid_event.clone();
let all_null_parents = [NULL_ID, NULL_ID, NULL_ID, NULL_ID, NULL_ID];
event_null_parents.header.parents = all_null_parents;
assert!(!event_null_parents.dag_validate(&event_graph).await?);
assert!(!event_null_parents.dag_validate(&header_dag).await?);
let mut event_same_layer_as_parents = valid_event.clone();
event_same_layer_as_parents.header.layer = 0;
assert!(!event_same_layer_as_parents.dag_validate(&event_graph).await?);
assert!(!event_same_layer_as_parents.dag_validate(&header_dag).await?);
// Thanks for reading
Ok(())

View File

@@ -17,23 +17,23 @@
*/
// use async_std::stream::from_iter;
use std::{
collections::{BTreeMap, HashMap, HashSet, VecDeque},
path::PathBuf,
str::FromStr,
sync::Arc,
};
// use futures::stream::FuturesOrdered;
use blake3::Hash;
use darkfi_serial::{deserialize_async, serialize_async};
use event::Header;
use futures::{
future::join_all,
// future,
stream::FuturesUnordered,
StreamExt,
};
use rand::{rngs::OsRng, seq::SliceRandom};
use std::{
collections::{BTreeMap, HashMap, HashSet, VecDeque},
path::PathBuf,
sync::Arc,
};
use blake3::Hash;
use darkfi_serial::{deserialize_async, serialize_async};
use event::Header;
use log::{debug, error, info, warn};
use num_bigint::BigUint;
use sled_overlay::{sled, SledTreeOverlay};
@@ -44,7 +44,7 @@ use smol::{
use tinyjson::JsonValue::{self};
use crate::{
event_graph::util::replayer_log,
event_graph::util::{midnight_timestamp, replayer_log},
net::{channel::Channel, P2pPtr},
rpc::{
jsonrpc::{JsonResponse, JsonResult},
@@ -87,24 +87,256 @@ const EVENT_TIME_DRIFT: u64 = 60_000;
/// Null event ID
pub const NULL_ID: Hash = Hash::from_bytes([0x00; blake3::OUT_LEN]);
/// Maximum number of DAGs to store, this should be configurable
pub const DAGS_MAX_NUMBER: i8 = 5;
/// Atomic pointer to an [`EventGraph`] instance.
pub type EventGraphPtr = Arc<EventGraph>;
pub type LayerUTips = BTreeMap<u64, HashSet<blake3::Hash>>;
#[derive(Clone)]
pub struct DAGStore {
db: sled::Db,
header_dags: HashMap<Hash, (sled::Tree, LayerUTips)>,
main_dags: HashMap<Hash, (sled::Tree, LayerUTips)>,
}
impl DAGStore {
pub async fn new(&self, sled_db: sled::Db, days_rotation: u64) -> Self {
let mut considered_trees = HashMap::new();
let mut considered_header_trees = HashMap::new();
if days_rotation > 0 {
// Create previous genesises if not existing, since they are deterministic.
for i in 1..=DAGS_MAX_NUMBER {
let i_days_ago = midnight_timestamp((i - DAGS_MAX_NUMBER).into());
let header =
Header { timestamp: i_days_ago, parents: [NULL_ID; N_EVENT_PARENTS], layer: 0 };
let genesis = Event { header, content: GENESIS_CONTENTS.to_vec() };
let tree_name = genesis.id().to_string();
let hdr_tree_name = format!("headers_{tree_name}");
let hdr_dag = sled_db.open_tree(hdr_tree_name).unwrap();
let dag = sled_db.open_tree(tree_name).unwrap();
if hdr_dag.is_empty() {
let mut overlay = SledTreeOverlay::new(&hdr_dag);
let header_se = serialize_async(&genesis.header).await;
// Add the header to the overlay
overlay.insert(genesis.id().as_bytes(), &header_se).unwrap();
// Aggregate changes into a single batch
let batch = overlay.aggregate().unwrap();
// Atomically apply the batch.
// Panic if something is corrupted.
if let Err(e) = hdr_dag.apply_batch(batch) {
panic!("Failed applying header_dag_insert batch to sled: {}", e);
}
}
if dag.is_empty() {
let mut overlay = SledTreeOverlay::new(&dag);
let event_se = serialize_async(&genesis).await;
// Add the event to the overlay
overlay.insert(genesis.id().as_bytes(), &event_se).unwrap();
// Aggregate changes into a single batch
let batch = overlay.aggregate().unwrap();
// Atomically apply the batch.
// Panic if something is corrupted.
if let Err(e) = dag.apply_batch(batch) {
panic!("Failed applying dag_insert batch to sled: {}", e);
}
}
let utips = self.find_unreferenced_tips(&dag).await;
considered_header_trees.insert(genesis.id(), (hdr_dag, utips.clone()));
considered_trees.insert(genesis.id(), (dag, utips));
}
} else {
let genesis = generate_genesis(0);
let tree_name = genesis.id().to_string();
let hdr_tree_name = format!("headers_{tree_name}");
let hdr_dag = sled_db.open_tree(hdr_tree_name).unwrap();
let dag = sled_db.open_tree(tree_name).unwrap();
if hdr_dag.is_empty() {
let mut overlay = SledTreeOverlay::new(&hdr_dag);
let header_se = serialize_async(&genesis.header).await;
// Add the header to the overlay
overlay.insert(genesis.id().as_bytes(), &header_se).unwrap();
// Aggregate changes into a single batch
let batch = overlay.aggregate().unwrap();
// Atomically apply the batch.
// Panic if something is corrupted.
if let Err(e) = hdr_dag.apply_batch(batch) {
panic!("Failed applying header_dag_insert batch to sled: {}", e);
}
}
if dag.is_empty() {
let mut overlay = SledTreeOverlay::new(&dag);
let event_se = serialize_async(&genesis).await;
// Add the event to the overlay
overlay.insert(genesis.id().as_bytes(), &event_se).unwrap();
// Aggregate changes into a single batch
let batch = overlay.aggregate().unwrap();
// Atomically apply the batch.
// Panic if something is corrupted.
if let Err(e) = dag.apply_batch(batch) {
panic!("Failed applying dag_insert batch to sled: {}", e);
}
}
let utips = self.find_unreferenced_tips(&dag).await;
considered_header_trees.insert(genesis.id(), (hdr_dag, utips.clone()));
considered_trees.insert(genesis.id(), (dag, utips));
}
Self { db: sled_db, header_dags: considered_header_trees, main_dags: considered_trees }
}
/// Adds a DAG into the set of DAGs and drops the oldest one if exeeding DAGS_MAX_NUMBER,
/// This is called if prune_task activates.
pub async fn add_dag(&mut self, dag_name: &str, genesis_event: &Event) {
debug!("add_dag::dags: {}", self.main_dags.len());
if self.main_dags.len() != self.header_dags.len() {
panic!("main dags length is not the same as header dags")
}
// TODO: sort dags by timestamp and drop the oldest
if self.main_dags.len() > DAGS_MAX_NUMBER.try_into().unwrap() {
while self.main_dags.len() >= DAGS_MAX_NUMBER.try_into().unwrap() {
debug!("[EVENTGRAPH] dropping oldest dag");
let sorted_dags = self.sort_dags().await;
// since dags are sorted in reverse
let oldest_tree = sorted_dags.last().unwrap().name();
let oldest_key = String::from_utf8_lossy(&oldest_tree);
let oldest_key = blake3::Hash::from_str(&oldest_key).unwrap();
let oldest_hdr_tree = self.header_dags.remove(&oldest_key).unwrap();
let oldest_tree = self.main_dags.remove(&oldest_key).unwrap();
self.db.drop_tree(oldest_hdr_tree.0.name()).unwrap();
self.db.drop_tree(oldest_tree.0.name()).unwrap();
}
}
// Insert genesis
let hdr_tree_name = format!("headers_{dag_name}");
let hdr_dag = self.get_dag(&hdr_tree_name);
hdr_dag
.insert(genesis_event.id().as_bytes(), serialize_async(&genesis_event.header).await)
.unwrap();
let dag = self.get_dag(dag_name);
dag.insert(genesis_event.id().as_bytes(), serialize_async(genesis_event).await).unwrap();
let utips = self.find_unreferenced_tips(&dag).await;
self.header_dags.insert(genesis_event.id(), (hdr_dag, utips.clone()));
self.main_dags.insert(genesis_event.id(), (dag, utips));
}
// Get a DAG providing its name.
pub fn get_dag(&self, dag_name: &str) -> sled::Tree {
self.db.open_tree(dag_name).unwrap()
}
/// Get {count} many DAGs.
pub async fn get_dags(&self, count: usize) -> Vec<sled::Tree> {
let sorted_dags = self.sort_dags().await;
sorted_dags.into_iter().take(count).collect()
}
/// Sort DAGs chronologically
async fn sort_dags(&self) -> Vec<sled::Tree> {
let mut vec_dags = vec![];
let dags = self
.main_dags
.iter()
.map(|x| {
let trees = x.1;
trees.0.clone()
})
.collect::<Vec<_>>();
for dag in dags {
let genesis = dag.first().unwrap().unwrap().1;
let genesis_event: Event = deserialize_async(&genesis).await.unwrap();
vec_dags.push((genesis_event.header.timestamp, dag));
}
vec_dags.sort_by_key(|&(ts, _)| ts);
vec_dags.reverse();
vec_dags.into_iter().map(|(_, dag)| dag).collect()
}
/// Find the unreferenced tips in the current DAG state, mapped by their layers.
async fn find_unreferenced_tips(&self, dag: &sled::Tree) -> LayerUTips {
// First get all the event IDs
let mut tips = HashSet::new();
for iter_elem in dag.iter() {
let (id, _) = iter_elem.unwrap();
let id = blake3::Hash::from_bytes((&id as &[u8]).try_into().unwrap());
tips.insert(id);
}
// Iterate again to find unreferenced IDs
for iter_elem in dag.iter() {
let (_, event) = iter_elem.unwrap();
let event: Event = deserialize_async(&event).await.unwrap();
for parent in event.header.parents.iter() {
tips.remove(parent);
}
}
// Build the layers map
let mut map: LayerUTips = BTreeMap::new();
for tip in tips {
let event = self.fetch_event_from_dag(&tip, &dag).await.unwrap().unwrap();
if let Some(layer_tips) = map.get_mut(&event.header.layer) {
layer_tips.insert(tip);
} else {
let mut layer_tips = HashSet::new();
layer_tips.insert(tip);
map.insert(event.header.layer, layer_tips);
}
}
map
}
/// Fetch an event from the DAG
pub async fn fetch_event_from_dag(
&self,
event_id: &blake3::Hash,
dag: &sled::Tree,
) -> Result<Option<Event>> {
let Some(bytes) = dag.get(event_id.as_bytes())? else {
return Ok(None);
};
let event: Event = deserialize_async(&bytes).await?;
return Ok(Some(event))
}
}
/// An Event Graph instance
pub struct EventGraph {
/// Pointer to the P2P network instance
p2p: P2pPtr,
/// Sled tree containing the headers
header_dag: sled::Tree,
/// Main sled tree containing the events
main_dag: sled::Tree,
dag_store: RwLock<DAGStore>,
/// Replay logs path.
datastore: PathBuf,
/// Run in replay_mode where if set we log Sled DB instructions
/// into `datastore`, useful to reacreate a faulty DAG to debug.
replay_mode: bool,
/// The set of unreferenced DAG tips
unreferenced_tips: RwLock<BTreeMap<u64, HashSet<Hash>>>,
/// A `HashSet` containg event IDs and their 1-level parents.
/// These come from the events we've sent out using `EventPut`.
/// They are used with `EventReq` to decide if we should reply
@@ -117,7 +349,7 @@ pub struct EventGraph {
/// inserted into the DAG
pub event_pub: PublisherPtr<Event>,
/// Current genesis event
current_genesis: RwLock<Event>,
pub current_genesis: RwLock<Event>,
/// Currently configured DAG rotation, in days
days_rotation: u64,
/// Flag signalling DAG has finished initial sync
@@ -142,7 +374,6 @@ impl EventGraph {
/// * `datastore` path where we should log db instrucion if run in
/// replay mode.
/// * `replay_mode` set the flag to keep a log of db instructions.
/// * `dag_tree_name` the name of disk-backed tree (or DAG name).
/// * `days_rotation` marks the lifetime of the DAG before it's
/// pruned.
pub async fn new(
@@ -151,27 +382,29 @@ impl EventGraph {
datastore: PathBuf,
replay_mode: bool,
fast_mode: bool,
dag_tree_name: &str,
days_rotation: u64,
ex: Arc<Executor<'_>>,
) -> Result<EventGraphPtr> {
let hdr_tree_name = format!("headers_{dag_tree_name}");
let hdr_dag = sled_db.open_tree(hdr_tree_name)?;
let dag = sled_db.open_tree(dag_tree_name)?;
let unreferenced_tips = RwLock::new(BTreeMap::new());
let broadcasted_ids = RwLock::new(HashSet::new());
let event_pub = Publisher::new();
// Create the current genesis event based on the `days_rotation`
let current_genesis = generate_genesis(days_rotation);
let current_dag_tree_name = current_genesis.id().to_string();
let dag_store = DAGStore {
db: sled_db.clone(),
header_dags: HashMap::default(),
main_dags: HashMap::default(),
}
.new(sled_db, days_rotation)
.await;
let self_ = Arc::new(Self {
p2p,
header_dag: hdr_dag.clone(),
main_dag: dag.clone(),
dag_store: RwLock::new(dag_store.clone()),
datastore,
replay_mode,
fast_mode,
unreferenced_tips,
broadcasted_ids,
prune_task: OnceCell::new(),
event_pub,
@@ -184,7 +417,8 @@ impl EventGraph {
// Check if we have it in our DAG.
// If not, we can prune the DAG and insert this new genesis event.
if !dag.contains_key(current_genesis.header.id().as_bytes())? {
let dag = dag_store.get_dag(&current_dag_tree_name);
if !dag.contains_key(current_genesis.id().as_bytes())? {
info!(
target: "event_graph::new()",
"[EVENTGRAPH] DAG does not contain current genesis, pruning existing data",
@@ -192,9 +426,6 @@ impl EventGraph {
self_.dag_prune(current_genesis).await?;
}
// Find the unreferenced tips in the current DAG state.
*self_.unreferenced_tips.write().await = self_.find_unreferenced_tips().await;
// Spawn the DAG pruning task
if days_rotation > 0 {
let prune_task = StoppableTask::new();
@@ -221,7 +452,7 @@ impl EventGraph {
}
/// Sync the DAG from connected peers
pub async fn dag_sync(&self, fast_mode: bool) -> Result<()> {
pub async fn dag_sync(&self, dag: sled::Tree, fast_mode: bool) -> Result<()> {
// We do an optimistic sync where we ask all our connected peers for
// the latest layer DAG tips (unreferenced events) and then we accept
// the ones we see the most times.
@@ -237,6 +468,8 @@ impl EventGraph {
// amount of iterations, these could be faulty peers and we can try again
// from the beginning
let dag_name = String::from_utf8_lossy(&dag.name()).to_string();
// Get references to all our peers.
let channels = self.p2p.hosts().peers();
let mut communicated_peers = channels.len();
@@ -268,7 +501,7 @@ impl EventGraph {
}
};
if let Err(e) = channel.send(&TipReq {}).await {
if let Err(e) = channel.send(&TipReq(dag_name.clone())).await {
error!(
target: "event_graph::dag_sync()",
"[EVENTGRAPH] Sync: Couldn't contact peer {}, skipping ({})", url, e,
@@ -328,7 +561,7 @@ impl EventGraph {
for tip in considered_tips.iter() {
assert!(tip != &NULL_ID);
if !self.main_dag.contains_key(tip.as_bytes()).unwrap() {
if !dag.contains_key(tip.as_bytes()).unwrap() {
missing_parents.insert(*tip);
}
}
@@ -344,13 +577,15 @@ impl EventGraph {
// TODO: requesting headers should be in a way that we wouldn't
// recieve the same header(s) again, by sending our tip, other
// nodes should send back the ones after it
let hdr_tree_name = format!("headers_{dag_name}");
let header_dag = self.dag_store.read().await.get_dag(&hdr_tree_name);
let mut headers_requests = FuturesUnordered::new();
for channel in channels.iter() {
headers_requests.push(request_header(&channel, comms_timeout))
headers_requests.push(request_header(&channel, dag_name.clone(), comms_timeout))
}
while let Some(peer_headers) = headers_requests.next().await {
self.header_dag_insert(peer_headers?).await?
self.header_dag_insert(peer_headers?, &dag_name).await?
}
// start download payload
@@ -358,7 +593,7 @@ impl EventGraph {
info!(target: "event_graph::dag_sync()", "[EVENTGRAPH] Fetching events");
let mut header_sorted = vec![];
for iter_elem in self.header_dag.iter() {
for iter_elem in header_dag.iter() {
let (_, val) = iter_elem.unwrap();
let val: Header = deserialize_async(&val).await.unwrap();
header_sorted.push(val);
@@ -438,59 +673,9 @@ impl EventGraph {
let mut verified_count = 0;
for (_, chunk) in received_events {
verified_count += chunk.len();
self.dag_insert(&chunk).await?;
self.dag_insert(&chunk, &dag_name).await?;
info!(target: "event_graph::dag_sync()", "[EVENTGRAPH] Verified Events: {}/{}", verified_count, retrieved_count);
}
// 1. Fetch events one by one
// let mut events_requests = FuturesOrdered::new();
// let peer = peer_selection(peers.clone());
// let peer = channels[0].clone();
// for header in header_sorted.iter() {
// let received_events =
// request_event(peer.clone(), vec![header.id()], comms_timeout).await?;
// self.dag_insert(&received_events).await?;
//}
// let mut received_events = vec![];
// while let Some(peer_events) = events_requests.next().await {
// let events = peer_events?;
// for i in events.iter() {
// info!("Received events id: {:?}", i.header.id());
// info!("layer: {}", i.header.layer);
// }
// received_events.extend(events);
// }
// self.dag_insert(&received_events).await?;
// // 2. split sorted headers into chunks and assign them to each connected peer
// let mut responses = vec![];
// for header in header_sorted.chunks_exact(peers.len()) {
// // For each peer, create a future that sends a request
// let pairs = peers.iter().zip(header).collect::<Vec<_>>();
// let pair_stream = from_iter(pairs.iter());
// let requests_stream = pair_stream.map(|(peer, header)| send_request(peer, header));
// // Collect all the responses into a vector
// let x = requests_stream.collect::<Vec<_>>().await;
// info!("len of x: {}", x.len());
// // responses.push(x);
// responses.extend(x);
// }
// // Wait for all the futures to complete
// let x = future::join_all(responses).await;
// let fetched_parents = x.into_iter().map(|f| f.unwrap()).collect::<Vec<_>>().concat();
// info!("fetched parents: {}", fetched_parents.len());
// for i in fetched_parents.iter() {
// info!("layer: {}", i.header.layer)
// }
// // 3. Fetch all events at once (just a POC)
// let peers = channels.clone().into_iter().collect::<Vec<_>>();
// let missing = header_sorted.iter().map(|x| x.id()).collect::<Vec<_>>();
// info!("first missing: {}", missing[0]);
// let parents = send_requests(&peers, &missing).await?.concat();
// info!("fetched parents: {}", parents.len());
}
// <-- end download payload
@@ -500,6 +685,23 @@ impl EventGraph {
Ok(())
}
/// Choose how many dags to sync
pub async fn sync_selected(&self, count: usize, fast_mode: bool) -> Result<()> {
let mut dags_to_sync = self.dag_store.read().await.get_dags(count).await;
// Since get_dags() return sorted dags in reverse
dags_to_sync.reverse();
for dag in dags_to_sync {
match self.dag_sync(dag, fast_mode).await {
Ok(()) => continue,
Err(e) => {
return Err(e);
}
}
}
Ok(())
}
/// Atomically prune the DAG and insert the given event as genesis.
async fn dag_prune(&self, genesis_event: Event) -> Result<()> {
debug!(target: "event_graph::dag_prune()", "Pruning DAG...");
@@ -509,44 +711,15 @@ impl EventGraph {
// ensure that during the pruning operation, no other operations are
// able to access the intermediate state which could lead to producing
// the wrong state after pruning.
let mut unreferenced_tips = self.unreferenced_tips.write().await;
let mut broadcasted_ids = self.broadcasted_ids.write().await;
let mut current_genesis = self.current_genesis.write().await;
// Atomically clear the main and headers DAGs and write the new genesis event.
// Header
let mut batch = sled::Batch::default();
for key in self.header_dag.iter().keys() {
batch.remove(key.unwrap());
}
batch.insert(
genesis_event.header.id().as_bytes(),
serialize_async(&genesis_event.header).await,
);
let dag_name = genesis_event.id().to_string();
self.dag_store.write().await.add_dag(&dag_name, &genesis_event).await;
debug!(target: "event_graph::dag_prune()", "Applying batch...");
if let Err(e) = self.header_dag.apply_batch(batch) {
panic!("Failed pruning header DAG, sled apply_batch error: {}", e);
}
// Main
let mut batch = sled::Batch::default();
for key in self.main_dag.iter().keys() {
batch.remove(key.unwrap());
}
batch.insert(genesis_event.header.id().as_bytes(), serialize_async(&genesis_event).await);
debug!(target: "event_graph::dag_prune()", "Applying batch...");
if let Err(e) = self.main_dag.apply_batch(batch) {
panic!("Failed pruning main DAG, sled apply_batch error: {}", e);
}
// Clear unreferenced tips and bcast ids
*unreferenced_tips = BTreeMap::new();
unreferenced_tips.insert(0, HashSet::from([genesis_event.header.id()]));
// Clear bcast ids
*current_genesis = genesis_event;
*broadcasted_ids = HashSet::new();
drop(unreferenced_tips);
drop(broadcasted_ids);
drop(current_genesis);
@@ -593,21 +766,25 @@ impl EventGraph {
/// knows that any requests for them are actually legitimate.
/// TODO: The `broadcasted_ids` set should periodically be pruned, when
/// some sensible time has passed after broadcasting the event.
pub async fn dag_insert(&self, events: &[Event]) -> Result<Vec<Hash>> {
pub async fn dag_insert(&self, events: &[Event], dag_name: &str) -> Result<Vec<Hash>> {
// Sanity check
if events.is_empty() {
return Ok(vec![])
}
// Acquire exclusive locks to `unreferenced_tips and broadcasted_ids`
let mut unreferenced_tips = self.unreferenced_tips.write().await;
// Acquire exclusive locks to `broadcasted_ids`
let dag_name_hash = blake3::Hash::from_str(dag_name).unwrap();
let mut broadcasted_ids = self.broadcasted_ids.write().await;
let main_dag = self.dag_store.read().await.get_dag(dag_name);
let hdr_tree_name = format!("headers_{dag_name}");
let header_dag = self.dag_store.read().await.get_dag(&hdr_tree_name);
// Here we keep the IDs to return
let mut ids = Vec::with_capacity(events.len());
// Create an overlay over the DAG tree
let mut overlay = SledTreeOverlay::new(&self.main_dag);
let mut overlay = SledTreeOverlay::new(&main_dag);
// Grab genesis timestamp
// let genesis_timestamp = self.current_genesis.read().await.header.timestamp;
@@ -615,7 +792,7 @@ impl EventGraph {
// Iterate over given events to validate them and
// write them to the overlay
for event in events {
let event_id = event.header.id();
let event_id = event.id();
if event.header.layer == 0 {
return Ok(vec![])
}
@@ -625,16 +802,16 @@ impl EventGraph {
);
// check if we already have the event
if self.main_dag.contains_key(event_id.as_bytes())? {
if main_dag.contains_key(event_id.as_bytes())? {
continue
}
// check if its header is in header's store
if !self.header_dag.contains_key(event_id.as_bytes())? {
if !header_dag.contains_key(event_id.as_bytes())? {
continue
}
if !event.validate(&self.header_dag).await? {
if !event.dag_validate(&header_dag).await? {
error!(target: "event_graph::dag_insert()", "Event {} is invalid!", event_id);
return Err(Error::EventIsInvalid)
}
@@ -659,14 +836,20 @@ impl EventGraph {
// Atomically apply the batch.
// Panic if something is corrupted.
if let Err(e) = self.main_dag.apply_batch(batch) {
if let Err(e) = main_dag.apply_batch(batch) {
panic!("Failed applying dag_insert batch to sled: {}", e);
}
drop(main_dag);
drop(header_dag);
let mut dag_store = self.dag_store.write().await;
let (_, unreferenced_tips) = &mut dag_store.main_dags.get_mut(&dag_name_hash).unwrap();
// Iterate over given events to update references and
// send out notifications about them
for event in events {
let event_id = event.header.id();
let event_id = event.id();
// Update the unreferenced DAG tips set
debug!(
@@ -713,18 +896,23 @@ impl EventGraph {
}
// Drop the exclusive locks
drop(unreferenced_tips);
drop(dag_store);
drop(broadcasted_ids);
let mut dag_store = self.dag_store.write().await;
dag_store.header_dags.get_mut(&dag_name_hash).unwrap().1 =
dag_store.main_dags.get(&dag_name_hash).unwrap().1.clone();
drop(dag_store);
Ok(ids)
}
pub async fn header_dag_insert(&self, headers: Vec<Header>) -> Result<()> {
pub async fn header_dag_insert(&self, headers: Vec<Header>, dag_name: &str) -> Result<()> {
let hdr_tree_name = format!("headers_{dag_name}");
let header_dag = self.dag_store.read().await.get_dag(&hdr_tree_name);
// Create an overlay over the DAG tree
let mut overlay = SledTreeOverlay::new(&self.header_dag);
// Grab genesis timestamp
let genesis_timestamp = self.current_genesis.read().await.header.timestamp;
let mut overlay = SledTreeOverlay::new(&header_dag);
// Acquire exclusive locks to `unreferenced_tips and broadcasted_ids`
// let mut unreferenced_header = self.unreferenced_tips.write().await;
@@ -744,10 +932,7 @@ impl EventGraph {
target: "event_graph::header_dag_insert()",
"Inserting header {} into the DAG", header_id,
);
if !header
.validate(&self.header_dag, genesis_timestamp, self.days_rotation, Some(&overlay))
.await?
{
if !header.validate(&header_dag, self.days_rotation, Some(&overlay)).await? {
error!(target: "event_graph::header_dag_insert()", "Header {} is invalid!", header_id);
return Err(Error::HeaderIsInvalid)
}
@@ -765,27 +950,39 @@ impl EventGraph {
// Atomically apply the batch.
// Panic if something is corrupted.
if let Err(e) = self.header_dag.apply_batch(batch) {
if let Err(e) = header_dag.apply_batch(batch) {
panic!("Failed applying dag_insert batch to sled: {}", e);
}
Ok(())
}
/// Fetch an event from the DAG
pub async fn dag_get(&self, event_id: &Hash) -> Result<Option<Event>> {
let Some(bytes) = self.main_dag.get(event_id.as_bytes())? else { return Ok(None) };
let event: Event = deserialize_async(&bytes).await?;
/// Search and fetch an event through all DAGs
pub async fn fetch_event_from_dags(&self, event_id: &blake3::Hash) -> Result<Option<Event>> {
let store = self.dag_store.read().await;
for tree_elem in store.main_dags.clone() {
let dag_name = tree_elem.0.to_string();
let Some(bytes) = store.get_dag(&dag_name).get(event_id.as_bytes())? else {
continue;
};
let event: Event = deserialize_async(&bytes).await?;
Ok(Some(event))
return Ok(Some(event))
}
Ok(None)
}
/// Get next layer along with its N_EVENT_PARENTS from the unreferenced
/// tips of the DAG. Since tips are mapped by their layer, we go backwards
/// until we fill the vector, ensuring we always use latest layers tips as
/// parents.
async fn get_next_layer_with_parents(&self) -> (u64, [Hash; N_EVENT_PARENTS]) {
let unreferenced_tips = self.unreferenced_tips.read().await;
async fn get_next_layer_with_parents(
&self,
dag_name: &Hash,
) -> (u64, [blake3::Hash; N_EVENT_PARENTS]) {
let store = self.dag_store.read().await;
let (_, unreferenced_tips) = store.header_dags.get(dag_name).unwrap();
let mut parents = [NULL_ID; N_EVENT_PARENTS];
let mut index = 0;
@@ -805,83 +1002,53 @@ impl EventGraph {
(next_layer, parents)
}
/// Find the unreferenced tips in the current DAG state, mapped by their layers.
async fn find_unreferenced_tips(&self) -> BTreeMap<u64, HashSet<Hash>> {
// First get all the event IDs
let mut tips = HashSet::new();
for iter_elem in self.main_dag.iter() {
let (id, _) = iter_elem.unwrap();
let id = Hash::from_bytes((&id as &[u8]).try_into().unwrap());
tips.insert(id);
}
// Iterate again to find unreferenced IDs
for iter_elem in self.main_dag.iter() {
let (_, event) = iter_elem.unwrap();
let event: Event = deserialize_async(&event).await.unwrap();
for parent in event.header.parents.iter() {
tips.remove(parent);
}
}
// Build the layers map
let mut map: BTreeMap<u64, HashSet<Hash>> = BTreeMap::new();
for tip in tips {
let event = self.dag_get(&tip).await.unwrap().unwrap();
if let Some(layer_tips) = map.get_mut(&event.header.layer) {
layer_tips.insert(tip);
} else {
let mut layer_tips = HashSet::new();
layer_tips.insert(tip);
map.insert(event.header.layer, layer_tips);
}
}
map
}
/// Internal function used for DAG sorting.
async fn get_unreferenced_tips_sorted(&self) -> [Hash; N_EVENT_PARENTS] {
let (_, tips) = self.get_next_layer_with_parents().await;
// Convert the hash to BigUint for sorting
let mut sorted: Vec<_> =
tips.iter().map(|x| BigUint::from_bytes_be(x.as_bytes())).collect();
sorted.sort_unstable();
// Convert back to blake3
async fn get_unreferenced_tips_sorted(&self) -> Vec<[blake3::Hash; N_EVENT_PARENTS]> {
let mut vec_tips = vec![];
let mut tips_sorted = [NULL_ID; N_EVENT_PARENTS];
for (i, id) in sorted.iter().enumerate() {
let mut bytes = id.to_bytes_be();
for (i, _) in self.dag_store.read().await.header_dags.iter() {
let (_, tips) = self.get_next_layer_with_parents(&i).await;
// Convert the hash to BigUint for sorting
let mut sorted: Vec<_> =
tips.iter().map(|x| BigUint::from_bytes_be(x.as_bytes())).collect();
sorted.sort_unstable();
// Ensure we have 32 bytes
while bytes.len() < blake3::OUT_LEN {
bytes.insert(0, 0);
// Convert back to blake3
for (i, id) in sorted.iter().enumerate() {
let mut bytes = id.to_bytes_be();
// Ensure we have 32 bytes
while bytes.len() < blake3::OUT_LEN {
bytes.insert(0, 0);
}
tips_sorted[i] = blake3::Hash::from_bytes(bytes.try_into().unwrap());
}
tips_sorted[i] = Hash::from_bytes(bytes.try_into().unwrap());
vec_tips.push(tips_sorted);
}
tips_sorted
vec_tips
}
// TODO: Fix fetching all events from all dags and then order and retrun them
/// Perform a topological sort of the DAG.
pub async fn order_events(&self) -> Vec<Event> {
let mut ordered_events = VecDeque::new();
let mut visited = HashSet::new();
for tip in self.get_unreferenced_tips_sorted().await {
if !visited.contains(&tip) && tip != NULL_ID {
let tip = self.dag_get(&tip).await.unwrap().unwrap();
ordered_events.extend(self.dfs_topological_sort(tip, &mut visited).await);
for i in self.get_unreferenced_tips_sorted().await {
for tip in i {
if !visited.contains(&tip) && tip != NULL_ID {
let tip = self.fetch_event_from_dags(&tip).await.unwrap().unwrap();
ordered_events.extend(self.dfs_topological_sort(tip, &mut visited).await);
}
}
}
let mut ord_events_vec = ordered_events.make_contiguous().to_vec();
// Order events based on thier layer numbers, or based on timestamp if they are equal
ord_events_vec.sort_unstable_by(|a, b| {
a.0.cmp(&b.0).then(b.1.header.timestamp.cmp(&a.1.header.timestamp))
});
// Order events by timestamp.
ord_events_vec.sort_unstable_by(|a, b| a.1.header.timestamp.cmp(&b.1.header.timestamp));
ord_events_vec.iter().map(|a| a.1.clone()).collect::<Vec<Event>>()
}
@@ -895,13 +1062,13 @@ impl EventGraph {
) -> VecDeque<(u64, Event)> {
let mut ordered_events = VecDeque::new();
let mut stack = VecDeque::new();
let event_id = event.header.id();
let event_id = event.id();
stack.push_back(event_id);
while let Some(event_id) = stack.pop_front() {
if !visited.contains(&event_id) && event_id != NULL_ID {
visited.insert(event_id);
if let Some(event) = self.dag_get(&event_id).await.unwrap() {
if let Some(event) = self.fetch_event_from_dags(&event_id).await.unwrap() {
for parent in event.header.parents.iter() {
stack.push_back(*parent);
}
@@ -937,8 +1104,10 @@ impl EventGraph {
}
pub async fn eventgraph_info(&self, id: u16, _params: JsonValue) -> JsonResult {
let current_genesis = self.current_genesis.read().await;
let dag_name = current_genesis.id().to_string();
let mut graph = HashMap::new();
for iter_elem in self.main_dag.iter() {
for iter_elem in self.dag_store.read().await.get_dag(&dag_name).iter() {
let (id, val) = iter_elem.unwrap();
let id = Hash::from_bytes((&id as &[u8]).try_into().unwrap());
let val: Event = deserialize_async(&val).await.unwrap();
@@ -962,17 +1131,16 @@ impl EventGraph {
/// Fetch all the events that are on a higher layers than the
/// provided ones.
pub async fn fetch_successors_of(
&self,
tips: BTreeMap<u64, HashSet<Hash>>,
) -> Result<Vec<Event>> {
pub async fn fetch_successors_of(&self, tips: LayerUTips) -> Result<Vec<Event>> {
debug!(
target: "event_graph::fetch_successors_of()",
"fetching successors of {tips:?}"
);
let current_genesis = self.current_genesis.read().await;
let dag_name = current_genesis.id().to_string();
let mut graph = HashMap::new();
for iter_elem in self.main_dag.iter() {
for iter_elem in self.dag_store.read().await.get_dag(&dag_name).iter() {
let (id, val) = iter_elem.unwrap();
let hash = Hash::from_bytes((&id as &[u8]).try_into().unwrap());
let event: Event = deserialize_async(&val).await.unwrap();
@@ -1001,36 +1169,11 @@ impl EventGraph {
}
}
async fn _send_request(peer: &Channel, missing: &Header) -> Result<Vec<Event>> {
info!("in send_request first missing: {}", missing.id());
let url = peer.address();
debug!(target: "event_graph::dag_sync()","Requesting {:?} from {}...", missing, url);
let ev_rep_sub = match peer.subscribe_msg::<EventRep>().await {
Ok(v) => v,
Err(e) => {
error!(target: "event_graph::dag_sync()","[EVENTGRAPH] Sync: Couldn't subscribe EventRep for peer {}, skipping ({})",url, e);
return Err(Error::Custom("Couldn't subscribe EventRep".to_string()))
}
};
if let Err(e) = peer.send(&EventReq(vec![missing.id()])).await {
error!(target: "event_graph::dag_sync()","[EVENTGRAPH] Sync: Failed communicating EventReq({:?}) to {}: {}",missing, url, e);
return Err(Error::Custom("Failed communicating EventReq".to_string()))
}
let Ok(parent) = ev_rep_sub.receive_with_timeout(15).await else {
error!(
target: "event_graph::dag_sync()",
"[EVENTGRAPH] Sync: Timeout waiting for parents {:?} from {}",
missing, url,
);
return Err(().into())
};
Ok(parent.0.clone())
}
async fn request_header(peer: &Channel, comms_timeout: u64) -> Result<Vec<Header>> {
async fn request_header(
peer: &Channel,
tree_name: String,
comms_timeout: u64,
) -> Result<Vec<Header>> {
let url = peer.address();
let hdr_rep_sub = match peer.subscribe_msg::<HeaderRep>().await {
@@ -1045,7 +1188,7 @@ async fn request_header(peer: &Channel, comms_timeout: u64) -> Result<Vec<Header
}
};
if let Err(e) = peer.send(&HeaderReq {}).await {
if let Err(e) = peer.send(&HeaderReq(tree_name)).await {
error!(
target: "event_graph::dag_sync()",
"[EVENTGRAPH] Sync: Couldn't contact peer {}, skipping ({})", url, e,
@@ -1113,53 +1256,3 @@ async fn request_event(
Ok(event.0.clone())
}
fn _peer_selection(peers: Vec<Arc<Channel>>) -> Arc<Channel> {
peers.choose(&mut OsRng).unwrap().clone()
}
// async fn send_request(peer: &Channel, missing: &[Hash]) -> Result<Vec<Event>> {
// info!("in send_request first missing: {}", missing[0]);
// let url = peer.address();
// debug!(target: "event_graph::dag_sync()","Requesting {:?} from {}...", missing, url);
// let ev_rep_sub = match peer.subscribe_msg::<EventRep>().await {
// Ok(v) => v,
// Err(e) => {
// error!(target: "event_graph::dag_sync()","[EVENTGRAPH] Sync: Couldn't subscribe EventRep for peer {}, skipping ({})",url, e);
// return Err(Error::Custom("Couldn't subscribe EventRep".to_string()))
// }
// };
// if let Err(e) = peer.send(&EventReq(missing.to_vec())).await {
// error!(target: "event_graph::dag_sync()","[EVENTGRAPH] Sync: Failed communicating EventReq({:?}) to {}: {}",missing, url, e);
// return Err(Error::Custom("Failed communicating EventReq".to_string()))
// }
// let Ok(parent) = ev_rep_sub.receive_with_timeout(15).await else {
// error!(
// target: "event_graph::dag_sync()",
// "[EVENTGRAPH] Sync: Timeout waiting for parents {:?} from {}",
// missing, url,
// );
// return Err(().into())
// };
// Ok(parent.0.clone())
// }
// // A function that sends requests to multiple peers concurrently
// async fn send_requests(peers: &[Arc<Channel>], missing: &[Hash]) -> Result<Vec<Vec<Event>>> {
// info!("in send_requests first missing: {}", missing[0]);
// let chunk_size = (missing.len() as f64 / peers.len() as f64).ceil() as usize;
// let pairs = peers.iter().zip(missing.chunks(chunk_size)).collect::<Vec<_>>();
// // For each peer, create a future that sends a request
// let pair_stream = from_iter(pairs.iter());
// let requests_stream = pair_stream.map(|(peer, missing)| send_request(peer, missing));
// // Collect all the responses into a vector
// let responses = requests_stream.collect::<Vec<_>>().await;
// // Wait for all the futures to complete
// future::try_join_all(responses).await
// }

View File

@@ -18,6 +18,7 @@
use std::{
collections::{BTreeMap, HashSet, VecDeque},
str::FromStr,
sync::{
atomic::{AtomicUsize, Ordering::SeqCst},
Arc,
@@ -28,7 +29,7 @@ use darkfi_serial::{async_trait, deserialize_async, SerialDecodable, SerialEncod
use log::{debug, error, trace, warn};
use smol::Executor;
use super::{event::Header, Event, EventGraphPtr, NULL_ID};
use super::{event::Header, Event, EventGraphPtr, LayerUTips, NULL_ID};
use crate::{
impl_p2p_message,
net::{
@@ -151,7 +152,7 @@ impl_p2p_message!(HeaderPut, "EventGraph::HeaderPut", 0, 0, DEFAULT_METERING_CON
/// A P2P message representing a header request
#[derive(Clone, SerialEncodable, SerialDecodable)]
pub struct HeaderReq {}
pub struct HeaderReq(pub String);
impl_p2p_message!(HeaderReq, "EventGraph::HeaderReq", 0, 0, DEFAULT_METERING_CONFIGURATION);
/// A P2P message representing a header reply
@@ -161,12 +162,12 @@ impl_p2p_message!(HeaderRep, "EventGraph::HeaderRep", 0, 0, DEFAULT_METERING_CON
/// A P2P message representing a request for a peer's DAG tips
#[derive(Clone, SerialEncodable, SerialDecodable)]
pub struct TipReq {}
pub struct TipReq(pub String);
impl_p2p_message!(TipReq, "EventGraph::TipReq", 0, 0, DEFAULT_METERING_CONFIGURATION);
/// A P2P message representing a reply for the peer's DAG tips
#[derive(Clone, SerialEncodable, SerialDecodable)]
pub struct TipRep(pub BTreeMap<u64, HashSet<blake3::Hash>>);
pub struct TipRep(pub LayerUTips);
impl_p2p_message!(TipRep, "EventGraph::TipRep", 0, 0, DEFAULT_METERING_CONFIGURATION);
#[async_trait]
@@ -177,7 +178,7 @@ impl ProtocolBase for ProtocolEventGraph {
self.jobsman.clone().spawn(self.clone().handle_event_req(), ex.clone()).await;
// self.jobsman.clone().spawn(self.clone().handle_header_put(), ex.clone()).await;
// self.jobsman.clone().spawn(self.clone().handle_header_req(), ex.clone()).await;
self.jobsman.clone().spawn(self.clone().handle_header_rep(), ex.clone()).await;
self.jobsman.clone().spawn(self.clone().handle_header_req(), ex.clone()).await;
self.jobsman.clone().spawn(self.clone().handle_tip_req(), ex.clone()).await;
self.jobsman.clone().spawn(self.clone().broadcast_rate_limiter(), ex.clone()).await;
Ok(())
@@ -263,7 +264,7 @@ impl ProtocolEventGraph {
};
trace!(
target: "event_graph::protocol::handle_event_put()",
"Got EventPut: {} [{}]", event.header.id(), self.channel.address(),
"Got EventPut: {} [{}]", event.id(), self.channel.address(),
);
// Check if node has finished syncing its DAG
@@ -276,8 +277,19 @@ impl ProtocolEventGraph {
}
// If we have already seen the event, we'll stay quiet.
let event_id = event.header.id();
if self.event_graph.main_dag.contains_key(event_id.as_bytes()).unwrap() {
let current_genesis = self.event_graph.current_genesis.read().await;
let dag_name = current_genesis.id().to_string();
let hdr_tree_name = format!("headers_{dag_name}");
let event_id = event.id();
if self
.event_graph
.dag_store
.read()
.await
.get_dag(&hdr_tree_name)
.contains_key(event_id.as_bytes())
.unwrap()
{
debug!(
target: "event_graph::protocol::handle_event_put()",
"Event {} is already known", event_id,
@@ -311,7 +323,7 @@ impl ProtocolEventGraph {
debug!(
target: "event_graph::protocol::handle_event_put()",
"Event {} is older than genesis. Event timestamp: `{}`. Genesis timestamp: `{}`",
event.header.id(), event.header.timestamp, genesis_timestamp
event.id(), event.header.timestamp, genesis_timestamp
);
}
@@ -341,7 +353,15 @@ impl ProtocolEventGraph {
continue
}
if !self.event_graph.main_dag.contains_key(parent_id.as_bytes()).unwrap() {
if !self
.event_graph
.dag_store
.read()
.await
.get_dag(&hdr_tree_name)
.contains_key(parent_id.as_bytes())
.unwrap()
{
missing_parents.insert(*parent_id);
}
}
@@ -363,6 +383,10 @@ impl ProtocolEventGraph {
"Event has {} missing parents. Requesting...", missing_parents.len(),
);
let current_genesis = self.event_graph.current_genesis.read().await;
let dag_name = current_genesis.id().to_string();
let hdr_tree_name = format!("headers_{dag_name}");
while !missing_parents.is_empty() {
// for parent_id in missing_parents.clone().iter() {
debug!(
@@ -394,12 +418,12 @@ impl ProtocolEventGraph {
let parents = parents.0.clone();
for parent in parents {
let parent_id = parent.header.id();
let parent_id = parent.id();
if !missing_parents.contains(&parent_id) {
error!(
target: "event_graph::protocol::handle_event_put()",
"[EVENTGRAPH] Peer {} replied with a wrong event: {}",
self.channel.address(), parent.header.id(),
self.channel.address(), parent.id(),
);
self.channel.stop().await;
return Err(Error::ChannelStopped)
@@ -407,7 +431,7 @@ impl ProtocolEventGraph {
debug!(
target: "event_graph::protocol::handle_event_put()",
"Got correct parent event {}", parent.header.id(),
"Got correct parent event {}", parent.id(),
);
if let Some(layer_events) = received_events.get_mut(&parent.header.layer) {
@@ -430,7 +454,10 @@ impl ProtocolEventGraph {
!received_events_hashes.contains(upper_parent) &&
!self
.event_graph
.main_dag
.dag_store
.read()
.await
.get_dag(&hdr_tree_name)
.contains_key(upper_parent.as_bytes())
.unwrap()
{
@@ -453,13 +480,13 @@ impl ProtocolEventGraph {
}
}
let headers = events.iter().map(|x| x.header.clone()).collect();
if self.event_graph.header_dag_insert(headers).await.is_err() {
if self.event_graph.header_dag_insert(headers, &dag_name).await.is_err() {
self.clone().increase_malicious_count().await?;
continue
}
// FIXME
if !self.event_graph.fast_mode {
if self.event_graph.dag_insert(&events).await.is_err() {
if self.event_graph.dag_insert(&events, &dag_name).await.is_err() {
self.clone().increase_malicious_count().await?;
continue
}
@@ -473,12 +500,17 @@ impl ProtocolEventGraph {
target: "event_graph::protocol::handle_event_put()",
"Got all parents necessary for insertion",
);
if self.event_graph.header_dag_insert(vec![event.header.clone()]).await.is_err() {
if self
.event_graph
.header_dag_insert(vec![event.header.clone()], &dag_name)
.await
.is_err()
{
self.clone().increase_malicious_count().await?;
continue
}
if self.event_graph.dag_insert(&[event.clone()]).await.is_err() {
if self.event_graph.dag_insert(&[event.clone()], &dag_name).await.is_err() {
self.clone().increase_malicious_count().await?;
continue
}
@@ -522,7 +554,20 @@ impl ProtocolEventGraph {
// reading our db and steal our bandwidth.
let mut events = vec![];
for event_id in event_ids.iter() {
if !self.event_graph.header_dag.contains_key(event_id.as_bytes())? {
if let Ok(event) = self
.event_graph
.fetch_event_from_dags(event_id)
.await?
.ok_or(Error::EventNotFound("The requested event is not found".to_owned()))
{
// At this point we should have it in our DAG.
// This code panics if this is not the case.
debug!(
target: "event_graph::protocol::handle_event_req()",
"Fetching event {:?} from DAG", event_id,
);
events.push(event);
} else {
let malicious_count = self.malicious_count.fetch_add(1, SeqCst);
if malicious_count + 1 == MALICIOUS_THRESHOLD {
error!(
@@ -541,20 +586,6 @@ impl ProtocolEventGraph {
);
continue
}
// At this point we should have it in our DAG.
// This code panics if this is not the case.
debug!(
target: "event_graph::protocol::handle_event_req()",
"Fetching event {:?} from DAG", event_id,
);
events.push(
self.event_graph
.dag_get(event_id)
.await?
.ok_or(Error::EventNotFound("Event Not Found in DAG".to_owned()))?,
);
}
// Check if the incoming event is older than the genesis event. If so, something
@@ -569,7 +600,7 @@ impl ProtocolEventGraph {
target: "event_graph::protocol::handle_event_req()",
"Requested event by peer {} is older than previous rotation period. It should have been pruned.
Event timestamp: `{}`. Genesis timestamp: `{}`",
event.header.id(), event.header.timestamp, genesis_timestamp
event.id(), event.header.timestamp, genesis_timestamp
);
}
@@ -591,16 +622,15 @@ impl ProtocolEventGraph {
}
}
// async fn handle_header_req(self: Arc<Self>) -> Result<()> {
// Ok(())
// }
/// Protocol function handling `HeaderReq`.
/// This is triggered whenever someone requests syncing headers by
/// sending their current headers.
async fn handle_header_rep(self: Arc<Self>) -> Result<()> {
async fn handle_header_req(self: Arc<Self>) -> Result<()> {
loop {
self.hdr_req_sub.receive().await?;
let dag_name = match self.hdr_req_sub.receive().await {
Ok(v) => v.0.clone(),
Err(_) => continue,
};
trace!(
target: "event_graph::protocol::handle_tip_req()",
"Got TipReq [{}]", self.channel.address(),
@@ -619,8 +649,9 @@ impl ProtocolEventGraph {
// We received header request. Let's find them, add them to
// our bcast ids list, and reply with them.
let main_dag = self.event_graph.dag_store.read().await.get_dag(&dag_name);
let mut headers = vec![];
for item in self.event_graph.main_dag.iter() {
for item in main_dag.iter() {
let (_, event) = item.unwrap();
let event: Event = deserialize_async(&event).await.unwrap();
if !headers.contains(&event.header) || event.header.layer != 0 {
@@ -645,7 +676,10 @@ impl ProtocolEventGraph {
/// tips of our DAG.
async fn handle_tip_req(self: Arc<Self>) -> Result<()> {
loop {
self.tip_req_sub.receive().await?;
let dag_name = match self.tip_req_sub.receive().await {
Ok(v) => v.0.clone(),
Err(_) => continue,
};
trace!(
target: "event_graph::protocol::handle_tip_req()",
"Got TipReq [{}]", self.channel.address(),
@@ -664,7 +698,13 @@ impl ProtocolEventGraph {
// We received a tip request. Let's find them, add them to
// our bcast ids list, and reply with them.
let layers = self.event_graph.unreferenced_tips.read().await.clone();
let dag_name_hash = blake3::Hash::from_str(&dag_name).unwrap();
let store = self.event_graph.dag_store.read().await;
let (_, layers) = match store.header_dags.get(&dag_name_hash) {
Some(v) => v,
None => continue,
};
// let layers = self.event_graph.dag_store.read().await.find_unreferenced_tips(&dag_name).await;
let mut bcast_ids = self.event_graph.broadcasted_ids.write().await;
for (_, tips) in layers.iter() {
for tip in tips {
@@ -673,7 +713,7 @@ impl ProtocolEventGraph {
}
drop(bcast_ids);
self.channel.send(&TipRep(layers)).await?;
self.channel.send(&TipRep(layers.clone())).await?;
}
}

View File

@@ -93,7 +93,7 @@ async fn spawn_node(
let p2p = P2p::new(settings, ex.clone()).await.unwrap();
let sled_db = sled::Config::new().temporary(true).open().unwrap();
let event_graph =
EventGraph::new(p2p.clone(), sled_db, "/tmp".into(), false, false, "dag", 1, ex.clone())
EventGraph::new(p2p.clone(), sled_db, "/tmp".into(), false, false, 1, ex.clone())
.await
.unwrap();
*event_graph.synced.write().await = true;
@@ -156,17 +156,22 @@ async fn bootstrap_nodes(
async fn assert_dags(eg_instances: &[Arc<EventGraph>], expected_len: usize, rng: &mut ThreadRng) {
let random_node = eg_instances.choose(rng).unwrap();
let last_layer_tips =
random_node.unreferenced_tips.read().await.last_key_value().unwrap().1.clone();
let random_node_genesis = random_node.current_genesis.read().await.id();
let store = random_node.dag_store.read().await;
let (_, unreferenced_tips) = store.main_dags.get(&random_node_genesis).unwrap();
let last_layer_tips = unreferenced_tips.last_key_value().unwrap().1.clone();
for (i, eg) in eg_instances.iter().enumerate() {
let node_last_layer_tips =
eg.unreferenced_tips.read().await.last_key_value().unwrap().1.clone();
let current_genesis = eg.current_genesis.read().await;
let dag_name = current_genesis.id().to_string();
let dag = eg.dag_store.read().await.get_dag(&dag_name);
let unreferenced_tips = eg.dag_store.read().await.find_unreferenced_tips(&dag).await;
let node_last_layer_tips = unreferenced_tips.last_key_value().unwrap().1.clone();
assert!(
eg.main_dag.len() == expected_len,
dag.len() == expected_len,
"Node {}, expected {} events, have {}",
i,
expected_len,
eg.main_dag.len()
dag.len()
);
assert_eq!(
node_last_layer_tips, last_layer_tips,
@@ -210,9 +215,13 @@ async fn eventgraph_propagation_real(ex: Arc<Executor<'static>>) {
// Grab genesis event
let random_node = eg_instances.choose(&mut rng).unwrap();
let (id, _) = random_node.main_dag.last().unwrap().unwrap();
let current_genesis = random_node.current_genesis.read().await;
let dag_name = current_genesis.id().to_string();
let (id, _) = random_node.dag_store.read().await.get_dag(&dag_name).last().unwrap().unwrap();
let genesis_event_id = blake3::Hash::from_bytes((&id as &[u8]).try_into().unwrap());
drop(current_genesis);
// =========================================
// 1. Assert that everyone's DAG is the same
// =========================================
@@ -222,16 +231,22 @@ async fn eventgraph_propagation_real(ex: Arc<Executor<'static>>) {
// 2. Create an event in one node and publish
// ==========================================
let random_node = eg_instances.choose(&mut rng).unwrap();
let current_genesis = random_node.current_genesis.read().await;
let dag_name = current_genesis.id().to_string();
let event = Event::new(vec![1, 2, 3, 4], random_node).await;
assert!(event.header.parents.contains(&genesis_event_id));
// The node adds it to their DAG, on layer 1.
random_node.header_dag_insert(vec![event.header.clone()]).await.unwrap();
let event_id = random_node.dag_insert(&[event.clone()]).await.unwrap()[0];
let tips_layers = random_node.unreferenced_tips.read().await;
random_node.header_dag_insert(vec![event.header.clone()], &dag_name).await.unwrap();
let event_id = random_node.dag_insert(&[event.clone()], &dag_name).await.unwrap()[0];
let store = random_node.dag_store.read().await;
let (_, tips_layers) = store.header_dags.get(&current_genesis.id()).unwrap();
// Since genesis was referenced, its layer (0) have been removed
assert_eq!(tips_layers.len(), 1);
assert!(tips_layers.last_key_value().unwrap().1.get(&event_id).is_some());
drop(tips_layers);
drop(store);
drop(current_genesis);
info!("Broadcasting event {}", event_id);
random_node.p2p.broadcast(&EventPut(event)).await;
info!("Waiting 5s for event propagation");
@@ -249,20 +264,25 @@ async fn eventgraph_propagation_real(ex: Arc<Executor<'static>>) {
// ==============================================================
let random_node = eg_instances.choose(&mut rng).unwrap();
let event0 = Event::new(vec![1, 2, 3, 4, 0], random_node).await;
random_node.header_dag_insert(vec![event0.header.clone()]).await.unwrap();
let event0_id = random_node.dag_insert(&[event0.clone()]).await.unwrap()[0];
random_node.header_dag_insert(vec![event0.header.clone()], &dag_name).await.unwrap();
let event0_id = random_node.dag_insert(&[event0.clone()], &dag_name).await.unwrap()[0];
let event1 = Event::new(vec![1, 2, 3, 4, 1], random_node).await;
random_node.header_dag_insert(vec![event1.header.clone()]).await.unwrap();
let event1_id = random_node.dag_insert(&[event1.clone()]).await.unwrap()[0];
random_node.header_dag_insert(vec![event1.header.clone()], &dag_name).await.unwrap();
let event1_id = random_node.dag_insert(&[event1.clone()], &dag_name).await.unwrap()[0];
let event2 = Event::new(vec![1, 2, 3, 4, 2], random_node).await;
random_node.header_dag_insert(vec![event2.header.clone()]).await.unwrap();
let event2_id = random_node.dag_insert(&[event2.clone()]).await.unwrap()[0];
random_node.header_dag_insert(vec![event2.header.clone()], &dag_name).await.unwrap();
let event2_id = random_node.dag_insert(&[event2.clone()], &dag_name).await.unwrap()[0];
// Genesis event + event from 2. + upper 3 events (layer 4)
assert_eq!(random_node.main_dag.len(), 5);
let tips_layers = random_node.unreferenced_tips.read().await;
let current_genesis = random_node.current_genesis.read().await;
let dag_name = current_genesis.id().to_string();
assert_eq!(random_node.dag_store.read().await.get_dag(&dag_name).len(), 5);
let random_node_genesis = random_node.current_genesis.read().await.id();
let store = random_node.dag_store.read().await;
let (_, tips_layers) = store.header_dags.get(&random_node_genesis).unwrap();
assert_eq!(tips_layers.len(), 1);
assert!(tips_layers.get(&4).unwrap().get(&event2_id).is_some());
drop(tips_layers);
drop(current_genesis);
drop(store);
let event_chain = vec![
(event0_id, event0.header.parents),
@@ -288,20 +308,20 @@ async fn eventgraph_propagation_real(ex: Arc<Executor<'static>>) {
// =======
let node1 = eg_instances.choose(&mut rng).unwrap();
let event0_1 = Event::new(vec![1, 2, 3, 4, 3], node1).await;
node1.header_dag_insert(vec![event0_1.header.clone()]).await.unwrap();
node1.dag_insert(&[event0_1.clone()]).await.unwrap();
node1.header_dag_insert(vec![event0_1.header.clone()], &dag_name).await.unwrap();
node1.dag_insert(&[event0_1.clone()], &dag_name).await.unwrap();
node1.p2p.broadcast(&EventPut(event0_1)).await;
msleep(300).await;
let event1_1 = Event::new(vec![1, 2, 3, 4, 4], node1).await;
node1.header_dag_insert(vec![event1_1.header.clone()]).await.unwrap();
node1.dag_insert(&[event1_1.clone()]).await.unwrap();
node1.header_dag_insert(vec![event1_1.header.clone()], &dag_name).await.unwrap();
node1.dag_insert(&[event1_1.clone()], &dag_name).await.unwrap();
node1.p2p.broadcast(&EventPut(event1_1)).await;
msleep(300).await;
let event2_1 = Event::new(vec![1, 2, 3, 4, 5], node1).await;
node1.header_dag_insert(vec![event2_1.header.clone()]).await.unwrap();
node1.dag_insert(&[event2_1.clone()]).await.unwrap();
node1.header_dag_insert(vec![event2_1.header.clone()], &dag_name).await.unwrap();
node1.dag_insert(&[event2_1.clone()], &dag_name).await.unwrap();
node1.p2p.broadcast(&EventPut(event2_1)).await;
msleep(300).await;
@@ -310,20 +330,20 @@ async fn eventgraph_propagation_real(ex: Arc<Executor<'static>>) {
// =======
let node2 = eg_instances.choose(&mut rng).unwrap();
let event0_2 = Event::new(vec![1, 2, 3, 4, 6], node2).await;
node2.header_dag_insert(vec![event0_2.header.clone()]).await.unwrap();
node2.dag_insert(&[event0_2.clone()]).await.unwrap();
node2.header_dag_insert(vec![event0_2.header.clone()], &dag_name).await.unwrap();
node2.dag_insert(&[event0_2.clone()], &dag_name).await.unwrap();
node2.p2p.broadcast(&EventPut(event0_2)).await;
msleep(300).await;
let event1_2 = Event::new(vec![1, 2, 3, 4, 7], node2).await;
node2.header_dag_insert(vec![event1_2.header.clone()]).await.unwrap();
node2.dag_insert(&[event1_2.clone()]).await.unwrap();
node2.header_dag_insert(vec![event1_2.header.clone()], &dag_name).await.unwrap();
node2.dag_insert(&[event1_2.clone()], &dag_name).await.unwrap();
node2.p2p.broadcast(&EventPut(event1_2)).await;
msleep(300).await;
let event2_2 = Event::new(vec![1, 2, 3, 4, 8], node2).await;
node2.header_dag_insert(vec![event2_2.header.clone()]).await.unwrap();
node2.dag_insert(&[event2_2.clone()]).await.unwrap();
node2.header_dag_insert(vec![event2_2.header.clone()], &dag_name).await.unwrap();
node2.dag_insert(&[event2_2.clone()], &dag_name).await.unwrap();
node2.p2p.broadcast(&EventPut(event2_2)).await;
msleep(300).await;
@@ -332,20 +352,20 @@ async fn eventgraph_propagation_real(ex: Arc<Executor<'static>>) {
// =======
let node3 = eg_instances.choose(&mut rng).unwrap();
let event0_3 = Event::new(vec![1, 2, 3, 4, 9], node3).await;
node3.header_dag_insert(vec![event0_3.header.clone()]).await.unwrap();
node3.dag_insert(&[event0_3.clone()]).await.unwrap();
node3.header_dag_insert(vec![event0_3.header.clone()], &dag_name).await.unwrap();
node3.dag_insert(&[event0_3.clone()], &dag_name).await.unwrap();
node3.p2p.broadcast(&EventPut(event0_3)).await;
msleep(300).await;
let event1_3 = Event::new(vec![1, 2, 3, 4, 10], node3).await;
node3.header_dag_insert(vec![event1_3.header.clone()]).await.unwrap();
node3.dag_insert(&[event1_3.clone()]).await.unwrap();
node3.header_dag_insert(vec![event1_3.header.clone()], &dag_name).await.unwrap();
node3.dag_insert(&[event1_3.clone()], &dag_name).await.unwrap();
node3.p2p.broadcast(&EventPut(event1_3)).await;
msleep(300).await;
let event2_3 = Event::new(vec![1, 2, 3, 4, 11], node3).await;
node3.header_dag_insert(vec![event2_3.header.clone()]).await.unwrap();
node3.dag_insert(&[event2_3.clone()]).await.unwrap();
node3.header_dag_insert(vec![event2_3.header.clone()], &dag_name).await.unwrap();
node3.dag_insert(&[event2_3.clone()], &dag_name).await.unwrap();
node3.p2p.broadcast(&EventPut(event2_3)).await;
msleep(300).await;
@@ -401,7 +421,7 @@ async fn eventgraph_propagation_real(ex: Arc<Executor<'static>>) {
info!("Waiting 5s for new node connection");
sleep(5).await;
event_graph.dag_sync(false).await.unwrap()
event_graph.sync_selected(1, false).await.unwrap();
}
// ============================================================
@@ -440,8 +460,10 @@ async fn eventgraph_chaotic_propagation_real(ex: Arc<Executor<'static>>) {
for i in 0..n_events {
let random_node = eg_instances.choose(&mut rng).unwrap();
let event = Event::new(i.to_be_bytes().to_vec(), random_node).await;
random_node.header_dag_insert(vec![event.header.clone()]).await.unwrap();
random_node.dag_insert(&[event.clone()]).await.unwrap();
let current_genesis = random_node.current_genesis.read().await;
let dag_name = current_genesis.id().to_string();
random_node.header_dag_insert(vec![event.header.clone()], &dag_name).await.unwrap();
random_node.dag_insert(&[event.clone()], &dag_name).await.unwrap();
random_node.p2p.broadcast(&EventPut(event)).await;
}
info!("Waiting 5s for events propagation");
@@ -480,7 +502,7 @@ async fn eventgraph_chaotic_propagation_real(ex: Arc<Executor<'static>>) {
info!("Waiting 5s for new node connection");
sleep(5).await;
event_graph.dag_sync(false).await.unwrap()
event_graph.sync_selected(2, false).await.unwrap()
}
// ============================================================