Compare commits

...

1 Commits

Author SHA1 Message Date
Derek Cofausper
3e42b42d5e fix(consensus): handle Syncing response in DebugConsensusClient with parent backfill
Previously, DebugConsensusClient discarded newPayload responses entirely.
On fast-block-time chains (~700ms), any gap (WS drop, startup lag) caused
a permanent Syncing loop since buffered blocks couldn't serve as parents
for subsequent blocks.

Now the client checks newPayload status. On Syncing, it walks backwards
up to 64 blocks via the block provider, submits missing ancestors
oldest-to-newest, and retries the original block. FCU is only sent when
the block is accepted.

Co-authored-by: Alexey Shekhirin <5773434+shekhirin@users.noreply.github.com>
Amp-Thread-ID: https://ampcode.com/threads/T-019cd3e4-3413-7178-ae0c-3159b96980e9
2026-03-09 18:58:41 +00:00

View File

@@ -1,11 +1,12 @@
use alloy_consensus::Sealable;
use alloy_consensus::{BlockHeader, Sealable};
use alloy_primitives::B256;
use alloy_rpc_types_engine::PayloadStatusEnum;
use reth_node_api::{
BuiltPayload, ConsensusEngineHandle, EngineApiMessageVersion, ExecutionPayload, NodePrimitives,
PayloadTypes,
};
use reth_primitives_traits::{Block, SealedBlock};
use reth_tracing::tracing::warn;
use reth_tracing::tracing::{debug, warn};
use ringbuffer::{AllocRingBuffer, RingBuffer};
use std::future::Future;
use tokio::sync::mpsc;
@@ -90,15 +91,20 @@ where
};
while let Some(block) = block_stream.recv().await {
let payload = T::block_to_payload(SealedBlock::new_unhashed(block));
let block_number = block.header().number();
let block_hash = payload.block_hash();
let block_number = payload.block_number();
// Submit the block, backfilling missing parents if the engine returns Syncing.
if let Err(err) =
self.submit_block_with_backfill(block, &mut previous_block_hashes).await
{
warn!(target: "consensus::debug-client", %block_number, %err, "failed to submit block");
continue;
}
previous_block_hashes.enqueue(block_hash);
// Send new events to execution client
let _ = self.engine_handle.new_payload(payload).await;
let block_hash = match previous_block_hashes.back() {
Some(hash) => *hash,
None => continue,
};
// Load previous block hashes. We're using (head - 32) and (head - 64) as the safe and
// finalized block hashes.
@@ -137,6 +143,164 @@ where
.await;
}
}
/// Submits a block via `newPayload`. If the engine returns `Syncing` (parent state missing),
/// fetches and submits missing ancestor blocks oldest-to-newest, then retries the original
/// block.
///
/// Returns `Ok(())` only if the block was ultimately accepted (`Valid`/`Accepted`).
/// Returns `Err` if the block could not be accepted, so the caller can skip FCU.
async fn submit_block_with_backfill(
&self,
block: P::Block,
previous_block_hashes: &mut AllocRingBuffer<B256>,
) -> eyre::Result<()> {
let block_number = block.header().number();
let payload = T::block_to_payload(SealedBlock::new_unhashed(block));
let block_hash = payload.block_hash();
let status = self
.engine_handle
.new_payload(payload.clone())
.await
.map_err(|e| eyre::eyre!("newPayload failed: {e}"))?;
match status.status {
PayloadStatusEnum::Valid | PayloadStatusEnum::Accepted => {
previous_block_hashes.enqueue(block_hash);
Ok(())
}
PayloadStatusEnum::Syncing => {
debug!(
target: "consensus::debug-client",
%block_number,
%block_hash,
"newPayload returned Syncing, backfilling missing parents"
);
if block_number == 0 {
return Err(eyre::eyre!("genesis block returned Syncing"));
}
// Walk backwards from this block's parent to find and submit missing ancestors.
// Cap the backfill depth to avoid unbounded fetching.
const MAX_BACKFILL_DEPTH: u64 = 64;
let start = block_number.saturating_sub(1);
let end = block_number.saturating_sub(MAX_BACKFILL_DEPTH);
let mut ancestors = Vec::new();
for num in (end..=start).rev() {
match self.block_provider.get_block(num).await {
Ok(ancestor) => ancestors.push(ancestor),
Err(err) => {
warn!(
target: "consensus::debug-client",
%num, %err, "failed to fetch ancestor block for backfill"
);
break;
}
}
// Submit each ancestor immediately to probe whether the engine knows it.
// This avoids fetching all the way back when only a few blocks are missing.
let ancestor_payload = T::block_to_payload(SealedBlock::new_unhashed(
ancestors.last().unwrap().clone(),
));
let ancestor_status = self
.engine_handle
.new_payload(ancestor_payload)
.await
.map_err(|e| eyre::eyre!("ancestor newPayload failed: {e}"))?;
match ancestor_status.status {
PayloadStatusEnum::Valid | PayloadStatusEnum::Accepted => {
// Found a block the engine accepts, no need to go further back.
break;
}
PayloadStatusEnum::Invalid { .. } => {
warn!(
target: "consensus::debug-client",
%num,
?ancestor_status,
"ancestor returned Invalid during backfill"
);
return Err(eyre::eyre!(
"ancestor block {num} returned Invalid during backfill"
));
}
PayloadStatusEnum::Syncing => {
// Keep walking back.
}
}
}
// Submit ancestors oldest-to-newest (reverse of collection order).
// Skip the last one since we already submitted it in the probe loop above.
// Track hashes for ringbuffer continuity.
let mut accepted_hashes = Vec::new();
if ancestors.len() > 1 {
for ancestor in ancestors.into_iter().rev().skip(1) {
let ancestor_payload =
T::block_to_payload(SealedBlock::new_unhashed(ancestor));
let hash = ancestor_payload.block_hash();
let res = self
.engine_handle
.new_payload(ancestor_payload)
.await
.map_err(|e| eyre::eyre!("forward newPayload failed: {e}"))?;
if matches!(
res.status,
PayloadStatusEnum::Valid | PayloadStatusEnum::Accepted
) {
accepted_hashes.push(hash);
}
}
}
// Retry the original block.
let retry_status = self
.engine_handle
.new_payload(payload)
.await
.map_err(|e| eyre::eyre!("retry newPayload failed: {e}"))?;
if matches!(
retry_status.status,
PayloadStatusEnum::Valid | PayloadStatusEnum::Accepted
) {
debug!(
target: "consensus::debug-client",
%block_number,
backfilled = accepted_hashes.len(),
"block accepted after backfill"
);
// Enqueue backfilled ancestor hashes to maintain ringbuffer continuity.
for hash in accepted_hashes {
previous_block_hashes.enqueue(hash);
}
previous_block_hashes.enqueue(block_hash);
Ok(())
} else {
Err(eyre::eyre!(
"block {block_number} still not accepted after backfill: {:?}",
retry_status.status
))
}
}
PayloadStatusEnum::Invalid { .. } => {
warn!(
target: "consensus::debug-client",
%block_number,
%block_hash,
?status,
"newPayload returned Invalid"
);
Err(eyre::eyre!("block {block_number} returned Invalid"))
}
}
}
}
/// Looks up a block hash from the ring buffer at the given offset from the most recent entry.