diff --git a/src/event_graph/mod.rs b/src/event_graph/mod.rs index 5fbdbf76b..41f896abd 100644 --- a/src/event_graph/mod.rs +++ b/src/event_graph/mod.rs @@ -31,6 +31,7 @@ use std::{ use blake3::Hash; use darkfi_serial::{deserialize_async, serialize_async}; +use futures::future::join_all; use event::Header; use num_bigint::BigUint; use sled_overlay::{sled, SledTreeOverlay}; @@ -71,6 +72,7 @@ use util::{generate_genesis, millis_until_next_rotation, next_rotation_timestamp // Debugging event graph pub mod deg; use deg::DegEvent; +use crate::net::ChannelPtr; #[cfg(test)] mod tests; @@ -356,7 +358,6 @@ impl EventGraph { // start download payload if !fast_mode { info!(target: "event_graph::dag_sync()", "[EVENTGRAPH] Fetching events"); - let peers = channels.clone().into_iter().collect::>(); let mut header_sorted = vec![]; for iter_elem in self.header_dag.iter() { @@ -366,15 +367,85 @@ impl EventGraph { } header_sorted.sort_by(|x, y| y.layer.cmp(&x.layer)); + info!(target: "event_graph::dag_sync()", "[EVENTGRAPH] Retrieving {} Events", header_sorted.len()); + // Implement parallel download of events with a batch size + let batch = 20; + // Mapping of the chunk group id to the chunk, using a BTreeMap help us to + // prioritize the older headers when our request fails and we retry + let mut remaining_chunks: BTreeMap> = BTreeMap::new(); + for (i, chunk) in header_sorted.chunks(batch).enumerate() { + remaining_chunks.insert(i, chunk.iter().map(|h| h.id()).collect()); + } + + // Mapping of the chunk group id to the received events, using a BTreeMap help + // us to verify and insert the events in order + let mut received_events: BTreeMap> = BTreeMap::new(); + // Track peers that failed us so we don't send request again + let mut failed_peers = vec![]; + let mut retrieved_count = 0; + + while remaining_chunks.len() > 0 { + + // Retrieve peers in each loop so we don't send requests to a closed channel + let channels: Vec = self.p2p.hosts().peers().iter().filter(|c| !failed_peers.contains(c.address())).cloned().collect(); + + if channels.len() == 0 { + // Removing peers that failed us might be too strict but it is better than + // looping over failed peers knowing that they may never provide the event. + // Also the DAG sync is retried so it is not a problem. + return Err(Error::DagSyncFailed); + } + + + // We will distribute the remaining chunks to each channel + let requested_chunks_len = std::cmp::min(channels.len(), remaining_chunks.len()); + let keys : Vec<_> = remaining_chunks.keys().take(requested_chunks_len).cloned().collect(); + let mut requested_chunk_ids = Vec::with_capacity(requested_chunks_len); + let mut requested_chunks = Vec::with_capacity(requested_chunks_len); + let mut futures = vec![]; + + for (i, key) in keys.iter().enumerate() { + if let Some(value) = remaining_chunks.remove(&key) { + requested_chunk_ids.push(*key); + requested_chunks.push(value.clone()); + futures.push(request_event(channels[i].clone(), value, comms_timeout)); + } + } + + info!(target: "event_graph::dag_sync()", "[EVENTGRAPH] Retrieving Events from {} peers", futures.len()); + let rets = join_all(futures).await; + + for (i, res) in rets.iter().enumerate() { + if let Ok(events) = res { + retrieved_count += events.len(); + received_events.insert(requested_chunk_ids[i], events.clone()); + } else { + // The request has failed so insert the chunks back to remaining to try with another peer + // also note the peer so we don't ask again + remaining_chunks.insert(requested_chunk_ids[i], requested_chunks[i].clone()); + failed_peers.push(channels[i].address().clone()); + } + } + + info!(target: "event_graph::dag_sync()", "[EVENTGRAPH] Retrieved Events: {}/{}", retrieved_count, header_sorted.len()); + } + + let mut verified_count = 0; + for (_, chunk) in received_events { + verified_count += chunk.len(); + self.dag_insert(&chunk).await?; + info!(target: "event_graph::dag_sync()", "[EVENTGRAPH] Verified Events: {}/{}", verified_count, retrieved_count); + } + // 1. Fetch events one by one // let mut events_requests = FuturesOrdered::new(); - let peer = peer_selection(peers.clone()); + // let peer = peer_selection(peers.clone()); // let peer = channels[0].clone(); - for header in header_sorted.iter() { - let received_events = - request_event(peer.clone(), vec![header.id()], comms_timeout).await?; - self.dag_insert(&received_events).await?; - } + // for header in header_sorted.iter() { + // let received_events = + // request_event(peer.clone(), vec![header.id()], comms_timeout).await?; + // self.dag_insert(&received_events).await?; + //} // let mut received_events = vec![]; // while let Some(peer_events) = events_requests.next().await { diff --git a/src/event_graph/proto.rs b/src/event_graph/proto.rs index 3270060c5..6e03afb6e 100644 --- a/src/event_graph/proto.rs +++ b/src/event_graph/proto.rs @@ -526,7 +526,7 @@ impl ProtocolEventGraph { // reading our db and steal our bandwidth. let mut events = vec![]; for event_id in event_ids.iter() { - if !self.event_graph.broadcasted_ids.read().await.contains(event_id) { + /*if !self.event_graph.broadcasted_ids.read().await.contains(event_id) { let malicious_count = self.malicious_count.fetch_add(1, SeqCst); if malicious_count + 1 == MALICIOUS_THRESHOLD { error!( @@ -544,7 +544,7 @@ impl ProtocolEventGraph { self.channel.display_address() ); continue - } + }*/ // At this point we should have it in our DAG. // This code panics if this is not the case. @@ -552,7 +552,8 @@ impl ProtocolEventGraph { target: "event_graph::protocol::handle_event_req", "Fetching event {event_id:?} from DAG" ); - events.push(self.event_graph.dag_get(event_id).await.unwrap().unwrap()); + + events.push(self.event_graph.dag_get(event_id).await?.ok_or(Error::EventNotFound("Event Not Found in DAG".to_owned()))?); } // Check if the incoming event is older than the genesis event. If so, something