From 94dfeb3adeb48f95e8b3f34f229bba6682b34eda Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Wed, 2 Aug 2023 18:36:48 +0200 Subject: [PATCH] fix: validate headers in full block downloader (#4034) --- crates/consensus/beacon/src/engine/mod.rs | 5 +- crates/consensus/beacon/src/engine/sync.rs | 38 +++++-- crates/interfaces/src/consensus.rs | 22 ++++ crates/interfaces/src/p2p/full_block.rs | 119 ++++++++++++--------- 4 files changed, 122 insertions(+), 62 deletions(-) diff --git a/crates/consensus/beacon/src/engine/mod.rs b/crates/consensus/beacon/src/engine/mod.rs index e7531d4217..f1c7c12b05 100644 --- a/crates/consensus/beacon/src/engine/mod.rs +++ b/crates/consensus/beacon/src/engine/mod.rs @@ -26,7 +26,7 @@ use reth_primitives::{ Head, Header, SealedBlock, SealedHeader, H256, U256, }; use reth_provider::{ - BlockIdReader, BlockReader, BlockSource, CanonChainTracker, ProviderError, + BlockIdReader, BlockReader, BlockSource, CanonChainTracker, ChainSpecProvider, ProviderError, StageCheckpointReader, }; use reth_prune::Pruner; @@ -208,6 +208,7 @@ where + BlockIdReader + CanonChainTracker + StageCheckpointReader + + ChainSpecProvider + 'static, Client: HeadersClient + BodiesClient + Clone + Unpin + 'static, { @@ -279,6 +280,7 @@ where task_spawner.clone(), run_pipeline_continuously, max_block, + blockchain.chain_spec(), ); let prune = pruner.map(|pruner| EnginePruneController::new(pruner, task_spawner)); let mut this = Self { @@ -1651,6 +1653,7 @@ where + BlockIdReader + CanonChainTracker + StageCheckpointReader + + ChainSpecProvider + Unpin + 'static, { diff --git a/crates/consensus/beacon/src/engine/sync.rs b/crates/consensus/beacon/src/engine/sync.rs index ba988f1be8..24b4d04a3b 100644 --- a/crates/consensus/beacon/src/engine/sync.rs +++ b/crates/consensus/beacon/src/engine/sync.rs @@ -1,6 +1,6 @@ //! Sync management for the engine implementation. -use crate::engine::metrics::EngineSyncMetrics; +use crate::{engine::metrics::EngineSyncMetrics, BeaconConsensus}; use futures::FutureExt; use reth_db::database::Database; use reth_interfaces::p2p::{ @@ -8,12 +8,13 @@ use reth_interfaces::p2p::{ full_block::{FetchFullBlockFuture, FetchFullBlockRangeFuture, FullBlockClient}, headers::client::HeadersClient, }; -use reth_primitives::{BlockNumber, SealedBlock, H256}; +use reth_primitives::{BlockNumber, ChainSpec, SealedBlock, H256}; use reth_stages::{ControlFlow, Pipeline, PipelineError, PipelineWithResult}; use reth_tasks::TaskSpawner; use std::{ cmp::{Ordering, Reverse}, collections::{binary_heap::PeekMut, BinaryHeap}, + sync::Arc, task::{ready, Context, Poll}, }; use tokio::sync::oneshot; @@ -68,9 +69,13 @@ where pipeline_task_spawner: Box, run_pipeline_continuously: bool, max_block: Option, + chain_spec: Arc, ) -> Self { Self { - full_block_client: FullBlockClient::new(client), + full_block_client: FullBlockClient::new( + client, + Arc::new(BeaconConsensus::new(chain_spec)), + ), pipeline_task_spawner, pipeline_state: PipelineState::Idle(Some(pipeline)), pending_pipeline_target: None, @@ -394,7 +399,8 @@ mod tests { }; use reth_interfaces::{p2p::either::EitherDownloader, test_utils::TestFullBlockClient}; use reth_primitives::{ - stage::StageCheckpoint, BlockBody, ChainSpec, ChainSpecBuilder, SealedHeader, MAINNET, + constants::ETHEREUM_BLOCK_GAS_LIMIT, stage::StageCheckpoint, BlockBody, ChainSpec, + ChainSpecBuilder, Header, SealedHeader, MAINNET, }; use reth_provider::{test_utils::TestExecutorFactory, PostState}; use reth_stages::{test_utils::TestStages, ExecOutput, StageError}; @@ -491,6 +497,7 @@ mod tests { fn build( self, pipeline: Pipeline, + chain_spec: Arc, ) -> EngineSyncController> where DB: Database + 'static, @@ -508,6 +515,7 @@ mod tests { // run_pipeline_continuously: false here until we want to test this false, self.max_block, + chain_spec, ) } } @@ -539,10 +547,11 @@ mod tests { checkpoint: StageCheckpoint::new(5), done: true, })])) - .build(chain_spec); + .build(chain_spec.clone()); - let mut sync_controller = - TestSyncControllerBuilder::new().with_client(client.clone()).build(pipeline); + let mut sync_controller = TestSyncControllerBuilder::new() + .with_client(client.clone()) + .build(pipeline, chain_spec); let tip = client.highest_block().expect("there should be blocks here"); sync_controller.set_pipeline_sync_target(tip.hash); @@ -577,20 +586,27 @@ mod tests { ); let client = TestFullBlockClient::default(); - let mut header = SealedHeader::default(); + let mut header = Header { + base_fee_per_gas: Some(7), + gas_limit: ETHEREUM_BLOCK_GAS_LIMIT, + ..Default::default() + } + .seal_slow(); let body = BlockBody::default(); for _ in 0..10 { header.parent_hash = header.hash_slow(); header.number += 1; + header.timestamp += 1; header = header.header.seal_slow(); client.insert(header.clone(), body.clone()); } // set up a pipeline - let pipeline = TestPipelineBuilder::new().build(chain_spec); + let pipeline = TestPipelineBuilder::new().build(chain_spec.clone()); - let mut sync_controller = - TestSyncControllerBuilder::new().with_client(client.clone()).build(pipeline); + let mut sync_controller = TestSyncControllerBuilder::new() + .with_client(client.clone()) + .build(pipeline, chain_spec); let tip = client.highest_block().expect("there should be blocks here"); diff --git a/crates/interfaces/src/consensus.rs b/crates/interfaces/src/consensus.rs index 76c2175c9a..9176bd06d7 100644 --- a/crates/interfaces/src/consensus.rs +++ b/crates/interfaces/src/consensus.rs @@ -31,6 +31,28 @@ pub trait Consensus: Debug + Send + Sync { parent: &SealedHeader, ) -> Result<(), ConsensusError>; + /// Validates the given headers + /// + /// This ensures that the first header is valid on its own and all subsequent headers are valid + /// on its own and valid against its parent. + /// + /// Note: this expects that the headers are in natural order (ascending block number) + fn validate_header_range(&self, headers: &[SealedHeader]) -> Result<(), ConsensusError> { + if headers.is_empty() { + return Ok(()) + } + let first = headers.first().expect("checked empty"); + self.validate_header(first)?; + let mut parent = first; + for child in headers.iter().skip(1) { + self.validate_header(child)?; + self.validate_header_against_parent(child, parent)?; + parent = child; + } + + Ok(()) + } + /// Validate if the header is correct and follows the consensus specification, including /// computed properties (like total difficulty). /// diff --git a/crates/interfaces/src/p2p/full_block.rs b/crates/interfaces/src/p2p/full_block.rs index 6cc4e4b61b..2d3f0482f5 100644 --- a/crates/interfaces/src/p2p/full_block.rs +++ b/crates/interfaces/src/p2p/full_block.rs @@ -1,5 +1,6 @@ +use super::headers::client::HeadersRequest; use crate::{ - consensus::ConsensusError, + consensus::{Consensus, ConsensusError}, p2p::{ bodies::client::{BodiesClient, SingleBodyRequest}, error::PeerRequestResult, @@ -16,22 +17,28 @@ use std::{ fmt::Debug, future::Future, pin::Pin, + sync::Arc, task::{ready, Context, Poll}, }; use tracing::debug; -use super::headers::client::HeadersRequest; - /// A Client that can fetch full blocks from the network. #[derive(Debug, Clone)] pub struct FullBlockClient { client: Client, + consensus: Arc, } impl FullBlockClient { /// Creates a new instance of `FullBlockClient`. - pub fn new(client: Client) -> Self { - Self { client } + pub fn new(client: Client, consensus: Arc) -> Self { + Self { client, consensus } + } + + /// Returns a client with Test consensus + #[cfg(feature = "test-utils")] + pub fn test_client(client: Client) -> Self { + Self::new(client, Arc::new(crate::test_utils::TestConsensus::default())) } } @@ -95,6 +102,7 @@ where headers: None, pending_headers: VecDeque::new(), bodies: HashMap::new(), + consensus: Arc::clone(&self.consensus), } } } @@ -186,7 +194,7 @@ where if let Some(header) = maybe_header { if header.hash() != this.hash { debug!(target: "downloaders", expected=?this.hash, received=?header.hash, "Received wrong header"); - // received bad header + // received a different header than requested this.client.report_bad_message(peer) } else { this.header = Some(header); @@ -352,6 +360,8 @@ where { /// The client used to fetch headers and bodies. client: Client, + /// The consensus instance used to validate the blocks. + consensus: Arc, /// The block hash to start fetching from (inclusive). start_hash: H256, /// How many blocks to fetch: `len([start_hash, ..]) == count` @@ -381,6 +391,8 @@ where } /// Inserts a block body, matching it with the `next_header`. + /// + /// Note: this assumes the response matches the next header in the queue. fn insert_body(&mut self, body_response: BodyResponse) { if let Some(header) = self.pending_headers.pop_front() { self.bodies.insert(header, body_response); @@ -388,8 +400,8 @@ where } /// Inserts multiple block bodies. - fn insert_bodies(&mut self, bodies: Vec) { - for body in bodies { + fn insert_bodies(&mut self, bodies: impl IntoIterator) { + for body in bodies.into_iter() { self.insert_body(body); } } @@ -461,6 +473,46 @@ where Some(response) } + fn on_headers_response(&mut self, headers: WithPeerId>) { + let (peer, mut headers_falling) = + headers.map(|h| h.into_iter().map(|h| h.seal_slow()).collect::>()).split(); + + // fill in the response if it's the correct length + if headers_falling.len() == self.count as usize { + // sort headers from highest to lowest block number + headers_falling.sort_unstable_by_key(|h| Reverse(h.number)); + + // check the starting hash + if headers_falling[0].hash() != self.start_hash { + // received a different header than requested + self.client.report_bad_message(peer); + } else { + let headers_rising = headers_falling.iter().rev().cloned().collect::>(); + // ensure the downloaded headers are valid + if let Err(err) = self.consensus.validate_header_range(&headers_rising) { + debug!(target: "downloaders", %err, ?self.start_hash, "Received bad header response"); + self.client.report_bad_message(peer); + return + } + + // get the bodies request so it can be polled later + let hashes = headers_falling.iter().map(|h| h.hash()).collect::>(); + + // populate the pending headers + self.pending_headers = headers_falling.clone().into(); + + // set the actual request if it hasn't been started yet + if !self.has_bodies_request_started() { + // request the bodies for the downloaded headers + self.request.bodies = Some(self.client.get_block_bodies(hashes)); + } + + // set the headers response + self.headers = Some(headers_falling); + } + } + } + /// Returns whether or not a bodies request has been started, returning false if there is no /// pending request. fn has_bodies_request_started(&self) -> bool { @@ -500,39 +552,7 @@ where RangeResponseResult::Header(res) => { match res { Ok(headers) => { - let (peer, mut headers) = headers - .map(|h| { - h.iter().map(|h| h.clone().seal_slow()).collect::>() - }) - .split(); - - // fill in the response if it's the correct length - if headers.len() == this.count as usize { - // sort headers from highest to lowest block number - headers.sort_unstable_by_key(|h| Reverse(h.number)); - - // check the starting hash - if headers[0].hash() != this.start_hash { - // received bad response - this.client.report_bad_message(peer); - } else { - // get the bodies request so it can be polled later - let hashes = - headers.iter().map(|h| h.hash()).collect::>(); - - // populate the pending headers - this.pending_headers = headers.clone().into(); - - // set the actual request if it hasn't been started yet - if !this.has_bodies_request_started() { - this.request.bodies = - Some(this.client.get_block_bodies(hashes)); - } - - // set the headers response - this.headers = Some(headers); - } - } + this.on_headers_response(headers); } Err(err) => { debug!(target: "downloaders", %err, ?this.start_hash, "Header range download failed"); @@ -561,10 +581,9 @@ where // first insert the received bodies this.insert_bodies( new_bodies - .iter() - .map(|resp| WithPeerId::new(peer, resp.clone())) - .map(BodyResponse::PendingValidation) - .collect::>(), + .into_iter() + .map(|resp| WithPeerId::new(peer, resp)) + .map(BodyResponse::PendingValidation), ); if !this.is_bodies_complete() { @@ -723,7 +742,7 @@ mod tests { let header = SealedHeader::default(); let body = BlockBody::default(); client.insert(header.clone(), body.clone()); - let client = FullBlockClient::new(client); + let client = FullBlockClient::test_client(client); let received = client.get_full_block(header.hash()).await; assert_eq!(received, SealedBlock::new(header, body)); @@ -735,7 +754,7 @@ mod tests { let header = SealedHeader::default(); let body = BlockBody::default(); client.insert(header.clone(), body.clone()); - let client = FullBlockClient::new(client); + let client = FullBlockClient::test_client(client); let received = client.get_full_block_range(header.hash(), 1).await; let received = received.first().expect("response should include a block"); @@ -754,7 +773,7 @@ mod tests { header = header.header.seal_slow(); client.insert(header.clone(), body.clone()); } - let client = FullBlockClient::new(client); + let client = FullBlockClient::test_client(client); let received = client.get_full_block_range(header.hash(), 1).await; let received = received.first().expect("response should include a block"); @@ -780,7 +799,7 @@ mod tests { header = header.header.seal_slow(); client.insert(header.clone(), body.clone()); } - let client = FullBlockClient::new(client); + let client = FullBlockClient::test_client(client); let future = client.get_full_block_range(header.hash(), 1); let mut stream = FullBlockRangeStream::from(future); @@ -826,7 +845,7 @@ mod tests { header = header.header.seal_slow(); client.insert(header.clone(), body.clone()); } - let client = FullBlockClient::new(client); + let client = FullBlockClient::test_client(client); let received = client.get_full_block_range(header.hash(), 1).await; let received = received.first().expect("response should include a block");