chore(sync): basic header response validation (#276)

This commit is contained in:
Roman Krasiuk
2022-11-28 19:32:46 +02:00
committed by GitHub
parent 96afa2d41d
commit 4d708ce8af
2 changed files with 69 additions and 26 deletions

View File

@@ -17,11 +17,15 @@ pub enum StageError {
error: consensus::Error,
},
/// The stage encountered a database error.
#[error("An internal database error occurred.")]
#[error("An internal database error occurred: {0}")]
Database(#[from] DbError),
/// The stage encountered a database integrity error.
#[error("A database integrity error occurred.")]
#[error("A database integrity error occurred: {0}")]
DatabaseIntegrity(#[from] DatabaseIntegrityError),
/// Invalid download response. Applicable for stages which
/// rely on external downloaders
#[error("Invalid download response: {0}")]
Download(String),
/// The stage encountered an internal error.
#[error(transparent)]
Internal(Box<dyn std::error::Error + Send + Sync>),

View File

@@ -5,7 +5,11 @@ use crate::{
use reth_interfaces::{
consensus::{Consensus, ForkchoiceState},
db::{models::blocks::BlockNumHash, tables, Database, DbCursorRO, DbCursorRW, DbTx, DbTxMut},
p2p::headers::{client::HeadersClient, downloader::HeaderDownloader, error::DownloadError},
p2p::headers::{
client::HeadersClient,
downloader::{ensure_parent, HeaderDownloader},
error::DownloadError,
},
};
use reth_primitives::{BlockNumber, SealedHeader, H256, U256};
use std::{fmt::Debug, sync::Arc};
@@ -50,48 +54,52 @@ impl<DB: Database, D: HeaderDownloader, C: Consensus, H: HeadersClient> Stage<DB
db: &mut StageDB<'_, DB>,
input: ExecInput,
) -> Result<ExecOutput, StageError> {
let last_block_num = input.stage_progress.unwrap_or_default();
self.update_head::<DB>(db, last_block_num).await?;
let stage_progress = input.stage_progress.unwrap_or_default();
self.update_head::<DB>(db, stage_progress).await?;
// TODO: add batch size
// download the headers
let last_hash = db.get_block_hash(last_block_num)?;
// Lookup the last stored header
let last_hash = db.get_block_hash(stage_progress)?;
let last_header =
db.get::<tables::Headers>((last_block_num, last_hash).into())?.ok_or({
DatabaseIntegrityError::Header { number: last_block_num, hash: last_hash }
db.get::<tables::Headers>((stage_progress, last_hash).into())?.ok_or({
DatabaseIntegrityError::Header { number: stage_progress, hash: last_hash }
})?;
let head = SealedHeader::new(last_header, last_hash);
let forkchoice = self.next_fork_choice_state(&head.hash()).await;
if let Some(number) = db.get::<tables::HeaderNumbers>(forkchoice.head_block_hash)? {
if number < head.number {
// Nothing to do here
warn!("Consensus reported old head {}", forkchoice.head_block_hash);
return Ok(ExecOutput { stage_progress, done: true, reached_tip: true })
}
}
// The stage relies on the downloader to return the headers
// in descending order starting from the tip down to
// the local head (latest block in db)
let headers = match self.downloader.download(head, forkchoice).await {
// TODO: add batching
let headers = match self.downloader.download(head.clone(), forkchoice.clone()).await {
Ok(res) => {
// TODO: validate the result order?
// at least check if it attaches (first == tip && last == last_hash)
// Perform basic response validation
self.validate_header_response(&res, head, forkchoice)?;
res
}
Err(e) => match e {
DownloadError::Timeout => {
warn!("No response for header request");
return Ok(ExecOutput {
stage_progress: last_block_num,
reached_tip: false,
done: false,
})
return Ok(ExecOutput { stage_progress, reached_tip: false, done: false })
}
DownloadError::HeaderValidation { hash, error } => {
warn!("Validation error for header {hash}: {error}");
return Err(StageError::Validation { block: last_block_num, error })
return Err(StageError::Validation { block: stage_progress, error })
}
error => {
warn!("Unexpected error occurred: {error}");
return Err(StageError::Internal(Box::new(error)))
return Err(StageError::Download(error.to_string()))
}
},
};
let stage_progress = self.write_headers::<DB>(db, headers).await?.unwrap_or(last_block_num);
let stage_progress = self.write_headers::<DB>(db, headers).await?.unwrap_or(stage_progress);
Ok(ExecOutput { stage_progress, reached_tip: true, done: true })
}
@@ -137,6 +145,31 @@ impl<D: HeaderDownloader, C: Consensus, H: HeadersClient> HeaderStage<D, C, H> {
}
}
/// Perform basic header response validation
fn validate_header_response(
&self,
headers: &[SealedHeader],
head: SealedHeader,
forkchoice: ForkchoiceState,
) -> Result<(), StageError> {
// The response must include at least head and tip
if headers.len() < 2 {
return Err(StageError::Download("Not enough headers".to_owned()))
}
let mut headers_iter = headers.iter().rev().peekable();
if headers_iter.peek().unwrap().hash() != forkchoice.head_block_hash {
return Err(StageError::Download("Response must end with tip".to_owned()))
}
while let Some(header) = headers_iter.next() {
ensure_parent(header, headers_iter.peek().unwrap_or(&&head))
.map_err(|err| StageError::Download(err.to_string()))?;
}
Ok(())
}
/// Write downloaded headers to the database
async fn write_headers<DB: Database>(
&self,
@@ -214,7 +247,11 @@ mod tests {
async fn execute_validation_error() {
let mut runner = HeadersTestRunner::default();
runner.consensus.set_fail_validation(true);
let input = ExecInput::default();
let (stage_progress, previous_stage) = (1000, 1200);
let input = ExecInput {
previous_stage: Some((PREV_STAGE_ID, previous_stage)),
stage_progress: Some(stage_progress),
};
let headers = runner.seed_execution(input).expect("failed to seed execution");
let rx = runner.execute(input);
runner.after_execution(headers).await.expect("failed to run after execution hook");
@@ -225,7 +262,7 @@ mod tests {
/// Check that unexpected download errors are caught
#[tokio::test]
async fn executed_request_error() {
async fn executed_download_error() {
let mut runner = HeadersTestRunner::default();
let (stage_progress, previous_stage) = (1000, 1200);
let input = ExecInput {
@@ -242,7 +279,7 @@ mod tests {
runner.consensus.update_tip(tip.hash());
let result = rx.await.unwrap();
assert_matches!(result, Err(StageError::Internal(_)));
assert_matches!(result, Err(StageError::Download(_)));
assert!(runner.validate_execution(input, result.ok()).is_ok(), "validation failed");
}
@@ -291,7 +328,7 @@ mod tests {
TestConsensus, TestHeaderDownloader, TestHeadersClient,
},
};
use reth_primitives::{BlockNumber, SealedHeader, H256, U256};
use reth_primitives::{BlockNumber, SealedHeader, U256};
use std::sync::Arc;
pub(crate) struct HeadersTestRunner<D: HeaderDownloader> {
@@ -402,7 +439,9 @@ mod tests {
let tip = if !headers.is_empty() {
headers.last().unwrap().hash()
} else {
H256::from_low_u64_be(rand::random())
let tip = random_header(0, None);
self.db.insert_headers(std::iter::once(&tip))?;
tip.hash()
};
self.consensus.update_tip(tip);
Ok(())