Files
reth/crates/engine/util/src/reorg.rs

322 lines
13 KiB
Rust

//! Stream wrapper that simulates reorgs.
use alloy_consensus::{BlockHeader, Transaction};
use alloy_rpc_types_engine::{ForkchoiceState, PayloadStatus};
use futures::{stream::FuturesUnordered, Stream, StreamExt, TryFutureExt};
use itertools::Either;
use reth_chainspec::{ChainSpecProvider, EthChainSpec};
use reth_engine_primitives::{
BeaconEngineMessage, BeaconOnNewPayloadError, ExecutionPayload as _, OnForkChoiceUpdated,
PayloadValidator,
};
use reth_errors::{BlockExecutionError, BlockValidationError, RethError, RethResult};
use reth_evm::{
execute::{BlockBuilder, BlockBuilderOutcome},
ConfigureEvm,
};
use reth_payload_primitives::{BuiltPayload, EngineApiMessageVersion, PayloadTypes};
use reth_primitives_traits::{
block::Block as _, BlockBody as _, BlockTy, HeaderTy, SealedBlock, SignedTransaction,
};
use reth_revm::{database::StateProviderDatabase, db::State};
use reth_storage_api::{errors::ProviderError, BlockReader, StateProviderFactory};
use std::{
collections::VecDeque,
future::Future,
pin::Pin,
task::{ready, Context, Poll},
};
use tokio::sync::oneshot;
use tracing::*;
#[derive(Debug)]
enum EngineReorgState<T: PayloadTypes> {
Forward,
Reorg { queue: VecDeque<BeaconEngineMessage<T>> },
}
type EngineReorgResponse = Result<
Either<Result<PayloadStatus, BeaconOnNewPayloadError>, RethResult<OnForkChoiceUpdated>>,
oneshot::error::RecvError,
>;
type ReorgResponseFut = Pin<Box<dyn Future<Output = EngineReorgResponse> + Send + Sync>>;
/// Engine API stream wrapper that simulates reorgs with specified frequency.
#[derive(Debug)]
#[pin_project::pin_project]
pub struct EngineReorg<S, T: PayloadTypes, Provider, Evm, Validator> {
/// Underlying stream
#[pin]
stream: S,
/// Database provider.
provider: Provider,
/// Evm configuration.
evm_config: Evm,
/// Payload validator.
payload_validator: Validator,
/// The frequency of reorgs.
frequency: usize,
/// The depth of reorgs.
depth: usize,
/// The number of forwarded forkchoice states.
/// This is reset after a reorg.
forkchoice_states_forwarded: usize,
/// Current state of the stream.
state: EngineReorgState<T>,
/// Last forkchoice state.
last_forkchoice_state: Option<ForkchoiceState>,
/// Pending engine responses to reorg messages.
reorg_responses: FuturesUnordered<ReorgResponseFut>,
}
impl<S, T: PayloadTypes, Provider, Evm, Validator> EngineReorg<S, T, Provider, Evm, Validator> {
/// Creates new [`EngineReorg`] stream wrapper.
pub fn new(
stream: S,
provider: Provider,
evm_config: Evm,
payload_validator: Validator,
frequency: usize,
depth: usize,
) -> Self {
Self {
stream,
provider,
evm_config,
payload_validator,
frequency,
depth,
state: EngineReorgState::Forward,
forkchoice_states_forwarded: 0,
last_forkchoice_state: None,
reorg_responses: FuturesUnordered::new(),
}
}
}
impl<S, T, Provider, Evm, Validator> Stream for EngineReorg<S, T, Provider, Evm, Validator>
where
S: Stream<Item = BeaconEngineMessage<T>>,
T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = Evm::Primitives>>,
Provider: BlockReader<Header = HeaderTy<Evm::Primitives>, Block = BlockTy<Evm::Primitives>>
+ StateProviderFactory
+ ChainSpecProvider,
Evm: ConfigureEvm,
Validator: PayloadValidator<ExecutionData = T::ExecutionData, Block = BlockTy<Evm::Primitives>>,
{
type Item = S::Item;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
loop {
if let Poll::Ready(Some(response)) = this.reorg_responses.poll_next_unpin(cx) {
match response {
Ok(Either::Left(Ok(payload_status))) => {
debug!(target: "engine::stream::reorg", ?payload_status, "Received response for reorg new payload");
}
Ok(Either::Left(Err(payload_error))) => {
error!(target: "engine::stream::reorg", %payload_error, "Error on reorg new payload");
}
Ok(Either::Right(Ok(fcu_status))) => {
debug!(target: "engine::stream::reorg", ?fcu_status, "Received response for reorg forkchoice update");
}
Ok(Either::Right(Err(fcu_error))) => {
error!(target: "engine::stream::reorg", %fcu_error, "Error on reorg forkchoice update");
}
Err(_) => {}
};
continue
}
if let EngineReorgState::Reorg { queue } = &mut this.state {
match queue.pop_front() {
Some(msg) => return Poll::Ready(Some(msg)),
None => {
*this.forkchoice_states_forwarded = 0;
*this.state = EngineReorgState::Forward;
}
}
}
let next = ready!(this.stream.poll_next_unpin(cx));
let item = match (next, &this.last_forkchoice_state) {
(
Some(BeaconEngineMessage::NewPayload { payload, tx }),
Some(last_forkchoice_state),
) if this.forkchoice_states_forwarded > this.frequency &&
// Only enter reorg state if new payload attaches to current head.
last_forkchoice_state.head_block_hash == payload.parent_hash() =>
{
// Enter the reorg state.
// The current payload will be immediately forwarded by being in front of the
// queue. Then we attempt to reorg the current head by generating a payload that
// attaches to the head's parent and is based on the non-conflicting
// transactions (txs from block `n + 1` that are valid at block `n` according to
// consensus checks) from the current payload as well as the corresponding
// forkchoice state. We will rely on CL to reorg us back to canonical chain.
// TODO: This is an expensive blocking operation, ideally it's spawned as a task
// so that the stream could yield the control back.
let reorg_block = match create_reorg_head(
this.provider,
this.evm_config,
this.payload_validator,
*this.depth,
payload.clone(),
) {
Ok(result) => result,
Err(error) => {
error!(target: "engine::stream::reorg", %error, "Error attempting to create reorg head");
// Forward the payload and attempt to create reorg on top of
// the next one
return Poll::Ready(Some(BeaconEngineMessage::NewPayload {
payload,
tx,
}))
}
};
let reorg_forkchoice_state = ForkchoiceState {
finalized_block_hash: last_forkchoice_state.finalized_block_hash,
safe_block_hash: last_forkchoice_state.safe_block_hash,
head_block_hash: reorg_block.hash(),
};
let (reorg_payload_tx, reorg_payload_rx) = oneshot::channel();
let (reorg_fcu_tx, reorg_fcu_rx) = oneshot::channel();
this.reorg_responses.extend([
Box::pin(reorg_payload_rx.map_ok(Either::Left)) as ReorgResponseFut,
Box::pin(reorg_fcu_rx.map_ok(Either::Right)) as ReorgResponseFut,
]);
let queue = VecDeque::from([
// Current payload
BeaconEngineMessage::NewPayload { payload, tx },
// Reorg payload
BeaconEngineMessage::NewPayload {
payload: T::block_to_payload(reorg_block),
tx: reorg_payload_tx,
},
// Reorg forkchoice state
BeaconEngineMessage::ForkchoiceUpdated {
state: reorg_forkchoice_state,
payload_attrs: None,
tx: reorg_fcu_tx,
version: EngineApiMessageVersion::default(),
},
]);
*this.state = EngineReorgState::Reorg { queue };
continue
}
(
Some(BeaconEngineMessage::ForkchoiceUpdated {
state,
payload_attrs,
tx,
version,
}),
_,
) => {
// Record last forkchoice state forwarded to the engine.
// We do not care if it's valid since engine should be able to handle
// reorgs that rely on invalid forkchoice state.
*this.last_forkchoice_state = Some(state);
*this.forkchoice_states_forwarded += 1;
Some(BeaconEngineMessage::ForkchoiceUpdated {
state,
payload_attrs,
tx,
version,
})
}
(item, _) => item,
};
return Poll::Ready(item)
}
}
}
fn create_reorg_head<Provider, Evm, Validator>(
provider: &Provider,
evm_config: &Evm,
payload_validator: &Validator,
mut depth: usize,
next_payload: Validator::ExecutionData,
) -> RethResult<SealedBlock<BlockTy<Evm::Primitives>>>
where
Provider: BlockReader<Header = HeaderTy<Evm::Primitives>, Block = BlockTy<Evm::Primitives>>
+ StateProviderFactory
+ ChainSpecProvider<ChainSpec: EthChainSpec>,
Evm: ConfigureEvm,
Validator: PayloadValidator<Block = BlockTy<Evm::Primitives>>,
{
// Ensure next payload is valid.
let next_block =
payload_validator.ensure_well_formed_payload(next_payload).map_err(RethError::msg)?;
// Fetch reorg target block depending on its depth and its parent.
let mut previous_hash = next_block.parent_hash();
let mut candidate_transactions = next_block.into_body().transactions().to_vec();
let reorg_target = 'target: {
loop {
let reorg_target = provider
.block_by_hash(previous_hash)?
.ok_or_else(|| ProviderError::HeaderNotFound(previous_hash.into()))?;
if depth == 0 {
break 'target reorg_target.seal_slow()
}
depth -= 1;
previous_hash = reorg_target.header().parent_hash();
candidate_transactions = reorg_target.into_body().into_transactions();
}
};
let reorg_target_parent = provider
.sealed_header_by_hash(reorg_target.header().parent_hash())?
.ok_or_else(|| ProviderError::HeaderNotFound(reorg_target.header().parent_hash().into()))?;
debug!(target: "engine::stream::reorg", number = reorg_target.header().number(), hash = %previous_hash, "Selected reorg target");
// Configure state
let state_provider = provider.state_by_block_hash(reorg_target.header().parent_hash())?;
let mut state = State::builder()
.with_database_ref(StateProviderDatabase::new(&state_provider))
.with_bundle_update()
.build();
let ctx = evm_config.context_for_block(&reorg_target);
let evm = evm_config.evm_for_block(&mut state, &reorg_target);
let mut builder = evm_config.create_block_builder(evm, &reorg_target_parent, ctx);
builder.apply_pre_execution_changes()?;
let mut cumulative_gas_used = 0;
for tx in candidate_transactions {
// ensure we still have capacity for this transaction
if cumulative_gas_used + tx.gas_limit() > reorg_target.gas_limit() {
continue
}
let tx_recovered =
tx.try_into_recovered().map_err(|_| ProviderError::SenderRecoveryError)?;
let gas_used = match builder.execute_transaction(tx_recovered) {
Ok(gas_used) => gas_used,
Err(BlockExecutionError::Validation(BlockValidationError::InvalidTx {
hash,
error,
})) => {
trace!(target: "engine::stream::reorg", hash = %hash, ?error, "Error executing transaction from next block");
continue
}
// Treat error as fatal
Err(error) => return Err(RethError::Execution(error)),
};
cumulative_gas_used += gas_used;
}
let BlockBuilderOutcome { block, .. } = builder.finish(&state_provider)?;
Ok(block.into_sealed_block())
}