event_graph: improved faster parallel event download

- the last one uses join_all on the futures thus the speed of the download is determined by the slowest peer
- in the current impl, FuturesUnordered is used to schedule a chunk download as soon as one peer finishes,
  thus the download speed will be determined by the fastest peer
This commit is contained in:
oars
2025-09-10 17:26:47 +03:00
parent 9fb431f413
commit 76096c4e5f

View File

@@ -18,7 +18,7 @@
// use async_std::stream::from_iter;
use std::{
collections::{BTreeMap, HashMap, HashSet, VecDeque},
collections::{BTreeMap, BTreeSet, HashMap, HashSet, VecDeque},
path::PathBuf,
str::FromStr,
sync::Arc,
@@ -29,7 +29,6 @@ use blake3::Hash;
use darkfi_serial::{deserialize_async, serialize_async};
use event::Header;
use futures::{
future::join_all,
// future,
stream::FuturesUnordered,
StreamExt,
@@ -42,6 +41,7 @@ use smol::{
Executor,
};
use tinyjson::JsonValue::{self};
use url::Url;
use crate::{
event_graph::util::{midnight_timestamp, replayer_log},
@@ -68,7 +68,6 @@ use util::{generate_genesis, millis_until_next_rotation, next_rotation_timestamp
// Debugging event graph
pub mod deg;
use crate::net::ChannelPtr;
use deg::DegEvent;
#[cfg(test)]
@@ -326,6 +325,12 @@ impl DAGStore {
}
}
enum PeerStatus {
Free,
Busy,
Failed,
}
/// An Event Graph instance
pub struct EventGraph {
/// Pointer to the P2P network instance
@@ -602,72 +607,78 @@ impl EventGraph {
info!(target: "event_graph::dag_sync()", "[EVENTGRAPH] Retrieving {} Events", header_sorted.len());
// Implement parallel download of events with a batch size
let batch = 5;
let batch = 20;
// Mapping of the chunk group id to the chunk, using a BTreeMap help us to
// prioritize the older headers when our request fails and we retry
let mut remaining_chunks: BTreeMap<usize, Vec<blake3::Hash>> = BTreeMap::new();
let mut chunks: BTreeMap<usize, Vec<blake3::Hash>> = BTreeMap::new();
for (i, chunk) in header_sorted.chunks(batch).enumerate() {
remaining_chunks.insert(i, chunk.iter().map(|h| h.id()).collect());
chunks.insert(i, chunk.iter().map(|h| h.id()).collect());
}
let mut remaining_chunk_ids: BTreeSet<usize> = chunks.keys().cloned().collect();
// Mapping of the chunk group id to the received events, using a BTreeMap help
// us to verify and insert the events in order
let mut received_events: BTreeMap<usize, Vec<Event>> = BTreeMap::new();
// Track peers that failed us so we don't send request again
let mut failed_peers = vec![];
// Track peers status so that we don't send a new request to the same peer before they
// finish the first or send to a failed peer
let mut peer_status: HashMap<Url, PeerStatus> = HashMap::new();
let mut retrieved_count = 0;
let mut futures = FuturesUnordered::new();
while remaining_chunks.len() > 0 {
while retrieved_count < header_sorted.len() {
// Retrieve peers in each loop so we don't send requests to a closed channel
let channels: Vec<ChannelPtr> = self
.p2p
.hosts()
.peers()
.iter()
.filter(|c| !failed_peers.contains(c.address()))
.cloned()
.collect();
let mut free_channels = vec![];
let mut busy_channels = 0;
if channels.len() == 0 {
// Removing peers that failed us might be too strict but it is better than
// looping over failed peers knowing that they may never provide the event.
// Also the DAG sync is retried so it is not a problem.
self.p2p.hosts().peers().iter().for_each(|channel| {
if let Some(status) = peer_status.get(channel.address()) {
match status {
PeerStatus::Free => free_channels.push(channel.clone()),
PeerStatus::Busy => busy_channels += 1,
_ => {}
}
} else {
peer_status.insert(channel.address().clone(), PeerStatus::Free);
free_channels.push(channel.clone());
}
});
// We don't have any channels we can assign to or wait to get response from
if free_channels.is_empty() && busy_channels == 0 {
return Err(Error::DagSyncFailed);
}
// We will distribute the remaining chunks to each channel
let requested_chunks_len = std::cmp::min(channels.len(), remaining_chunks.len());
let keys: Vec<_> =
remaining_chunks.keys().take(requested_chunks_len).cloned().collect();
let mut requested_chunk_ids = Vec::with_capacity(requested_chunks_len);
let mut requested_chunks = Vec::with_capacity(requested_chunks_len);
let mut futures = vec![];
for (i, key) in keys.iter().enumerate() {
if let Some(value) = remaining_chunks.remove(&key) {
requested_chunk_ids.push(*key);
requested_chunks.push(value.clone());
futures.push(request_event(channels[i].clone(), value, comms_timeout));
}
let requested_chunks_len =
std::cmp::min(free_channels.len(), remaining_chunk_ids.len());
let requested_chunk_ids: Vec<usize> =
remaining_chunk_ids.iter().take(requested_chunks_len).copied().collect();
for (i, chunk_id) in requested_chunk_ids.iter().enumerate() {
futures.push(request_event(
free_channels[i].clone(),
chunks.get(chunk_id).unwrap().clone(),
*chunk_id,
comms_timeout,
));
remaining_chunk_ids.remove(chunk_id);
peer_status.insert(free_channels[i].address().clone(), PeerStatus::Busy);
}
info!(target: "event_graph::dag_sync()", "[EVENTGRAPH] Retrieving Events from {} peers", futures.len());
let rets = join_all(futures).await;
for (i, res) in rets.iter().enumerate() {
if let Ok(events) = res {
if let Some(resp) = futures.next().await {
let (events, chunk_id, channel) = resp;
if let Ok(events) = events {
retrieved_count += events.len();
received_events.insert(requested_chunk_ids[i], events.clone());
received_events.insert(chunk_id, events.clone());
peer_status.insert(channel.address().clone(), PeerStatus::Free);
} else {
// The request has failed so insert the chunks back to remaining to try with another peer
// also note the peer so we don't ask again
remaining_chunks
.insert(requested_chunk_ids[i], requested_chunks[i].clone());
failed_peers.push(channels[i].address().clone());
remaining_chunk_ids.insert(chunk_id);
peer_status.insert(channel.address().clone(), PeerStatus::Failed);
}
}
info!(target: "event_graph::dag_sync()", "[EVENTGRAPH] Retrieved Events: {}/{}", retrieved_count, header_sorted.len());
info!(target: "event_graph::dag_sync()", "[EVENTGRAPH] Retrieved Events: {}/{}", retrieved_count, header_sorted.len());
}
}
let mut verified_count = 0;
@@ -1213,8 +1224,9 @@ async fn request_header(
async fn request_event(
peer: Arc<Channel>,
headers: Vec<Hash>,
chunk_id: usize,
comms_timeout: u64,
) -> Result<Vec<Event>> {
) -> (Result<Vec<Event>>, usize, Arc<Channel>) {
let url = peer.address();
debug!(
@@ -1230,7 +1242,11 @@ async fn request_event(
"[EVENTGRAPH] Sync: Couldn't subscribe EventRep for peer {}, skipping ({})",
url, e,
);
return Err(Error::EventNotFound("Couldn't subscribe EventRep".to_owned()));
return (
Err(Error::EventNotFound("Couldn't subscribe EventRep".to_owned())),
chunk_id,
peer,
);
}
};
@@ -1241,7 +1257,11 @@ async fn request_event(
"[EVENTGRAPH] Sync: Failed communicating EventReq({:?}) to {}: {}",
headers, url, e,
);
return Err(Error::EventNotFound("Failed communicating EventReq".to_owned()));
return (
Err(Error::EventNotFound("Failed communicating EventReq".to_owned())),
chunk_id,
peer,
);
}
// Node waits for response
@@ -1251,8 +1271,12 @@ async fn request_event(
"[EVENTGRAPH] Sync: Timeout waiting for parents {:?} from {}",
headers, url,
);
return Err(Error::EventNotFound("Timeout waiting for parents".to_owned()));
return (
Err(Error::EventNotFound("Timeout waiting for parents".to_owned())),
chunk_id,
peer,
);
};
Ok(event.0.clone())
(Ok(event.0.clone()), chunk_id, peer)
}