chore: remove trait EngineApiTreeHandler (#10136)

This commit is contained in:
Miguel Tavares
2024-08-06 10:55:15 -03:00
committed by GitHub
parent f4e6a09bf6
commit 365012b9ca

View File

@@ -21,7 +21,7 @@ use reth_engine_primitives::EngineTypes;
use reth_errors::{ConsensusError, ProviderResult};
use reth_evm::execute::{BlockExecutorProvider, Executor};
use reth_payload_builder::PayloadBuilderHandle;
use reth_payload_primitives::{PayloadAttributes, PayloadBuilderAttributes, PayloadTypes};
use reth_payload_primitives::{PayloadAttributes, PayloadBuilderAttributes};
use reth_payload_validator::ExecutionPayloadValidator;
use reth_primitives::{
Block, BlockNumHash, BlockNumber, GotExpected, Header, Receipts, Requests, SealedBlock,
@@ -306,47 +306,6 @@ impl EngineApiTreeState {
}
}
/// The type responsible for processing engine API requests.
pub trait EngineApiTreeHandler {
/// The engine type that this handler is for.
type Engine: EngineTypes;
/// Invoked when previously requested blocks were downloaded.
fn on_downloaded(&mut self, blocks: Vec<SealedBlockWithSenders>) -> Option<TreeEvent>;
/// When the Consensus layer receives a new block via the consensus gossip protocol,
/// the transactions in the block are sent to the execution layer in the form of a
/// [`ExecutionPayload`]. The Execution layer executes the transactions and validates the
/// state in the block header, then passes validation data back to Consensus layer, that
/// adds the block to the head of its own blockchain and attests to it. The block is then
/// broadcast over the consensus p2p network in the form of a "Beacon block".
///
/// These responses should adhere to the [Engine API Spec for
/// `engine_newPayload`](https://github.com/ethereum/execution-apis/blob/main/src/engine/paris.md#specification).
///
/// This returns a [`PayloadStatus`] that represents the outcome of a processed new payload and
/// returns an error if an internal error occurred.
fn on_new_payload(
&mut self,
payload: ExecutionPayload,
cancun_fields: Option<CancunPayloadFields>,
) -> Result<TreeOutcome<PayloadStatus>, InsertBlockFatalError>;
/// Invoked when we receive a new forkchoice update message. Calls into the blockchain tree
/// to resolve chain forks and ensure that the Execution Layer is working with the latest valid
/// chain.
///
/// These responses should adhere to the [Engine API Spec for
/// `engine_forkchoiceUpdated`](https://github.com/ethereum/execution-apis/blob/main/src/engine/paris.md#specification-1).
///
/// Returns an error if an internal error occurred like a database error.
fn on_forkchoice_updated(
&mut self,
state: ForkchoiceState,
attrs: Option<<Self::Engine as PayloadTypes>::PayloadAttributes>,
) -> ProviderResult<TreeOutcome<OnForkChoiceUpdated>>;
}
/// The outcome of a tree operation.
#[derive(Debug)]
pub struct TreeOutcome<T> {
@@ -369,7 +328,7 @@ impl<T> TreeOutcome<T> {
}
}
/// Events that can be emitted by the [`EngineApiTreeHandler`].
/// Events that are triggered by Tree Chain
#[derive(Debug)]
pub enum TreeEvent {
/// Tree action is needed.
@@ -539,6 +498,262 @@ where
}
}
/// Invoked when previously requested blocks were downloaded.
fn on_downloaded(&mut self, blocks: Vec<SealedBlockWithSenders>) -> Option<TreeEvent> {
trace!(target: "engine", block_count = %blocks.len(), "received downloaded blocks");
for block in blocks {
if let Some(event) = self.on_downloaded_block(block) {
let needs_backfill = event.is_backfill_action();
self.on_tree_event(event);
if needs_backfill {
// can exit early if backfill is needed
break
}
}
}
None
}
/// When the Consensus layer receives a new block via the consensus gossip protocol,
/// the transactions in the block are sent to the execution layer in the form of a
/// [`ExecutionPayload`]. The Execution layer executes the transactions and validates the
/// state in the block header, then passes validation data back to Consensus layer, that
/// adds the block to the head of its own blockchain and attests to it. The block is then
/// broadcast over the consensus p2p network in the form of a "Beacon block".
///
/// These responses should adhere to the [Engine API Spec for
/// `engine_newPayload`](https://github.com/ethereum/execution-apis/blob/main/src/engine/paris.md#specification).
///
/// This returns a [`PayloadStatus`] that represents the outcome of a processed new payload and
/// returns an error if an internal error occurred.
#[instrument(level = "trace", skip_all, fields(block_hash = %payload.block_hash(), block_num = %payload.block_number(),), target = "engine")]
fn on_new_payload(
&mut self,
payload: ExecutionPayload,
cancun_fields: Option<CancunPayloadFields>,
) -> Result<TreeOutcome<PayloadStatus>, InsertBlockFatalError> {
trace!(target: "engine", "invoked new payload");
self.metrics.new_payload_messages.increment(1);
// Ensures that the given payload does not violate any consensus rules that concern the
// block's layout, like:
// - missing or invalid base fee
// - invalid extra data
// - invalid transactions
// - incorrect hash
// - the versioned hashes passed with the payload do not exactly match transaction
// versioned hashes
// - the block does not contain blob transactions if it is pre-cancun
//
// This validates the following engine API rule:
//
// 3. Given the expected array of blob versioned hashes client software **MUST** run its
// validation by taking the following steps:
//
// 1. Obtain the actual array by concatenating blob versioned hashes lists
// (`tx.blob_versioned_hashes`) of each [blob
// transaction](https://eips.ethereum.org/EIPS/eip-4844#new-transaction-type) included
// in the payload, respecting the order of inclusion. If the payload has no blob
// transactions the expected array **MUST** be `[]`.
//
// 2. Return `{status: INVALID, latestValidHash: null, validationError: errorMessage |
// null}` if the expected and the actual arrays don't match.
//
// This validation **MUST** be instantly run in all cases even during active sync process.
let parent_hash = payload.parent_hash();
let block = match self
.payload_validator
.ensure_well_formed_payload(payload, cancun_fields.into())
{
Ok(block) => block,
Err(error) => {
error!(target: "engine::tree", %error, "Invalid payload");
// we need to convert the error to a payload status (response to the CL)
let latest_valid_hash =
if error.is_block_hash_mismatch() || error.is_invalid_versioned_hashes() {
// Engine-API rules:
// > `latestValidHash: null` if the blockHash validation has failed (<https://github.com/ethereum/execution-apis/blob/fe8e13c288c592ec154ce25c534e26cb7ce0530d/src/engine/shanghai.md?plain=1#L113>)
// > `latestValidHash: null` if the expected and the actual arrays don't match (<https://github.com/ethereum/execution-apis/blob/fe8e13c288c592ec154ce25c534e26cb7ce0530d/src/engine/cancun.md?plain=1#L103>)
None
} else {
self.latest_valid_hash_for_invalid_payload(parent_hash)?
};
let status = PayloadStatusEnum::from(error);
return Ok(TreeOutcome::new(PayloadStatus::new(status, latest_valid_hash)))
}
};
let block_hash = block.hash();
let mut lowest_buffered_ancestor = self.lowest_buffered_ancestor_or(block_hash);
if lowest_buffered_ancestor == block_hash {
lowest_buffered_ancestor = block.parent_hash;
}
// now check the block itself
if let Some(status) =
self.check_invalid_ancestor_with_head(lowest_buffered_ancestor, block_hash)?
{
return Ok(TreeOutcome::new(status))
}
let status = if !self.backfill_sync_state.is_idle() {
if let Err(error) = self.buffer_block_without_senders(block) {
self.on_insert_block_error(error)?
} else {
PayloadStatus::from_status(PayloadStatusEnum::Syncing)
}
} else {
let mut latest_valid_hash = None;
match self.insert_block_without_senders(block) {
Ok(status) => {
let status = match status {
InsertPayloadOk::Inserted(BlockStatus::Valid(_)) |
InsertPayloadOk::AlreadySeen(BlockStatus::Valid(_)) => {
latest_valid_hash = Some(block_hash);
PayloadStatusEnum::Valid
}
InsertPayloadOk::Inserted(BlockStatus::Disconnected { .. }) |
InsertPayloadOk::AlreadySeen(BlockStatus::Disconnected { .. }) => {
// not known to be invalid, but we don't know anything else
PayloadStatusEnum::Syncing
}
};
PayloadStatus::new(status, latest_valid_hash)
}
Err(error) => self.on_insert_block_error(error)?,
}
};
let mut outcome = TreeOutcome::new(status);
if outcome.outcome.is_valid() && self.is_sync_target_head(block_hash) {
// if the block is valid and it is the sync target head, make it canonical
outcome =
outcome.with_event(TreeEvent::TreeAction(TreeAction::MakeCanonical(block_hash)));
}
Ok(outcome)
}
/// Invoked when we receive a new forkchoice update message. Calls into the blockchain tree
/// to resolve chain forks and ensure that the Execution Layer is working with the latest valid
/// chain.
///
/// These responses should adhere to the [Engine API Spec for
/// `engine_forkchoiceUpdated`](https://github.com/ethereum/execution-apis/blob/main/src/engine/paris.md#specification-1).
///
/// Returns an error if an internal error occurred like a database error.
#[instrument(level = "trace", skip_all, fields(head = % state.head_block_hash, safe = % state.safe_block_hash,finalized = % state.finalized_block_hash), target = "engine")]
fn on_forkchoice_updated(
&mut self,
state: ForkchoiceState,
attrs: Option<T::PayloadAttributes>,
) -> ProviderResult<TreeOutcome<OnForkChoiceUpdated>> {
trace!(target: "engine", ?attrs, "invoked forkchoice update");
self.metrics.forkchoice_updated_messages.increment(1);
self.canonical_in_memory_state.on_forkchoice_update_received();
if let Some(on_updated) = self.pre_validate_forkchoice_update(state)? {
return Ok(TreeOutcome::new(on_updated))
}
let valid_outcome = |head| {
TreeOutcome::new(OnForkChoiceUpdated::valid(PayloadStatus::new(
PayloadStatusEnum::Valid,
Some(head),
)))
};
// Process the forkchoice update by trying to make the head block canonical
//
// We can only process this forkchoice update if:
// - we have the `head` block
// - the head block is part of a chain that is connected to the canonical chain. This
// includes reorgs.
//
// Performing a FCU involves:
// - marking the FCU's head block as canonical
// - updating in memory state to reflect the new canonical chain
// - updating canonical state trackers
// - emitting a canonicalization event for the new chain (including reorg)
// - if we have payload attributes, delegate them to the payload service
// 1. ensure we have a new head block
if self.state.tree_state.canonical_block_hash() == state.head_block_hash {
trace!(target: "engine", "fcu head hash is already canonical");
// we still need to process payload attributes if the head is already canonical
if let Some(attr) = attrs {
let tip = self
.block_by_hash(self.state.tree_state.canonical_block_hash())?
.ok_or_else(|| {
// If we can't find the canonical block, then something is wrong and we need
// to return an error
ProviderError::HeaderNotFound(state.head_block_hash.into())
})?;
let updated = self.process_payload_attributes(attr, &tip, state);
return Ok(TreeOutcome::new(updated))
}
// the head block is already canonical
return Ok(valid_outcome(state.head_block_hash))
}
// 2. ensure we can apply a new chain update for the head block
if let Some(chain_update) = self.state.tree_state.on_new_head(state.head_block_hash) {
let tip = chain_update.tip().header.clone();
self.on_canonical_chain_update(chain_update);
// update the safe and finalized blocks and ensure their values are valid, but only
// after the head block is made canonical
if let Err(outcome) = self.ensure_consistent_forkchoice_state(state) {
// safe or finalized hashes are invalid
return Ok(TreeOutcome::new(outcome))
}
if let Some(attr) = attrs {
let updated = self.process_payload_attributes(attr, &tip, state);
return Ok(TreeOutcome::new(updated))
}
return Ok(valid_outcome(state.head_block_hash))
}
// 3. check if the head is already part of the canonical chain
if let Ok(Some(canonical_header)) = self.find_canonical_header(state.head_block_hash) {
debug!(target: "engine", head = canonical_header.number, "fcu head block is already canonical");
// the head block is already canonical
return Ok(valid_outcome(state.head_block_hash))
}
// 4. we don't have the block to perform the update
// we assume the FCU is valid and at least the head is missing,
// so we need to start syncing to it
//
// find the appropriate target to sync to, if we don't have the safe block hash then we
// start syncing to the safe block via backfill first
let target = if self.state.forkchoice_state_tracker.is_empty() &&
// check that safe block is valid and missing
!state.safe_block_hash.is_zero() &&
self.find_canonical_header(state.safe_block_hash).ok().flatten().is_none()
{
debug!(target: "engine", "missing safe block on initial FCU, downloading safe block");
state.safe_block_hash
} else {
state.head_block_hash
};
let target = self.lowest_buffered_ancestor_or(target);
trace!(target: "engine", %target, "downloading missing block");
Ok(TreeOutcome::new(OnForkChoiceUpdated::valid(PayloadStatus::from_status(
PayloadStatusEnum::Syncing,
)))
.with_event(TreeEvent::Download(DownloadRequest::single_block(target))))
}
/// Attempts to receive the next engine request.
///
/// If there's currently no persistence action in progress, this will block until a new request
@@ -1670,250 +1885,6 @@ where
}
}
impl<P, E, T> EngineApiTreeHandler for EngineApiTreeHandlerImpl<P, E, T>
where
P: BlockReader + StateProviderFactory + Clone + 'static,
E: BlockExecutorProvider,
T: EngineTypes,
{
type Engine = T;
fn on_downloaded(&mut self, blocks: Vec<SealedBlockWithSenders>) -> Option<TreeEvent> {
trace!(target: "engine", block_count = %blocks.len(), "received downloaded blocks");
for block in blocks {
if let Some(event) = self.on_downloaded_block(block) {
let needs_backfill = event.is_backfill_action();
self.on_tree_event(event);
if needs_backfill {
// can exit early if backfill is needed
break
}
}
}
None
}
#[instrument(level = "trace", skip_all, fields(block_hash = %payload.block_hash(), block_num = %payload.block_number(),), target = "engine")]
fn on_new_payload(
&mut self,
payload: ExecutionPayload,
cancun_fields: Option<CancunPayloadFields>,
) -> Result<TreeOutcome<PayloadStatus>, InsertBlockFatalError> {
trace!(target: "engine", "invoked new payload");
self.metrics.new_payload_messages.increment(1);
// Ensures that the given payload does not violate any consensus rules that concern the
// block's layout, like:
// - missing or invalid base fee
// - invalid extra data
// - invalid transactions
// - incorrect hash
// - the versioned hashes passed with the payload do not exactly match transaction
// versioned hashes
// - the block does not contain blob transactions if it is pre-cancun
//
// This validates the following engine API rule:
//
// 3. Given the expected array of blob versioned hashes client software **MUST** run its
// validation by taking the following steps:
//
// 1. Obtain the actual array by concatenating blob versioned hashes lists
// (`tx.blob_versioned_hashes`) of each [blob
// transaction](https://eips.ethereum.org/EIPS/eip-4844#new-transaction-type) included
// in the payload, respecting the order of inclusion. If the payload has no blob
// transactions the expected array **MUST** be `[]`.
//
// 2. Return `{status: INVALID, latestValidHash: null, validationError: errorMessage |
// null}` if the expected and the actual arrays don't match.
//
// This validation **MUST** be instantly run in all cases even during active sync process.
let parent_hash = payload.parent_hash();
let block = match self
.payload_validator
.ensure_well_formed_payload(payload, cancun_fields.into())
{
Ok(block) => block,
Err(error) => {
error!(target: "engine::tree", %error, "Invalid payload");
// we need to convert the error to a payload status (response to the CL)
let latest_valid_hash =
if error.is_block_hash_mismatch() || error.is_invalid_versioned_hashes() {
// Engine-API rules:
// > `latestValidHash: null` if the blockHash validation has failed (<https://github.com/ethereum/execution-apis/blob/fe8e13c288c592ec154ce25c534e26cb7ce0530d/src/engine/shanghai.md?plain=1#L113>)
// > `latestValidHash: null` if the expected and the actual arrays don't match (<https://github.com/ethereum/execution-apis/blob/fe8e13c288c592ec154ce25c534e26cb7ce0530d/src/engine/cancun.md?plain=1#L103>)
None
} else {
self.latest_valid_hash_for_invalid_payload(parent_hash)?
};
let status = PayloadStatusEnum::from(error);
return Ok(TreeOutcome::new(PayloadStatus::new(status, latest_valid_hash)))
}
};
let block_hash = block.hash();
let mut lowest_buffered_ancestor = self.lowest_buffered_ancestor_or(block_hash);
if lowest_buffered_ancestor == block_hash {
lowest_buffered_ancestor = block.parent_hash;
}
// now check the block itself
if let Some(status) =
self.check_invalid_ancestor_with_head(lowest_buffered_ancestor, block_hash)?
{
return Ok(TreeOutcome::new(status))
}
let status = if !self.backfill_sync_state.is_idle() {
if let Err(error) = self.buffer_block_without_senders(block) {
self.on_insert_block_error(error)?
} else {
PayloadStatus::from_status(PayloadStatusEnum::Syncing)
}
} else {
let mut latest_valid_hash = None;
match self.insert_block_without_senders(block) {
Ok(status) => {
let status = match status {
InsertPayloadOk::Inserted(BlockStatus::Valid(_)) |
InsertPayloadOk::AlreadySeen(BlockStatus::Valid(_)) => {
latest_valid_hash = Some(block_hash);
PayloadStatusEnum::Valid
}
InsertPayloadOk::Inserted(BlockStatus::Disconnected { .. }) |
InsertPayloadOk::AlreadySeen(BlockStatus::Disconnected { .. }) => {
// not known to be invalid, but we don't know anything else
PayloadStatusEnum::Syncing
}
};
PayloadStatus::new(status, latest_valid_hash)
}
Err(error) => self.on_insert_block_error(error)?,
}
};
let mut outcome = TreeOutcome::new(status);
if outcome.outcome.is_valid() && self.is_sync_target_head(block_hash) {
// if the block is valid and it is the sync target head, make it canonical
outcome =
outcome.with_event(TreeEvent::TreeAction(TreeAction::MakeCanonical(block_hash)));
}
Ok(outcome)
}
#[instrument(level = "trace", skip_all, fields(head = % state.head_block_hash, safe = % state.safe_block_hash,finalized = % state.finalized_block_hash), target = "engine")]
fn on_forkchoice_updated(
&mut self,
state: ForkchoiceState,
attrs: Option<<Self::Engine as PayloadTypes>::PayloadAttributes>,
) -> ProviderResult<TreeOutcome<OnForkChoiceUpdated>> {
trace!(target: "engine", ?attrs, "invoked forkchoice update");
self.metrics.forkchoice_updated_messages.increment(1);
self.canonical_in_memory_state.on_forkchoice_update_received();
if let Some(on_updated) = self.pre_validate_forkchoice_update(state)? {
return Ok(TreeOutcome::new(on_updated))
}
let valid_outcome = |head| {
TreeOutcome::new(OnForkChoiceUpdated::valid(PayloadStatus::new(
PayloadStatusEnum::Valid,
Some(head),
)))
};
// Process the forkchoice update by trying to make the head block canonical
//
// We can only process this forkchoice update if:
// - we have the `head` block
// - the head block is part of a chain that is connected to the canonical chain. This
// includes reorgs.
//
// Performing a FCU involves:
// - marking the FCU's head block as canonical
// - updating in memory state to reflect the new canonical chain
// - updating canonical state trackers
// - emitting a canonicalization event for the new chain (including reorg)
// - if we have payload attributes, delegate them to the payload service
// 1. ensure we have a new head block
if self.state.tree_state.canonical_block_hash() == state.head_block_hash {
trace!(target: "engine", "fcu head hash is already canonical");
// we still need to process payload attributes if the head is already canonical
if let Some(attr) = attrs {
let tip = self
.block_by_hash(self.state.tree_state.canonical_block_hash())?
.ok_or_else(|| {
// If we can't find the canonical block, then something is wrong and we need
// to return an error
ProviderError::HeaderNotFound(state.head_block_hash.into())
})?;
let updated = self.process_payload_attributes(attr, &tip, state);
return Ok(TreeOutcome::new(updated))
}
// the head block is already canonical
return Ok(valid_outcome(state.head_block_hash))
}
// 2. ensure we can apply a new chain update for the head block
if let Some(chain_update) = self.state.tree_state.on_new_head(state.head_block_hash) {
let tip = chain_update.tip().header.clone();
self.on_canonical_chain_update(chain_update);
// update the safe and finalized blocks and ensure their values are valid, but only
// after the head block is made canonical
if let Err(outcome) = self.ensure_consistent_forkchoice_state(state) {
// safe or finalized hashes are invalid
return Ok(TreeOutcome::new(outcome))
}
if let Some(attr) = attrs {
let updated = self.process_payload_attributes(attr, &tip, state);
return Ok(TreeOutcome::new(updated))
}
return Ok(valid_outcome(state.head_block_hash))
}
// 3. check if the head is already part of the canonical chain
if let Ok(Some(canonical_header)) = self.find_canonical_header(state.head_block_hash) {
debug!(target: "engine", head = canonical_header.number, "fcu head block is already canonical");
// the head block is already canonical
return Ok(valid_outcome(state.head_block_hash))
}
// 4. we don't have the block to perform the update
// we assume the FCU is valid and at least the head is missing,
// so we need to start syncing to it
//
// find the appropriate target to sync to, if we don't have the safe block hash then we
// start syncing to the safe block via backfill first
let target = if self.state.forkchoice_state_tracker.is_empty() &&
// check that safe block is valid and missing
!state.safe_block_hash.is_zero() &&
self.find_canonical_header(state.safe_block_hash).ok().flatten().is_none()
{
debug!(target: "engine", "missing safe block on initial FCU, downloading safe block");
state.safe_block_hash
} else {
state.head_block_hash
};
let target = self.lowest_buffered_ancestor_or(target);
trace!(target: "engine", %target, "downloading missing block");
Ok(TreeOutcome::new(OnForkChoiceUpdated::valid(PayloadStatus::from_status(
PayloadStatusEnum::Syncing,
)))
.with_event(TreeEvent::Download(DownloadRequest::single_block(target))))
}
}
/// The state of the persistence task.
#[derive(Default, Debug)]
pub struct PersistenceState {