feat: track fcu validity (#2934)

This commit is contained in:
Matthias Seitz
2023-05-31 21:21:56 +02:00
committed by GitHub
parent 1641f555f2
commit aea35263da
3 changed files with 150 additions and 33 deletions

View File

@@ -0,0 +1,103 @@
use reth_primitives::H256;
use reth_rpc_types::engine::{ForkchoiceState, PayloadStatusEnum};
/// The struct that keeps track of the received forkchoice state and their status.
#[derive(Debug, Clone, Default)]
pub(crate) struct ForkchoiceStateTracker {
/// The latest forkchoice state that we received.
///
/// Caution: this can be invalid.
latest: Option<ReceivedForkchoiceState>,
/// Tracks the latest forkchoice state that we received to which we need to sync.
last_syncing: Option<ForkchoiceState>,
/// The latest valid forkchoice state that we received and processed as valid.
last_valid: Option<ForkchoiceState>,
}
impl ForkchoiceStateTracker {
/// Sets the latest forkchoice state that we received.
///
/// If the status is valid, we also update the last valid forkchoice state.
pub(crate) fn set_latest(&mut self, state: ForkchoiceState, status: ForkchoiceStatus) {
if status.is_valid() {
self.set_valid(state);
} else if status.is_syncing() {
self.last_syncing = Some(state);
}
let received = ReceivedForkchoiceState { state, status };
self.latest = Some(received);
}
fn set_valid(&mut self, state: ForkchoiceState) {
// we no longer need to sync to this state.
self.last_syncing = None;
self.last_valid = Some(state);
}
/// Returns the head hash of the latest received FCU to which we need to sync.
pub(crate) fn sync_target(&self) -> Option<H256> {
self.last_syncing.as_ref().map(|s| s.head_block_hash)
}
/// Returns the last received ForkchoiceState to which we need to sync.
pub(crate) fn sync_target_state(&self) -> Option<ForkchoiceState> {
self.last_syncing
}
/// Returns true if no forkchoice state has been received yet.
pub(crate) fn is_empty(&self) -> bool {
self.latest.is_none()
}
}
/// Represents a forkchoice update and tracks the status we assigned to it.
#[derive(Debug, Clone)]
#[allow(unused)]
pub(crate) struct ReceivedForkchoiceState {
state: ForkchoiceState,
status: ForkchoiceStatus,
}
/// A simplified representation of [PayloadStatusEnum] specifically for FCU.
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
pub(crate) enum ForkchoiceStatus {
/// The forkchoice state is valid.
Valid,
/// The forkchoice state is invalid.
Invalid,
/// The forkchoice state is unknown.
Syncing,
}
impl ForkchoiceStatus {
pub(crate) fn is_valid(&self) -> bool {
matches!(self, ForkchoiceStatus::Valid)
}
pub(crate) fn is_syncing(&self) -> bool {
matches!(self, ForkchoiceStatus::Syncing)
}
/// Converts the general purpose [PayloadStatusEnum] into a [ForkchoiceStatus].
pub(crate) fn from_payload_status(status: &PayloadStatusEnum) -> Self {
match status {
PayloadStatusEnum::Valid => ForkchoiceStatus::Valid,
PayloadStatusEnum::Invalid { .. } => ForkchoiceStatus::Invalid,
PayloadStatusEnum::Syncing => ForkchoiceStatus::Syncing,
PayloadStatusEnum::Accepted => {
// This is only returned on `newPayload` accepted would be a valid state here.
ForkchoiceStatus::Valid
}
PayloadStatusEnum::InvalidBlockHash { .. } => ForkchoiceStatus::Invalid,
}
}
}
impl From<PayloadStatusEnum> for ForkchoiceStatus {
fn from(status: PayloadStatusEnum) -> Self {
ForkchoiceStatus::from_payload_status(&status)
}
}

View File

@@ -1,4 +1,7 @@
use crate::{engine::error::BeaconOnNewPayloadError, BeaconConsensusEngineEvent};
use crate::{
engine::{error::BeaconOnNewPayloadError, forkchoice::ForkchoiceStatus},
BeaconConsensusEngineEvent,
};
use futures::{future::Either, FutureExt};
use reth_interfaces::consensus::ForkchoiceState;
use reth_payload_builder::error::PayloadBuilderError;
@@ -19,8 +22,11 @@ use tokio::sync::{mpsc::UnboundedSender, oneshot};
#[must_use = "futures do nothing unless you `.await` or poll them"]
#[derive(Debug)]
pub struct OnForkChoiceUpdated {
/// Tracks if this update was valid.
is_valid_update: bool,
/// Represents the status of the forkchoice update.
///
/// Note: This is separate from the response `fut`, because we still can return an error
/// depending on the payload attributes, even if the forkchoice update itself is valid.
forkchoice_status: ForkchoiceStatus,
/// Returns the result of the forkchoice update.
fut: Either<futures::future::Ready<ForkChoiceUpdateResult>, PendingPayloadId>,
}
@@ -30,14 +36,19 @@ pub struct OnForkChoiceUpdated {
impl OnForkChoiceUpdated {
/// Returns true if this update is valid
pub(crate) fn is_valid_update(&self) -> bool {
self.is_valid_update
self.forkchoice_status.is_valid()
}
/// Returns the determined status of the received ForkchoiceState.
pub(crate) fn forkchoice_status(&self) -> ForkchoiceStatus {
self.forkchoice_status
}
/// Creates a new instance of `OnForkChoiceUpdated` if the forkchoice update succeeded and no
/// payload attributes were provided.
pub(crate) fn valid(status: PayloadStatus) -> Self {
Self {
is_valid_update: status.is_valid(),
forkchoice_status: ForkchoiceStatus::from_payload_status(&status.status),
fut: Either::Left(futures::future::ready(Ok(ForkchoiceUpdated::new(status)))),
}
}
@@ -46,7 +57,7 @@ impl OnForkChoiceUpdated {
/// forkchoice update failed due to an invalid payload.
pub(crate) fn with_invalid(status: PayloadStatus) -> Self {
Self {
is_valid_update: false,
forkchoice_status: ForkchoiceStatus::from_payload_status(&status.status),
fut: Either::Left(futures::future::ready(Ok(ForkchoiceUpdated::new(status)))),
}
}
@@ -55,7 +66,7 @@ impl OnForkChoiceUpdated {
/// given state is considered invalid
pub(crate) fn invalid_state() -> Self {
Self {
is_valid_update: false,
forkchoice_status: ForkchoiceStatus::Invalid,
fut: Either::Left(futures::future::ready(Err(ForkchoiceUpdateError::InvalidState))),
}
}
@@ -65,7 +76,7 @@ impl OnForkChoiceUpdated {
pub(crate) fn invalid_payload_attributes() -> Self {
Self {
// This is valid because this is only reachable if the state and payload is valid
is_valid_update: true,
forkchoice_status: ForkchoiceStatus::Valid,
fut: Either::Left(futures::future::ready(Err(
ForkchoiceUpdateError::UpdatedInvalidPayloadAttributes,
))),
@@ -78,7 +89,7 @@ impl OnForkChoiceUpdated {
pending_payload_id: oneshot::Receiver<Result<PayloadId, PayloadBuilderError>>,
) -> Self {
Self {
is_valid_update: payload_status.is_valid(),
forkchoice_status: ForkchoiceStatus::from_payload_status(&payload_status.status),
fut: Either::Right(PendingPayloadId {
payload_status: Some(payload_status),
pending_payload_id,

View File

@@ -55,8 +55,10 @@ pub use error::{
mod metrics;
mod event;
mod forkchoice;
pub(crate) mod sync;
use crate::engine::forkchoice::ForkchoiceStateTracker;
pub use event::BeaconConsensusEngineEvent;
/// The maximum number of invalid headers that can be tracked by the engine.
@@ -163,9 +165,8 @@ where
engine_message_rx: UnboundedReceiverStream<BeaconEngineMessage>,
/// A clone of the handle
handle: BeaconConsensusEngineHandle,
/// Current forkchoice state. The engine must receive the initial state in order to start
/// syncing.
forkchoice_state: Option<ForkchoiceState>,
/// Tracks the received forkchoice state updates received by the CL.
forkchoice_state_tracker: ForkchoiceStateTracker,
/// The payload store.
payload_builder: PayloadBuilderHandle,
/// Listeners for engine events.
@@ -246,7 +247,7 @@ where
sync_state_updater,
engine_message_rx: UnboundedReceiverStream::new(rx),
handle: handle.clone(),
forkchoice_state: None,
forkchoice_state_tracker: Default::default(),
payload_builder,
listeners: EventListeners::default(),
invalid_headers: InvalidHeaderCache::new(MAX_INVALID_HEADERS),
@@ -354,6 +355,9 @@ where
}
};
// update the forkchoice state tracker
self.forkchoice_state_tracker.set_latest(state, on_updated.forkchoice_status());
let is_valid_response = on_updated.is_valid_update();
let _ = tx.send(Ok(on_updated));
@@ -395,9 +399,6 @@ where
// TODO: check PoW / EIP-3675 terminal block conditions for the fork choice head
// TODO: ensure validity of the payload (is this satisfied already?)
let is_first_forkchoice = self.forkchoice_state.is_none();
self.forkchoice_state = Some(state);
let status = if self.sync.is_pipeline_idle() {
// We can only process new forkchoice updates if the pipeline is idle, since it requires
// exclusive access to the database
@@ -445,7 +446,7 @@ where
}
}
self.on_failed_canonical_forkchoice_update(&state, error, is_first_forkchoice)
self.on_failed_canonical_forkchoice_update(&state, error)
}
}
} else {
@@ -519,7 +520,6 @@ where
&mut self,
state: &ForkchoiceState,
error: Error,
is_first_forkchoice: bool,
) -> PayloadStatus {
debug_assert!(self.sync.is_pipeline_idle(), "pipeline must be idle");
warn!(target: "consensus::engine", ?error, ?state, "Error canonicalizing the head hash");
@@ -550,7 +550,7 @@ where
// if this is the first FCU we received from the beacon node, then we start triggering the
// pipeline
if is_first_forkchoice {
if self.forkchoice_state_tracker.is_empty() {
// find the appropriate target to sync to, if we don't have the safe block hash then we
// start syncing to the safe block via pipeline first
let target = if !state.safe_block_hash.is_zero() &&
@@ -859,9 +859,9 @@ where
{
// payload is valid
self.sync_state_updater.update_sync_state(SyncState::Idle);
} else if let Some(ref state) = self.forkchoice_state {
} else if let Some(target) = self.forkchoice_state_tracker.sync_target() {
// if the payload is invalid, we run the pipeline to the head block.
self.sync.set_pipeline_sync_target(state.head_block_hash);
self.sync.set_pipeline_sync_target(target);
}
}
EngineSyncEvent::PipelineStarted(target) => {
@@ -883,16 +883,6 @@ where
return Some(Ok(()))
}
let current_state = match self.forkchoice_state {
Some(state) => state,
None => {
// This is only possible if the node was run with `debug.tip`
// argument and without CL.
warn!(target: "consensus::engine", "No forkchoice state available");
return None
}
};
if let ControlFlow::Unwind { bad_block, .. } = ctrl {
trace!(target: "consensus::engine", hash=?bad_block.hash, "Bad block detected in unwind");
@@ -922,6 +912,19 @@ where
self.blockchain.set_canonical_head(max_header);
}
let sync_target_state = match self
.forkchoice_state_tracker
.sync_target_state()
{
Some(current_state) => current_state,
None => {
// This is only possible if the node was run with `debug.tip`
// argument and without CL.
warn!(target: "consensus::engine", "No forkchoice state available");
return None
}
};
// TODO: figure out how to make this less complex:
// restore_tree_if_possible will run the pipeline if the current_state head
// hash is missing. This can arise if we buffer the forkchoice head, and if
@@ -943,11 +946,11 @@ where
// exists), or if the head is invalid. ideally we want "is a descendant of
// this block invalid"
let lowest_buffered_ancestor =
self.lowest_buffered_ancestor_or(current_state.head_block_hash);
self.lowest_buffered_ancestor_or(sync_target_state.head_block_hash);
if self.invalid_headers.get(&lowest_buffered_ancestor).is_none() {
// Update the state and hashes of the blockchain tree if possible.
match self.restore_tree_if_possible(current_state) {
match self.restore_tree_if_possible(sync_target_state) {
Ok(_) => self.sync_state_updater.update_sync_state(SyncState::Idle),
Err(error) => {
error!(target: "consensus::engine", ?error, "Error restoring blockchain tree");