Downloader fixes (#193)

This commit is contained in:
Artem Vorotnikov
2022-05-30 17:28:32 +03:00
committed by GitHub
parent fec42fe096
commit b0df894adb
2 changed files with 94 additions and 69 deletions

View File

@@ -26,7 +26,7 @@ pub mod collections {
skip_list: LruCache<H256, HashSet<H256>>,
raw: LruCache<H256, BlockHeader>,
q: LruCache<Link, ()>,
q: LruCache<H256, ()>,
}
impl Default for Graph {
@@ -60,7 +60,6 @@ pub mod collections {
#[inline]
pub fn clear(&mut self) {
self.q.clear();
self.raw.clear();
self.skip_list.clear();
self.chains.clear();
@@ -79,29 +78,24 @@ pub mod collections {
#[inline]
pub fn insert(&mut self, header: BlockHeader) {
let hash = header.hash();
if self.raw.contains_key(&hash) {
if self.q.contains_key(&hash) {
return;
}
let link = Link {
height: header.number,
hash,
parent_hash: header.parent_hash,
};
self.skip_list
.entry(header.parent_hash)
.or_insert(HashSet::new())
.insert(hash);
self.raw.insert(hash, header);
self.q.insert(link, ());
self.q.insert(hash, ());
}
pub fn dfs(&mut self) -> Option<H256> {
let mut roots = HashSet::new();
for (node, _) in self.q.iter() {
if !self.skip_list.contains_key(&node.hash) && self.raw.contains_key(&node.hash) {
roots.insert(node.hash);
for (hash, _) in self.q.iter() {
if !self.skip_list.contains_key(hash) && self.raw.contains_key(hash) {
roots.insert(*hash);
}
}
if roots.is_empty() {
@@ -112,11 +106,11 @@ pub mod collections {
let mut current = root;
let mut td = U256::ZERO;
let mut depth = 0;
while let Some(header) = self.raw.get(&current) {
if self.chains.contains_key(&current) {
break;
}
td += header.difficulty;
current = header.parent_hash;
depth += 1;
@@ -158,8 +152,6 @@ pub mod collections {
headers.push((current, header));
current = parent_hash;
}
self.chains.remove(tail);
headers.reverse();
headers
}

View File

@@ -1,3 +1,5 @@
#![allow(unreachable_code)]
use crate::{
consensus::Consensus,
kv::{mdbx::*, tables},
@@ -8,19 +10,22 @@ use crate::{
types::{BlockId, HeaderRequest, Message, Status},
},
stagedsync::{stage::*, stages::HEADERS},
TaskGuard,
};
use anyhow::format_err;
use async_trait::async_trait;
use hashbrown::HashMap;
use ethereum_types::H512;
use parking_lot::Mutex;
use rayon::iter::{IndexedParallelIterator, IntoParallelRefIterator, ParallelIterator};
use std::{
collections::{hash_map::Entry, HashMap},
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
time::Duration,
};
use tokio::time::Instant;
use tokio::{sync::mpsc, time::Instant};
use tokio_stream::StreamExt;
use tracing::*;
@@ -169,6 +174,18 @@ where
}
}
#[inline]
fn dummy_check_headers(headers: &[BlockHeader]) -> bool {
let mut block_num = headers[0].number;
for header in headers.iter().skip(1) {
if header.number != block_num + 1 {
return false;
}
block_num += 1u8;
}
true
}
impl<E> HeaderDownload<E>
where
E: EnvironmentKind,
@@ -181,23 +198,23 @@ where
) -> HashMap<BlockNumber, HeaderRequest> {
assert!(starting_block < target);
let cap = (target.0 - starting_block.0) as usize / HEADERS_UPPER_BOUND;
let mut requests = HashMap::with_capacity(cap + 1);
for start in (starting_block..target).step_by(HEADERS_UPPER_BOUND) {
let limit = if start + HEADERS_UPPER_BOUND < target {
HEADERS_UPPER_BOUND as u64
} else {
*target - *start
};
(starting_block..target)
.step_by(HEADERS_UPPER_BOUND)
.map(|start| {
let limit = if start + HEADERS_UPPER_BOUND < target {
HEADERS_UPPER_BOUND as u64
} else {
*target - *start
};
let request = HeaderRequest {
start: BlockId::Number(start),
limit,
..Default::default()
};
requests.insert(start, request);
}
requests
let request = HeaderRequest {
start: BlockId::Number(start),
limit,
..Default::default()
};
(start, request)
})
.collect()
}
pub async fn download_headers(
@@ -205,58 +222,74 @@ where
start: BlockNumber,
end: BlockNumber,
) -> anyhow::Result<Vec<(H256, BlockHeader)>> {
let mut requests = Self::prepare_requests(start, end);
let requests = Arc::new(Mutex::new(Self::prepare_requests(start, end)));
let mut stream = self.node.stream_headers().await;
let is_bounded = |block_number: BlockNumber| block_number >= start && block_number <= end;
let mut took = Instant::now();
let mut instant = Instant::now();
let mut ticker = tokio::time::interval(Self::BACK_OFF);
{
let _g = TaskGuard(tokio::task::spawn({
let node = self.node.clone();
let requests = requests.clone();
while !requests.is_empty() {
let mut message_processed = false;
async move {
loop {
let reqs = requests.lock().values().copied().collect::<Vec<_>>();
node.clone().send_many_header_requests(reqs).await?;
tokio::time::sleep(Self::BACK_OFF).await;
}
Ok::<_, anyhow::Error>(())
}
}));
let (penalization_tx, mut penalization_rx) = mpsc::channel::<H512>(128);
let _guard = TaskGuard(tokio::task::spawn({
let node = self.node.clone();
async move {
while let Some(penalty) = penalization_rx.recv().await {
node.penalize_peer(penalty).await?;
}
Ok::<_, anyhow::Error>(())
}
}));
while !requests.lock().is_empty() {
if let Some(msg) = stream.next().await {
let peer_id = msg.peer_id;
tokio::select! {
Some(msg) = stream.next() => {
if let Message::BlockHeaders(inner) = msg.msg {
if inner.headers.is_empty() {
continue;
}
let num = inner.headers[0].number;
let last_hash = inner.headers[inner.headers.len() - 1].hash();
if requests.contains_key(&num) || (is_bounded(num) && !self.graph.contains(last_hash)) {
requests.remove(&num);
debug!(
"Received={} headers, Graph={}",
inner.headers.len(),
self.graph.len()
);
self.graph.extend(inner.headers);
message_processed = true;
let is_valid = dummy_check_headers(&inner.headers);
if is_valid {
let num = inner.headers[0].number;
let last_hash = inner.headers[inner.headers.len() - 1].hash();
let mut requests = requests.lock();
if let Entry::Occupied(entry) = requests.entry(num) {
let limit = entry.get().limit as usize;
if inner.headers.len() == limit {
entry.remove();
self.graph.extend(inner.headers);
}
} else if !self.graph.contains(&last_hash) && is_bounded(num) {
self.graph.extend(inner.headers);
}
} else {
penalization_tx.send(peer_id).await?;
continue;
}
}
}
_ = ticker.tick() => {}
}
if instant.elapsed() > Duration::from_secs(30) {
instant = Instant::now();
let all = (*end - *start) as usize;
info!(
"Downloading headers... left={} out of {}",
all - self.graph.len(),
all
);
}
if !message_processed {
self.node
.clone()
.send_many_header_requests(requests.values().copied())
.await?;
}
}