mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-04-30 03:01:58 -04:00
459 lines
18 KiB
Rust
459 lines
18 KiB
Rust
use crate::{FlashBlockCompleteSequence, FlashBlockCompleteSequenceRx};
|
|
use alloy_primitives::B256;
|
|
use alloy_rpc_types_engine::PayloadStatusEnum;
|
|
use op_alloy_rpc_types_engine::OpExecutionData;
|
|
use reth_engine_primitives::ConsensusEngineHandle;
|
|
use reth_optimism_payload_builder::OpPayloadTypes;
|
|
use reth_payload_primitives::{EngineApiMessageVersion, ExecutionPayload, PayloadTypes};
|
|
use tracing::*;
|
|
|
|
/// Consensus client that sends FCUs and new payloads using blocks from a [`FlashBlockService`].
|
|
///
|
|
/// This client receives completed flashblock sequences and:
|
|
/// - Attempts to submit `engine_newPayload` if `state_root` is available (non-zero)
|
|
/// - Always sends `engine_forkChoiceUpdated` to drive chain forward
|
|
///
|
|
/// [`FlashBlockService`]: crate::FlashBlockService
|
|
#[derive(Debug)]
|
|
pub struct FlashBlockConsensusClient<P = OpPayloadTypes>
|
|
where
|
|
P: PayloadTypes,
|
|
{
|
|
/// Handle to execution client.
|
|
engine_handle: ConsensusEngineHandle<P>,
|
|
/// Receiver for completed flashblock sequences from `FlashBlockService`.
|
|
sequence_receiver: FlashBlockCompleteSequenceRx,
|
|
}
|
|
|
|
impl<P> FlashBlockConsensusClient<P>
|
|
where
|
|
P: PayloadTypes,
|
|
P::ExecutionData: for<'a> TryFrom<&'a FlashBlockCompleteSequence, Error: std::fmt::Display>,
|
|
{
|
|
/// Create a new `FlashBlockConsensusClient` with the given Op engine and sequence receiver.
|
|
pub const fn new(
|
|
engine_handle: ConsensusEngineHandle<P>,
|
|
sequence_receiver: FlashBlockCompleteSequenceRx,
|
|
) -> eyre::Result<Self> {
|
|
Ok(Self { engine_handle, sequence_receiver })
|
|
}
|
|
|
|
/// Attempts to submit a new payload to the engine.
|
|
///
|
|
/// The `TryFrom` conversion will fail if `execution_outcome.state_root` is `B256::ZERO`,
|
|
/// in which case this returns the `parent_hash` instead to drive the chain forward.
|
|
///
|
|
/// Returns the block hash to use for FCU (either the new block or parent).
|
|
async fn submit_new_payload(&self, sequence: &FlashBlockCompleteSequence) -> B256 {
|
|
let payload = match P::ExecutionData::try_from(sequence) {
|
|
Ok(payload) => payload,
|
|
Err(err) => {
|
|
trace!(target: "flashblocks", %err, "Failed payload conversion, using parent hash");
|
|
return sequence.payload_base().parent_hash;
|
|
}
|
|
};
|
|
|
|
let block_number = payload.block_number();
|
|
let block_hash = payload.block_hash();
|
|
match self.engine_handle.new_payload(payload).await {
|
|
Ok(result) => {
|
|
debug!(
|
|
target: "flashblocks",
|
|
flashblock_count = sequence.count(),
|
|
block_number,
|
|
%block_hash,
|
|
?result,
|
|
"Submitted engine_newPayload",
|
|
);
|
|
|
|
if let PayloadStatusEnum::Invalid { validation_error } = result.status {
|
|
debug!(
|
|
target: "flashblocks",
|
|
block_number,
|
|
%block_hash,
|
|
%validation_error,
|
|
"Payload validation error",
|
|
);
|
|
};
|
|
}
|
|
Err(err) => {
|
|
error!(
|
|
target: "flashblocks",
|
|
%err,
|
|
block_number,
|
|
"Failed to submit new payload",
|
|
);
|
|
}
|
|
}
|
|
|
|
block_hash
|
|
}
|
|
|
|
/// Submit a forkchoice update to the engine.
|
|
async fn submit_forkchoice_update(
|
|
&self,
|
|
head_block_hash: B256,
|
|
sequence: &FlashBlockCompleteSequence,
|
|
) {
|
|
let block_number = sequence.block_number();
|
|
let safe_hash = sequence.payload_base().parent_hash;
|
|
let finalized_hash = sequence.payload_base().parent_hash;
|
|
let fcu_state = alloy_rpc_types_engine::ForkchoiceState {
|
|
head_block_hash,
|
|
safe_block_hash: safe_hash,
|
|
finalized_block_hash: finalized_hash,
|
|
};
|
|
|
|
match self
|
|
.engine_handle
|
|
.fork_choice_updated(fcu_state, None, EngineApiMessageVersion::V5)
|
|
.await
|
|
{
|
|
Ok(result) => {
|
|
debug!(
|
|
target: "flashblocks",
|
|
flashblock_count = sequence.count(),
|
|
block_number,
|
|
%head_block_hash,
|
|
%safe_hash,
|
|
%finalized_hash,
|
|
?result,
|
|
"Submitted engine_forkChoiceUpdated",
|
|
)
|
|
}
|
|
Err(err) => {
|
|
error!(
|
|
target: "flashblocks",
|
|
%err,
|
|
block_number,
|
|
%head_block_hash,
|
|
%safe_hash,
|
|
%finalized_hash,
|
|
"Failed to submit fork choice update",
|
|
);
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Runs the consensus client loop.
|
|
///
|
|
/// Continuously receives completed flashblock sequences and submits them to the execution
|
|
/// engine:
|
|
/// 1. Attempts `engine_newPayload` (only if `state_root` is available)
|
|
/// 2. Always sends `engine_forkChoiceUpdated` to drive chain forward
|
|
pub async fn run(mut self) {
|
|
loop {
|
|
let Ok(sequence) = self.sequence_receiver.recv().await else {
|
|
continue;
|
|
};
|
|
|
|
// Returns block_hash for FCU:
|
|
// - If state_root is available: submits newPayload and returns the new block's hash
|
|
// - If state_root is zero: skips newPayload and returns parent_hash (no progress yet)
|
|
let block_hash = self.submit_new_payload(&sequence).await;
|
|
|
|
self.submit_forkchoice_update(block_hash, &sequence).await;
|
|
}
|
|
}
|
|
}
|
|
|
|
impl TryFrom<&FlashBlockCompleteSequence> for OpExecutionData {
|
|
type Error = &'static str;
|
|
|
|
fn try_from(sequence: &FlashBlockCompleteSequence) -> Result<Self, Self::Error> {
|
|
let mut data = Self::from_flashblocks_unchecked(sequence);
|
|
|
|
// If execution outcome is available, use the computed state_root and block_hash.
|
|
// FlashBlockService computes these when building sequences on top of the local tip.
|
|
if let Some(execution_outcome) = sequence.execution_outcome() {
|
|
let payload = data.payload.as_v1_mut();
|
|
payload.state_root = execution_outcome.state_root;
|
|
payload.block_hash = execution_outcome.block_hash;
|
|
}
|
|
|
|
// Only proceed if we have a valid state_root (non-zero).
|
|
if data.payload.as_v1_mut().state_root == B256::ZERO {
|
|
return Err("No state_root available for payload");
|
|
}
|
|
|
|
Ok(data)
|
|
}
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use super::*;
|
|
use crate::{sequence::SequenceExecutionOutcome, test_utils::TestFlashBlockFactory};
|
|
|
|
mod op_execution_data_conversion {
|
|
use super::*;
|
|
|
|
#[test]
|
|
fn test_try_from_fails_with_zero_state_root() {
|
|
// When execution_outcome is None, state_root remains zero and conversion fails
|
|
let factory = TestFlashBlockFactory::new();
|
|
let fb0 = factory.flashblock_at(0).state_root(B256::ZERO).build();
|
|
|
|
let sequence = FlashBlockCompleteSequence::new(vec![fb0], None).unwrap();
|
|
|
|
let result = OpExecutionData::try_from(&sequence);
|
|
assert!(result.is_err());
|
|
assert_eq!(result.unwrap_err(), "No state_root available for payload");
|
|
}
|
|
|
|
#[test]
|
|
fn test_try_from_succeeds_with_execution_outcome() {
|
|
// When execution_outcome has state_root, conversion succeeds
|
|
let factory = TestFlashBlockFactory::new();
|
|
let fb0 = factory.flashblock_at(0).state_root(B256::ZERO).build();
|
|
|
|
let execution_outcome = SequenceExecutionOutcome {
|
|
block_hash: B256::random(),
|
|
state_root: B256::random(), // Non-zero
|
|
};
|
|
|
|
let sequence =
|
|
FlashBlockCompleteSequence::new(vec![fb0], Some(execution_outcome)).unwrap();
|
|
|
|
let result = OpExecutionData::try_from(&sequence);
|
|
assert!(result.is_ok());
|
|
|
|
let mut data = result.unwrap();
|
|
assert_eq!(data.payload.as_v1_mut().state_root, execution_outcome.state_root);
|
|
assert_eq!(data.payload.as_v1_mut().block_hash, execution_outcome.block_hash);
|
|
}
|
|
|
|
#[test]
|
|
fn test_try_from_succeeds_with_provided_state_root() {
|
|
// When sequencer provides non-zero state_root, conversion succeeds
|
|
let factory = TestFlashBlockFactory::new();
|
|
let provided_state_root = B256::random();
|
|
let fb0 = factory.flashblock_at(0).state_root(provided_state_root).build();
|
|
|
|
let sequence = FlashBlockCompleteSequence::new(vec![fb0], None).unwrap();
|
|
|
|
let result = OpExecutionData::try_from(&sequence);
|
|
assert!(result.is_ok());
|
|
|
|
let mut data = result.unwrap();
|
|
assert_eq!(data.payload.as_v1_mut().state_root, provided_state_root);
|
|
}
|
|
|
|
#[test]
|
|
fn test_try_from_execution_outcome_overrides_provided_state_root() {
|
|
// execution_outcome takes precedence over sequencer-provided state_root
|
|
let factory = TestFlashBlockFactory::new();
|
|
let provided_state_root = B256::random();
|
|
let fb0 = factory.flashblock_at(0).state_root(provided_state_root).build();
|
|
|
|
let execution_outcome = SequenceExecutionOutcome {
|
|
block_hash: B256::random(),
|
|
state_root: B256::random(), // Different from provided
|
|
};
|
|
|
|
let sequence =
|
|
FlashBlockCompleteSequence::new(vec![fb0], Some(execution_outcome)).unwrap();
|
|
|
|
let result = OpExecutionData::try_from(&sequence);
|
|
assert!(result.is_ok());
|
|
|
|
let mut data = result.unwrap();
|
|
// Should use execution_outcome, not the provided state_root
|
|
assert_eq!(data.payload.as_v1_mut().state_root, execution_outcome.state_root);
|
|
assert_ne!(data.payload.as_v1_mut().state_root, provided_state_root);
|
|
}
|
|
|
|
#[test]
|
|
fn test_try_from_with_multiple_flashblocks() {
|
|
// Test conversion with sequence of multiple flashblocks
|
|
let factory = TestFlashBlockFactory::new();
|
|
let fb0 = factory.flashblock_at(0).state_root(B256::ZERO).build();
|
|
let fb1 = factory.flashblock_after(&fb0).state_root(B256::ZERO).build();
|
|
let fb2 = factory.flashblock_after(&fb1).state_root(B256::ZERO).build();
|
|
|
|
let execution_outcome =
|
|
SequenceExecutionOutcome { block_hash: B256::random(), state_root: B256::random() };
|
|
|
|
let sequence =
|
|
FlashBlockCompleteSequence::new(vec![fb0, fb1, fb2], Some(execution_outcome))
|
|
.unwrap();
|
|
|
|
let result = OpExecutionData::try_from(&sequence);
|
|
assert!(result.is_ok());
|
|
|
|
let mut data = result.unwrap();
|
|
assert_eq!(data.payload.as_v1_mut().state_root, execution_outcome.state_root);
|
|
assert_eq!(data.payload.as_v1_mut().block_hash, execution_outcome.block_hash);
|
|
}
|
|
}
|
|
|
|
mod consensus_client_creation {
|
|
use super::*;
|
|
use tokio::sync::broadcast;
|
|
|
|
#[test]
|
|
fn test_new_creates_client() {
|
|
let (engine_tx, _) = tokio::sync::mpsc::unbounded_channel();
|
|
let engine_handle = ConsensusEngineHandle::<OpPayloadTypes>::new(engine_tx);
|
|
|
|
let (_, sequence_rx) = broadcast::channel(1);
|
|
|
|
let result = FlashBlockConsensusClient::new(engine_handle, sequence_rx);
|
|
assert!(result.is_ok());
|
|
}
|
|
}
|
|
|
|
mod submit_new_payload_behavior {
|
|
use super::*;
|
|
|
|
#[test]
|
|
fn test_submit_new_payload_returns_parent_hash_when_no_state_root() {
|
|
// When conversion fails (no state_root), should return parent_hash
|
|
let factory = TestFlashBlockFactory::new();
|
|
let fb0 = factory.flashblock_at(0).state_root(B256::ZERO).build();
|
|
let parent_hash = fb0.base.as_ref().unwrap().parent_hash;
|
|
|
|
let sequence = FlashBlockCompleteSequence::new(vec![fb0], None).unwrap();
|
|
|
|
// Verify conversion would fail
|
|
let conversion_result = OpExecutionData::try_from(&sequence);
|
|
assert!(conversion_result.is_err());
|
|
|
|
// In the actual run loop, submit_new_payload would return parent_hash
|
|
assert_eq!(sequence.payload_base().parent_hash, parent_hash);
|
|
}
|
|
|
|
#[test]
|
|
fn test_submit_new_payload_returns_block_hash_when_state_root_available() {
|
|
// When conversion succeeds, should return the new block's hash
|
|
let factory = TestFlashBlockFactory::new();
|
|
let fb0 = factory.flashblock_at(0).state_root(B256::ZERO).build();
|
|
|
|
let execution_outcome =
|
|
SequenceExecutionOutcome { block_hash: B256::random(), state_root: B256::random() };
|
|
|
|
let sequence =
|
|
FlashBlockCompleteSequence::new(vec![fb0], Some(execution_outcome)).unwrap();
|
|
|
|
// Verify conversion succeeds
|
|
let conversion_result = OpExecutionData::try_from(&sequence);
|
|
assert!(conversion_result.is_ok());
|
|
|
|
let mut data = conversion_result.unwrap();
|
|
assert_eq!(data.payload.as_v1_mut().block_hash, execution_outcome.block_hash);
|
|
}
|
|
}
|
|
|
|
mod forkchoice_update_behavior {
|
|
use super::*;
|
|
|
|
#[test]
|
|
fn test_forkchoice_state_uses_parent_hash_for_safe_and_finalized() {
|
|
// Both safe_hash and finalized_hash should be set to parent_hash
|
|
let factory = TestFlashBlockFactory::new();
|
|
let fb0 = factory.flashblock_at(0).build();
|
|
let parent_hash = fb0.base.as_ref().unwrap().parent_hash;
|
|
|
|
let sequence = FlashBlockCompleteSequence::new(vec![fb0], None).unwrap();
|
|
|
|
// Verify the expected forkchoice state
|
|
assert_eq!(sequence.payload_base().parent_hash, parent_hash);
|
|
}
|
|
|
|
#[test]
|
|
fn test_forkchoice_update_with_new_block_hash() {
|
|
// When newPayload succeeds, FCU should use the new block's hash as head
|
|
let factory = TestFlashBlockFactory::new();
|
|
let fb0 = factory.flashblock_at(0).state_root(B256::ZERO).build();
|
|
|
|
let execution_outcome =
|
|
SequenceExecutionOutcome { block_hash: B256::random(), state_root: B256::random() };
|
|
|
|
let sequence =
|
|
FlashBlockCompleteSequence::new(vec![fb0], Some(execution_outcome)).unwrap();
|
|
|
|
// The head_block_hash for FCU would be execution_outcome.block_hash
|
|
assert_eq!(
|
|
sequence.execution_outcome().unwrap().block_hash,
|
|
execution_outcome.block_hash
|
|
);
|
|
}
|
|
|
|
#[test]
|
|
fn test_forkchoice_update_with_parent_hash_when_no_state_root() {
|
|
// When newPayload is skipped (no state_root), FCU should use parent_hash as head
|
|
let factory = TestFlashBlockFactory::new();
|
|
let fb0 = factory.flashblock_at(0).state_root(B256::ZERO).build();
|
|
let parent_hash = fb0.base.as_ref().unwrap().parent_hash;
|
|
|
|
let sequence = FlashBlockCompleteSequence::new(vec![fb0], None).unwrap();
|
|
|
|
// The head_block_hash for FCU would be parent_hash (fallback)
|
|
assert_eq!(sequence.payload_base().parent_hash, parent_hash);
|
|
}
|
|
}
|
|
|
|
mod run_loop_logic {
|
|
use super::*;
|
|
|
|
#[test]
|
|
fn test_run_loop_processes_sequence_with_state_root() {
|
|
// Scenario: Sequence with state_root should trigger both newPayload and FCU
|
|
let factory = TestFlashBlockFactory::new();
|
|
let fb0 = factory.flashblock_at(0).state_root(B256::ZERO).build();
|
|
|
|
let execution_outcome =
|
|
SequenceExecutionOutcome { block_hash: B256::random(), state_root: B256::random() };
|
|
|
|
let sequence =
|
|
FlashBlockCompleteSequence::new(vec![fb0], Some(execution_outcome)).unwrap();
|
|
|
|
// Verify sequence is ready for newPayload
|
|
let conversion = OpExecutionData::try_from(&sequence);
|
|
assert!(conversion.is_ok());
|
|
}
|
|
|
|
#[test]
|
|
fn test_run_loop_processes_sequence_without_state_root() {
|
|
// Scenario: Sequence without state_root should skip newPayload but still do FCU
|
|
let factory = TestFlashBlockFactory::new();
|
|
let fb0 = factory.flashblock_at(0).state_root(B256::ZERO).build();
|
|
|
|
let sequence = FlashBlockCompleteSequence::new(vec![fb0], None).unwrap();
|
|
|
|
// Verify sequence cannot be converted (newPayload will be skipped)
|
|
let conversion = OpExecutionData::try_from(&sequence);
|
|
assert!(conversion.is_err());
|
|
|
|
// But FCU should still happen with parent_hash
|
|
assert!(sequence.payload_base().parent_hash != B256::ZERO);
|
|
}
|
|
|
|
#[test]
|
|
fn test_run_loop_handles_multiple_sequences() {
|
|
// Multiple sequences should be processed independently
|
|
let factory = TestFlashBlockFactory::new();
|
|
|
|
// Sequence 1: With state_root
|
|
let fb0_seq1 = factory.flashblock_at(0).state_root(B256::ZERO).build();
|
|
let outcome1 =
|
|
SequenceExecutionOutcome { block_hash: B256::random(), state_root: B256::random() };
|
|
let seq1 =
|
|
FlashBlockCompleteSequence::new(vec![fb0_seq1.clone()], Some(outcome1)).unwrap();
|
|
|
|
// Sequence 2: Without state_root (for next block)
|
|
let fb0_seq2 = factory.flashblock_for_next_block(&fb0_seq1).build();
|
|
let seq2 = FlashBlockCompleteSequence::new(vec![fb0_seq2], None).unwrap();
|
|
|
|
// Both should be valid sequences
|
|
assert_eq!(seq1.block_number(), 100);
|
|
assert_eq!(seq2.block_number(), 101);
|
|
|
|
// seq1 can be converted
|
|
assert!(OpExecutionData::try_from(&seq1).is_ok());
|
|
// seq2 cannot be converted
|
|
assert!(OpExecutionData::try_from(&seq2).is_err());
|
|
}
|
|
}
|
|
}
|