event_graph: parallel event download

This commit is contained in:
oars
2025-09-06 14:47:48 +03:00
committed by dasman
parent eab0aa9b88
commit 19b8d125c7
2 changed files with 82 additions and 10 deletions

View File

@@ -31,6 +31,7 @@ use std::{
use blake3::Hash;
use darkfi_serial::{deserialize_async, serialize_async};
use futures::future::join_all;
use event::Header;
use num_bigint::BigUint;
use sled_overlay::{sled, SledTreeOverlay};
@@ -71,6 +72,7 @@ use util::{generate_genesis, millis_until_next_rotation, next_rotation_timestamp
// Debugging event graph
pub mod deg;
use deg::DegEvent;
use crate::net::ChannelPtr;
#[cfg(test)]
mod tests;
@@ -356,7 +358,6 @@ impl EventGraph {
// start download payload
if !fast_mode {
info!(target: "event_graph::dag_sync()", "[EVENTGRAPH] Fetching events");
let peers = channels.clone().into_iter().collect::<Vec<_>>();
let mut header_sorted = vec![];
for iter_elem in self.header_dag.iter() {
@@ -366,15 +367,85 @@ impl EventGraph {
}
header_sorted.sort_by(|x, y| y.layer.cmp(&x.layer));
info!(target: "event_graph::dag_sync()", "[EVENTGRAPH] Retrieving {} Events", header_sorted.len());
// Implement parallel download of events with a batch size
let batch = 20;
// Mapping of the chunk group id to the chunk, using a BTreeMap help us to
// prioritize the older headers when our request fails and we retry
let mut remaining_chunks: BTreeMap<usize, Vec<blake3::Hash>> = BTreeMap::new();
for (i, chunk) in header_sorted.chunks(batch).enumerate() {
remaining_chunks.insert(i, chunk.iter().map(|h| h.id()).collect());
}
// Mapping of the chunk group id to the received events, using a BTreeMap help
// us to verify and insert the events in order
let mut received_events: BTreeMap<usize, Vec<Event>> = BTreeMap::new();
// Track peers that failed us so we don't send request again
let mut failed_peers = vec![];
let mut retrieved_count = 0;
while remaining_chunks.len() > 0 {
// Retrieve peers in each loop so we don't send requests to a closed channel
let channels: Vec<ChannelPtr> = self.p2p.hosts().peers().iter().filter(|c| !failed_peers.contains(c.address())).cloned().collect();
if channels.len() == 0 {
// Removing peers that failed us might be too strict but it is better than
// looping over failed peers knowing that they may never provide the event.
// Also the DAG sync is retried so it is not a problem.
return Err(Error::DagSyncFailed);
}
// We will distribute the remaining chunks to each channel
let requested_chunks_len = std::cmp::min(channels.len(), remaining_chunks.len());
let keys : Vec<_> = remaining_chunks.keys().take(requested_chunks_len).cloned().collect();
let mut requested_chunk_ids = Vec::with_capacity(requested_chunks_len);
let mut requested_chunks = Vec::with_capacity(requested_chunks_len);
let mut futures = vec![];
for (i, key) in keys.iter().enumerate() {
if let Some(value) = remaining_chunks.remove(&key) {
requested_chunk_ids.push(*key);
requested_chunks.push(value.clone());
futures.push(request_event(channels[i].clone(), value, comms_timeout));
}
}
info!(target: "event_graph::dag_sync()", "[EVENTGRAPH] Retrieving Events from {} peers", futures.len());
let rets = join_all(futures).await;
for (i, res) in rets.iter().enumerate() {
if let Ok(events) = res {
retrieved_count += events.len();
received_events.insert(requested_chunk_ids[i], events.clone());
} else {
// The request has failed so insert the chunks back to remaining to try with another peer
// also note the peer so we don't ask again
remaining_chunks.insert(requested_chunk_ids[i], requested_chunks[i].clone());
failed_peers.push(channels[i].address().clone());
}
}
info!(target: "event_graph::dag_sync()", "[EVENTGRAPH] Retrieved Events: {}/{}", retrieved_count, header_sorted.len());
}
let mut verified_count = 0;
for (_, chunk) in received_events {
verified_count += chunk.len();
self.dag_insert(&chunk).await?;
info!(target: "event_graph::dag_sync()", "[EVENTGRAPH] Verified Events: {}/{}", verified_count, retrieved_count);
}
// 1. Fetch events one by one
// let mut events_requests = FuturesOrdered::new();
let peer = peer_selection(peers.clone());
// let peer = peer_selection(peers.clone());
// let peer = channels[0].clone();
for header in header_sorted.iter() {
let received_events =
request_event(peer.clone(), vec![header.id()], comms_timeout).await?;
self.dag_insert(&received_events).await?;
}
// for header in header_sorted.iter() {
// let received_events =
// request_event(peer.clone(), vec![header.id()], comms_timeout).await?;
// self.dag_insert(&received_events).await?;
//}
// let mut received_events = vec![];
// while let Some(peer_events) = events_requests.next().await {

View File

@@ -526,7 +526,7 @@ impl ProtocolEventGraph {
// reading our db and steal our bandwidth.
let mut events = vec![];
for event_id in event_ids.iter() {
if !self.event_graph.broadcasted_ids.read().await.contains(event_id) {
/*if !self.event_graph.broadcasted_ids.read().await.contains(event_id) {
let malicious_count = self.malicious_count.fetch_add(1, SeqCst);
if malicious_count + 1 == MALICIOUS_THRESHOLD {
error!(
@@ -544,7 +544,7 @@ impl ProtocolEventGraph {
self.channel.display_address()
);
continue
}
}*/
// At this point we should have it in our DAG.
// This code panics if this is not the case.
@@ -552,7 +552,8 @@ impl ProtocolEventGraph {
target: "event_graph::protocol::handle_event_req",
"Fetching event {event_id:?} from DAG"
);
events.push(self.event_graph.dag_get(event_id).await.unwrap().unwrap());
events.push(self.event_graph.dag_get(event_id).await?.ok_or(Error::EventNotFound("Event Not Found in DAG".to_owned()))?);
}
// Check if the incoming event is older than the genesis event. If so, something