event-graph: DAG sync concept

This commit is contained in:
parazyd
2023-09-08 16:41:46 +02:00
parent f6699efea9
commit 7861949a64
3 changed files with 87 additions and 6 deletions

View File

@@ -524,6 +524,12 @@ pub enum Error {
#[error("Geode chunk route not found")]
GeodeChunkRouteNotFound,
// ==================
// Event Graph errors
// ==================
#[error("DAG sync failed")]
DagSyncFailed,
// =========
// Catch-all
// =========

View File

@@ -18,14 +18,14 @@
use std::{
cmp::Ordering,
collections::{HashSet, VecDeque},
collections::{HashMap, HashSet, VecDeque},
sync::Arc,
time::UNIX_EPOCH,
};
use async_recursion::async_recursion;
use darkfi_serial::{deserialize_async, serialize_async};
use log::{debug, info};
use log::{debug, error, info};
use num_bigint::BigUint;
use smol::{
lock::{Mutex, RwLock},
@@ -44,6 +44,7 @@ pub use event::Event;
/// P2P protocol implementation for the Event Graph
pub mod proto;
use proto::TipReq;
/// Utility functions
mod util;
@@ -162,7 +163,7 @@ impl EventGraph {
}
/// Sync the DAG from connected peers
pub async fn dag_sync(&self) {
pub async fn dag_sync(&self) -> Result<()> {
// We do an optimistic sync where we ask all our connected peers for
// the DAG tips (unreferenced events) and then we accept the ones we
// see the most times.
@@ -177,6 +178,80 @@ impl EventGraph {
// * Since we should be pruning, if we're not synced after some reasonable
// amount of iterations, these could be faulty peers and we can try again
// from the beginning
// Get references to all our peers.
let channels = self.p2p.channels().lock().await.clone();
let mut communicated_peers = channels.len();
info!(
target: "event_graph::dag_sync()",
"[EVENTGRAPH]Syncing DAG from {} peers...", communicated_peers,
);
// Here we keep track of the tips and how many time we've seen them.
let mut tips: HashMap<blake3::Hash, usize> = HashMap::new();
// Let's first ask all of our peers for their tips and collect them
// in our hashmap above.
for (url, channel) in channels.iter() {
if let Err(e) = channel.send(&TipReq {}).await {
error!(
target: "event_graph::dag_sync()",
"[EVENTGRAPH] Sync: Couldn't contact peer {}, skipping ({})", url, e,
);
communicated_peers -= 1;
continue
};
// TODO: I can't access tip_rep_sub from here.
/*
let peer_tips = match timeout(REPLY_TIMEOUT, tip_rep_sub.receive()).await {
Ok(peer_tips) => peer_tips?,
Err(_) => {
error!(
target: "event_graph::dag_sync()",
"Peer {} didn't reply with tips in time, skipping", url,
);
communicated_peers -= 1;
continue
}
};
let peer_tips = &peer_tips.0;
*/
let peer_tips = vec![];
// Note down the seen tips
for tip in peer_tips {
if let Some(seen_tip) = tips.get_mut(tip) {
*seen_tip += 1;
} else {
tips.insert(*tip, 1);
}
}
}
// After we've communicated all the peers, let's see what happened.
if tips.is_empty() {
error!(
target: "event_graph::dag_sync()",
"[EVENTGRAPH] Sync: Could not find any DAG tips",
);
return Err(Error::DagSyncFailed)
}
// We know the number of peers we've communicated with.
// Arbitrarily, let's not consider events we only got once.
// TODO: This should be more sensible depending on the peer number.
let mut considered_tips = HashSet::new();
for (tip, amount) in tips.iter() {
if amount > &1 {
considered_tips.insert(*tip);
}
}
drop(tips);
// Now begin fetching the events backwards.
// TODO: Reuse the recursive logic from EventPut handler.
todo!()
}

View File

@@ -35,7 +35,7 @@ use crate::{impl_p2p_message, net::*, system::timeout::timeout, Error, Result};
/// drop the peer from our P2P connection.
const MALICIOUS_THRESHOLD: usize = 5;
/// Time to wait for a parent ID reply
const REPLY_TIMEOUT: Duration = Duration::from_secs(5);
pub(super) const REPLY_TIMEOUT: Duration = Duration::from_secs(5);
/// P2P protocol implementation for the Event Graph.
pub struct ProtocolEventGraph {
@@ -52,7 +52,7 @@ pub struct ProtocolEventGraph {
/// `MessageSubscriber` for `TipReq`
tip_req_sub: MessageSubscription<TipReq>,
/// `MessageSubscriber` for `TipRep`
_tip_rep_sub: MessageSubscription<TipRep>,
tip_rep_sub: MessageSubscription<TipRep>,
/// Peer malicious message count
malicious_count: AtomicUsize,
/// P2P jobs manager pointer
@@ -121,7 +121,7 @@ impl ProtocolEventGraph {
ev_req_sub,
ev_rep_sub,
tip_req_sub,
_tip_rep_sub: tip_rep_sub,
tip_rep_sub,
malicious_count: AtomicUsize::new(0),
jobsman: ProtocolJobsManager::new("ProtocolEventGraph", channel.clone()),
}))