mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-04-30 03:01:58 -04:00
Compare commits
3 Commits
main
...
refactor/d
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
16b7dffe91 | ||
|
|
f6bb458716 | ||
|
|
6479de0c90 |
6
.changelog/kind-slugs-shout.md
Normal file
6
.changelog/kind-slugs-shout.md
Normal file
@@ -0,0 +1,6 @@
|
||||
---
|
||||
reth-consensus-debug-client: minor
|
||||
reth-node-builder: minor
|
||||
---
|
||||
|
||||
Refactored the debug consensus client to decouple forkchoice strategy from block providers. Introduced `RawBlockProvider` trait, `ForkchoiceProvider` wrapper, and `ForkchoiceMode` enum supporting `Offset`, `Finalized`, and `Tag` strategies. Added `FinalizedBlockBuffer` for buffering blocks until finalized, and updated `EtherscanBlockProvider` and `RpcBlockProvider` to implement `RawBlockProvider` with `BlockNumberOrTag`-based `get_block`.
|
||||
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -7952,6 +7952,7 @@ dependencies = [
|
||||
"eyre",
|
||||
"futures",
|
||||
"reqwest",
|
||||
"reth-ethereum-primitives",
|
||||
"reth-node-api",
|
||||
"reth-primitives-traits",
|
||||
"reth-tracing",
|
||||
|
||||
@@ -35,3 +35,7 @@ tokio = { workspace = true, features = ["time"] }
|
||||
serde_json.workspace = true
|
||||
|
||||
ringbuffer.workspace = true
|
||||
|
||||
[dev-dependencies]
|
||||
reth-ethereum-primitives.workspace = true
|
||||
reth-primitives-traits = { workspace = true, features = ["test-utils"] }
|
||||
|
||||
@@ -1,60 +1,18 @@
|
||||
use alloy_consensus::Sealable;
|
||||
use alloy_consensus::{BlockHeader, Sealable};
|
||||
use alloy_eips::{merge::EPOCH_SLOTS, BlockNumHash, BlockNumberOrTag};
|
||||
use alloy_primitives::B256;
|
||||
use alloy_rpc_types_engine::ForkchoiceState;
|
||||
use reth_node_api::{
|
||||
BuiltPayload, ConsensusEngineHandle, EngineApiMessageVersion, ExecutionPayload, NodePrimitives,
|
||||
PayloadTypes,
|
||||
BuiltPayload, ConsensusEngineHandle, EngineApiMessageVersion, NodePrimitives, PayloadTypes,
|
||||
};
|
||||
use reth_primitives_traits::{Block, SealedBlock};
|
||||
use reth_tracing::tracing::warn;
|
||||
use ringbuffer::{AllocRingBuffer, RingBuffer};
|
||||
use std::future::Future;
|
||||
use std::{collections::HashMap, future::Future};
|
||||
use tokio::sync::mpsc;
|
||||
|
||||
/// Supplies consensus client with new blocks sent in `tx` and a callback to find specific blocks
|
||||
/// by number to fetch past finalized and safe blocks.
|
||||
#[auto_impl::auto_impl(&, Arc, Box)]
|
||||
pub trait BlockProvider: Send + Sync + 'static {
|
||||
/// The block type.
|
||||
type Block: Block;
|
||||
|
||||
/// Runs a block provider to send new blocks to the given sender.
|
||||
///
|
||||
/// Note: This is expected to be spawned in a separate task, and as such it should ignore
|
||||
/// errors.
|
||||
fn subscribe_blocks(&self, tx: mpsc::Sender<Self::Block>) -> impl Future<Output = ()> + Send;
|
||||
|
||||
/// Get a past block by number.
|
||||
fn get_block(
|
||||
&self,
|
||||
block_number: u64,
|
||||
) -> impl Future<Output = eyre::Result<Self::Block>> + Send;
|
||||
|
||||
/// Get previous block hash using previous block hash buffer. If it isn't available (buffer
|
||||
/// started more recently than `offset`), fetch it using `get_block`.
|
||||
fn get_or_fetch_previous_block(
|
||||
&self,
|
||||
previous_block_hashes: &AllocRingBuffer<B256>,
|
||||
current_block_number: u64,
|
||||
offset: usize,
|
||||
) -> impl Future<Output = eyre::Result<B256>> + Send {
|
||||
async move {
|
||||
if let Some(hash) = get_hash_at_offset(previous_block_hashes, offset) {
|
||||
return Ok(hash);
|
||||
}
|
||||
|
||||
// Return zero hash if the chain isn't long enough to have the block at the offset.
|
||||
let previous_block_number = match current_block_number.checked_sub(offset as u64) {
|
||||
Some(number) => number,
|
||||
None => return Ok(B256::default()),
|
||||
};
|
||||
let block = self.get_block(previous_block_number).await?;
|
||||
Ok(block.header().hash_slow())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Debug consensus client that sends FCUs and new payloads using recent blocks from an external
|
||||
/// provider like Etherscan or an RPC endpoint.
|
||||
/// Debug consensus client that sends FCUs and new payloads using recent blocks from a
|
||||
/// [`BlockProvider`].
|
||||
#[derive(Debug)]
|
||||
pub struct DebugConsensusClient<P: BlockProvider, T: PayloadTypes> {
|
||||
/// Handle to execution client.
|
||||
@@ -79,71 +37,365 @@ where
|
||||
/// Spawn the client to start sending FCUs and new payloads by periodically fetching recent
|
||||
/// blocks.
|
||||
pub async fn run(self) {
|
||||
let mut previous_block_hashes = AllocRingBuffer::new(65);
|
||||
let mut block_stream = {
|
||||
let (tx, rx) = mpsc::channel::<P::Block>(64);
|
||||
let block_provider = self.block_provider.clone();
|
||||
tokio::spawn(async move {
|
||||
block_provider.subscribe_blocks(tx).await;
|
||||
});
|
||||
rx
|
||||
};
|
||||
let (tx, mut rx) = mpsc::channel(64);
|
||||
let block_provider = self.block_provider.clone();
|
||||
tokio::spawn(async move {
|
||||
block_provider.subscribe(tx).await;
|
||||
});
|
||||
|
||||
while let Some(block) = block_stream.recv().await {
|
||||
let payload = T::block_to_payload(SealedBlock::new_unhashed(block));
|
||||
|
||||
let block_hash = payload.block_hash();
|
||||
let block_number = payload.block_number();
|
||||
|
||||
previous_block_hashes.enqueue(block_hash);
|
||||
|
||||
// Send new events to execution client
|
||||
while let Some(new_block) = rx.recv().await {
|
||||
let payload = T::block_to_payload(SealedBlock::new_unhashed(new_block.block));
|
||||
let _ = self.engine_handle.new_payload(payload).await;
|
||||
|
||||
// Load previous block hashes. We're using (head - 32) and (head - 64) as the safe and
|
||||
// finalized block hashes.
|
||||
let safe_block_hash = self.block_provider.get_or_fetch_previous_block(
|
||||
&previous_block_hashes,
|
||||
block_number,
|
||||
32,
|
||||
);
|
||||
let finalized_block_hash = self.block_provider.get_or_fetch_previous_block(
|
||||
&previous_block_hashes,
|
||||
block_number,
|
||||
64,
|
||||
);
|
||||
let (safe_block_hash, finalized_block_hash) =
|
||||
tokio::join!(safe_block_hash, finalized_block_hash);
|
||||
let (safe_block_hash, finalized_block_hash) = match (
|
||||
safe_block_hash,
|
||||
finalized_block_hash,
|
||||
) {
|
||||
(Ok(safe_block_hash), Ok(finalized_block_hash)) => {
|
||||
(safe_block_hash, finalized_block_hash)
|
||||
}
|
||||
(safe_block_hash, finalized_block_hash) => {
|
||||
warn!(target: "consensus::debug-client", ?safe_block_hash, ?finalized_block_hash, "failed to fetch safe or finalized hash from etherscan");
|
||||
continue;
|
||||
}
|
||||
};
|
||||
let state = alloy_rpc_types_engine::ForkchoiceState {
|
||||
head_block_hash: block_hash,
|
||||
safe_block_hash,
|
||||
finalized_block_hash,
|
||||
};
|
||||
let _ = self
|
||||
.engine_handle
|
||||
.fork_choice_updated(state, None, EngineApiMessageVersion::V3)
|
||||
.fork_choice_updated(new_block.forkchoice_state, None, EngineApiMessageVersion::V3)
|
||||
.await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Looks up a block hash from the ring buffer at the given offset from the most recent entry.
|
||||
/// A new block event with its associated forkchoice state.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct NewBlock<B> {
|
||||
/// The new block.
|
||||
pub block: B,
|
||||
/// The forkchoice state to submit alongside this block.
|
||||
pub forkchoice_state: ForkchoiceState,
|
||||
}
|
||||
|
||||
/// Supplies the consensus client with new blocks and their forkchoice state.
|
||||
#[auto_impl::auto_impl(&, Arc, Box)]
|
||||
pub trait BlockProvider: Send + Sync + 'static {
|
||||
/// The block type.
|
||||
type Block: Block;
|
||||
|
||||
/// Streams new blocks with their forkchoice state.
|
||||
fn subscribe(&self, tx: mpsc::Sender<NewBlock<Self::Block>>)
|
||||
-> impl Future<Output = ()> + Send;
|
||||
}
|
||||
|
||||
/// Provides raw blocks without forkchoice information.
|
||||
#[auto_impl::auto_impl(&, Arc, Box)]
|
||||
pub trait RawBlockProvider: Send + Sync + 'static {
|
||||
/// The block type.
|
||||
type Block: Block;
|
||||
|
||||
/// Streams new blocks.
|
||||
fn subscribe_blocks(&self, tx: mpsc::Sender<Self::Block>) -> impl Future<Output = ()> + Send;
|
||||
|
||||
/// Gets a block by number or tag (e.g. `Safe`, `Finalized`, `Number(u64)`).
|
||||
fn get_block(
|
||||
&self,
|
||||
block: BlockNumberOrTag,
|
||||
) -> impl Future<Output = eyre::Result<Self::Block>> + Send;
|
||||
}
|
||||
|
||||
/// Determines how the [`ForkchoiceProvider`] derives safe and finalized block hashes.
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
pub enum ForkchoiceMode {
|
||||
/// Derives safe and finalized hashes from fixed offsets relative to the head.
|
||||
///
|
||||
/// Uses a ring buffer to look up past block hashes, falling back to
|
||||
/// [`RawBlockProvider::get_block`] for blocks outside the buffer.
|
||||
Offset {
|
||||
/// Offset from head for the safe block hash.
|
||||
safe: usize,
|
||||
/// Offset from head for the finalized block hash.
|
||||
finalized: usize,
|
||||
},
|
||||
/// Buffers incoming blocks and only yields them once they are finalized.
|
||||
///
|
||||
/// On each new block, the current finalized block number is fetched via
|
||||
/// [`BlockNumberOrTag::Finalized`]. All buffered blocks up to that number are then
|
||||
/// emitted with head=safe=finalized set to the block's own hash.
|
||||
///
|
||||
/// **Note:** Requires a provider that supports [`BlockNumberOrTag::Finalized`] lookups
|
||||
/// (e.g., [`crate::RpcBlockProvider`]). Not supported by [`crate::EtherscanBlockProvider`].
|
||||
Finalized,
|
||||
/// Fetches the safe and finalized blocks by their respective tags
|
||||
/// ([`BlockNumberOrTag::Safe`] and [`BlockNumberOrTag::Finalized`]) from the provider.
|
||||
///
|
||||
/// **Note:** Requires a provider that supports tag-based lookups
|
||||
/// (e.g., [`crate::RpcBlockProvider`]). Not supported by [`crate::EtherscanBlockProvider`].
|
||||
Tag,
|
||||
}
|
||||
|
||||
impl ForkchoiceMode {
|
||||
/// The default offset mode: safe = head - 32, finalized = head - 64.
|
||||
pub const DEFAULT_OFFSET: Self =
|
||||
Self::Offset { safe: EPOCH_SLOTS as usize, finalized: (EPOCH_SLOTS * 2) as usize };
|
||||
}
|
||||
|
||||
impl Default for ForkchoiceMode {
|
||||
fn default() -> Self {
|
||||
Self::DEFAULT_OFFSET
|
||||
}
|
||||
}
|
||||
|
||||
/// Wraps a [`RawBlockProvider`] and derives forkchoice state for each block
|
||||
/// according to the configured [`ForkchoiceMode`].
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct ForkchoiceProvider<P> {
|
||||
/// Inner raw block provider.
|
||||
inner: P,
|
||||
/// How to derive safe and finalized hashes.
|
||||
mode: ForkchoiceMode,
|
||||
}
|
||||
|
||||
impl<P> ForkchoiceProvider<P> {
|
||||
/// Creates a new provider with the default offset mode (safe=32, finalized=64).
|
||||
pub fn new(inner: P) -> Self {
|
||||
Self { inner, mode: ForkchoiceMode::default() }
|
||||
}
|
||||
|
||||
/// Creates a new provider with the given mode.
|
||||
pub const fn with_mode(inner: P, mode: ForkchoiceMode) -> Self {
|
||||
Self { inner, mode }
|
||||
}
|
||||
}
|
||||
|
||||
impl<P> BlockProvider for ForkchoiceProvider<P>
|
||||
where
|
||||
P: RawBlockProvider + Clone,
|
||||
{
|
||||
type Block = P::Block;
|
||||
|
||||
async fn subscribe(&self, tx: mpsc::Sender<NewBlock<Self::Block>>) {
|
||||
let (block_tx, mut block_rx) = mpsc::channel::<P::Block>(64);
|
||||
let inner = self.inner.clone();
|
||||
tokio::spawn(async move {
|
||||
inner.subscribe_blocks(block_tx).await;
|
||||
});
|
||||
|
||||
match self.mode {
|
||||
ForkchoiceMode::Offset { safe: safe_offset, finalized: finalized_offset } => {
|
||||
let mut previous_block_hashes =
|
||||
AllocRingBuffer::new(safe_offset.max(finalized_offset) + 1);
|
||||
|
||||
while let Some(block) = block_rx.recv().await {
|
||||
let block_hash = block.header().hash_slow();
|
||||
let block_number = block.header().number();
|
||||
|
||||
previous_block_hashes.enqueue(BlockNumHash::new(block_number, block_hash));
|
||||
|
||||
let safe_block_hash = get_or_fetch_hash(
|
||||
&self.inner,
|
||||
&previous_block_hashes,
|
||||
block_number,
|
||||
safe_offset,
|
||||
);
|
||||
let finalized_block_hash = get_or_fetch_hash(
|
||||
&self.inner,
|
||||
&previous_block_hashes,
|
||||
block_number,
|
||||
finalized_offset,
|
||||
);
|
||||
|
||||
let (safe_block_hash, finalized_block_hash) =
|
||||
tokio::join!(safe_block_hash, finalized_block_hash);
|
||||
|
||||
let (safe_block_hash, finalized_block_hash) = match (
|
||||
safe_block_hash,
|
||||
finalized_block_hash,
|
||||
) {
|
||||
(Ok(safe), Ok(finalized)) => (safe, finalized),
|
||||
(safe_block_hash, finalized_block_hash) => {
|
||||
warn!(target: "consensus::debug-client", ?safe_block_hash, ?finalized_block_hash, "failed to fetch safe or finalized hash");
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
let new_block = NewBlock {
|
||||
block,
|
||||
forkchoice_state: ForkchoiceState {
|
||||
head_block_hash: block_hash,
|
||||
safe_block_hash,
|
||||
finalized_block_hash,
|
||||
},
|
||||
};
|
||||
|
||||
if tx.send(new_block).await.is_err() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
ForkchoiceMode::Finalized => {
|
||||
let mut buffer = FinalizedBlockBuffer::new();
|
||||
|
||||
while let Some(block) = block_rx.recv().await {
|
||||
buffer.insert(block);
|
||||
|
||||
let finalized = match self.inner.get_block(BlockNumberOrTag::Finalized).await {
|
||||
Ok(b) => BlockNumHash::new(b.header().number(), b.header().hash_slow()),
|
||||
Err(err) => {
|
||||
warn!(target: "consensus::debug-client", %err, "failed to fetch finalized block");
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
let finalized_blocks = buffer.drain_finalized(finalized);
|
||||
|
||||
for finalized_block in finalized_blocks {
|
||||
let hash = finalized_block.header().hash_slow();
|
||||
let new_block = NewBlock {
|
||||
block: finalized_block,
|
||||
forkchoice_state: ForkchoiceState {
|
||||
head_block_hash: hash,
|
||||
safe_block_hash: hash,
|
||||
finalized_block_hash: hash,
|
||||
},
|
||||
};
|
||||
|
||||
if tx.send(new_block).await.is_err() {
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
ForkchoiceMode::Tag => {
|
||||
while let Some(block) = block_rx.recv().await {
|
||||
let block_hash = block.header().hash_slow();
|
||||
|
||||
let safe = self.inner.get_block(BlockNumberOrTag::Safe);
|
||||
let finalized = self.inner.get_block(BlockNumberOrTag::Finalized);
|
||||
let (safe, finalized) = tokio::join!(safe, finalized);
|
||||
|
||||
let (safe_block_hash, finalized_block_hash) = match (safe, finalized) {
|
||||
(Ok(safe), Ok(finalized)) => {
|
||||
(safe.header().hash_slow(), finalized.header().hash_slow())
|
||||
}
|
||||
(safe, finalized) => {
|
||||
warn!(target: "consensus::debug-client", ?safe, ?finalized, "failed to fetch safe or finalized block by tag");
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
let new_block = NewBlock {
|
||||
block,
|
||||
forkchoice_state: ForkchoiceState {
|
||||
head_block_hash: block_hash,
|
||||
safe_block_hash,
|
||||
finalized_block_hash,
|
||||
},
|
||||
};
|
||||
|
||||
if tx.send(new_block).await.is_err() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Fetches a block hash at the given offset, first checking the ring buffer, then falling back
|
||||
/// to the raw block provider.
|
||||
async fn get_or_fetch_hash<P: RawBlockProvider>(
|
||||
provider: &P,
|
||||
buffer: &AllocRingBuffer<BlockNumHash>,
|
||||
current_block_number: u64,
|
||||
offset: usize,
|
||||
) -> eyre::Result<B256> {
|
||||
let target_number = match current_block_number.checked_sub(offset as u64) {
|
||||
Some(number) => number,
|
||||
None => return Ok(B256::default()),
|
||||
};
|
||||
|
||||
if let Some(hash) = find_hash_by_number(buffer, target_number) {
|
||||
return Ok(hash);
|
||||
}
|
||||
|
||||
let block = provider.get_block(BlockNumberOrTag::Number(target_number)).await?;
|
||||
Ok(block.header().hash_slow())
|
||||
}
|
||||
|
||||
/// Finds a block hash by block number in the ring buffer, searching from newest to oldest.
|
||||
fn find_hash_by_number(buffer: &AllocRingBuffer<BlockNumHash>, target_number: u64) -> Option<B256> {
|
||||
buffer.iter().rev().find(|entry| entry.number == target_number).map(|entry| entry.hash)
|
||||
}
|
||||
|
||||
/// Buffers blocks and releases them once they become finalized.
|
||||
///
|
||||
/// Returns `None` if the buffer doesn't have enough entries to satisfy the offset.
|
||||
fn get_hash_at_offset(buffer: &AllocRingBuffer<B256>, offset: usize) -> Option<B256> {
|
||||
buffer.len().checked_sub(offset + 1).and_then(|index| buffer.get(index).copied())
|
||||
/// Blocks are indexed by their hash. When [`drain_finalized`](Self::drain_finalized) is called,
|
||||
/// it walks the `parent_hash` chain from the finalized block backwards, collecting all buffered
|
||||
/// blocks that belong to the finalized chain. Any remaining blocks at or below the finalized
|
||||
/// height are pruned as sidechain blocks.
|
||||
#[derive(Debug)]
|
||||
pub struct FinalizedBlockBuffer<B> {
|
||||
blocks: HashMap<B256, B>,
|
||||
/// The last finalized block's number and hash.
|
||||
last_finalized: Option<BlockNumHash>,
|
||||
}
|
||||
|
||||
impl<B: Block> FinalizedBlockBuffer<B> {
|
||||
/// Creates an empty buffer.
|
||||
pub fn new() -> Self {
|
||||
Self { blocks: HashMap::new(), last_finalized: None }
|
||||
}
|
||||
|
||||
/// Inserts a block into the buffer.
|
||||
pub fn insert(&mut self, block: B) {
|
||||
let hash = block.header().hash_slow();
|
||||
self.blocks.insert(hash, block);
|
||||
}
|
||||
|
||||
/// Walks the parent hash chain from `finalized` back to the last known finalized block,
|
||||
/// removes all matching blocks from the buffer, and returns them in ascending order
|
||||
/// (oldest first).
|
||||
///
|
||||
/// Any remaining buffered blocks at or below the finalized height are pruned as sidechain
|
||||
/// blocks that can no longer become canonical.
|
||||
///
|
||||
/// Returns an empty vec if the finalized hash is not in the buffer.
|
||||
pub fn drain_finalized(&mut self, finalized: BlockNumHash) -> Vec<B> {
|
||||
if !self.blocks.contains_key(&finalized.hash) {
|
||||
return Vec::new();
|
||||
}
|
||||
|
||||
// Walk backwards from the finalized hash, collecting hashes of blocks to drain.
|
||||
let mut chain = Vec::new();
|
||||
let mut current = finalized.hash;
|
||||
|
||||
while self.blocks.contains_key(¤t) {
|
||||
chain.push(current);
|
||||
|
||||
// Stop if we've reached the previous finalized boundary.
|
||||
if self.last_finalized.as_ref().is_some_and(|last| last.hash == current) {
|
||||
break;
|
||||
}
|
||||
|
||||
let parent = self.blocks[¤t].header().parent_hash();
|
||||
current = parent;
|
||||
}
|
||||
|
||||
self.last_finalized = Some(finalized);
|
||||
|
||||
// Reverse so we return oldest-first, then remove from map.
|
||||
chain.reverse();
|
||||
let result: Vec<B> =
|
||||
chain.into_iter().filter_map(|hash| self.blocks.remove(&hash)).collect();
|
||||
|
||||
// Prune any sidechain blocks at or below the finalized height.
|
||||
self.blocks.retain(|_, b| b.header().number() > finalized.number);
|
||||
|
||||
result
|
||||
}
|
||||
|
||||
/// Returns the number of buffered blocks.
|
||||
pub fn len(&self) -> usize {
|
||||
self.blocks.len()
|
||||
}
|
||||
|
||||
/// Returns `true` if the buffer is empty.
|
||||
pub fn is_empty(&self) -> bool {
|
||||
self.blocks.is_empty()
|
||||
}
|
||||
}
|
||||
|
||||
impl<B: Block> Default for FinalizedBlockBuffer<B> {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@@ -151,47 +403,172 @@ mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_get_hash_at_offset() {
|
||||
let mut buffer: AllocRingBuffer<B256> = AllocRingBuffer::new(65);
|
||||
fn test_find_hash_by_number() {
|
||||
let mut buffer: AllocRingBuffer<BlockNumHash> = AllocRingBuffer::new(65);
|
||||
|
||||
// Empty buffer returns None for any offset
|
||||
assert_eq!(get_hash_at_offset(&buffer, 0), None);
|
||||
assert_eq!(get_hash_at_offset(&buffer, 1), None);
|
||||
// Empty buffer returns None for any number
|
||||
assert_eq!(find_hash_by_number(&buffer, 0), None);
|
||||
assert_eq!(find_hash_by_number(&buffer, 1), None);
|
||||
|
||||
// Push hashes 0..65
|
||||
for i in 0..65u8 {
|
||||
buffer.enqueue(B256::with_last_byte(i));
|
||||
// Push entries for block numbers 0..65
|
||||
for i in 0..65u64 {
|
||||
buffer.enqueue(BlockNumHash::new(i, B256::with_last_byte(i as u8)));
|
||||
}
|
||||
|
||||
// offset=0 should return the most recent (64)
|
||||
assert_eq!(get_hash_at_offset(&buffer, 0), Some(B256::with_last_byte(64)));
|
||||
// Look up the most recent (block 64)
|
||||
assert_eq!(find_hash_by_number(&buffer, 64), Some(B256::with_last_byte(64)));
|
||||
|
||||
// offset=32 (safe block) should return hash 32
|
||||
assert_eq!(get_hash_at_offset(&buffer, 32), Some(B256::with_last_byte(32)));
|
||||
// Look up block 32
|
||||
assert_eq!(find_hash_by_number(&buffer, 32), Some(B256::with_last_byte(32)));
|
||||
|
||||
// offset=64 (finalized block) should return hash 0 (the oldest)
|
||||
assert_eq!(get_hash_at_offset(&buffer, 64), Some(B256::with_last_byte(0)));
|
||||
// Look up the oldest (block 0)
|
||||
assert_eq!(find_hash_by_number(&buffer, 0), Some(B256::with_last_byte(0)));
|
||||
|
||||
// offset=65 exceeds buffer, should return None
|
||||
assert_eq!(get_hash_at_offset(&buffer, 65), None);
|
||||
// Non-existent block number returns None
|
||||
assert_eq!(find_hash_by_number(&buffer, 100), None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_get_hash_at_offset_insufficient_entries() {
|
||||
let mut buffer: AllocRingBuffer<B256> = AllocRingBuffer::new(65);
|
||||
fn test_find_hash_by_number_insufficient_entries() {
|
||||
let mut buffer: AllocRingBuffer<BlockNumHash> = AllocRingBuffer::new(65);
|
||||
|
||||
// With only 1 entry, only offset=0 works
|
||||
buffer.enqueue(B256::with_last_byte(1));
|
||||
assert_eq!(get_hash_at_offset(&buffer, 0), Some(B256::with_last_byte(1)));
|
||||
assert_eq!(get_hash_at_offset(&buffer, 1), None);
|
||||
assert_eq!(get_hash_at_offset(&buffer, 32), None);
|
||||
assert_eq!(get_hash_at_offset(&buffer, 64), None);
|
||||
// With only 1 entry, only that block number works
|
||||
buffer.enqueue(BlockNumHash::new(1, B256::with_last_byte(1)));
|
||||
assert_eq!(find_hash_by_number(&buffer, 1), Some(B256::with_last_byte(1)));
|
||||
assert_eq!(find_hash_by_number(&buffer, 0), None);
|
||||
assert_eq!(find_hash_by_number(&buffer, 32), None);
|
||||
assert_eq!(find_hash_by_number(&buffer, 64), None);
|
||||
|
||||
// With 33 entries, offset=32 works but offset=64 doesn't
|
||||
for i in 2..=33u8 {
|
||||
buffer.enqueue(B256::with_last_byte(i));
|
||||
// With 33 entries (blocks 1..=33), block 1 works but block 64 doesn't
|
||||
for i in 2..=33u64 {
|
||||
buffer.enqueue(BlockNumHash::new(i, B256::with_last_byte(i as u8)));
|
||||
}
|
||||
assert_eq!(get_hash_at_offset(&buffer, 32), Some(B256::with_last_byte(1)));
|
||||
assert_eq!(get_hash_at_offset(&buffer, 64), None);
|
||||
assert_eq!(find_hash_by_number(&buffer, 1), Some(B256::with_last_byte(1)));
|
||||
assert_eq!(find_hash_by_number(&buffer, 64), None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_find_hash_by_number_with_gaps() {
|
||||
let mut buffer: AllocRingBuffer<BlockNumHash> = AllocRingBuffer::new(10);
|
||||
|
||||
// Insert non-contiguous block numbers (simulating skipped blocks)
|
||||
buffer.enqueue(BlockNumHash::new(10, B256::with_last_byte(10)));
|
||||
buffer.enqueue(BlockNumHash::new(13, B256::with_last_byte(13)));
|
||||
buffer.enqueue(BlockNumHash::new(17, B256::with_last_byte(17)));
|
||||
|
||||
// Existing block numbers are found
|
||||
assert_eq!(find_hash_by_number(&buffer, 10), Some(B256::with_last_byte(10)));
|
||||
assert_eq!(find_hash_by_number(&buffer, 13), Some(B256::with_last_byte(13)));
|
||||
assert_eq!(find_hash_by_number(&buffer, 17), Some(B256::with_last_byte(17)));
|
||||
|
||||
// Skipped block numbers return None
|
||||
assert_eq!(find_hash_by_number(&buffer, 11), None);
|
||||
assert_eq!(find_hash_by_number(&buffer, 12), None);
|
||||
assert_eq!(find_hash_by_number(&buffer, 15), None);
|
||||
}
|
||||
|
||||
/// Creates a test block with the given number and parent hash.
|
||||
fn test_block(number: u64, parent_hash: B256) -> reth_ethereum_primitives::Block {
|
||||
use reth_primitives_traits::{block::TestBlock, header::test_utils::TestHeader};
|
||||
let mut block = reth_ethereum_primitives::Block::default();
|
||||
block.header_mut().set_block_number(number);
|
||||
block.header_mut().set_parent_hash(parent_hash);
|
||||
block
|
||||
}
|
||||
|
||||
fn num_hash(block: &reth_ethereum_primitives::Block) -> BlockNumHash {
|
||||
BlockNumHash::new(block.header().number(), block.header().hash_slow())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_finalized_buffer_empty() {
|
||||
let mut buf = FinalizedBlockBuffer::<reth_ethereum_primitives::Block>::new();
|
||||
assert!(buf.is_empty());
|
||||
assert!(buf.drain_finalized(BlockNumHash::new(0, B256::ZERO)).is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_finalized_buffer_drain_chain() {
|
||||
let mut buf = FinalizedBlockBuffer::new();
|
||||
|
||||
let b1 = test_block(1, B256::ZERO);
|
||||
let b1_nh = num_hash(&b1);
|
||||
let b2 = test_block(2, b1_nh.hash);
|
||||
let b2_nh = num_hash(&b2);
|
||||
let b3 = test_block(3, b2_nh.hash);
|
||||
let b3_nh = num_hash(&b3);
|
||||
|
||||
buf.insert(b1);
|
||||
buf.insert(b2);
|
||||
buf.insert(b3);
|
||||
assert_eq!(buf.len(), 3);
|
||||
|
||||
// Finalize up to block 2 — should drain blocks 1 and 2
|
||||
let drained = buf.drain_finalized(b2_nh);
|
||||
assert_eq!(drained.len(), 2);
|
||||
assert_eq!(drained[0].header().number(), 1);
|
||||
assert_eq!(drained[1].header().number(), 2);
|
||||
|
||||
// Block 3 remains
|
||||
assert_eq!(buf.len(), 1);
|
||||
|
||||
// Finalize block 3
|
||||
let drained = buf.drain_finalized(b3_nh);
|
||||
assert_eq!(drained.len(), 1);
|
||||
assert_eq!(drained[0].header().number(), 3);
|
||||
assert!(buf.is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_finalized_buffer_prunes_sidechains() {
|
||||
let mut buf = FinalizedBlockBuffer::new();
|
||||
|
||||
// Canonical chain: b1 -> b2
|
||||
let b1 = test_block(1, B256::ZERO);
|
||||
let b1_nh = num_hash(&b1);
|
||||
let b2 = test_block(2, b1_nh.hash);
|
||||
let b2_nh = num_hash(&b2);
|
||||
|
||||
// Fork: b1 -> b2_fork (different block at height 2)
|
||||
let b2_fork = test_block(2, B256::with_last_byte(0xaa));
|
||||
let b2_fork_nh = num_hash(&b2_fork);
|
||||
assert_ne!(b2_nh.hash, b2_fork_nh.hash);
|
||||
|
||||
// Block 3 on top of canonical b2
|
||||
let b3 = test_block(3, b2_nh.hash);
|
||||
|
||||
buf.insert(b1);
|
||||
buf.insert(b2);
|
||||
buf.insert(b2_fork);
|
||||
buf.insert(b3);
|
||||
assert_eq!(buf.len(), 4);
|
||||
|
||||
// Finalize canonical b2 — drains b1 and b2, prunes b2_fork (height <= 2)
|
||||
let drained = buf.drain_finalized(b2_nh);
|
||||
assert_eq!(drained.len(), 2);
|
||||
// Only b3 (height 3) remains, b2_fork was pruned
|
||||
assert_eq!(buf.len(), 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_finalized_buffer_unknown_hash_no_change() {
|
||||
let mut buf = FinalizedBlockBuffer::new();
|
||||
buf.insert(test_block(1, B256::ZERO));
|
||||
|
||||
// Unknown hash does not modify the buffer at all
|
||||
let drained = buf.drain_finalized(BlockNumHash::new(99, B256::with_last_byte(0xff)));
|
||||
assert!(drained.is_empty());
|
||||
assert_eq!(buf.len(), 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_finalized_buffer_unknown_hash_keeps_higher_blocks() {
|
||||
let mut buf = FinalizedBlockBuffer::new();
|
||||
buf.insert(test_block(5, B256::ZERO));
|
||||
|
||||
// Unknown hash does not modify the buffer at all
|
||||
let drained = buf.drain_finalized(BlockNumHash::new(3, B256::with_last_byte(0xff)));
|
||||
assert!(drained.is_empty());
|
||||
assert_eq!(buf.len(), 1);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -15,5 +15,8 @@
|
||||
mod client;
|
||||
mod providers;
|
||||
|
||||
pub use client::{BlockProvider, DebugConsensusClient};
|
||||
pub use client::{
|
||||
BlockProvider, DebugConsensusClient, FinalizedBlockBuffer, ForkchoiceMode, ForkchoiceProvider,
|
||||
NewBlock, RawBlockProvider,
|
||||
};
|
||||
pub use providers::{EtherscanBlockProvider, RpcBlockProvider};
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
use crate::BlockProvider;
|
||||
use crate::RawBlockProvider;
|
||||
use alloy_consensus::BlockHeader;
|
||||
use alloy_eips::BlockNumberOrTag;
|
||||
use alloy_json_rpc::{Response, ResponsePayload};
|
||||
@@ -9,7 +9,7 @@ use std::{sync::Arc, time::Duration};
|
||||
use tokio::{sync::mpsc, time::interval};
|
||||
|
||||
/// Block provider that fetches new blocks from Etherscan API.
|
||||
#[derive(derive_more::Debug, Clone)]
|
||||
#[derive(derive_more::Debug)]
|
||||
pub struct EtherscanBlockProvider<RpcBlock, PrimitiveBlock> {
|
||||
http_client: Client,
|
||||
base_url: String,
|
||||
@@ -20,6 +20,19 @@ pub struct EtherscanBlockProvider<RpcBlock, PrimitiveBlock> {
|
||||
convert: Arc<dyn Fn(RpcBlock) -> PrimitiveBlock + Send + Sync>,
|
||||
}
|
||||
|
||||
impl<RpcBlock, PrimitiveBlock> Clone for EtherscanBlockProvider<RpcBlock, PrimitiveBlock> {
|
||||
fn clone(&self) -> Self {
|
||||
Self {
|
||||
http_client: self.http_client.clone(),
|
||||
base_url: self.base_url.clone(),
|
||||
api_key: self.api_key.clone(),
|
||||
chain_id: self.chain_id,
|
||||
interval: self.interval,
|
||||
convert: self.convert.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<RpcBlock, PrimitiveBlock> EtherscanBlockProvider<RpcBlock, PrimitiveBlock>
|
||||
where
|
||||
RpcBlock: Serialize + DeserializeOwned,
|
||||
@@ -88,7 +101,7 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
impl<RpcBlock, PrimitiveBlock> BlockProvider for EtherscanBlockProvider<RpcBlock, PrimitiveBlock>
|
||||
impl<RpcBlock, PrimitiveBlock> RawBlockProvider for EtherscanBlockProvider<RpcBlock, PrimitiveBlock>
|
||||
where
|
||||
RpcBlock: Serialize + DeserializeOwned + 'static,
|
||||
PrimitiveBlock: reth_primitives_traits::Block + 'static,
|
||||
@@ -125,7 +138,7 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
async fn get_block(&self, block_number: u64) -> eyre::Result<Self::Block> {
|
||||
self.load_block(BlockNumberOrTag::Number(block_number)).await
|
||||
async fn get_block(&self, block: BlockNumberOrTag) -> eyre::Result<Self::Block> {
|
||||
self.load_block(block).await
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
use crate::BlockProvider;
|
||||
use crate::RawBlockProvider;
|
||||
use alloy_eips::BlockNumberOrTag;
|
||||
use alloy_provider::{ConnectionConfig, Network, Provider, ProviderBuilder, WebSocketConfig};
|
||||
use alloy_transport::TransportResult;
|
||||
use futures::{Stream, StreamExt};
|
||||
@@ -9,7 +10,7 @@ use tokio::sync::mpsc::Sender;
|
||||
|
||||
/// Block provider that fetches new blocks from an RPC endpoint using a connection that supports
|
||||
/// RPC subscriptions.
|
||||
#[derive(derive_more::Debug, Clone)]
|
||||
#[derive(derive_more::Debug)]
|
||||
pub struct RpcBlockProvider<N: Network, PrimitiveBlock> {
|
||||
#[debug(skip)]
|
||||
provider: Arc<dyn Provider<N>>,
|
||||
@@ -18,6 +19,16 @@ pub struct RpcBlockProvider<N: Network, PrimitiveBlock> {
|
||||
convert: Arc<dyn Fn(N::BlockResponse) -> PrimitiveBlock + Send + Sync>,
|
||||
}
|
||||
|
||||
impl<N: Network, PrimitiveBlock> Clone for RpcBlockProvider<N, PrimitiveBlock> {
|
||||
fn clone(&self) -> Self {
|
||||
Self {
|
||||
provider: self.provider.clone(),
|
||||
url: self.url.clone(),
|
||||
convert: self.convert.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<N: Network, PrimitiveBlock> RpcBlockProvider<N, PrimitiveBlock> {
|
||||
/// Create a new RPC block provider with the given RPC URL.
|
||||
pub async fn new(
|
||||
@@ -66,7 +77,7 @@ impl<N: Network, PrimitiveBlock> RpcBlockProvider<N, PrimitiveBlock> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<N: Network, PrimitiveBlock> BlockProvider for RpcBlockProvider<N, PrimitiveBlock>
|
||||
impl<N: Network, PrimitiveBlock> RawBlockProvider for RpcBlockProvider<N, PrimitiveBlock>
|
||||
where
|
||||
PrimitiveBlock: Block + 'static,
|
||||
{
|
||||
@@ -79,10 +90,11 @@ where
|
||||
target: "consensus::debug-client",
|
||||
%err,
|
||||
url=%self.url,
|
||||
"Failed to subscribe to blocks",
|
||||
"Failed to subscribe to blocks, retrying in 1s",
|
||||
);
|
||||
}) else {
|
||||
return
|
||||
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
|
||||
continue
|
||||
};
|
||||
|
||||
while let Some(res) = stream.next().await {
|
||||
@@ -112,13 +124,13 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
async fn get_block(&self, block_number: u64) -> eyre::Result<Self::Block> {
|
||||
async fn get_block(&self, block: BlockNumberOrTag) -> eyre::Result<Self::Block> {
|
||||
let block = self
|
||||
.provider
|
||||
.get_block_by_number(block_number.into())
|
||||
.get_block_by_number(block)
|
||||
.full()
|
||||
.await?
|
||||
.ok_or_else(|| eyre::eyre!("block not found by number {}", block_number))?;
|
||||
.ok_or_else(|| eyre::eyre!("block not found for {block}"))?;
|
||||
Ok((self.convert)(block))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5,7 +5,8 @@ use alloy_provider::network::AnyNetwork;
|
||||
use jsonrpsee::core::{DeserializeOwned, Serialize};
|
||||
use reth_chainspec::EthChainSpec;
|
||||
use reth_consensus_debug_client::{
|
||||
BlockProvider, DebugConsensusClient, EtherscanBlockProvider, RpcBlockProvider,
|
||||
BlockProvider, DebugConsensusClient, EtherscanBlockProvider, ForkchoiceMode,
|
||||
ForkchoiceProvider, RpcBlockProvider,
|
||||
};
|
||||
use reth_engine_local::{LocalMiner, MiningMode};
|
||||
use reth_node_api::{
|
||||
@@ -114,9 +115,11 @@ impl<L> DebugNodeLauncher<L> {
|
||||
|
||||
/// Type alias for the default debug block provider. We use etherscan provider to satisfy the
|
||||
/// bounds.
|
||||
pub type DefaultDebugBlockProvider<N> = EtherscanBlockProvider<
|
||||
<<N as FullNodeTypes>::Types as DebugNode<N>>::RpcBlock,
|
||||
BlockTy<<N as FullNodeTypes>::Types>,
|
||||
pub type DefaultDebugBlockProvider<N> = ForkchoiceProvider<
|
||||
EtherscanBlockProvider<
|
||||
<<N as FullNodeTypes>::Types as DebugNode<N>>::RpcBlock,
|
||||
BlockTy<<N as FullNodeTypes>::Types>,
|
||||
>,
|
||||
>;
|
||||
|
||||
/// Future for the [`DebugNodeLauncher`].
|
||||
@@ -132,6 +135,7 @@ where
|
||||
map_attributes:
|
||||
Option<Box<dyn Fn(PayloadAttrTy<N::Types>) -> PayloadAttrTy<N::Types> + Send + Sync>>,
|
||||
debug_block_provider: Option<B>,
|
||||
forkchoice_mode: ForkchoiceMode,
|
||||
mining_mode: Option<MiningMode<N::Pool>>,
|
||||
}
|
||||
|
||||
@@ -153,6 +157,7 @@ where
|
||||
local_payload_attributes_builder: Some(Box::new(builder)),
|
||||
map_attributes: None,
|
||||
debug_block_provider: self.debug_block_provider,
|
||||
forkchoice_mode: self.forkchoice_mode,
|
||||
mining_mode: self.mining_mode,
|
||||
}
|
||||
}
|
||||
@@ -168,10 +173,20 @@ where
|
||||
local_payload_attributes_builder: None,
|
||||
map_attributes: Some(Box::new(f)),
|
||||
debug_block_provider: self.debug_block_provider,
|
||||
forkchoice_mode: self.forkchoice_mode,
|
||||
mining_mode: self.mining_mode,
|
||||
}
|
||||
}
|
||||
|
||||
/// Sets the [`ForkchoiceMode`] used by the default RPC and Etherscan block providers.
|
||||
///
|
||||
/// This has no effect when a custom block provider is set via
|
||||
/// [`with_debug_block_provider`](Self::with_debug_block_provider).
|
||||
pub const fn with_forkchoice_mode(mut self, mode: ForkchoiceMode) -> Self {
|
||||
self.forkchoice_mode = mode;
|
||||
self
|
||||
}
|
||||
|
||||
/// Sets a custom [`MiningMode`] for the local miner in dev mode.
|
||||
///
|
||||
/// This overrides the default mining mode that is derived from the node configuration
|
||||
@@ -198,6 +213,7 @@ where
|
||||
local_payload_attributes_builder: self.local_payload_attributes_builder,
|
||||
map_attributes: self.map_attributes,
|
||||
debug_block_provider: Some(provider),
|
||||
forkchoice_mode: self.forkchoice_mode,
|
||||
mining_mode: self.mining_mode,
|
||||
}
|
||||
}
|
||||
@@ -209,6 +225,7 @@ where
|
||||
local_payload_attributes_builder,
|
||||
map_attributes,
|
||||
debug_block_provider,
|
||||
forkchoice_mode,
|
||||
mining_mode,
|
||||
} = self;
|
||||
|
||||
@@ -242,6 +259,7 @@ where
|
||||
N::Types::rpc_to_primitive_block(rpc_block)
|
||||
})
|
||||
.await?;
|
||||
let block_provider = ForkchoiceProvider::with_mode(block_provider, forkchoice_mode);
|
||||
|
||||
let rpc_consensus_client = DebugConsensusClient::new(
|
||||
handle.node.add_ons_handle.beacon_engine_handle.clone(),
|
||||
@@ -272,6 +290,7 @@ where
|
||||
chain.id(),
|
||||
N::Types::rpc_to_primitive_block,
|
||||
);
|
||||
let block_provider = ForkchoiceProvider::with_mode(block_provider, forkchoice_mode);
|
||||
let rpc_consensus_client = DebugConsensusClient::new(
|
||||
handle.node.add_ons_handle.beacon_engine_handle.clone(),
|
||||
Arc::new(block_provider),
|
||||
@@ -358,6 +377,7 @@ where
|
||||
local_payload_attributes_builder: None,
|
||||
map_attributes: None,
|
||||
debug_block_provider: None,
|
||||
forkchoice_mode: ForkchoiceMode::default(),
|
||||
mining_mode: None,
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user