From 7d14525307b328cdaf6f1a8742378c5da5b37815 Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Fri, 2 Jun 2023 16:28:03 +0200 Subject: [PATCH] feat: improve block downloads (#2941) --- .../consensus/beacon/src/engine/forkchoice.rs | 18 ++ crates/consensus/beacon/src/engine/message.rs | 11 +- crates/consensus/beacon/src/engine/mod.rs | 240 +++++++++++++----- crates/consensus/beacon/src/engine/sync.rs | 5 + crates/interfaces/src/blockchain_tree/mod.rs | 2 +- .../src/providers/state/historical.rs | 2 +- 6 files changed, 210 insertions(+), 68 deletions(-) diff --git a/crates/consensus/beacon/src/engine/forkchoice.rs b/crates/consensus/beacon/src/engine/forkchoice.rs index a7c92fb707..57bafaa393 100644 --- a/crates/consensus/beacon/src/engine/forkchoice.rs +++ b/crates/consensus/beacon/src/engine/forkchoice.rs @@ -37,7 +37,25 @@ impl ForkchoiceStateTracker { self.last_valid = Some(state); } + /// Returns the [ForkchoiceStatus] of the latest received FCU. + pub(crate) fn latest_status(&self) -> Option { + self.latest.as_ref().map(|s| s.status) + } + + /// Returns whether the latest received FCU is valid: [ForkchoiceStatus::Valid] + #[allow(unused)] + pub(crate) fn is_latest_valid(&self) -> bool { + self.latest_status().map(|s| s.is_valid()).unwrap_or(false) + } + + /// Returns the last valid head hash. + #[allow(unused)] + pub(crate) fn last_valid_head(&self) -> Option { + self.last_valid.as_ref().map(|s| s.head_block_hash) + } + /// Returns the head hash of the latest received FCU to which we need to sync. + #[allow(unused)] pub(crate) fn sync_target(&self) -> Option { self.last_syncing.as_ref().map(|s| s.head_block_hash) } diff --git a/crates/consensus/beacon/src/engine/message.rs b/crates/consensus/beacon/src/engine/message.rs index 546e03237d..625de5c113 100644 --- a/crates/consensus/beacon/src/engine/message.rs +++ b/crates/consensus/beacon/src/engine/message.rs @@ -7,7 +7,7 @@ use reth_interfaces::consensus::ForkchoiceState; use reth_payload_builder::error::PayloadBuilderError; use reth_rpc_types::engine::{ ExecutionPayload, ForkChoiceUpdateResult, ForkchoiceUpdateError, ForkchoiceUpdated, - PayloadAttributes, PayloadId, PayloadStatus, + PayloadAttributes, PayloadId, PayloadStatus, PayloadStatusEnum, }; use std::{ future::Future, @@ -44,6 +44,15 @@ impl OnForkChoiceUpdated { self.forkchoice_status } + /// Creates a new instance of `OnForkChoiceUpdated` for the `SYNCING` state + pub(crate) fn syncing() -> Self { + let status = PayloadStatus::from_status(PayloadStatusEnum::Syncing); + Self { + forkchoice_status: ForkchoiceStatus::from_payload_status(&status.status), + fut: Either::Left(futures::future::ready(Ok(ForkchoiceUpdated::new(status)))), + } + } + /// Creates a new instance of `OnForkChoiceUpdated` if the forkchoice update succeeded and no /// payload attributes were provided. pub(crate) fn valid(status: PayloadStatus) -> Self { diff --git a/crates/consensus/beacon/src/engine/mod.rs b/crates/consensus/beacon/src/engine/mod.rs index 0a8cae33d8..814c037ee2 100644 --- a/crates/consensus/beacon/src/engine/mod.rs +++ b/crates/consensus/beacon/src/engine/mod.rs @@ -135,15 +135,65 @@ impl BeaconConsensusEngineHandle { /// The beacon consensus engine is the driver that switches between historical and live sync. /// /// The beacon consensus engine is itself driven by messages from the Consensus Layer, which are -/// received by Engine API. +/// received by Engine API (JSON-RPC). /// /// The consensus engine is idle until it receives the first /// [BeaconEngineMessage::ForkchoiceUpdated] message from the CL which would initiate the sync. At /// first, the consensus engine would run the [Pipeline] until the latest known block hash. -/// Afterwards, it would attempt to create/restore the [`BlockchainTreeEngine`] from the blocks +/// Afterward, it would attempt to create/restore the [`BlockchainTreeEngine`] from the blocks /// that are currently available. In case the restoration is successful, the consensus engine would -/// run in a live sync mode, which mean it would solemnly rely on the messages from Engine API to -/// construct the chain forward. +/// run in a live sync mode, populating the [`BlockchainTreeEngine`] with new blocks as they arrive +/// via engine API and downloading any missing blocks from the network to fill potential gaps. +/// +/// The consensus engine has two data input sources: +/// +/// ## New Payload (`engine_newPayloadV{}`) +/// +/// The engine receives new payloads from the CL. If the payload is connected to the canonical +/// chain, it will be fully validated added to a chain in the [BlockchainTreeEngine]: `VALID` +/// +/// If the payload's chain is disconnected (at least 1 block is missing) then it will be buffered: +/// `SYNCING` ([BlockStatus::Disconnected]). +/// +/// ## Forkchoice Update (FCU) (`engine_forkchoiceUpdatedV{}`) +/// +/// This contains the latest forkchoice state and the payload attributes. The engine will attempt to +/// make a new canonical chain based on the `head_hash` of the update and trigger payload building +/// if the `payload_attrs` are present and the FCU is `VALID`. +/// +/// The `head_hash` forms a chain by walking backwards from the `head_hash` towards the canonical +/// blocks of the chain. +/// +/// Making a new canonical chain can result in the following relevant outcomes: +/// +/// ### The chain is connected +/// +/// All blocks of the `head_hash`'s chain are present in the [BlockchainTreeEngine] and are +/// committed to the canonical chain. This also includes reorgs. +/// +/// ### The chain is disconnected +/// +/// In this case the [BlockchainTreeEngine] doesn't know how the new chain connects to the existing +/// canonical chain. It could be a simple commit (new blocks extend the current head) or a re-org +/// that requires unwinding the canonical chain. +/// +/// This further distinguishes between two variants: +/// +/// #### `head_hash`'s block exists +/// +/// The `head_hash`'s block was already received/downloaded, but at least one block is missing to +/// form a _connected_ chain. The engine will attempt to download the missing blocks from the +/// network by walking backwards (`parent_hash`), and then try to make the block canonical as soon +/// as the chain becomes connected. +/// +/// However, it still can be the case that the chain and the FCU is `INVALID`. +/// +/// #### `head_hash` block is missing +/// +/// This is similar to the previous case, but the `head_hash`'s block is missing. At which point the +/// engine doesn't know where the new head will point to: new chain could be a re-org or a simple +/// commit. The engine will download the missing head first and then proceed as in the previous +/// case. /// /// # Panics /// @@ -408,6 +458,7 @@ where tx: oneshot::Sender>, ) -> bool { self.metrics.forkchoice_updated_messages.increment(1); + self.blockchain.on_forkchoice_update_received(&state); let on_updated = match self.forkchoice_updated(state, attrs) { Ok(response) => response, @@ -423,9 +474,15 @@ where let is_valid_response = on_updated.is_valid_update(); let _ = tx.send(Ok(on_updated)); + // notify listeners about new processed FCU + self.listeners.notify(BeaconConsensusEngineEvent::ForkchoiceUpdated(state)); + // Terminate the sync early if it's reached the maximum user // configured block. if is_valid_response { + // new VALID update that moved the canonical chain forward + let _ = self.update_canon_chain(&state); + let tip_number = self.blockchain.canonical_tip().number; if self.sync.has_reached_max_block(tip_number) { return true @@ -458,52 +515,39 @@ where return Ok(OnForkChoiceUpdated::with_invalid(status)) } - // TODO: check PoW / EIP-3675 terminal block conditions for the fork choice head - // TODO: ensure validity of the payload (is this satisfied already?) - - let status = if self.sync.is_pipeline_idle() { + if self.sync.is_pipeline_active() { // We can only process new forkchoice updates if the pipeline is idle, since it requires // exclusive access to the database - match self.blockchain.make_canonical(&state.head_block_hash) { - Ok(outcome) => { - let header = outcome.into_header(); - debug!(target: "consensus::engine", hash=?state.head_block_hash, number=header.number, "canonicalized new head"); - - if let Some(attrs) = attrs { - let payload_response = - self.process_payload_attributes(attrs, header.unseal(), state); - if payload_response.is_valid_update() { - // we will return VALID, so let's make sure the info tracker is - // properly updated - self.update_canon_chain(&state)?; - } - self.listeners.notify(BeaconConsensusEngineEvent::ForkchoiceUpdated(state)); - trace!(target: "consensus::engine", status = ?payload_response, ?state, "Returning forkchoice status "); - return Ok(payload_response) - } - - // we will return VALID, so let's make sure the info tracker is - // properly updated - self.update_canon_chain(&state)?; - PayloadStatus::new(PayloadStatusEnum::Valid, Some(state.head_block_hash)) - } - Err(error) => { - if let Error::Execution(ref err) = error { - if err.is_fatal() { - tracing::error!(target: "consensus::engine", ?err, "Encountered fatal error"); - return Err(error) - } - } - - self.on_failed_canonical_forkchoice_update(&state, error) - } - } - } else { trace!(target: "consensus::engine", "Pipeline is syncing, skipping forkchoice update"); - PayloadStatus::from_status(PayloadStatusEnum::Syncing) - }; + return Ok(OnForkChoiceUpdated::syncing()) + } - self.listeners.notify(BeaconConsensusEngineEvent::ForkchoiceUpdated(state)); + let status = match self.blockchain.make_canonical(&state.head_block_hash) { + Ok(outcome) => { + let header = outcome.into_header(); + debug!(target: "consensus::engine", hash=?state.head_block_hash, number=header.number, "canonicalized new head"); + + if let Some(attrs) = attrs { + // the CL requested to build a new payload on top of this new VALID head + let payload_response = + self.process_payload_attributes(attrs, header.unseal(), state); + trace!(target: "consensus::engine", status = ?payload_response, ?state, "Returning forkchoice status "); + return Ok(payload_response) + } + + PayloadStatus::new(PayloadStatusEnum::Valid, Some(state.head_block_hash)) + } + Err(error) => { + if let Error::Execution(ref err) = error { + if err.is_fatal() { + tracing::error!(target: "consensus::engine", ?err, "Encountered fatal error"); + return Err(error) + } + } + + self.on_failed_canonical_forkchoice_update(&state, error) + } + }; trace!(target: "consensus::engine", ?status, ?state, "Returning forkchoice status"); Ok(OnForkChoiceUpdated::valid(status)) @@ -554,7 +598,6 @@ where total_difficulty: head_td, }); self.blockchain.set_canonical_head(head); - self.blockchain.on_forkchoice_update_received(update); Ok(()) } @@ -894,6 +937,90 @@ where Ok(()) } + /// Invoked if we successfully downloaded a new block from the network. + /// + /// This will attempt to insert the block into the tree. + /// + /// There are several scenarios: + /// + /// ## [BlockStatus::Valid] + /// + /// The block is connected to the current canonical head and is valid. + /// If the engine is still SYNCING, then we can try again to make the chain canonical. + /// + /// ## [BlockStatus::Accepted] + /// + /// All ancestors are known, but the block is not connected to the current canonical _head_. If + /// the block is an ancestor of the current forkchoice head, then we can try again to make the + /// chain canonical, which would trigger a reorg in this case since the new head is therefore + /// not connected to the current head. + /// + /// ## [BlockStatus::Disconnected] + /// + /// The block is not connected to the canonical head, and we need to download the missing parent + /// first. + /// + /// ## Insert Error + /// + /// If the insertion into the tree failed, then the block was well-formed (valid hash), but its + /// chain is invalid, which means the FCU that triggered the download is invalid. Here we can + /// stop because there's nothing to do here and the engine needs to wait for another FCU. + fn on_downloaded_block(&mut self, block: SealedBlock) { + trace!(target: "consensus::engine", hash=?block.hash, number=%block.number, "Downloaded full block"); + // check if the block's parent is already marked as invalid + if self.check_invalid_ancestor_with_head(block.parent_hash, block.hash).is_some() { + // can skip this invalid block + return + } + + match self.blockchain.insert_block_without_senders(block) { + Ok(status) => { + match status { + BlockStatus::Valid => { + // block is connected to the current canonical head and is valid. + self.try_make_sync_target_canonical(); + } + BlockStatus::Accepted => { + // block is connected to the canonical chain, but not the current head + self.try_make_sync_target_canonical(); + } + BlockStatus::Disconnected { missing_parent } => { + // continue downloading the missing parent + self.sync.download_full_block(missing_parent.hash); + } + } + } + Err(err) => { + debug!(target: "consensus::engine", ?err, "Failed to insert downloaded block"); + } + } + } + + /// Attempt to form a new canonical chain based on the current sync target. + /// + /// This is invoked when we successfully downloaded a new block from the network which resulted + /// in either [BlockStatus::Accepted] or [BlockStatus::Valid]. + /// + /// Note: This will not succeed if the sync target has changed since the block download request + /// was issued and the new target is still disconnected and additional missing blocks are + /// downloaded + fn try_make_sync_target_canonical(&mut self) { + if let Some(target) = self.forkchoice_state_tracker.sync_target_state() { + // optimistically try to make the chain canonical, the sync target might have changed + // since the block download request was issued (new FCU received) + if let Ok(outcome) = self.blockchain.make_canonical(&target.head_block_hash) { + let new_head = outcome.into_header(); + debug!(target: "consensus::engine", hash=?new_head.hash, number=new_head.number, "canonicalized new head"); + + // we're no longer syncing + self.sync_state_updater.update_sync_state(SyncState::Idle); + + // we can update the FCU blocks + let _ = self.update_canon_chain(&target); + } + } + } + /// Event handler for events emitted by the [EngineSyncController]. /// /// This returns a result to indicate whether the engine future should resolve (fatal error). @@ -903,24 +1030,7 @@ where ) -> Option> { match ev { EngineSyncEvent::FetchedFullBlock(block) => { - trace!(target: "consensus::engine", hash=?block.hash, "Fetched full block"); - // it is guaranteed that the pipeline is not active at this point. - - // TODO(mattsse): better error handling and start closing the gap if there's any by - // closing the gap either via pipeline, or by fetching the blocks via block number - // [head..FCU.number] - - if self - .try_insert_new_payload(block) - .map(|status| status.is_valid()) - .unwrap_or_default() - { - // payload is valid - self.sync_state_updater.update_sync_state(SyncState::Idle); - } else if let Some(target) = self.forkchoice_state_tracker.sync_target() { - // if the payload is invalid, we run the pipeline to the head block. - self.sync.set_pipeline_sync_target(target); - } + self.on_downloaded_block(block); } EngineSyncEvent::PipelineStarted(target) => { trace!(target: "consensus::engine", ?target, continuous = target.is_none(), "Started the pipeline"); diff --git a/crates/consensus/beacon/src/engine/sync.rs b/crates/consensus/beacon/src/engine/sync.rs index 164197f08c..cc678eca22 100644 --- a/crates/consensus/beacon/src/engine/sync.rs +++ b/crates/consensus/beacon/src/engine/sync.rs @@ -100,6 +100,11 @@ where self.pipeline_state.is_idle() } + /// Returns `true` if the pipeline is active. + pub(crate) fn is_pipeline_active(&self) -> bool { + !self.is_pipeline_idle() + } + /// Returns true if there's already a request for the given hash. pub(crate) fn is_inflight_request(&self, hash: H256) -> bool { self.inflight_full_block_requests.iter().any(|req| *req.hash() == hash) diff --git a/crates/interfaces/src/blockchain_tree/mod.rs b/crates/interfaces/src/blockchain_tree/mod.rs index 85c3707fcc..a695a53838 100644 --- a/crates/interfaces/src/blockchain_tree/mod.rs +++ b/crates/interfaces/src/blockchain_tree/mod.rs @@ -120,7 +120,7 @@ pub enum BlockStatus { /// If block validation is valid and block extends canonical chain. /// In BlockchainTree sense it forks on canonical tip. Valid, - /// If the block is valid, but it does not extend canonical chain + /// If the block is valid, but it does not extend canonical chain. /// (It is side chain) or hasn't been fully validated but ancestors of a payload are known. Accepted, /// If blocks is not connected to canonical chain. diff --git a/crates/storage/provider/src/providers/state/historical.rs b/crates/storage/provider/src/providers/state/historical.rs index 74f58037c7..2e3ddf3b3b 100644 --- a/crates/storage/provider/src/providers/state/historical.rs +++ b/crates/storage/provider/src/providers/state/historical.rs @@ -131,7 +131,7 @@ impl<'a, 'b, TX: DbTx<'a>> StateProvider for HistoricalStateProviderRef<'a, 'b, Ok(Some(storage_entry.value)) } else if changeset_block_number == None { // if there is no shards, return empty account. - return Ok(None); + return Ok(None) } else { // if changeset is not present that means that there was history shard but we need to // use newest value from plain state