mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-01-09 23:38:10 -05:00
feat(op-reth): initial setup FlashBlockConsensusClient engine sidecar (#18443)
This commit is contained in:
3
Cargo.lock
generated
3
Cargo.lock
generated
@@ -9304,13 +9304,16 @@ dependencies = [
|
||||
"reth-errors",
|
||||
"reth-evm",
|
||||
"reth-execution-types",
|
||||
"reth-node-api",
|
||||
"reth-optimism-evm",
|
||||
"reth-optimism-payload-builder",
|
||||
"reth-optimism-primitives",
|
||||
"reth-primitives-traits",
|
||||
"reth-revm",
|
||||
"reth-rpc-eth-types",
|
||||
"reth-storage-api",
|
||||
"reth-tasks",
|
||||
"ringbuffer",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"test-case",
|
||||
|
||||
@@ -19,9 +19,11 @@ reth-primitives-traits = { workspace = true, features = ["serde"] }
|
||||
reth-execution-types = { workspace = true, features = ["serde"] }
|
||||
reth-evm.workspace = true
|
||||
reth-revm.workspace = true
|
||||
reth-optimism-payload-builder.workspace = true
|
||||
reth-rpc-eth-types.workspace = true
|
||||
reth-errors.workspace = true
|
||||
reth-storage-api.workspace = true
|
||||
reth-node-api.workspace = true
|
||||
reth-tasks.workspace = true
|
||||
|
||||
# alloy
|
||||
@@ -29,6 +31,7 @@ alloy-eips = { workspace = true, features = ["serde"] }
|
||||
alloy-serde.workspace = true
|
||||
alloy-primitives = { workspace = true, features = ["serde"] }
|
||||
alloy-rpc-types-engine = { workspace = true, features = ["serde"] }
|
||||
alloy-consensus.workspace = true
|
||||
|
||||
# io
|
||||
tokio.workspace = true
|
||||
@@ -45,6 +48,8 @@ tracing.workspace = true
|
||||
# errors
|
||||
eyre.workspace = true
|
||||
|
||||
ringbuffer.workspace = true
|
||||
|
||||
[dev-dependencies]
|
||||
test-case.workspace = true
|
||||
alloy-consensus.workspace = true
|
||||
|
||||
81
crates/optimism/flashblocks/src/consensus.rs
Normal file
81
crates/optimism/flashblocks/src/consensus.rs
Normal file
@@ -0,0 +1,81 @@
|
||||
use crate::FlashBlockCompleteSequenceRx;
|
||||
use alloy_primitives::B256;
|
||||
use reth_node_api::{ConsensusEngineHandle, EngineApiMessageVersion};
|
||||
use reth_optimism_payload_builder::OpPayloadTypes;
|
||||
use ringbuffer::{AllocRingBuffer, RingBuffer};
|
||||
use tracing::warn;
|
||||
|
||||
/// Consensus client that sends FCUs and new payloads using blocks from a [`FlashBlockService`]
|
||||
///
|
||||
/// [`FlashBlockService`]: crate::FlashBlockService
|
||||
#[derive(Debug)]
|
||||
pub struct FlashBlockConsensusClient {
|
||||
/// Handle to execution client.
|
||||
engine_handle: ConsensusEngineHandle<OpPayloadTypes>,
|
||||
sequence_receiver: FlashBlockCompleteSequenceRx,
|
||||
}
|
||||
|
||||
impl FlashBlockConsensusClient {
|
||||
/// Create a new `FlashBlockConsensusClient` with the given Op engine and sequence receiver.
|
||||
pub const fn new(
|
||||
engine_handle: ConsensusEngineHandle<OpPayloadTypes>,
|
||||
sequence_receiver: FlashBlockCompleteSequenceRx,
|
||||
) -> eyre::Result<Self> {
|
||||
Ok(Self { engine_handle, sequence_receiver })
|
||||
}
|
||||
|
||||
/// Get previous block hash using previous block hash buffer. If it isn't available (buffer
|
||||
/// started more recently than `offset`), return default zero hash
|
||||
fn get_previous_block_hash(
|
||||
&self,
|
||||
previous_block_hashes: &AllocRingBuffer<B256>,
|
||||
offset: usize,
|
||||
) -> B256 {
|
||||
*previous_block_hashes
|
||||
.len()
|
||||
.checked_sub(offset)
|
||||
.and_then(|index| previous_block_hashes.get(index))
|
||||
.unwrap_or_default()
|
||||
}
|
||||
|
||||
/// Spawn the client to start sending FCUs and new payloads by periodically fetching recent
|
||||
/// blocks.
|
||||
pub async fn run(mut self) {
|
||||
let mut previous_block_hashes = AllocRingBuffer::new(64);
|
||||
|
||||
loop {
|
||||
match self.sequence_receiver.recv().await {
|
||||
Ok(sequence) => {
|
||||
let block_hash = sequence.payload_base().parent_hash;
|
||||
previous_block_hashes.push(block_hash);
|
||||
|
||||
// Load previous block hashes. We're using (head - 32) and (head - 64) as the
|
||||
// safe and finalized block hashes.
|
||||
let safe_block_hash = self.get_previous_block_hash(&previous_block_hashes, 32);
|
||||
let finalized_block_hash =
|
||||
self.get_previous_block_hash(&previous_block_hashes, 64);
|
||||
|
||||
let state = alloy_rpc_types_engine::ForkchoiceState {
|
||||
head_block_hash: block_hash,
|
||||
safe_block_hash,
|
||||
finalized_block_hash,
|
||||
};
|
||||
|
||||
// Send FCU
|
||||
let _ = self
|
||||
.engine_handle
|
||||
.fork_choice_updated(state, None, EngineApiMessageVersion::V3)
|
||||
.await;
|
||||
}
|
||||
Err(err) => {
|
||||
warn!(
|
||||
target: "consensus::flashblock-client",
|
||||
%err,
|
||||
"error while fetching flashblock completed sequence"
|
||||
);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -7,6 +7,8 @@ use reth_rpc_eth_types::PendingBlock;
|
||||
pub use service::FlashBlockService;
|
||||
pub use ws::{WsConnect, WsFlashBlockStream};
|
||||
|
||||
mod consensus;
|
||||
pub use consensus::FlashBlockConsensusClient;
|
||||
mod payload;
|
||||
mod sequence;
|
||||
pub use sequence::FlashBlockCompleteSequence;
|
||||
|
||||
@@ -1,9 +1,9 @@
|
||||
use crate::{ExecutionPayloadBaseV1, FlashBlock};
|
||||
use crate::{ExecutionPayloadBaseV1, FlashBlock, FlashBlockCompleteSequenceRx};
|
||||
use alloy_eips::eip2718::WithEncoded;
|
||||
use core::mem;
|
||||
use eyre::{bail, OptionExt};
|
||||
use reth_primitives_traits::{Recovered, SignedTransaction};
|
||||
use std::collections::BTreeMap;
|
||||
use std::{collections::BTreeMap, ops::Deref};
|
||||
use tokio::sync::broadcast;
|
||||
use tracing::{debug, trace, warn};
|
||||
|
||||
@@ -34,9 +34,7 @@ where
|
||||
}
|
||||
|
||||
/// Gets a subscriber to the flashblock sequences produced.
|
||||
pub(crate) fn subscribe_block_sequence(
|
||||
&self,
|
||||
) -> broadcast::Receiver<FlashBlockCompleteSequence> {
|
||||
pub(crate) fn subscribe_block_sequence(&self) -> FlashBlockCompleteSequenceRx {
|
||||
self.block_broadcaster.subscribe()
|
||||
}
|
||||
|
||||
@@ -168,6 +166,19 @@ impl FlashBlockCompleteSequence {
|
||||
pub const fn count(&self) -> usize {
|
||||
self.0.len()
|
||||
}
|
||||
|
||||
/// Returns the last flashblock in the sequence.
|
||||
pub fn last(&self) -> &FlashBlock {
|
||||
self.0.last().unwrap()
|
||||
}
|
||||
}
|
||||
|
||||
impl Deref for FlashBlockCompleteSequence {
|
||||
type Target = Vec<FlashBlock>;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.0
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> TryFrom<FlashBlockPendingSequence<T>> for FlashBlockCompleteSequence {
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
use crate::{
|
||||
sequence::FlashBlockPendingSequence,
|
||||
worker::{BuildArgs, FlashBlockBuilder},
|
||||
ExecutionPayloadBaseV1, FlashBlock, FlashBlockCompleteSequence,
|
||||
ExecutionPayloadBaseV1, FlashBlock, FlashBlockCompleteSequenceRx,
|
||||
};
|
||||
use alloy_eips::eip2718::WithEncoded;
|
||||
use alloy_primitives::B256;
|
||||
@@ -20,10 +20,7 @@ use std::{
|
||||
task::{ready, Context, Poll},
|
||||
time::Instant,
|
||||
};
|
||||
use tokio::{
|
||||
pin,
|
||||
sync::{broadcast, oneshot},
|
||||
};
|
||||
use tokio::{pin, sync::oneshot};
|
||||
use tracing::{debug, trace, warn};
|
||||
|
||||
/// The `FlashBlockService` maintains an in-memory [`PendingBlock`] built out of a sequence of
|
||||
@@ -84,7 +81,7 @@ where
|
||||
}
|
||||
|
||||
/// Returns a subscriber to the flashblock sequence.
|
||||
pub fn subscribe_block_sequence(&self) -> broadcast::Receiver<FlashBlockCompleteSequence> {
|
||||
pub fn subscribe_block_sequence(&self) -> FlashBlockCompleteSequenceRx {
|
||||
self.blocks.subscribe_block_sequence()
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user