mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-02-04 12:05:12 -05:00
feat: handle disconnected blocks (#9981)
This commit is contained in:
@@ -28,7 +28,8 @@ use reth_primitives::{
|
||||
SealedBlockWithSenders, SealedHeader, B256, U256,
|
||||
};
|
||||
use reth_provider::{
|
||||
BlockReader, ExecutionOutcome, ProviderError, StateProviderFactory, StateRootProvider,
|
||||
BlockReader, ExecutionOutcome, ProviderError, StateProviderBox, StateProviderFactory,
|
||||
StateRootProvider,
|
||||
};
|
||||
use reth_revm::database::StateProviderDatabase;
|
||||
use reth_rpc_types::{
|
||||
@@ -103,6 +104,22 @@ impl TreeState {
|
||||
.map(|executed_block| executed_block.block.clone())
|
||||
}
|
||||
|
||||
/// Returns all available blocks for the given hash that lead back to the canonical chain, from
|
||||
/// newest to oldest. And the parent hash of the oldest block that is missing from the buffer.
|
||||
///
|
||||
/// Returns `None` if the block for the given hash is not found.
|
||||
fn blocks_by_hash(&self, hash: B256) -> Option<(B256, Vec<ExecutedBlock>)> {
|
||||
let block = self.blocks_by_hash.get(&hash).cloned()?;
|
||||
let mut parent_hash = block.block().parent_hash;
|
||||
let mut blocks = vec![block];
|
||||
while let Some(executed) = self.blocks_by_hash.get(&parent_hash) {
|
||||
parent_hash = executed.block.parent_hash;
|
||||
blocks.push(executed.clone());
|
||||
}
|
||||
|
||||
Some((parent_hash, blocks))
|
||||
}
|
||||
|
||||
/// Insert executed block into the state.
|
||||
fn insert_executed(&mut self, executed: ExecutedBlock) {
|
||||
let hash = executed.block.hash();
|
||||
@@ -826,21 +843,35 @@ where
|
||||
Ok(block)
|
||||
}
|
||||
|
||||
/// Return state provider with reference to in-memory blocks that overlay database state.
|
||||
/// Returns the state provider for the requested block hash.
|
||||
///
|
||||
/// This merges the state of all blocks that are part of the chain that the requested block is
|
||||
/// the head of. This includes all blocks that connect back to the canonical block on disk.
|
||||
// TODO: return error if the chain has gaps
|
||||
fn state_provider(&self, hash: B256) -> ProviderResult<MemoryOverlayStateProvider> {
|
||||
let mut in_memory = Vec::new();
|
||||
let mut parent_hash = hash;
|
||||
while let Some(executed) = self.state.tree_state.blocks_by_hash.get(&parent_hash) {
|
||||
parent_hash = executed.block.parent_hash;
|
||||
in_memory.push(executed.clone());
|
||||
/// the head of and are not yet persisted on disk. This includes all blocks that connect back to
|
||||
/// a canonical block on disk.
|
||||
///
|
||||
/// Returns `None` if the state for the requested hash is not found, this happens if the
|
||||
/// requested state belongs to a block that is not connected to the canonical chain.
|
||||
///
|
||||
/// Returns an error if we failed to fetch the state from the database.
|
||||
fn state_provider(&self, hash: B256) -> ProviderResult<Option<StateProviderBox>> {
|
||||
if let Some((historical, blocks)) = self.state.tree_state.blocks_by_hash(hash) {
|
||||
trace!(target: "engine", %hash, "found canonical state for block in memory");
|
||||
// the block leads back to the canonical chain
|
||||
let historical = self.provider.state_by_block_hash(historical)?;
|
||||
return Ok(Some(Box::new(MemoryOverlayStateProvider::new(blocks, historical))))
|
||||
}
|
||||
|
||||
let historical = self.provider.state_by_block_hash(parent_hash)?;
|
||||
Ok(MemoryOverlayStateProvider::new(in_memory, historical))
|
||||
// the hash could belong to an unknown block or a persisted block
|
||||
if let Some(header) = self.provider.header(&hash)? {
|
||||
trace!(target: "engine", %hash, number = %header.number, "found canonical state for block in database");
|
||||
// the block is known and persisted
|
||||
let historical = self.provider.state_by_block_hash(hash)?;
|
||||
return Ok(Some(historical))
|
||||
}
|
||||
|
||||
trace!(target: "engine", %hash, "no canonical state found for block");
|
||||
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
/// Return the parent hash of the lowest buffered ancestor for the requested block, if there
|
||||
@@ -1259,7 +1290,24 @@ where
|
||||
// validate block consensus rules
|
||||
self.validate_block(&block)?;
|
||||
|
||||
let state_provider = self.state_provider(block.parent_hash)?;
|
||||
let Some(state_provider) = self.state_provider(block.parent_hash)? else {
|
||||
// we don't have the state required to execute this block, buffering it and find the
|
||||
// missing parent block
|
||||
let missing_ancestor = self
|
||||
.state
|
||||
.buffer
|
||||
.lowest_ancestor(&block.parent_hash)
|
||||
.map(|block| block.parent_num_hash())
|
||||
.unwrap_or_else(|| block.parent_num_hash());
|
||||
|
||||
self.state.buffer.insert_block(block);
|
||||
|
||||
return Ok(InsertPayloadOk::Inserted(BlockStatus::Disconnected {
|
||||
head: self.state.tree_state.current_canonical_head,
|
||||
missing_ancestor,
|
||||
}))
|
||||
};
|
||||
|
||||
let executor = self.executor_provider.executor(StateProviderDatabase::new(&state_provider));
|
||||
|
||||
let block_number = block.number;
|
||||
@@ -1983,6 +2031,45 @@ mod tests {
|
||||
assert!(resp.payload_status.is_syncing());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_disconnected_payload() {
|
||||
let s = include_str!("../../test-data/holesky/2.rlp");
|
||||
let data = Bytes::from_str(s).unwrap();
|
||||
let block = Block::decode(&mut data.as_ref()).unwrap();
|
||||
let sealed = block.seal_slow();
|
||||
let hash = sealed.hash();
|
||||
let payload = block_to_payload_v1(sealed.clone());
|
||||
|
||||
let mut test_harness = TestHarness::new(HOLESKY.clone());
|
||||
|
||||
let outcome = test_harness.tree.on_new_payload(payload.into(), None).unwrap();
|
||||
assert!(outcome.outcome.is_syncing());
|
||||
|
||||
// ensure block is buffered
|
||||
let buffered = test_harness.tree.state.buffer.block(&hash).unwrap();
|
||||
assert_eq!(buffered.block, sealed);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_disconnected_block() {
|
||||
let s = include_str!("../../test-data/holesky/2.rlp");
|
||||
let data = Bytes::from_str(s).unwrap();
|
||||
let block = Block::decode(&mut data.as_ref()).unwrap();
|
||||
let sealed = block.seal_slow();
|
||||
let hash = sealed.hash();
|
||||
|
||||
let mut test_harness = TestHarness::new(HOLESKY.clone());
|
||||
|
||||
let outcome = test_harness.tree.insert_block_without_senders(sealed.clone()).unwrap();
|
||||
assert_eq!(
|
||||
outcome,
|
||||
InsertPayloadOk::Inserted(BlockStatus::Disconnected {
|
||||
head: test_harness.tree.state.tree_state.current_canonical_head,
|
||||
missing_ancestor: sealed.parent_num_hash()
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_holesky_payload() {
|
||||
let s = include_str!("../../test-data/holesky/1.rlp");
|
||||
|
||||
Reference in New Issue
Block a user