From 4d708ce8afad56e02b4383bb44e0f253590edca7 Mon Sep 17 00:00:00 2001 From: Roman Krasiuk Date: Mon, 28 Nov 2022 19:32:46 +0200 Subject: [PATCH] chore(sync): basic header response validation (#276) --- crates/stages/src/error.rs | 8 ++- crates/stages/src/stages/headers.rs | 87 +++++++++++++++++++++-------- 2 files changed, 69 insertions(+), 26 deletions(-) diff --git a/crates/stages/src/error.rs b/crates/stages/src/error.rs index a1bd0d60ab..45a5c9133d 100644 --- a/crates/stages/src/error.rs +++ b/crates/stages/src/error.rs @@ -17,11 +17,15 @@ pub enum StageError { error: consensus::Error, }, /// The stage encountered a database error. - #[error("An internal database error occurred.")] + #[error("An internal database error occurred: {0}")] Database(#[from] DbError), /// The stage encountered a database integrity error. - #[error("A database integrity error occurred.")] + #[error("A database integrity error occurred: {0}")] DatabaseIntegrity(#[from] DatabaseIntegrityError), + /// Invalid download response. Applicable for stages which + /// rely on external downloaders + #[error("Invalid download response: {0}")] + Download(String), /// The stage encountered an internal error. #[error(transparent)] Internal(Box), diff --git a/crates/stages/src/stages/headers.rs b/crates/stages/src/stages/headers.rs index 78c5108791..3df83486a4 100644 --- a/crates/stages/src/stages/headers.rs +++ b/crates/stages/src/stages/headers.rs @@ -5,7 +5,11 @@ use crate::{ use reth_interfaces::{ consensus::{Consensus, ForkchoiceState}, db::{models::blocks::BlockNumHash, tables, Database, DbCursorRO, DbCursorRW, DbTx, DbTxMut}, - p2p::headers::{client::HeadersClient, downloader::HeaderDownloader, error::DownloadError}, + p2p::headers::{ + client::HeadersClient, + downloader::{ensure_parent, HeaderDownloader}, + error::DownloadError, + }, }; use reth_primitives::{BlockNumber, SealedHeader, H256, U256}; use std::{fmt::Debug, sync::Arc}; @@ -50,48 +54,52 @@ impl Stage, input: ExecInput, ) -> Result { - let last_block_num = input.stage_progress.unwrap_or_default(); - self.update_head::(db, last_block_num).await?; + let stage_progress = input.stage_progress.unwrap_or_default(); + self.update_head::(db, stage_progress).await?; - // TODO: add batch size - // download the headers - let last_hash = db.get_block_hash(last_block_num)?; + // Lookup the last stored header + let last_hash = db.get_block_hash(stage_progress)?; let last_header = - db.get::((last_block_num, last_hash).into())?.ok_or({ - DatabaseIntegrityError::Header { number: last_block_num, hash: last_hash } + db.get::((stage_progress, last_hash).into())?.ok_or({ + DatabaseIntegrityError::Header { number: stage_progress, hash: last_hash } })?; let head = SealedHeader::new(last_header, last_hash); let forkchoice = self.next_fork_choice_state(&head.hash()).await; + if let Some(number) = db.get::(forkchoice.head_block_hash)? { + if number < head.number { + // Nothing to do here + warn!("Consensus reported old head {}", forkchoice.head_block_hash); + return Ok(ExecOutput { stage_progress, done: true, reached_tip: true }) + } + } + // The stage relies on the downloader to return the headers // in descending order starting from the tip down to // the local head (latest block in db) - let headers = match self.downloader.download(head, forkchoice).await { + // TODO: add batching + let headers = match self.downloader.download(head.clone(), forkchoice.clone()).await { Ok(res) => { - // TODO: validate the result order? - // at least check if it attaches (first == tip && last == last_hash) + // Perform basic response validation + self.validate_header_response(&res, head, forkchoice)?; res } Err(e) => match e { DownloadError::Timeout => { warn!("No response for header request"); - return Ok(ExecOutput { - stage_progress: last_block_num, - reached_tip: false, - done: false, - }) + return Ok(ExecOutput { stage_progress, reached_tip: false, done: false }) } DownloadError::HeaderValidation { hash, error } => { warn!("Validation error for header {hash}: {error}"); - return Err(StageError::Validation { block: last_block_num, error }) + return Err(StageError::Validation { block: stage_progress, error }) } error => { warn!("Unexpected error occurred: {error}"); - return Err(StageError::Internal(Box::new(error))) + return Err(StageError::Download(error.to_string())) } }, }; - let stage_progress = self.write_headers::(db, headers).await?.unwrap_or(last_block_num); + let stage_progress = self.write_headers::(db, headers).await?.unwrap_or(stage_progress); Ok(ExecOutput { stage_progress, reached_tip: true, done: true }) } @@ -137,6 +145,31 @@ impl HeaderStage { } } + /// Perform basic header response validation + fn validate_header_response( + &self, + headers: &[SealedHeader], + head: SealedHeader, + forkchoice: ForkchoiceState, + ) -> Result<(), StageError> { + // The response must include at least head and tip + if headers.len() < 2 { + return Err(StageError::Download("Not enough headers".to_owned())) + } + + let mut headers_iter = headers.iter().rev().peekable(); + if headers_iter.peek().unwrap().hash() != forkchoice.head_block_hash { + return Err(StageError::Download("Response must end with tip".to_owned())) + } + + while let Some(header) = headers_iter.next() { + ensure_parent(header, headers_iter.peek().unwrap_or(&&head)) + .map_err(|err| StageError::Download(err.to_string()))?; + } + + Ok(()) + } + /// Write downloaded headers to the database async fn write_headers( &self, @@ -214,7 +247,11 @@ mod tests { async fn execute_validation_error() { let mut runner = HeadersTestRunner::default(); runner.consensus.set_fail_validation(true); - let input = ExecInput::default(); + let (stage_progress, previous_stage) = (1000, 1200); + let input = ExecInput { + previous_stage: Some((PREV_STAGE_ID, previous_stage)), + stage_progress: Some(stage_progress), + }; let headers = runner.seed_execution(input).expect("failed to seed execution"); let rx = runner.execute(input); runner.after_execution(headers).await.expect("failed to run after execution hook"); @@ -225,7 +262,7 @@ mod tests { /// Check that unexpected download errors are caught #[tokio::test] - async fn executed_request_error() { + async fn executed_download_error() { let mut runner = HeadersTestRunner::default(); let (stage_progress, previous_stage) = (1000, 1200); let input = ExecInput { @@ -242,7 +279,7 @@ mod tests { runner.consensus.update_tip(tip.hash()); let result = rx.await.unwrap(); - assert_matches!(result, Err(StageError::Internal(_))); + assert_matches!(result, Err(StageError::Download(_))); assert!(runner.validate_execution(input, result.ok()).is_ok(), "validation failed"); } @@ -291,7 +328,7 @@ mod tests { TestConsensus, TestHeaderDownloader, TestHeadersClient, }, }; - use reth_primitives::{BlockNumber, SealedHeader, H256, U256}; + use reth_primitives::{BlockNumber, SealedHeader, U256}; use std::sync::Arc; pub(crate) struct HeadersTestRunner { @@ -402,7 +439,9 @@ mod tests { let tip = if !headers.is_empty() { headers.last().unwrap().hash() } else { - H256::from_low_u64_be(rand::random()) + let tip = random_header(0, None); + self.db.insert_headers(std::iter::once(&tip))?; + tip.hash() }; self.consensus.update_tip(tip); Ok(())