script/evgrd: fill in required LocalEventGraph fields and methods, and present a working test module

This commit is contained in:
dasman
2024-09-19 18:23:49 +03:00
parent 3dead803ad
commit a63d628ec5
6 changed files with 402 additions and 70 deletions

View File

@@ -18,7 +18,7 @@
use darkfi::{
async_daemonize, cli_desc,
event_graph::{self, proto::ProtocolEventGraph, EventGraph, EventGraphPtr},
event_graph::{proto::ProtocolEventGraph, Event, EventGraph, EventGraphPtr},
net::{
session::SESSION_DEFAULT,
settings::SettingsOpt as NetSettingsOpt,
@@ -29,24 +29,16 @@ use darkfi::{
jsonrpc::JsonSubscriber,
server::{listen_and_serve, RequestHandler},
},
system::{sleep, Publisher, PublisherPtr, StoppableTask, StoppableTaskPtr},
util::path::{expand_path, get_config_path},
system::{sleep, StoppableTask, StoppableTaskPtr},
util::path::expand_path,
Error, Result,
};
use darkfi_serial::{
async_trait, deserialize_async, serialize_async, AsyncDecodable, AsyncEncodable, Encodable,
SerialDecodable, SerialEncodable,
};
use darkfi_serial::{AsyncDecodable, AsyncEncodable};
use futures::FutureExt;
use log::{debug, error, info};
use rand::rngs::OsRng;
use sled_overlay::sled;
use smol::{fs, lock::Mutex, stream::StreamExt, Executor};
use std::{
collections::HashSet,
path::PathBuf,
sync::{Arc, Mutex as SyncMutex},
};
use std::{collections::HashSet, path::PathBuf, sync::Arc};
use structopt_toml::{serde::Deserialize, structopt::StructOpt, StructOptToml};
use url::Url;
@@ -102,7 +94,7 @@ struct Args {
sync_attempts: u8,
/// Number of seconds to wait before trying again if sync fails.
#[structopt(long, default_value = "10")]
#[structopt(long, default_value = "50")]
sync_timeout: u8,
/// P2P network settings
@@ -170,14 +162,12 @@ async fn rpc_serve(
}
}
}
Ok(())
}
async fn handle_connect(
mut stream: Box<dyn PtStream>,
daemon: Arc<Daemon>,
ex: Arc<Executor<'_>>,
_ex: Arc<Executor<'_>>,
) -> Result<()> {
let client_version = VersionMessage::decode_async(&mut stream).await?;
info!(target: "evgrd", "Client version: {}", client_version.protocol_version);
@@ -204,6 +194,8 @@ async fn handle_connect(
info!(target: "evgrd", "Fetching events {fetchevs:?}");
let events = daemon.event_graph.fetch_successors_of(fetchevs.unref_tips).await?;
info!("fetched {events:?}");
for event in events {
MSG_EVENT.encode_async(&mut stream).await?;
event.encode_async(&mut stream).await?;
@@ -234,12 +226,18 @@ async fn realmain(args: Args, ex: Arc<Executor<'static>>) -> Result<()> {
sled_db.clone(),
replay_datastore.clone(),
replay_mode,
"darkirc_dag",
"evgrd_dag",
1,
ex.clone(),
)
.await?;
// Adding some events
// for i in 1..6 {
// let event = Event::new(vec![1, 2, 3, i], &event_graph).await;
// event_graph.dag_insert(&[event.clone()]).await.unwrap();
// }
let prune_task = event_graph.prune_task.get().unwrap();
info!("Registering EventGraph P2P protocol");

View File

@@ -17,33 +17,27 @@
*/
use darkfi::{
async_daemonize, cli_desc,
event_graph::{self, proto::ProtocolEventGraph, EventGraph, EventGraphPtr},
net::{
session::SESSION_DEFAULT,
settings::SettingsOpt as NetSettingsOpt,
transport::{Dialer, Listener, PtListener, PtStream},
P2p, P2pPtr,
},
rpc::{
jsonrpc::JsonSubscriber,
server::{listen_and_serve, RequestHandler},
},
system::{sleep, StoppableTask, StoppableTaskPtr},
util::path::{expand_path, get_config_path},
event_graph::{self},
net::transport::Dialer,
util::path::expand_path,
Error, Result,
};
use darkfi_serial::{
async_trait, deserialize_async, serialize_async, AsyncDecodable, AsyncEncodable, Encodable,
SerialDecodable, SerialEncodable,
};
use log::{debug, error, info, warn};
use darkfi_serial::{AsyncDecodable, AsyncEncodable};
use log::{error, info};
use sled_overlay::sled;
use smol::fs;
use url::Url;
use evgrd::{FetchEventsMessage, LocalEventGraph, VersionMessage, MSG_EVENT, MSG_FETCHEVENTS};
async fn amain() -> Result<()> {
let evgr = LocalEventGraph::new();
info!("Instantiating event DAG");
let ex = std::sync::Arc::new(smol::Executor::new());
let datastore = expand_path("~/.local/darkfi/evgrd")?;
fs::create_dir_all(&datastore).await?;
let sled_db = sled::open(datastore)?;
let evgr = LocalEventGraph::new(sled_db.clone(), "evgrd_testdag", 1, ex.clone()).await?;
let endpoint = "tcp://127.0.0.1:5588";
let endpoint = Url::parse(endpoint)?;
@@ -51,33 +45,44 @@ async fn amain() -> Result<()> {
let dialer = Dialer::new(endpoint, None).await?;
let timeout = std::time::Duration::from_secs(60);
info!("Connecting...");
println!("Connecting...");
let mut stream = dialer.dial(Some(timeout)).await?;
info!("Connected!");
println!("Connected!");
let version = VersionMessage::new();
version.encode_async(&mut stream).await?;
let server_version = VersionMessage::decode_async(&mut stream).await?;
info!("Server version: {}", server_version.protocol_version);
println!("Server version: {}", server_version.protocol_version);
let fetchevs = FetchEventsMessage::new(evgr.unref_tips.clone());
let unref_tips = evgr.unreferenced_tips.read().await.clone();
let fetchevs = FetchEventsMessage::new(unref_tips);
MSG_FETCHEVENTS.encode_async(&mut stream).await?;
fetchevs.encode_async(&mut stream).await?;
loop {
let msg_type = u8::decode_async(&mut stream).await?;
println!("Received: {msg_type:?}");
if msg_type != MSG_EVENT {
error!("Received invalid msg_type: {msg_type}");
return Err(Error::MalformedPacket)
}
let ev = event_graph::Event::decode_async(&mut stream).await?;
}
Ok(())
let genesis_timestamp = evgr.current_genesis.read().await.clone().timestamp;
let ev_id = ev.id();
if !evgr.dag.contains_key(ev_id.as_bytes()).unwrap() &&
ev.validate(&evgr.dag, genesis_timestamp, evgr.days_rotation, None).await?
{
println!("got {ev:?}");
evgr.dag_insert(&[ev]).await.unwrap();
} else {
println!("Event is invalid!")
}
}
}
fn main() {
smol::block_on(amain());
let _ = smol::block_on(amain());
}

View File

@@ -16,21 +16,336 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
use darkfi::event_graph::Event;
use darkfi_serial::{
async_trait, deserialize_async, serialize_async, Encodable, SerialDecodable, SerialEncodable,
use darkfi::{
event_graph::{
util::{generate_genesis, next_rotation_timestamp, seconds_until_next_rotation},
Event, GENESIS_CONTENTS, INITIAL_GENESIS, NULL_ID, N_EVENT_PARENTS,
},
system::{sleep, Publisher, PublisherPtr, StoppableTask, StoppableTaskPtr},
Error, Result,
};
use darkfi_serial::{
async_trait, deserialize_async, serialize_async, SerialDecodable, SerialEncodable,
};
use log::{debug, error, info};
use sled_overlay::{sled, SledTreeOverlay};
use smol::{
lock::{OnceCell, RwLock},
Executor,
};
use std::{
collections::{BTreeMap, HashSet},
sync::Arc,
};
use std::sync::Arc;
pub const PROTOCOL_VERSION: u32 = 1;
/// Atomic pointer to an [`EventGraph`] instance.
pub type LocalEventGraphPtr = Arc<LocalEventGraph>;
pub struct LocalEventGraph {
pub unref_tips: Vec<(u64, blake3::Hash)>,
/// Sled tree containing the DAG
pub dag: sled::Tree,
/// The set of unreferenced DAG tips
pub unreferenced_tips: RwLock<BTreeMap<u64, HashSet<blake3::Hash>>>,
/// A `HashSet` containg event IDs and their 1-level parents.
/// These come from the events we've sent out using `EventPut`.
/// They are used with `EventReq` to decide if we should reply
/// or not. Additionally it is also used when we broadcast the
/// `TipRep` message telling peers about our unreferenced tips.
broadcasted_ids: RwLock<HashSet<blake3::Hash>>,
/// DAG Pruning Task
pub prune_task: OnceCell<StoppableTaskPtr>,
/// Event publisher, this notifies whenever an event is
/// inserted into the DAG
pub event_pub: PublisherPtr<Event>,
/// Current genesis event
pub current_genesis: RwLock<Event>,
/// Currently configured DAG rotation, in days
pub days_rotation: u64,
/// Flag signalling DAG has finished initial sync
pub synced: RwLock<bool>,
/// Enable graph debugging
pub deg_enabled: RwLock<bool>,
}
impl LocalEventGraph {
pub fn new() -> Self {
Self { unref_tips: vec![] }
pub async fn new(
sled_db: sled::Db,
dag_tree_name: &str,
days_rotation: u64,
ex: Arc<Executor<'_>>,
) -> Result<LocalEventGraphPtr> {
let dag = sled_db.open_tree(dag_tree_name)?;
let unreferenced_tips = RwLock::new(BTreeMap::new());
let broadcasted_ids = RwLock::new(HashSet::new());
let event_pub = Publisher::new();
// Create the current genesis event based on the `days_rotation`
let current_genesis = generate_genesis(days_rotation);
let self_ = Arc::new(Self {
dag: dag.clone(),
unreferenced_tips,
broadcasted_ids,
prune_task: OnceCell::new(),
event_pub,
current_genesis: RwLock::new(current_genesis.clone()),
days_rotation,
synced: RwLock::new(false),
deg_enabled: RwLock::new(false),
});
// Check if we have it in our DAG.
// If not, we can prune the DAG and insert this new genesis event.
if !dag.contains_key(current_genesis.id().as_bytes())? {
info!(
target: "event_graph::new()",
"[EVENTGRAPH] DAG does not contain current genesis, pruning existing data",
);
self_.dag_prune(current_genesis).await?;
}
// Find the unreferenced tips in the current DAG state.
*self_.unreferenced_tips.write().await = self_.find_unreferenced_tips().await;
// Spawn the DAG pruning task
if days_rotation > 0 {
let prune_task = StoppableTask::new();
let _ = self_.prune_task.set(prune_task.clone()).await;
prune_task.clone().start(
self_.clone().dag_prune_task(days_rotation),
|_| async move {
info!(target: "event_graph::_handle_stop()", "[EVENTGRAPH] Prune task stopped, flushing sled")
},
Error::DetachedTaskStopped,
ex.clone(),
);
}
Ok(self_)
}
async fn dag_prune(&self, genesis_event: Event) -> Result<()> {
debug!(target: "event_graph::dag_prune()", "Pruning DAG...");
// Acquire exclusive locks to unreferenced_tips, broadcasted_ids and
// current_genesis while this operation is happening. We do this to
// ensure that during the pruning operation, no other operations are
// able to access the intermediate state which could lead to producing
// the wrong state after pruning.
let mut unreferenced_tips = self.unreferenced_tips.write().await;
let mut broadcasted_ids = self.broadcasted_ids.write().await;
let mut current_genesis = self.current_genesis.write().await;
// Atomically clear the DAG and write the new genesis event.
let mut batch = sled::Batch::default();
for key in self.dag.iter().keys() {
batch.remove(key.unwrap());
}
batch.insert(genesis_event.id().as_bytes(), serialize_async(&genesis_event).await);
debug!(target: "event_graph::dag_prune()", "Applying batch...");
if let Err(e) = self.dag.apply_batch(batch) {
panic!("Failed pruning DAG, sled apply_batch error: {}", e);
}
// Clear unreferenced tips and bcast ids
*unreferenced_tips = BTreeMap::new();
unreferenced_tips.insert(0, HashSet::from([genesis_event.id()]));
*current_genesis = genesis_event;
*broadcasted_ids = HashSet::new();
drop(unreferenced_tips);
drop(broadcasted_ids);
drop(current_genesis);
debug!(target: "event_graph::dag_prune()", "DAG pruned successfully");
Ok(())
}
/// Background task periodically pruning the DAG.
async fn dag_prune_task(self: Arc<Self>, days_rotation: u64) -> Result<()> {
// The DAG should periodically be pruned. This can be a configurable
// parameter. By pruning, we should deterministically replace the
// genesis event (can use a deterministic timestamp) and drop everything
// in the DAG, leaving just the new genesis event.
debug!(target: "event_graph::dag_prune_task()", "Spawned background DAG pruning task");
loop {
// Find the next rotation timestamp:
let next_rotation = next_rotation_timestamp(INITIAL_GENESIS, days_rotation);
// Prepare the new genesis event
let current_genesis = Event {
timestamp: next_rotation,
content: GENESIS_CONTENTS.to_vec(),
parents: [NULL_ID; N_EVENT_PARENTS],
layer: 0,
};
// Sleep until it's time to rotate.
let s = seconds_until_next_rotation(next_rotation);
debug!(target: "event_graph::dag_prune_task()", "Sleeping {}s until next DAG prune", s);
sleep(s).await;
debug!(target: "event_graph::dag_prune_task()", "Rotation period reached");
// Trigger DAG prune
self.dag_prune(current_genesis).await?;
}
}
/// Find the unreferenced tips in the current DAG state, mapped by their layers.
async fn find_unreferenced_tips(&self) -> BTreeMap<u64, HashSet<blake3::Hash>> {
// First get all the event IDs
let mut tips = HashSet::new();
for iter_elem in self.dag.iter() {
let (id, _) = iter_elem.unwrap();
let id = blake3::Hash::from_bytes((&id as &[u8]).try_into().unwrap());
tips.insert(id);
}
// Iterate again to find unreferenced IDs
for iter_elem in self.dag.iter() {
let (_, event) = iter_elem.unwrap();
let event: Event = deserialize_async(&event).await.unwrap();
for parent in event.parents.iter() {
tips.remove(parent);
}
}
// Build the layers map
let mut map: BTreeMap<u64, HashSet<blake3::Hash>> = BTreeMap::new();
for tip in tips {
let event = self.dag_get(&tip).await.unwrap().unwrap();
if let Some(layer_tips) = map.get_mut(&event.layer) {
layer_tips.insert(tip);
} else {
let mut layer_tips = HashSet::new();
layer_tips.insert(tip);
map.insert(event.layer, layer_tips);
}
}
map
}
pub async fn dag_insert(&self, events: &[Event]) -> Result<Vec<blake3::Hash>> {
// Sanity check
if events.is_empty() {
return Ok(vec![])
}
// Acquire exclusive locks to `unreferenced_tips and broadcasted_ids`
let mut unreferenced_tips = self.unreferenced_tips.write().await;
let mut broadcasted_ids = self.broadcasted_ids.write().await;
// Here we keep the IDs to return
let mut ids = Vec::with_capacity(events.len());
// Create an overlay over the DAG tree
let mut overlay = SledTreeOverlay::new(&self.dag);
// Grab genesis timestamp
let genesis_timestamp = self.current_genesis.read().await.timestamp;
// Iterate over given events to validate them and
// write them to the overlay
for event in events {
let event_id = event.id();
debug!(
target: "event_graph::dag_insert()",
"Inserting event {} into the DAG", event_id,
);
if !event
.validate(&self.dag, genesis_timestamp, self.days_rotation, Some(&overlay))
.await?
{
error!(target: "event_graph::dag_insert()", "Event {} is invalid!", event_id);
return Err(Error::EventIsInvalid)
}
let event_se = serialize_async(event).await;
// Add the event to the overlay
overlay.insert(event_id.as_bytes(), &event_se)?;
// Note down the event ID to return
ids.push(event_id);
}
// Aggregate changes into a single batch
let batch = overlay.aggregate().unwrap();
// Atomically apply the batch.
// Panic if something is corrupted.
if let Err(e) = self.dag.apply_batch(batch) {
panic!("Failed applying dag_insert batch to sled: {}", e);
}
// Iterate over given events to update references and
// send out notifications about them
for event in events {
let event_id = event.id();
// Update the unreferenced DAG tips set
debug!(
target: "event_graph::dag_insert()",
"Event {} parents {:#?}", event_id, event.parents,
);
for parent_id in event.parents.iter() {
if parent_id != &NULL_ID {
debug!(
target: "event_graph::dag_insert()",
"Removing {} from unreferenced_tips", parent_id,
);
// Iterate over unreferenced tips in previous layers
// and remove the parent
// NOTE: this might be too exhaustive, but the
// assumption is that previous layers unreferenced
// tips will be few.
for (layer, tips) in unreferenced_tips.iter_mut() {
if layer >= &event.layer {
continue
}
tips.remove(parent_id);
}
broadcasted_ids.insert(*parent_id);
}
}
unreferenced_tips.retain(|_, tips| !tips.is_empty());
debug!(
target: "event_graph::dag_insert()",
"Adding {} to unreferenced tips", event_id,
);
if let Some(layer_tips) = unreferenced_tips.get_mut(&event.layer) {
layer_tips.insert(event_id);
} else {
let mut layer_tips = HashSet::new();
layer_tips.insert(event_id);
unreferenced_tips.insert(event.layer, layer_tips);
}
// Send out notifications about the new event
self.event_pub.notify(event.clone()).await;
}
// Drop the exclusive locks
drop(unreferenced_tips);
drop(broadcasted_ids);
Ok(ids)
}
/// Fetch an event from the DAG
pub async fn dag_get(&self, event_id: &blake3::Hash) -> Result<Option<Event>> {
let Some(bytes) = self.dag.get(event_id.as_bytes())? else { return Ok(None) };
let event: Event = deserialize_async(&bytes).await?;
Ok(Some(event))
}
}
@@ -45,13 +360,19 @@ impl VersionMessage {
}
}
impl Default for VersionMessage {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
pub struct FetchEventsMessage {
pub unref_tips: Vec<(u64, blake3::Hash)>,
pub unref_tips: BTreeMap<u64, HashSet<blake3::Hash>>,
}
impl FetchEventsMessage {
pub fn new(unref_tips: Vec<(u64, blake3::Hash)>) -> Self {
pub fn new(unref_tips: BTreeMap<u64, HashSet<blake3::Hash>>) -> Self {
Self { unref_tips }
}
}

View File

@@ -34,11 +34,11 @@ pub struct Event {
/// Timestamp of the event in whole seconds
pub timestamp: u64,
/// Content of the event
pub(crate) content: Vec<u8>,
pub content: Vec<u8>,
/// Parent nodes in the event DAG
pub(crate) parents: [blake3::Hash; N_EVENT_PARENTS],
pub parents: [blake3::Hash; N_EVENT_PARENTS],
/// DAG layer index of the event
pub(crate) layer: u64,
pub layer: u64,
}
impl Event {

View File

@@ -60,7 +60,7 @@ pub mod util;
use util::{generate_genesis, next_rotation_timestamp};
// Debugging event graph
pub(crate) mod deg;
pub mod deg;
use deg::DegEvent;
#[cfg(test)]
@@ -68,12 +68,12 @@ mod tests;
/// Initial genesis timestamp in millis (07 Sep 2023, 00:00:00 UTC)
/// Must always be UTC midnight.
const INITIAL_GENESIS: u64 = 1_694_044_800_000;
pub const INITIAL_GENESIS: u64 = 1_694_044_800_000;
/// Genesis event contents
const GENESIS_CONTENTS: &[u8] = &[0x47, 0x45, 0x4e, 0x45, 0x53, 0x49, 0x53];
pub const GENESIS_CONTENTS: &[u8] = &[0x47, 0x45, 0x4e, 0x45, 0x53, 0x49, 0x53];
/// The number of parents an event is supposed to have.
const N_EVENT_PARENTS: usize = 5;
pub const N_EVENT_PARENTS: usize = 5;
/// Allowed timestamp drift in milliseconds
const EVENT_TIME_DRIFT: u64 = 60_000;
/// Null event ID
@@ -843,10 +843,13 @@ impl EventGraph {
/// Fetch all the events that are on a higher layers than the
/// provided ones.
pub async fn fetch_successors_of(&self, tips: Vec<(u64, blake3::Hash)>) -> Result<Vec<Event>> {
pub async fn fetch_successors_of(
&self,
tips: BTreeMap<u64, HashSet<blake3::Hash>>,
) -> Result<Vec<Event>> {
debug!(
target: "event_graph::fetch_successors_of()",
"fetching successors of c{tips:?}"
"fetching successors of {tips:?}"
);
let mut graph = HashMap::new();
@@ -859,17 +862,22 @@ impl EventGraph {
let mut result = vec![];
for tip in tips.iter() {
if !graph.contains_key(&tip.1) {
continue;
'outer: for tip in tips.iter() {
for i in tip.1.iter() {
if !graph.contains_key(i) {
continue 'outer;
}
}
for (_, ev) in graph.iter() {
if ev.layer > tip.0 && !result.contains(ev) {
if ev.layer > *tip.0 && !result.contains(ev) {
result.push(ev.clone())
}
}
}
result.sort_by(|a, b| a.layer.cmp(&b.layer));
Ok(result)
}

View File

@@ -69,7 +69,7 @@ pub(super) fn days_since(midnight_ts: u64) -> u64 {
}
/// Calculate the timestamp of the next DAG rotation.
pub(super) fn next_rotation_timestamp(starting_timestamp: u64, rotation_period: u64) -> u64 {
pub fn next_rotation_timestamp(starting_timestamp: u64, rotation_period: u64) -> u64 {
// Prevent division by 0
if rotation_period == 0 {
panic!("Rotation period cannot be 0");
@@ -99,7 +99,7 @@ pub(super) fn next_rotation_timestamp(starting_timestamp: u64, rotation_period:
/// Calculate the time in milliseconds until the next_rotation, given
/// as a timestamp.
/// `next_rotation` here represents a timestamp in UNIX epoch format.
pub(super) fn seconds_until_next_rotation(next_rotation: u64) -> u64 {
pub fn seconds_until_next_rotation(next_rotation: u64) -> u64 {
// Store `now` in a variable in order to avoid a TOCTOU error.
// There may be a drift of one second between this panic check and
// the return value if we get unlucky.
@@ -111,7 +111,7 @@ pub(super) fn seconds_until_next_rotation(next_rotation: u64) -> u64 {
}
/// Generate a deterministic genesis event corresponding to the DAG's configuration.
pub(super) fn generate_genesis(days_rotation: u64) -> Event {
pub fn generate_genesis(days_rotation: u64) -> Event {
// Days rotation is u64 except zero
let timestamp = if days_rotation == 0 {
INITIAL_GENESIS