Compare commits

...

3 Commits

Author SHA1 Message Date
Matthias Seitz
16b7dffe91 chore: add changelog entry for debug consensus client refactor
Amp-Thread-ID: https://ampcode.com/threads/T-019c7791-6737-7239-abbc-56636dd117cc
Co-authored-by: Amp <amp@ampcode.com>
2026-02-19 21:23:49 +01:00
Matthias Seitz
f6bb458716 fix: resolve doc link warnings for RpcBlockProvider and EtherscanBlockProvider
Amp-Thread-ID: https://ampcode.com/threads/T-019c7737-2c16-72a3-884c-ae406c1d98d0
Co-authored-by: Amp <amp@ampcode.com>
2026-02-19 21:18:25 +01:00
Matthias Seitz
6479de0c90 refactor: decouple forkchoice strategy from debug consensus client
Split the old BlockProvider trait into two layers:
- RawBlockProvider: low-level block sourcing (subscribe + get_block)
- BlockProvider: high-level trait emitting NewBlock events (block + ForkchoiceState)

Introduce ForkchoiceProvider<P> with configurable ForkchoiceMode enum:
- Offset { safe, finalized }: derives safe/finalized from block number offsets
  using a ring buffer that stores BlockNumHash for gap-safe lookups
- Finalized: buffers blocks and only yields them once finalized, using
  FinalizedBlockBuffer which walks parent_hash chains and prunes sidechains
- Tag: fetches safe/finalized blocks by their RPC tags

Add FinalizedBlockBuffer with parent-hash-chain-based draining that correctly
handles forks — only the canonical chain is drained, sidechain blocks at or
below the finalized height are pruned.

Make ForkchoiceMode configurable via DebugNodeLauncherFuture::with_forkchoice_mode().

Additional improvements:
- RawBlockProvider::get_block takes BlockNumberOrTag instead of u64
- RPC provider retries on connection failure instead of exiting
- Document that Finalized/Tag modes require RPC provider (not Etherscan)

Amp-Thread-ID: https://ampcode.com/threads/T-019c7737-2c16-72a3-884c-ae406c1d98d0
Co-authored-by: Amp <amp@ampcode.com>
2026-02-19 21:12:17 +01:00
8 changed files with 587 additions and 151 deletions

View File

@@ -0,0 +1,6 @@
---
reth-consensus-debug-client: minor
reth-node-builder: minor
---
Refactored the debug consensus client to decouple forkchoice strategy from block providers. Introduced `RawBlockProvider` trait, `ForkchoiceProvider` wrapper, and `ForkchoiceMode` enum supporting `Offset`, `Finalized`, and `Tag` strategies. Added `FinalizedBlockBuffer` for buffering blocks until finalized, and updated `EtherscanBlockProvider` and `RpcBlockProvider` to implement `RawBlockProvider` with `BlockNumberOrTag`-based `get_block`.

1
Cargo.lock generated
View File

@@ -7952,6 +7952,7 @@ dependencies = [
"eyre",
"futures",
"reqwest",
"reth-ethereum-primitives",
"reth-node-api",
"reth-primitives-traits",
"reth-tracing",

View File

@@ -35,3 +35,7 @@ tokio = { workspace = true, features = ["time"] }
serde_json.workspace = true
ringbuffer.workspace = true
[dev-dependencies]
reth-ethereum-primitives.workspace = true
reth-primitives-traits = { workspace = true, features = ["test-utils"] }

View File

@@ -1,60 +1,18 @@
use alloy_consensus::Sealable;
use alloy_consensus::{BlockHeader, Sealable};
use alloy_eips::{merge::EPOCH_SLOTS, BlockNumHash, BlockNumberOrTag};
use alloy_primitives::B256;
use alloy_rpc_types_engine::ForkchoiceState;
use reth_node_api::{
BuiltPayload, ConsensusEngineHandle, EngineApiMessageVersion, ExecutionPayload, NodePrimitives,
PayloadTypes,
BuiltPayload, ConsensusEngineHandle, EngineApiMessageVersion, NodePrimitives, PayloadTypes,
};
use reth_primitives_traits::{Block, SealedBlock};
use reth_tracing::tracing::warn;
use ringbuffer::{AllocRingBuffer, RingBuffer};
use std::future::Future;
use std::{collections::HashMap, future::Future};
use tokio::sync::mpsc;
/// Supplies consensus client with new blocks sent in `tx` and a callback to find specific blocks
/// by number to fetch past finalized and safe blocks.
#[auto_impl::auto_impl(&, Arc, Box)]
pub trait BlockProvider: Send + Sync + 'static {
/// The block type.
type Block: Block;
/// Runs a block provider to send new blocks to the given sender.
///
/// Note: This is expected to be spawned in a separate task, and as such it should ignore
/// errors.
fn subscribe_blocks(&self, tx: mpsc::Sender<Self::Block>) -> impl Future<Output = ()> + Send;
/// Get a past block by number.
fn get_block(
&self,
block_number: u64,
) -> impl Future<Output = eyre::Result<Self::Block>> + Send;
/// Get previous block hash using previous block hash buffer. If it isn't available (buffer
/// started more recently than `offset`), fetch it using `get_block`.
fn get_or_fetch_previous_block(
&self,
previous_block_hashes: &AllocRingBuffer<B256>,
current_block_number: u64,
offset: usize,
) -> impl Future<Output = eyre::Result<B256>> + Send {
async move {
if let Some(hash) = get_hash_at_offset(previous_block_hashes, offset) {
return Ok(hash);
}
// Return zero hash if the chain isn't long enough to have the block at the offset.
let previous_block_number = match current_block_number.checked_sub(offset as u64) {
Some(number) => number,
None => return Ok(B256::default()),
};
let block = self.get_block(previous_block_number).await?;
Ok(block.header().hash_slow())
}
}
}
/// Debug consensus client that sends FCUs and new payloads using recent blocks from an external
/// provider like Etherscan or an RPC endpoint.
/// Debug consensus client that sends FCUs and new payloads using recent blocks from a
/// [`BlockProvider`].
#[derive(Debug)]
pub struct DebugConsensusClient<P: BlockProvider, T: PayloadTypes> {
/// Handle to execution client.
@@ -79,71 +37,365 @@ where
/// Spawn the client to start sending FCUs and new payloads by periodically fetching recent
/// blocks.
pub async fn run(self) {
let mut previous_block_hashes = AllocRingBuffer::new(65);
let mut block_stream = {
let (tx, rx) = mpsc::channel::<P::Block>(64);
let block_provider = self.block_provider.clone();
tokio::spawn(async move {
block_provider.subscribe_blocks(tx).await;
});
rx
};
let (tx, mut rx) = mpsc::channel(64);
let block_provider = self.block_provider.clone();
tokio::spawn(async move {
block_provider.subscribe(tx).await;
});
while let Some(block) = block_stream.recv().await {
let payload = T::block_to_payload(SealedBlock::new_unhashed(block));
let block_hash = payload.block_hash();
let block_number = payload.block_number();
previous_block_hashes.enqueue(block_hash);
// Send new events to execution client
while let Some(new_block) = rx.recv().await {
let payload = T::block_to_payload(SealedBlock::new_unhashed(new_block.block));
let _ = self.engine_handle.new_payload(payload).await;
// Load previous block hashes. We're using (head - 32) and (head - 64) as the safe and
// finalized block hashes.
let safe_block_hash = self.block_provider.get_or_fetch_previous_block(
&previous_block_hashes,
block_number,
32,
);
let finalized_block_hash = self.block_provider.get_or_fetch_previous_block(
&previous_block_hashes,
block_number,
64,
);
let (safe_block_hash, finalized_block_hash) =
tokio::join!(safe_block_hash, finalized_block_hash);
let (safe_block_hash, finalized_block_hash) = match (
safe_block_hash,
finalized_block_hash,
) {
(Ok(safe_block_hash), Ok(finalized_block_hash)) => {
(safe_block_hash, finalized_block_hash)
}
(safe_block_hash, finalized_block_hash) => {
warn!(target: "consensus::debug-client", ?safe_block_hash, ?finalized_block_hash, "failed to fetch safe or finalized hash from etherscan");
continue;
}
};
let state = alloy_rpc_types_engine::ForkchoiceState {
head_block_hash: block_hash,
safe_block_hash,
finalized_block_hash,
};
let _ = self
.engine_handle
.fork_choice_updated(state, None, EngineApiMessageVersion::V3)
.fork_choice_updated(new_block.forkchoice_state, None, EngineApiMessageVersion::V3)
.await;
}
}
}
/// Looks up a block hash from the ring buffer at the given offset from the most recent entry.
/// A new block event with its associated forkchoice state.
#[derive(Debug, Clone)]
pub struct NewBlock<B> {
/// The new block.
pub block: B,
/// The forkchoice state to submit alongside this block.
pub forkchoice_state: ForkchoiceState,
}
/// Supplies the consensus client with new blocks and their forkchoice state.
#[auto_impl::auto_impl(&, Arc, Box)]
pub trait BlockProvider: Send + Sync + 'static {
/// The block type.
type Block: Block;
/// Streams new blocks with their forkchoice state.
fn subscribe(&self, tx: mpsc::Sender<NewBlock<Self::Block>>)
-> impl Future<Output = ()> + Send;
}
/// Provides raw blocks without forkchoice information.
#[auto_impl::auto_impl(&, Arc, Box)]
pub trait RawBlockProvider: Send + Sync + 'static {
/// The block type.
type Block: Block;
/// Streams new blocks.
fn subscribe_blocks(&self, tx: mpsc::Sender<Self::Block>) -> impl Future<Output = ()> + Send;
/// Gets a block by number or tag (e.g. `Safe`, `Finalized`, `Number(u64)`).
fn get_block(
&self,
block: BlockNumberOrTag,
) -> impl Future<Output = eyre::Result<Self::Block>> + Send;
}
/// Determines how the [`ForkchoiceProvider`] derives safe and finalized block hashes.
#[derive(Debug, Clone, Copy)]
pub enum ForkchoiceMode {
/// Derives safe and finalized hashes from fixed offsets relative to the head.
///
/// Uses a ring buffer to look up past block hashes, falling back to
/// [`RawBlockProvider::get_block`] for blocks outside the buffer.
Offset {
/// Offset from head for the safe block hash.
safe: usize,
/// Offset from head for the finalized block hash.
finalized: usize,
},
/// Buffers incoming blocks and only yields them once they are finalized.
///
/// On each new block, the current finalized block number is fetched via
/// [`BlockNumberOrTag::Finalized`]. All buffered blocks up to that number are then
/// emitted with head=safe=finalized set to the block's own hash.
///
/// **Note:** Requires a provider that supports [`BlockNumberOrTag::Finalized`] lookups
/// (e.g., [`crate::RpcBlockProvider`]). Not supported by [`crate::EtherscanBlockProvider`].
Finalized,
/// Fetches the safe and finalized blocks by their respective tags
/// ([`BlockNumberOrTag::Safe`] and [`BlockNumberOrTag::Finalized`]) from the provider.
///
/// **Note:** Requires a provider that supports tag-based lookups
/// (e.g., [`crate::RpcBlockProvider`]). Not supported by [`crate::EtherscanBlockProvider`].
Tag,
}
impl ForkchoiceMode {
/// The default offset mode: safe = head - 32, finalized = head - 64.
pub const DEFAULT_OFFSET: Self =
Self::Offset { safe: EPOCH_SLOTS as usize, finalized: (EPOCH_SLOTS * 2) as usize };
}
impl Default for ForkchoiceMode {
fn default() -> Self {
Self::DEFAULT_OFFSET
}
}
/// Wraps a [`RawBlockProvider`] and derives forkchoice state for each block
/// according to the configured [`ForkchoiceMode`].
#[derive(Debug, Clone)]
pub struct ForkchoiceProvider<P> {
/// Inner raw block provider.
inner: P,
/// How to derive safe and finalized hashes.
mode: ForkchoiceMode,
}
impl<P> ForkchoiceProvider<P> {
/// Creates a new provider with the default offset mode (safe=32, finalized=64).
pub fn new(inner: P) -> Self {
Self { inner, mode: ForkchoiceMode::default() }
}
/// Creates a new provider with the given mode.
pub const fn with_mode(inner: P, mode: ForkchoiceMode) -> Self {
Self { inner, mode }
}
}
impl<P> BlockProvider for ForkchoiceProvider<P>
where
P: RawBlockProvider + Clone,
{
type Block = P::Block;
async fn subscribe(&self, tx: mpsc::Sender<NewBlock<Self::Block>>) {
let (block_tx, mut block_rx) = mpsc::channel::<P::Block>(64);
let inner = self.inner.clone();
tokio::spawn(async move {
inner.subscribe_blocks(block_tx).await;
});
match self.mode {
ForkchoiceMode::Offset { safe: safe_offset, finalized: finalized_offset } => {
let mut previous_block_hashes =
AllocRingBuffer::new(safe_offset.max(finalized_offset) + 1);
while let Some(block) = block_rx.recv().await {
let block_hash = block.header().hash_slow();
let block_number = block.header().number();
previous_block_hashes.enqueue(BlockNumHash::new(block_number, block_hash));
let safe_block_hash = get_or_fetch_hash(
&self.inner,
&previous_block_hashes,
block_number,
safe_offset,
);
let finalized_block_hash = get_or_fetch_hash(
&self.inner,
&previous_block_hashes,
block_number,
finalized_offset,
);
let (safe_block_hash, finalized_block_hash) =
tokio::join!(safe_block_hash, finalized_block_hash);
let (safe_block_hash, finalized_block_hash) = match (
safe_block_hash,
finalized_block_hash,
) {
(Ok(safe), Ok(finalized)) => (safe, finalized),
(safe_block_hash, finalized_block_hash) => {
warn!(target: "consensus::debug-client", ?safe_block_hash, ?finalized_block_hash, "failed to fetch safe or finalized hash");
continue;
}
};
let new_block = NewBlock {
block,
forkchoice_state: ForkchoiceState {
head_block_hash: block_hash,
safe_block_hash,
finalized_block_hash,
},
};
if tx.send(new_block).await.is_err() {
break;
}
}
}
ForkchoiceMode::Finalized => {
let mut buffer = FinalizedBlockBuffer::new();
while let Some(block) = block_rx.recv().await {
buffer.insert(block);
let finalized = match self.inner.get_block(BlockNumberOrTag::Finalized).await {
Ok(b) => BlockNumHash::new(b.header().number(), b.header().hash_slow()),
Err(err) => {
warn!(target: "consensus::debug-client", %err, "failed to fetch finalized block");
continue;
}
};
let finalized_blocks = buffer.drain_finalized(finalized);
for finalized_block in finalized_blocks {
let hash = finalized_block.header().hash_slow();
let new_block = NewBlock {
block: finalized_block,
forkchoice_state: ForkchoiceState {
head_block_hash: hash,
safe_block_hash: hash,
finalized_block_hash: hash,
},
};
if tx.send(new_block).await.is_err() {
return;
}
}
}
}
ForkchoiceMode::Tag => {
while let Some(block) = block_rx.recv().await {
let block_hash = block.header().hash_slow();
let safe = self.inner.get_block(BlockNumberOrTag::Safe);
let finalized = self.inner.get_block(BlockNumberOrTag::Finalized);
let (safe, finalized) = tokio::join!(safe, finalized);
let (safe_block_hash, finalized_block_hash) = match (safe, finalized) {
(Ok(safe), Ok(finalized)) => {
(safe.header().hash_slow(), finalized.header().hash_slow())
}
(safe, finalized) => {
warn!(target: "consensus::debug-client", ?safe, ?finalized, "failed to fetch safe or finalized block by tag");
continue;
}
};
let new_block = NewBlock {
block,
forkchoice_state: ForkchoiceState {
head_block_hash: block_hash,
safe_block_hash,
finalized_block_hash,
},
};
if tx.send(new_block).await.is_err() {
break;
}
}
}
}
}
}
/// Fetches a block hash at the given offset, first checking the ring buffer, then falling back
/// to the raw block provider.
async fn get_or_fetch_hash<P: RawBlockProvider>(
provider: &P,
buffer: &AllocRingBuffer<BlockNumHash>,
current_block_number: u64,
offset: usize,
) -> eyre::Result<B256> {
let target_number = match current_block_number.checked_sub(offset as u64) {
Some(number) => number,
None => return Ok(B256::default()),
};
if let Some(hash) = find_hash_by_number(buffer, target_number) {
return Ok(hash);
}
let block = provider.get_block(BlockNumberOrTag::Number(target_number)).await?;
Ok(block.header().hash_slow())
}
/// Finds a block hash by block number in the ring buffer, searching from newest to oldest.
fn find_hash_by_number(buffer: &AllocRingBuffer<BlockNumHash>, target_number: u64) -> Option<B256> {
buffer.iter().rev().find(|entry| entry.number == target_number).map(|entry| entry.hash)
}
/// Buffers blocks and releases them once they become finalized.
///
/// Returns `None` if the buffer doesn't have enough entries to satisfy the offset.
fn get_hash_at_offset(buffer: &AllocRingBuffer<B256>, offset: usize) -> Option<B256> {
buffer.len().checked_sub(offset + 1).and_then(|index| buffer.get(index).copied())
/// Blocks are indexed by their hash. When [`drain_finalized`](Self::drain_finalized) is called,
/// it walks the `parent_hash` chain from the finalized block backwards, collecting all buffered
/// blocks that belong to the finalized chain. Any remaining blocks at or below the finalized
/// height are pruned as sidechain blocks.
#[derive(Debug)]
pub struct FinalizedBlockBuffer<B> {
blocks: HashMap<B256, B>,
/// The last finalized block's number and hash.
last_finalized: Option<BlockNumHash>,
}
impl<B: Block> FinalizedBlockBuffer<B> {
/// Creates an empty buffer.
pub fn new() -> Self {
Self { blocks: HashMap::new(), last_finalized: None }
}
/// Inserts a block into the buffer.
pub fn insert(&mut self, block: B) {
let hash = block.header().hash_slow();
self.blocks.insert(hash, block);
}
/// Walks the parent hash chain from `finalized` back to the last known finalized block,
/// removes all matching blocks from the buffer, and returns them in ascending order
/// (oldest first).
///
/// Any remaining buffered blocks at or below the finalized height are pruned as sidechain
/// blocks that can no longer become canonical.
///
/// Returns an empty vec if the finalized hash is not in the buffer.
pub fn drain_finalized(&mut self, finalized: BlockNumHash) -> Vec<B> {
if !self.blocks.contains_key(&finalized.hash) {
return Vec::new();
}
// Walk backwards from the finalized hash, collecting hashes of blocks to drain.
let mut chain = Vec::new();
let mut current = finalized.hash;
while self.blocks.contains_key(&current) {
chain.push(current);
// Stop if we've reached the previous finalized boundary.
if self.last_finalized.as_ref().is_some_and(|last| last.hash == current) {
break;
}
let parent = self.blocks[&current].header().parent_hash();
current = parent;
}
self.last_finalized = Some(finalized);
// Reverse so we return oldest-first, then remove from map.
chain.reverse();
let result: Vec<B> =
chain.into_iter().filter_map(|hash| self.blocks.remove(&hash)).collect();
// Prune any sidechain blocks at or below the finalized height.
self.blocks.retain(|_, b| b.header().number() > finalized.number);
result
}
/// Returns the number of buffered blocks.
pub fn len(&self) -> usize {
self.blocks.len()
}
/// Returns `true` if the buffer is empty.
pub fn is_empty(&self) -> bool {
self.blocks.is_empty()
}
}
impl<B: Block> Default for FinalizedBlockBuffer<B> {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
@@ -151,47 +403,172 @@ mod tests {
use super::*;
#[test]
fn test_get_hash_at_offset() {
let mut buffer: AllocRingBuffer<B256> = AllocRingBuffer::new(65);
fn test_find_hash_by_number() {
let mut buffer: AllocRingBuffer<BlockNumHash> = AllocRingBuffer::new(65);
// Empty buffer returns None for any offset
assert_eq!(get_hash_at_offset(&buffer, 0), None);
assert_eq!(get_hash_at_offset(&buffer, 1), None);
// Empty buffer returns None for any number
assert_eq!(find_hash_by_number(&buffer, 0), None);
assert_eq!(find_hash_by_number(&buffer, 1), None);
// Push hashes 0..65
for i in 0..65u8 {
buffer.enqueue(B256::with_last_byte(i));
// Push entries for block numbers 0..65
for i in 0..65u64 {
buffer.enqueue(BlockNumHash::new(i, B256::with_last_byte(i as u8)));
}
// offset=0 should return the most recent (64)
assert_eq!(get_hash_at_offset(&buffer, 0), Some(B256::with_last_byte(64)));
// Look up the most recent (block 64)
assert_eq!(find_hash_by_number(&buffer, 64), Some(B256::with_last_byte(64)));
// offset=32 (safe block) should return hash 32
assert_eq!(get_hash_at_offset(&buffer, 32), Some(B256::with_last_byte(32)));
// Look up block 32
assert_eq!(find_hash_by_number(&buffer, 32), Some(B256::with_last_byte(32)));
// offset=64 (finalized block) should return hash 0 (the oldest)
assert_eq!(get_hash_at_offset(&buffer, 64), Some(B256::with_last_byte(0)));
// Look up the oldest (block 0)
assert_eq!(find_hash_by_number(&buffer, 0), Some(B256::with_last_byte(0)));
// offset=65 exceeds buffer, should return None
assert_eq!(get_hash_at_offset(&buffer, 65), None);
// Non-existent block number returns None
assert_eq!(find_hash_by_number(&buffer, 100), None);
}
#[test]
fn test_get_hash_at_offset_insufficient_entries() {
let mut buffer: AllocRingBuffer<B256> = AllocRingBuffer::new(65);
fn test_find_hash_by_number_insufficient_entries() {
let mut buffer: AllocRingBuffer<BlockNumHash> = AllocRingBuffer::new(65);
// With only 1 entry, only offset=0 works
buffer.enqueue(B256::with_last_byte(1));
assert_eq!(get_hash_at_offset(&buffer, 0), Some(B256::with_last_byte(1)));
assert_eq!(get_hash_at_offset(&buffer, 1), None);
assert_eq!(get_hash_at_offset(&buffer, 32), None);
assert_eq!(get_hash_at_offset(&buffer, 64), None);
// With only 1 entry, only that block number works
buffer.enqueue(BlockNumHash::new(1, B256::with_last_byte(1)));
assert_eq!(find_hash_by_number(&buffer, 1), Some(B256::with_last_byte(1)));
assert_eq!(find_hash_by_number(&buffer, 0), None);
assert_eq!(find_hash_by_number(&buffer, 32), None);
assert_eq!(find_hash_by_number(&buffer, 64), None);
// With 33 entries, offset=32 works but offset=64 doesn't
for i in 2..=33u8 {
buffer.enqueue(B256::with_last_byte(i));
// With 33 entries (blocks 1..=33), block 1 works but block 64 doesn't
for i in 2..=33u64 {
buffer.enqueue(BlockNumHash::new(i, B256::with_last_byte(i as u8)));
}
assert_eq!(get_hash_at_offset(&buffer, 32), Some(B256::with_last_byte(1)));
assert_eq!(get_hash_at_offset(&buffer, 64), None);
assert_eq!(find_hash_by_number(&buffer, 1), Some(B256::with_last_byte(1)));
assert_eq!(find_hash_by_number(&buffer, 64), None);
}
#[test]
fn test_find_hash_by_number_with_gaps() {
let mut buffer: AllocRingBuffer<BlockNumHash> = AllocRingBuffer::new(10);
// Insert non-contiguous block numbers (simulating skipped blocks)
buffer.enqueue(BlockNumHash::new(10, B256::with_last_byte(10)));
buffer.enqueue(BlockNumHash::new(13, B256::with_last_byte(13)));
buffer.enqueue(BlockNumHash::new(17, B256::with_last_byte(17)));
// Existing block numbers are found
assert_eq!(find_hash_by_number(&buffer, 10), Some(B256::with_last_byte(10)));
assert_eq!(find_hash_by_number(&buffer, 13), Some(B256::with_last_byte(13)));
assert_eq!(find_hash_by_number(&buffer, 17), Some(B256::with_last_byte(17)));
// Skipped block numbers return None
assert_eq!(find_hash_by_number(&buffer, 11), None);
assert_eq!(find_hash_by_number(&buffer, 12), None);
assert_eq!(find_hash_by_number(&buffer, 15), None);
}
/// Creates a test block with the given number and parent hash.
fn test_block(number: u64, parent_hash: B256) -> reth_ethereum_primitives::Block {
use reth_primitives_traits::{block::TestBlock, header::test_utils::TestHeader};
let mut block = reth_ethereum_primitives::Block::default();
block.header_mut().set_block_number(number);
block.header_mut().set_parent_hash(parent_hash);
block
}
fn num_hash(block: &reth_ethereum_primitives::Block) -> BlockNumHash {
BlockNumHash::new(block.header().number(), block.header().hash_slow())
}
#[test]
fn test_finalized_buffer_empty() {
let mut buf = FinalizedBlockBuffer::<reth_ethereum_primitives::Block>::new();
assert!(buf.is_empty());
assert!(buf.drain_finalized(BlockNumHash::new(0, B256::ZERO)).is_empty());
}
#[test]
fn test_finalized_buffer_drain_chain() {
let mut buf = FinalizedBlockBuffer::new();
let b1 = test_block(1, B256::ZERO);
let b1_nh = num_hash(&b1);
let b2 = test_block(2, b1_nh.hash);
let b2_nh = num_hash(&b2);
let b3 = test_block(3, b2_nh.hash);
let b3_nh = num_hash(&b3);
buf.insert(b1);
buf.insert(b2);
buf.insert(b3);
assert_eq!(buf.len(), 3);
// Finalize up to block 2 — should drain blocks 1 and 2
let drained = buf.drain_finalized(b2_nh);
assert_eq!(drained.len(), 2);
assert_eq!(drained[0].header().number(), 1);
assert_eq!(drained[1].header().number(), 2);
// Block 3 remains
assert_eq!(buf.len(), 1);
// Finalize block 3
let drained = buf.drain_finalized(b3_nh);
assert_eq!(drained.len(), 1);
assert_eq!(drained[0].header().number(), 3);
assert!(buf.is_empty());
}
#[test]
fn test_finalized_buffer_prunes_sidechains() {
let mut buf = FinalizedBlockBuffer::new();
// Canonical chain: b1 -> b2
let b1 = test_block(1, B256::ZERO);
let b1_nh = num_hash(&b1);
let b2 = test_block(2, b1_nh.hash);
let b2_nh = num_hash(&b2);
// Fork: b1 -> b2_fork (different block at height 2)
let b2_fork = test_block(2, B256::with_last_byte(0xaa));
let b2_fork_nh = num_hash(&b2_fork);
assert_ne!(b2_nh.hash, b2_fork_nh.hash);
// Block 3 on top of canonical b2
let b3 = test_block(3, b2_nh.hash);
buf.insert(b1);
buf.insert(b2);
buf.insert(b2_fork);
buf.insert(b3);
assert_eq!(buf.len(), 4);
// Finalize canonical b2 — drains b1 and b2, prunes b2_fork (height <= 2)
let drained = buf.drain_finalized(b2_nh);
assert_eq!(drained.len(), 2);
// Only b3 (height 3) remains, b2_fork was pruned
assert_eq!(buf.len(), 1);
}
#[test]
fn test_finalized_buffer_unknown_hash_no_change() {
let mut buf = FinalizedBlockBuffer::new();
buf.insert(test_block(1, B256::ZERO));
// Unknown hash does not modify the buffer at all
let drained = buf.drain_finalized(BlockNumHash::new(99, B256::with_last_byte(0xff)));
assert!(drained.is_empty());
assert_eq!(buf.len(), 1);
}
#[test]
fn test_finalized_buffer_unknown_hash_keeps_higher_blocks() {
let mut buf = FinalizedBlockBuffer::new();
buf.insert(test_block(5, B256::ZERO));
// Unknown hash does not modify the buffer at all
let drained = buf.drain_finalized(BlockNumHash::new(3, B256::with_last_byte(0xff)));
assert!(drained.is_empty());
assert_eq!(buf.len(), 1);
}
}

View File

@@ -15,5 +15,8 @@
mod client;
mod providers;
pub use client::{BlockProvider, DebugConsensusClient};
pub use client::{
BlockProvider, DebugConsensusClient, FinalizedBlockBuffer, ForkchoiceMode, ForkchoiceProvider,
NewBlock, RawBlockProvider,
};
pub use providers::{EtherscanBlockProvider, RpcBlockProvider};

View File

@@ -1,4 +1,4 @@
use crate::BlockProvider;
use crate::RawBlockProvider;
use alloy_consensus::BlockHeader;
use alloy_eips::BlockNumberOrTag;
use alloy_json_rpc::{Response, ResponsePayload};
@@ -9,7 +9,7 @@ use std::{sync::Arc, time::Duration};
use tokio::{sync::mpsc, time::interval};
/// Block provider that fetches new blocks from Etherscan API.
#[derive(derive_more::Debug, Clone)]
#[derive(derive_more::Debug)]
pub struct EtherscanBlockProvider<RpcBlock, PrimitiveBlock> {
http_client: Client,
base_url: String,
@@ -20,6 +20,19 @@ pub struct EtherscanBlockProvider<RpcBlock, PrimitiveBlock> {
convert: Arc<dyn Fn(RpcBlock) -> PrimitiveBlock + Send + Sync>,
}
impl<RpcBlock, PrimitiveBlock> Clone for EtherscanBlockProvider<RpcBlock, PrimitiveBlock> {
fn clone(&self) -> Self {
Self {
http_client: self.http_client.clone(),
base_url: self.base_url.clone(),
api_key: self.api_key.clone(),
chain_id: self.chain_id,
interval: self.interval,
convert: self.convert.clone(),
}
}
}
impl<RpcBlock, PrimitiveBlock> EtherscanBlockProvider<RpcBlock, PrimitiveBlock>
where
RpcBlock: Serialize + DeserializeOwned,
@@ -88,7 +101,7 @@ where
}
}
impl<RpcBlock, PrimitiveBlock> BlockProvider for EtherscanBlockProvider<RpcBlock, PrimitiveBlock>
impl<RpcBlock, PrimitiveBlock> RawBlockProvider for EtherscanBlockProvider<RpcBlock, PrimitiveBlock>
where
RpcBlock: Serialize + DeserializeOwned + 'static,
PrimitiveBlock: reth_primitives_traits::Block + 'static,
@@ -125,7 +138,7 @@ where
}
}
async fn get_block(&self, block_number: u64) -> eyre::Result<Self::Block> {
self.load_block(BlockNumberOrTag::Number(block_number)).await
async fn get_block(&self, block: BlockNumberOrTag) -> eyre::Result<Self::Block> {
self.load_block(block).await
}
}

View File

@@ -1,4 +1,5 @@
use crate::BlockProvider;
use crate::RawBlockProvider;
use alloy_eips::BlockNumberOrTag;
use alloy_provider::{ConnectionConfig, Network, Provider, ProviderBuilder, WebSocketConfig};
use alloy_transport::TransportResult;
use futures::{Stream, StreamExt};
@@ -9,7 +10,7 @@ use tokio::sync::mpsc::Sender;
/// Block provider that fetches new blocks from an RPC endpoint using a connection that supports
/// RPC subscriptions.
#[derive(derive_more::Debug, Clone)]
#[derive(derive_more::Debug)]
pub struct RpcBlockProvider<N: Network, PrimitiveBlock> {
#[debug(skip)]
provider: Arc<dyn Provider<N>>,
@@ -18,6 +19,16 @@ pub struct RpcBlockProvider<N: Network, PrimitiveBlock> {
convert: Arc<dyn Fn(N::BlockResponse) -> PrimitiveBlock + Send + Sync>,
}
impl<N: Network, PrimitiveBlock> Clone for RpcBlockProvider<N, PrimitiveBlock> {
fn clone(&self) -> Self {
Self {
provider: self.provider.clone(),
url: self.url.clone(),
convert: self.convert.clone(),
}
}
}
impl<N: Network, PrimitiveBlock> RpcBlockProvider<N, PrimitiveBlock> {
/// Create a new RPC block provider with the given RPC URL.
pub async fn new(
@@ -66,7 +77,7 @@ impl<N: Network, PrimitiveBlock> RpcBlockProvider<N, PrimitiveBlock> {
}
}
impl<N: Network, PrimitiveBlock> BlockProvider for RpcBlockProvider<N, PrimitiveBlock>
impl<N: Network, PrimitiveBlock> RawBlockProvider for RpcBlockProvider<N, PrimitiveBlock>
where
PrimitiveBlock: Block + 'static,
{
@@ -79,10 +90,11 @@ where
target: "consensus::debug-client",
%err,
url=%self.url,
"Failed to subscribe to blocks",
"Failed to subscribe to blocks, retrying in 1s",
);
}) else {
return
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
continue
};
while let Some(res) = stream.next().await {
@@ -112,13 +124,13 @@ where
}
}
async fn get_block(&self, block_number: u64) -> eyre::Result<Self::Block> {
async fn get_block(&self, block: BlockNumberOrTag) -> eyre::Result<Self::Block> {
let block = self
.provider
.get_block_by_number(block_number.into())
.get_block_by_number(block)
.full()
.await?
.ok_or_else(|| eyre::eyre!("block not found by number {}", block_number))?;
.ok_or_else(|| eyre::eyre!("block not found for {block}"))?;
Ok((self.convert)(block))
}
}

View File

@@ -5,7 +5,8 @@ use alloy_provider::network::AnyNetwork;
use jsonrpsee::core::{DeserializeOwned, Serialize};
use reth_chainspec::EthChainSpec;
use reth_consensus_debug_client::{
BlockProvider, DebugConsensusClient, EtherscanBlockProvider, RpcBlockProvider,
BlockProvider, DebugConsensusClient, EtherscanBlockProvider, ForkchoiceMode,
ForkchoiceProvider, RpcBlockProvider,
};
use reth_engine_local::{LocalMiner, MiningMode};
use reth_node_api::{
@@ -114,9 +115,11 @@ impl<L> DebugNodeLauncher<L> {
/// Type alias for the default debug block provider. We use etherscan provider to satisfy the
/// bounds.
pub type DefaultDebugBlockProvider<N> = EtherscanBlockProvider<
<<N as FullNodeTypes>::Types as DebugNode<N>>::RpcBlock,
BlockTy<<N as FullNodeTypes>::Types>,
pub type DefaultDebugBlockProvider<N> = ForkchoiceProvider<
EtherscanBlockProvider<
<<N as FullNodeTypes>::Types as DebugNode<N>>::RpcBlock,
BlockTy<<N as FullNodeTypes>::Types>,
>,
>;
/// Future for the [`DebugNodeLauncher`].
@@ -132,6 +135,7 @@ where
map_attributes:
Option<Box<dyn Fn(PayloadAttrTy<N::Types>) -> PayloadAttrTy<N::Types> + Send + Sync>>,
debug_block_provider: Option<B>,
forkchoice_mode: ForkchoiceMode,
mining_mode: Option<MiningMode<N::Pool>>,
}
@@ -153,6 +157,7 @@ where
local_payload_attributes_builder: Some(Box::new(builder)),
map_attributes: None,
debug_block_provider: self.debug_block_provider,
forkchoice_mode: self.forkchoice_mode,
mining_mode: self.mining_mode,
}
}
@@ -168,10 +173,20 @@ where
local_payload_attributes_builder: None,
map_attributes: Some(Box::new(f)),
debug_block_provider: self.debug_block_provider,
forkchoice_mode: self.forkchoice_mode,
mining_mode: self.mining_mode,
}
}
/// Sets the [`ForkchoiceMode`] used by the default RPC and Etherscan block providers.
///
/// This has no effect when a custom block provider is set via
/// [`with_debug_block_provider`](Self::with_debug_block_provider).
pub const fn with_forkchoice_mode(mut self, mode: ForkchoiceMode) -> Self {
self.forkchoice_mode = mode;
self
}
/// Sets a custom [`MiningMode`] for the local miner in dev mode.
///
/// This overrides the default mining mode that is derived from the node configuration
@@ -198,6 +213,7 @@ where
local_payload_attributes_builder: self.local_payload_attributes_builder,
map_attributes: self.map_attributes,
debug_block_provider: Some(provider),
forkchoice_mode: self.forkchoice_mode,
mining_mode: self.mining_mode,
}
}
@@ -209,6 +225,7 @@ where
local_payload_attributes_builder,
map_attributes,
debug_block_provider,
forkchoice_mode,
mining_mode,
} = self;
@@ -242,6 +259,7 @@ where
N::Types::rpc_to_primitive_block(rpc_block)
})
.await?;
let block_provider = ForkchoiceProvider::with_mode(block_provider, forkchoice_mode);
let rpc_consensus_client = DebugConsensusClient::new(
handle.node.add_ons_handle.beacon_engine_handle.clone(),
@@ -272,6 +290,7 @@ where
chain.id(),
N::Types::rpc_to_primitive_block,
);
let block_provider = ForkchoiceProvider::with_mode(block_provider, forkchoice_mode);
let rpc_consensus_client = DebugConsensusClient::new(
handle.node.add_ons_handle.beacon_engine_handle.clone(),
Arc::new(block_provider),
@@ -358,6 +377,7 @@ where
local_payload_attributes_builder: None,
map_attributes: None,
debug_block_provider: None,
forkchoice_mode: ForkchoiceMode::default(),
mining_mode: None,
}
}