From aea35263dad8f8e1e04c33804fa939278c9c362c Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Wed, 31 May 2023 21:21:56 +0200 Subject: [PATCH] feat: track fcu validity (#2934) --- .../consensus/beacon/src/engine/forkchoice.rs | 103 ++++++++++++++++++ crates/consensus/beacon/src/engine/message.rs | 29 +++-- crates/consensus/beacon/src/engine/mod.rs | 51 +++++---- 3 files changed, 150 insertions(+), 33 deletions(-) create mode 100644 crates/consensus/beacon/src/engine/forkchoice.rs diff --git a/crates/consensus/beacon/src/engine/forkchoice.rs b/crates/consensus/beacon/src/engine/forkchoice.rs new file mode 100644 index 0000000000..8f17bad61d --- /dev/null +++ b/crates/consensus/beacon/src/engine/forkchoice.rs @@ -0,0 +1,103 @@ +use reth_primitives::H256; +use reth_rpc_types::engine::{ForkchoiceState, PayloadStatusEnum}; + +/// The struct that keeps track of the received forkchoice state and their status. +#[derive(Debug, Clone, Default)] +pub(crate) struct ForkchoiceStateTracker { + /// The latest forkchoice state that we received. + /// + /// Caution: this can be invalid. + latest: Option, + + /// Tracks the latest forkchoice state that we received to which we need to sync. + last_syncing: Option, + /// The latest valid forkchoice state that we received and processed as valid. + last_valid: Option, +} + +impl ForkchoiceStateTracker { + /// Sets the latest forkchoice state that we received. + /// + /// If the status is valid, we also update the last valid forkchoice state. + pub(crate) fn set_latest(&mut self, state: ForkchoiceState, status: ForkchoiceStatus) { + if status.is_valid() { + self.set_valid(state); + } else if status.is_syncing() { + self.last_syncing = Some(state); + } + + let received = ReceivedForkchoiceState { state, status }; + self.latest = Some(received); + } + + fn set_valid(&mut self, state: ForkchoiceState) { + // we no longer need to sync to this state. + self.last_syncing = None; + + self.last_valid = Some(state); + } + + /// Returns the head hash of the latest received FCU to which we need to sync. + pub(crate) fn sync_target(&self) -> Option { + self.last_syncing.as_ref().map(|s| s.head_block_hash) + } + + /// Returns the last received ForkchoiceState to which we need to sync. + pub(crate) fn sync_target_state(&self) -> Option { + self.last_syncing + } + + /// Returns true if no forkchoice state has been received yet. + pub(crate) fn is_empty(&self) -> bool { + self.latest.is_none() + } +} + +/// Represents a forkchoice update and tracks the status we assigned to it. +#[derive(Debug, Clone)] +#[allow(unused)] +pub(crate) struct ReceivedForkchoiceState { + state: ForkchoiceState, + status: ForkchoiceStatus, +} + +/// A simplified representation of [PayloadStatusEnum] specifically for FCU. +#[derive(Debug, Clone, Copy, Eq, PartialEq)] +pub(crate) enum ForkchoiceStatus { + /// The forkchoice state is valid. + Valid, + /// The forkchoice state is invalid. + Invalid, + /// The forkchoice state is unknown. + Syncing, +} + +impl ForkchoiceStatus { + pub(crate) fn is_valid(&self) -> bool { + matches!(self, ForkchoiceStatus::Valid) + } + + pub(crate) fn is_syncing(&self) -> bool { + matches!(self, ForkchoiceStatus::Syncing) + } + + /// Converts the general purpose [PayloadStatusEnum] into a [ForkchoiceStatus]. + pub(crate) fn from_payload_status(status: &PayloadStatusEnum) -> Self { + match status { + PayloadStatusEnum::Valid => ForkchoiceStatus::Valid, + PayloadStatusEnum::Invalid { .. } => ForkchoiceStatus::Invalid, + PayloadStatusEnum::Syncing => ForkchoiceStatus::Syncing, + PayloadStatusEnum::Accepted => { + // This is only returned on `newPayload` accepted would be a valid state here. + ForkchoiceStatus::Valid + } + PayloadStatusEnum::InvalidBlockHash { .. } => ForkchoiceStatus::Invalid, + } + } +} + +impl From for ForkchoiceStatus { + fn from(status: PayloadStatusEnum) -> Self { + ForkchoiceStatus::from_payload_status(&status) + } +} diff --git a/crates/consensus/beacon/src/engine/message.rs b/crates/consensus/beacon/src/engine/message.rs index 7dbe3ae36e..546e03237d 100644 --- a/crates/consensus/beacon/src/engine/message.rs +++ b/crates/consensus/beacon/src/engine/message.rs @@ -1,4 +1,7 @@ -use crate::{engine::error::BeaconOnNewPayloadError, BeaconConsensusEngineEvent}; +use crate::{ + engine::{error::BeaconOnNewPayloadError, forkchoice::ForkchoiceStatus}, + BeaconConsensusEngineEvent, +}; use futures::{future::Either, FutureExt}; use reth_interfaces::consensus::ForkchoiceState; use reth_payload_builder::error::PayloadBuilderError; @@ -19,8 +22,11 @@ use tokio::sync::{mpsc::UnboundedSender, oneshot}; #[must_use = "futures do nothing unless you `.await` or poll them"] #[derive(Debug)] pub struct OnForkChoiceUpdated { - /// Tracks if this update was valid. - is_valid_update: bool, + /// Represents the status of the forkchoice update. + /// + /// Note: This is separate from the response `fut`, because we still can return an error + /// depending on the payload attributes, even if the forkchoice update itself is valid. + forkchoice_status: ForkchoiceStatus, /// Returns the result of the forkchoice update. fut: Either, PendingPayloadId>, } @@ -30,14 +36,19 @@ pub struct OnForkChoiceUpdated { impl OnForkChoiceUpdated { /// Returns true if this update is valid pub(crate) fn is_valid_update(&self) -> bool { - self.is_valid_update + self.forkchoice_status.is_valid() + } + + /// Returns the determined status of the received ForkchoiceState. + pub(crate) fn forkchoice_status(&self) -> ForkchoiceStatus { + self.forkchoice_status } /// Creates a new instance of `OnForkChoiceUpdated` if the forkchoice update succeeded and no /// payload attributes were provided. pub(crate) fn valid(status: PayloadStatus) -> Self { Self { - is_valid_update: status.is_valid(), + forkchoice_status: ForkchoiceStatus::from_payload_status(&status.status), fut: Either::Left(futures::future::ready(Ok(ForkchoiceUpdated::new(status)))), } } @@ -46,7 +57,7 @@ impl OnForkChoiceUpdated { /// forkchoice update failed due to an invalid payload. pub(crate) fn with_invalid(status: PayloadStatus) -> Self { Self { - is_valid_update: false, + forkchoice_status: ForkchoiceStatus::from_payload_status(&status.status), fut: Either::Left(futures::future::ready(Ok(ForkchoiceUpdated::new(status)))), } } @@ -55,7 +66,7 @@ impl OnForkChoiceUpdated { /// given state is considered invalid pub(crate) fn invalid_state() -> Self { Self { - is_valid_update: false, + forkchoice_status: ForkchoiceStatus::Invalid, fut: Either::Left(futures::future::ready(Err(ForkchoiceUpdateError::InvalidState))), } } @@ -65,7 +76,7 @@ impl OnForkChoiceUpdated { pub(crate) fn invalid_payload_attributes() -> Self { Self { // This is valid because this is only reachable if the state and payload is valid - is_valid_update: true, + forkchoice_status: ForkchoiceStatus::Valid, fut: Either::Left(futures::future::ready(Err( ForkchoiceUpdateError::UpdatedInvalidPayloadAttributes, ))), @@ -78,7 +89,7 @@ impl OnForkChoiceUpdated { pending_payload_id: oneshot::Receiver>, ) -> Self { Self { - is_valid_update: payload_status.is_valid(), + forkchoice_status: ForkchoiceStatus::from_payload_status(&payload_status.status), fut: Either::Right(PendingPayloadId { payload_status: Some(payload_status), pending_payload_id, diff --git a/crates/consensus/beacon/src/engine/mod.rs b/crates/consensus/beacon/src/engine/mod.rs index 1ac44c7b93..a98edbe75d 100644 --- a/crates/consensus/beacon/src/engine/mod.rs +++ b/crates/consensus/beacon/src/engine/mod.rs @@ -55,8 +55,10 @@ pub use error::{ mod metrics; mod event; +mod forkchoice; pub(crate) mod sync; +use crate::engine::forkchoice::ForkchoiceStateTracker; pub use event::BeaconConsensusEngineEvent; /// The maximum number of invalid headers that can be tracked by the engine. @@ -163,9 +165,8 @@ where engine_message_rx: UnboundedReceiverStream, /// A clone of the handle handle: BeaconConsensusEngineHandle, - /// Current forkchoice state. The engine must receive the initial state in order to start - /// syncing. - forkchoice_state: Option, + /// Tracks the received forkchoice state updates received by the CL. + forkchoice_state_tracker: ForkchoiceStateTracker, /// The payload store. payload_builder: PayloadBuilderHandle, /// Listeners for engine events. @@ -246,7 +247,7 @@ where sync_state_updater, engine_message_rx: UnboundedReceiverStream::new(rx), handle: handle.clone(), - forkchoice_state: None, + forkchoice_state_tracker: Default::default(), payload_builder, listeners: EventListeners::default(), invalid_headers: InvalidHeaderCache::new(MAX_INVALID_HEADERS), @@ -354,6 +355,9 @@ where } }; + // update the forkchoice state tracker + self.forkchoice_state_tracker.set_latest(state, on_updated.forkchoice_status()); + let is_valid_response = on_updated.is_valid_update(); let _ = tx.send(Ok(on_updated)); @@ -395,9 +399,6 @@ where // TODO: check PoW / EIP-3675 terminal block conditions for the fork choice head // TODO: ensure validity of the payload (is this satisfied already?) - let is_first_forkchoice = self.forkchoice_state.is_none(); - self.forkchoice_state = Some(state); - let status = if self.sync.is_pipeline_idle() { // We can only process new forkchoice updates if the pipeline is idle, since it requires // exclusive access to the database @@ -445,7 +446,7 @@ where } } - self.on_failed_canonical_forkchoice_update(&state, error, is_first_forkchoice) + self.on_failed_canonical_forkchoice_update(&state, error) } } } else { @@ -519,7 +520,6 @@ where &mut self, state: &ForkchoiceState, error: Error, - is_first_forkchoice: bool, ) -> PayloadStatus { debug_assert!(self.sync.is_pipeline_idle(), "pipeline must be idle"); warn!(target: "consensus::engine", ?error, ?state, "Error canonicalizing the head hash"); @@ -550,7 +550,7 @@ where // if this is the first FCU we received from the beacon node, then we start triggering the // pipeline - if is_first_forkchoice { + if self.forkchoice_state_tracker.is_empty() { // find the appropriate target to sync to, if we don't have the safe block hash then we // start syncing to the safe block via pipeline first let target = if !state.safe_block_hash.is_zero() && @@ -859,9 +859,9 @@ where { // payload is valid self.sync_state_updater.update_sync_state(SyncState::Idle); - } else if let Some(ref state) = self.forkchoice_state { + } else if let Some(target) = self.forkchoice_state_tracker.sync_target() { // if the payload is invalid, we run the pipeline to the head block. - self.sync.set_pipeline_sync_target(state.head_block_hash); + self.sync.set_pipeline_sync_target(target); } } EngineSyncEvent::PipelineStarted(target) => { @@ -883,16 +883,6 @@ where return Some(Ok(())) } - let current_state = match self.forkchoice_state { - Some(state) => state, - None => { - // This is only possible if the node was run with `debug.tip` - // argument and without CL. - warn!(target: "consensus::engine", "No forkchoice state available"); - return None - } - }; - if let ControlFlow::Unwind { bad_block, .. } = ctrl { trace!(target: "consensus::engine", hash=?bad_block.hash, "Bad block detected in unwind"); @@ -922,6 +912,19 @@ where self.blockchain.set_canonical_head(max_header); } + let sync_target_state = match self + .forkchoice_state_tracker + .sync_target_state() + { + Some(current_state) => current_state, + None => { + // This is only possible if the node was run with `debug.tip` + // argument and without CL. + warn!(target: "consensus::engine", "No forkchoice state available"); + return None + } + }; + // TODO: figure out how to make this less complex: // restore_tree_if_possible will run the pipeline if the current_state head // hash is missing. This can arise if we buffer the forkchoice head, and if @@ -943,11 +946,11 @@ where // exists), or if the head is invalid. ideally we want "is a descendant of // this block invalid" let lowest_buffered_ancestor = - self.lowest_buffered_ancestor_or(current_state.head_block_hash); + self.lowest_buffered_ancestor_or(sync_target_state.head_block_hash); if self.invalid_headers.get(&lowest_buffered_ancestor).is_none() { // Update the state and hashes of the blockchain tree if possible. - match self.restore_tree_if_possible(current_state) { + match self.restore_tree_if_possible(sync_target_state) { Ok(_) => self.sync_state_updater.update_sync_state(SyncState::Idle), Err(error) => { error!(target: "consensus::engine", ?error, "Error restoring blockchain tree");