diff --git a/src/error.rs b/src/error.rs index d1dce9a91..ea78a71a4 100644 --- a/src/error.rs +++ b/src/error.rs @@ -524,6 +524,12 @@ pub enum Error { #[error("Geode chunk route not found")] GeodeChunkRouteNotFound, + // ================== + // Event Graph errors + // ================== + #[error("DAG sync failed")] + DagSyncFailed, + // ========= // Catch-all // ========= diff --git a/src/event_graph2/mod.rs b/src/event_graph2/mod.rs index 1a0f71527..226e5e5f2 100644 --- a/src/event_graph2/mod.rs +++ b/src/event_graph2/mod.rs @@ -18,14 +18,14 @@ use std::{ cmp::Ordering, - collections::{HashSet, VecDeque}, + collections::{HashMap, HashSet, VecDeque}, sync::Arc, time::UNIX_EPOCH, }; use async_recursion::async_recursion; use darkfi_serial::{deserialize_async, serialize_async}; -use log::{debug, info}; +use log::{debug, error, info}; use num_bigint::BigUint; use smol::{ lock::{Mutex, RwLock}, @@ -44,6 +44,7 @@ pub use event::Event; /// P2P protocol implementation for the Event Graph pub mod proto; +use proto::TipReq; /// Utility functions mod util; @@ -162,7 +163,7 @@ impl EventGraph { } /// Sync the DAG from connected peers - pub async fn dag_sync(&self) { + pub async fn dag_sync(&self) -> Result<()> { // We do an optimistic sync where we ask all our connected peers for // the DAG tips (unreferenced events) and then we accept the ones we // see the most times. @@ -177,6 +178,80 @@ impl EventGraph { // * Since we should be pruning, if we're not synced after some reasonable // amount of iterations, these could be faulty peers and we can try again // from the beginning + + // Get references to all our peers. + let channels = self.p2p.channels().lock().await.clone(); + let mut communicated_peers = channels.len(); + info!( + target: "event_graph::dag_sync()", + "[EVENTGRAPH]Syncing DAG from {} peers...", communicated_peers, + ); + + // Here we keep track of the tips and how many time we've seen them. + let mut tips: HashMap = HashMap::new(); + + // Let's first ask all of our peers for their tips and collect them + // in our hashmap above. + for (url, channel) in channels.iter() { + if let Err(e) = channel.send(&TipReq {}).await { + error!( + target: "event_graph::dag_sync()", + "[EVENTGRAPH] Sync: Couldn't contact peer {}, skipping ({})", url, e, + ); + communicated_peers -= 1; + continue + }; + + // TODO: I can't access tip_rep_sub from here. + /* + let peer_tips = match timeout(REPLY_TIMEOUT, tip_rep_sub.receive()).await { + Ok(peer_tips) => peer_tips?, + Err(_) => { + error!( + target: "event_graph::dag_sync()", + "Peer {} didn't reply with tips in time, skipping", url, + ); + communicated_peers -= 1; + continue + } + }; + let peer_tips = &peer_tips.0; + */ + let peer_tips = vec![]; + + // Note down the seen tips + for tip in peer_tips { + if let Some(seen_tip) = tips.get_mut(tip) { + *seen_tip += 1; + } else { + tips.insert(*tip, 1); + } + } + } + + // After we've communicated all the peers, let's see what happened. + if tips.is_empty() { + error!( + target: "event_graph::dag_sync()", + "[EVENTGRAPH] Sync: Could not find any DAG tips", + ); + return Err(Error::DagSyncFailed) + } + + // We know the number of peers we've communicated with. + // Arbitrarily, let's not consider events we only got once. + // TODO: This should be more sensible depending on the peer number. + let mut considered_tips = HashSet::new(); + for (tip, amount) in tips.iter() { + if amount > &1 { + considered_tips.insert(*tip); + } + } + drop(tips); + + // Now begin fetching the events backwards. + // TODO: Reuse the recursive logic from EventPut handler. + todo!() } diff --git a/src/event_graph2/proto.rs b/src/event_graph2/proto.rs index cace8059d..0a79a7fce 100644 --- a/src/event_graph2/proto.rs +++ b/src/event_graph2/proto.rs @@ -35,7 +35,7 @@ use crate::{impl_p2p_message, net::*, system::timeout::timeout, Error, Result}; /// drop the peer from our P2P connection. const MALICIOUS_THRESHOLD: usize = 5; /// Time to wait for a parent ID reply -const REPLY_TIMEOUT: Duration = Duration::from_secs(5); +pub(super) const REPLY_TIMEOUT: Duration = Duration::from_secs(5); /// P2P protocol implementation for the Event Graph. pub struct ProtocolEventGraph { @@ -52,7 +52,7 @@ pub struct ProtocolEventGraph { /// `MessageSubscriber` for `TipReq` tip_req_sub: MessageSubscription, /// `MessageSubscriber` for `TipRep` - _tip_rep_sub: MessageSubscription, + tip_rep_sub: MessageSubscription, /// Peer malicious message count malicious_count: AtomicUsize, /// P2P jobs manager pointer @@ -121,7 +121,7 @@ impl ProtocolEventGraph { ev_req_sub, ev_rep_sub, tip_req_sub, - _tip_rep_sub: tip_rep_sub, + tip_rep_sub, malicious_count: AtomicUsize::new(0), jobsman: ProtocolJobsManager::new("ProtocolEventGraph", channel.clone()), }))