event_graph: chore clippy and fmt

This commit is contained in:
dasman
2025-09-10 02:00:11 +03:00
parent 4a6bb97878
commit d85dd658b0

View File

@@ -17,9 +17,11 @@
*/
// use async_std::stream::from_iter;
// use futures::stream::FuturesOrdered;
use futures::{
future::join_all,
// future,
stream::{FuturesOrdered, FuturesUnordered},
stream::FuturesUnordered,
StreamExt,
};
use rand::{rngs::OsRng, seq::SliceRandom};
@@ -31,7 +33,6 @@ use std::{
use blake3::Hash;
use darkfi_serial::{deserialize_async, serialize_async};
use futures::future::join_all;
use event::Header;
use log::{debug, error, info, warn};
use num_bigint::BigUint;
@@ -67,8 +68,8 @@ use util::{generate_genesis, millis_until_next_rotation, next_rotation_timestamp
// Debugging event graph
pub mod deg;
use deg::DegEvent;
use crate::net::ChannelPtr;
use deg::DegEvent;
#[cfg(test)]
mod tests;
@@ -366,7 +367,7 @@ impl EventGraph {
info!(target: "event_graph::dag_sync()", "[EVENTGRAPH] Retrieving {} Events", header_sorted.len());
// Implement parallel download of events with a batch size
let batch = 20;
let batch = 5;
// Mapping of the chunk group id to the chunk, using a BTreeMap help us to
// prioritize the older headers when our request fails and we retry
let mut remaining_chunks: BTreeMap<usize, Vec<blake3::Hash>> = BTreeMap::new();
@@ -382,9 +383,15 @@ impl EventGraph {
let mut retrieved_count = 0;
while remaining_chunks.len() > 0 {
// Retrieve peers in each loop so we don't send requests to a closed channel
let channels: Vec<ChannelPtr> = self.p2p.hosts().peers().iter().filter(|c| !failed_peers.contains(c.address())).cloned().collect();
let channels: Vec<ChannelPtr> = self
.p2p
.hosts()
.peers()
.iter()
.filter(|c| !failed_peers.contains(c.address()))
.cloned()
.collect();
if channels.len() == 0 {
// Removing peers that failed us might be too strict but it is better than
@@ -393,10 +400,10 @@ impl EventGraph {
return Err(Error::DagSyncFailed);
}
// We will distribute the remaining chunks to each channel
let requested_chunks_len = std::cmp::min(channels.len(), remaining_chunks.len());
let keys : Vec<_> = remaining_chunks.keys().take(requested_chunks_len).cloned().collect();
let keys: Vec<_> =
remaining_chunks.keys().take(requested_chunks_len).cloned().collect();
let mut requested_chunk_ids = Vec::with_capacity(requested_chunks_len);
let mut requested_chunks = Vec::with_capacity(requested_chunks_len);
let mut futures = vec![];
@@ -419,7 +426,8 @@ impl EventGraph {
} else {
// The request has failed so insert the chunks back to remaining to try with another peer
// also note the peer so we don't ask again
remaining_chunks.insert(requested_chunk_ids[i], requested_chunks[i].clone());
remaining_chunks
.insert(requested_chunk_ids[i], requested_chunks[i].clone());
failed_peers.push(channels[i].address().clone());
}
}
@@ -1106,7 +1114,7 @@ async fn request_event(
Ok(event.0.clone())
}
fn peer_selection(peers: Vec<Arc<Channel>>) -> Arc<Channel> {
fn _peer_selection(peers: Vec<Arc<Channel>>) -> Arc<Channel> {
peers.choose(&mut OsRng).unwrap().clone()
}