mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-04-30 03:01:58 -04:00
Compare commits
1 Commits
snapv2
...
alexey/fix
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3e42b42d5e |
@@ -1,11 +1,12 @@
|
||||
use alloy_consensus::Sealable;
|
||||
use alloy_consensus::{BlockHeader, Sealable};
|
||||
use alloy_primitives::B256;
|
||||
use alloy_rpc_types_engine::PayloadStatusEnum;
|
||||
use reth_node_api::{
|
||||
BuiltPayload, ConsensusEngineHandle, EngineApiMessageVersion, ExecutionPayload, NodePrimitives,
|
||||
PayloadTypes,
|
||||
};
|
||||
use reth_primitives_traits::{Block, SealedBlock};
|
||||
use reth_tracing::tracing::warn;
|
||||
use reth_tracing::tracing::{debug, warn};
|
||||
use ringbuffer::{AllocRingBuffer, RingBuffer};
|
||||
use std::future::Future;
|
||||
use tokio::sync::mpsc;
|
||||
@@ -90,15 +91,20 @@ where
|
||||
};
|
||||
|
||||
while let Some(block) = block_stream.recv().await {
|
||||
let payload = T::block_to_payload(SealedBlock::new_unhashed(block));
|
||||
let block_number = block.header().number();
|
||||
|
||||
let block_hash = payload.block_hash();
|
||||
let block_number = payload.block_number();
|
||||
// Submit the block, backfilling missing parents if the engine returns Syncing.
|
||||
if let Err(err) =
|
||||
self.submit_block_with_backfill(block, &mut previous_block_hashes).await
|
||||
{
|
||||
warn!(target: "consensus::debug-client", %block_number, %err, "failed to submit block");
|
||||
continue;
|
||||
}
|
||||
|
||||
previous_block_hashes.enqueue(block_hash);
|
||||
|
||||
// Send new events to execution client
|
||||
let _ = self.engine_handle.new_payload(payload).await;
|
||||
let block_hash = match previous_block_hashes.back() {
|
||||
Some(hash) => *hash,
|
||||
None => continue,
|
||||
};
|
||||
|
||||
// Load previous block hashes. We're using (head - 32) and (head - 64) as the safe and
|
||||
// finalized block hashes.
|
||||
@@ -137,6 +143,164 @@ where
|
||||
.await;
|
||||
}
|
||||
}
|
||||
|
||||
/// Submits a block via `newPayload`. If the engine returns `Syncing` (parent state missing),
|
||||
/// fetches and submits missing ancestor blocks oldest-to-newest, then retries the original
|
||||
/// block.
|
||||
///
|
||||
/// Returns `Ok(())` only if the block was ultimately accepted (`Valid`/`Accepted`).
|
||||
/// Returns `Err` if the block could not be accepted, so the caller can skip FCU.
|
||||
async fn submit_block_with_backfill(
|
||||
&self,
|
||||
block: P::Block,
|
||||
previous_block_hashes: &mut AllocRingBuffer<B256>,
|
||||
) -> eyre::Result<()> {
|
||||
let block_number = block.header().number();
|
||||
let payload = T::block_to_payload(SealedBlock::new_unhashed(block));
|
||||
let block_hash = payload.block_hash();
|
||||
|
||||
let status = self
|
||||
.engine_handle
|
||||
.new_payload(payload.clone())
|
||||
.await
|
||||
.map_err(|e| eyre::eyre!("newPayload failed: {e}"))?;
|
||||
|
||||
match status.status {
|
||||
PayloadStatusEnum::Valid | PayloadStatusEnum::Accepted => {
|
||||
previous_block_hashes.enqueue(block_hash);
|
||||
Ok(())
|
||||
}
|
||||
PayloadStatusEnum::Syncing => {
|
||||
debug!(
|
||||
target: "consensus::debug-client",
|
||||
%block_number,
|
||||
%block_hash,
|
||||
"newPayload returned Syncing, backfilling missing parents"
|
||||
);
|
||||
|
||||
if block_number == 0 {
|
||||
return Err(eyre::eyre!("genesis block returned Syncing"));
|
||||
}
|
||||
|
||||
// Walk backwards from this block's parent to find and submit missing ancestors.
|
||||
// Cap the backfill depth to avoid unbounded fetching.
|
||||
const MAX_BACKFILL_DEPTH: u64 = 64;
|
||||
let start = block_number.saturating_sub(1);
|
||||
let end = block_number.saturating_sub(MAX_BACKFILL_DEPTH);
|
||||
|
||||
let mut ancestors = Vec::new();
|
||||
for num in (end..=start).rev() {
|
||||
match self.block_provider.get_block(num).await {
|
||||
Ok(ancestor) => ancestors.push(ancestor),
|
||||
Err(err) => {
|
||||
warn!(
|
||||
target: "consensus::debug-client",
|
||||
%num, %err, "failed to fetch ancestor block for backfill"
|
||||
);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// Submit each ancestor immediately to probe whether the engine knows it.
|
||||
// This avoids fetching all the way back when only a few blocks are missing.
|
||||
let ancestor_payload = T::block_to_payload(SealedBlock::new_unhashed(
|
||||
ancestors.last().unwrap().clone(),
|
||||
));
|
||||
let ancestor_status = self
|
||||
.engine_handle
|
||||
.new_payload(ancestor_payload)
|
||||
.await
|
||||
.map_err(|e| eyre::eyre!("ancestor newPayload failed: {e}"))?;
|
||||
|
||||
match ancestor_status.status {
|
||||
PayloadStatusEnum::Valid | PayloadStatusEnum::Accepted => {
|
||||
// Found a block the engine accepts, no need to go further back.
|
||||
break;
|
||||
}
|
||||
PayloadStatusEnum::Invalid { .. } => {
|
||||
warn!(
|
||||
target: "consensus::debug-client",
|
||||
%num,
|
||||
?ancestor_status,
|
||||
"ancestor returned Invalid during backfill"
|
||||
);
|
||||
return Err(eyre::eyre!(
|
||||
"ancestor block {num} returned Invalid during backfill"
|
||||
));
|
||||
}
|
||||
PayloadStatusEnum::Syncing => {
|
||||
// Keep walking back.
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Submit ancestors oldest-to-newest (reverse of collection order).
|
||||
// Skip the last one since we already submitted it in the probe loop above.
|
||||
// Track hashes for ringbuffer continuity.
|
||||
let mut accepted_hashes = Vec::new();
|
||||
if ancestors.len() > 1 {
|
||||
for ancestor in ancestors.into_iter().rev().skip(1) {
|
||||
let ancestor_payload =
|
||||
T::block_to_payload(SealedBlock::new_unhashed(ancestor));
|
||||
let hash = ancestor_payload.block_hash();
|
||||
let res = self
|
||||
.engine_handle
|
||||
.new_payload(ancestor_payload)
|
||||
.await
|
||||
.map_err(|e| eyre::eyre!("forward newPayload failed: {e}"))?;
|
||||
|
||||
if matches!(
|
||||
res.status,
|
||||
PayloadStatusEnum::Valid | PayloadStatusEnum::Accepted
|
||||
) {
|
||||
accepted_hashes.push(hash);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Retry the original block.
|
||||
let retry_status = self
|
||||
.engine_handle
|
||||
.new_payload(payload)
|
||||
.await
|
||||
.map_err(|e| eyre::eyre!("retry newPayload failed: {e}"))?;
|
||||
|
||||
if matches!(
|
||||
retry_status.status,
|
||||
PayloadStatusEnum::Valid | PayloadStatusEnum::Accepted
|
||||
) {
|
||||
debug!(
|
||||
target: "consensus::debug-client",
|
||||
%block_number,
|
||||
backfilled = accepted_hashes.len(),
|
||||
"block accepted after backfill"
|
||||
);
|
||||
|
||||
// Enqueue backfilled ancestor hashes to maintain ringbuffer continuity.
|
||||
for hash in accepted_hashes {
|
||||
previous_block_hashes.enqueue(hash);
|
||||
}
|
||||
previous_block_hashes.enqueue(block_hash);
|
||||
Ok(())
|
||||
} else {
|
||||
Err(eyre::eyre!(
|
||||
"block {block_number} still not accepted after backfill: {:?}",
|
||||
retry_status.status
|
||||
))
|
||||
}
|
||||
}
|
||||
PayloadStatusEnum::Invalid { .. } => {
|
||||
warn!(
|
||||
target: "consensus::debug-client",
|
||||
%block_number,
|
||||
%block_hash,
|
||||
?status,
|
||||
"newPayload returned Invalid"
|
||||
);
|
||||
Err(eyre::eyre!("block {block_number} returned Invalid"))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Looks up a block hash from the ring buffer at the given offset from the most recent entry.
|
||||
|
||||
Reference in New Issue
Block a user