mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-04-08 03:01:12 -04:00
refactor(db): use hashed state as canonical state representation (#21115)
Co-authored-by: Amp <amp@ampcode.com> Co-authored-by: Dan Cline <6798349+Rjected@users.noreply.github.com> Co-authored-by: joshieDo <93316087+joshieDo@users.noreply.github.com>
This commit is contained in:
committed by
GitHub
parent
7ff78ca082
commit
121160d248
@@ -20,7 +20,7 @@ use reth_node_types::{BlockTy, NodeTypes};
|
||||
use reth_payload_builder::PayloadBuilderHandle;
|
||||
use reth_provider::{
|
||||
providers::{BlockchainProvider, ProviderNodeTypes},
|
||||
ProviderFactory,
|
||||
ProviderFactory, StorageSettingsCache,
|
||||
};
|
||||
use reth_prune::PrunerWithFactory;
|
||||
use reth_stages_api::{MetricEventsSender, Pipeline};
|
||||
@@ -94,6 +94,7 @@ where
|
||||
if chain_spec.is_optimism() { EngineApiKind::OpStack } else { EngineApiKind::Ethereum };
|
||||
|
||||
let downloader = BasicBlockDownloader::new(client, consensus.clone());
|
||||
let use_hashed_state = provider.cached_storage_settings().use_hashed_state;
|
||||
|
||||
let persistence_handle =
|
||||
PersistenceHandle::<N::Primitives>::spawn_service(provider, pruner, sync_metrics_tx);
|
||||
@@ -111,6 +112,7 @@ where
|
||||
engine_kind,
|
||||
evm_config,
|
||||
changeset_cache,
|
||||
use_hashed_state,
|
||||
);
|
||||
|
||||
let engine_handler = EngineApiRequestHandler::new(to_tree_tx, from_tree);
|
||||
|
||||
@@ -143,6 +143,13 @@ test-utils = [
|
||||
"reth-evm-ethereum/test-utils",
|
||||
"reth-tasks/test-utils",
|
||||
]
|
||||
rocksdb = [
|
||||
"reth-provider/rocksdb",
|
||||
"reth-prune/rocksdb",
|
||||
"reth-stages?/rocksdb",
|
||||
"reth-e2e-test-utils/rocksdb",
|
||||
]
|
||||
edge = ["rocksdb"]
|
||||
|
||||
[[test]]
|
||||
name = "e2e_testsuite"
|
||||
|
||||
@@ -351,6 +351,14 @@ impl<S: StateProvider, const PREWARM: bool> StateProvider for CachedStateProvide
|
||||
self.state_provider.storage(account, storage_key)
|
||||
}
|
||||
}
|
||||
|
||||
fn storage_by_hashed_key(
|
||||
&self,
|
||||
address: Address,
|
||||
hashed_storage_key: StorageKey,
|
||||
) -> ProviderResult<Option<StorageValue>> {
|
||||
self.state_provider.storage_by_hashed_key(address, hashed_storage_key)
|
||||
}
|
||||
}
|
||||
|
||||
impl<S: BytecodeReader, const PREWARM: bool> BytecodeReader for CachedStateProvider<S, PREWARM> {
|
||||
|
||||
@@ -199,6 +199,17 @@ impl<S: StateProvider> StateProvider for InstrumentedStateProvider<S> {
|
||||
self.record_storage_fetch(start.elapsed());
|
||||
res
|
||||
}
|
||||
|
||||
fn storage_by_hashed_key(
|
||||
&self,
|
||||
address: Address,
|
||||
hashed_storage_key: StorageKey,
|
||||
) -> ProviderResult<Option<StorageValue>> {
|
||||
let start = Instant::now();
|
||||
let res = self.state_provider.storage_by_hashed_key(address, hashed_storage_key);
|
||||
self.record_storage_fetch(start.elapsed());
|
||||
res
|
||||
}
|
||||
}
|
||||
|
||||
impl<S: BytecodeReader> BytecodeReader for InstrumentedStateProvider<S> {
|
||||
|
||||
@@ -32,7 +32,7 @@ use reth_provider::{
|
||||
BlockExecutionOutput, BlockExecutionResult, BlockReader, ChangeSetReader,
|
||||
DatabaseProviderFactory, HashedPostStateProvider, ProviderError, StageCheckpointReader,
|
||||
StateProviderBox, StateProviderFactory, StateReader, StorageChangeSetReader,
|
||||
TransactionVariant,
|
||||
StorageSettingsCache, TransactionVariant,
|
||||
};
|
||||
use reth_revm::database::StateProviderDatabase;
|
||||
use reth_stages_api::ControlFlow;
|
||||
@@ -271,6 +271,9 @@ where
|
||||
evm_config: C,
|
||||
/// Changeset cache for in-memory trie changesets
|
||||
changeset_cache: ChangesetCache,
|
||||
/// Whether the node uses hashed state as canonical storage (v2 mode).
|
||||
/// Cached at construction to avoid threading `StorageSettingsCache` bounds everywhere.
|
||||
use_hashed_state: bool,
|
||||
}
|
||||
|
||||
impl<N, P: Debug, T: PayloadTypes + Debug, V: Debug, C> std::fmt::Debug
|
||||
@@ -296,6 +299,7 @@ where
|
||||
.field("engine_kind", &self.engine_kind)
|
||||
.field("evm_config", &self.evm_config)
|
||||
.field("changeset_cache", &self.changeset_cache)
|
||||
.field("use_hashed_state", &self.use_hashed_state)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
@@ -313,7 +317,8 @@ where
|
||||
P::Provider: BlockReader<Block = N::Block, Header = N::BlockHeader>
|
||||
+ StageCheckpointReader
|
||||
+ ChangeSetReader
|
||||
+ StorageChangeSetReader,
|
||||
+ StorageChangeSetReader
|
||||
+ StorageSettingsCache,
|
||||
C: ConfigureEvm<Primitives = N> + 'static,
|
||||
T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>,
|
||||
V: EngineValidator<T>,
|
||||
@@ -334,6 +339,7 @@ where
|
||||
engine_kind: EngineApiKind,
|
||||
evm_config: C,
|
||||
changeset_cache: ChangesetCache,
|
||||
use_hashed_state: bool,
|
||||
) -> Self {
|
||||
let (incoming_tx, incoming) = crossbeam_channel::unbounded();
|
||||
|
||||
@@ -355,6 +361,7 @@ where
|
||||
engine_kind,
|
||||
evm_config,
|
||||
changeset_cache,
|
||||
use_hashed_state,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -375,6 +382,7 @@ where
|
||||
kind: EngineApiKind,
|
||||
evm_config: C,
|
||||
changeset_cache: ChangesetCache,
|
||||
use_hashed_state: bool,
|
||||
) -> (Sender<FromEngine<EngineApiRequest<T, N>, N::Block>>, UnboundedReceiver<EngineApiEvent<N>>)
|
||||
{
|
||||
let best_block_number = provider.best_block_number().unwrap_or(0);
|
||||
@@ -407,6 +415,7 @@ where
|
||||
kind,
|
||||
evm_config,
|
||||
changeset_cache,
|
||||
use_hashed_state,
|
||||
);
|
||||
let incoming = task.incoming_tx.clone();
|
||||
spawn_os_thread("engine", || task.run());
|
||||
@@ -2379,7 +2388,12 @@ where
|
||||
|
||||
self.update_reorg_metrics(old.len(), old_first);
|
||||
self.reinsert_reorged_blocks(new.clone());
|
||||
self.reinsert_reorged_blocks(old.clone());
|
||||
|
||||
// When use_hashed_state is enabled, skip reinserting the old chain — the
|
||||
// bundle state references plain state reverts which don't exist.
|
||||
if !self.use_hashed_state {
|
||||
self.reinsert_reorged_blocks(old.clone());
|
||||
}
|
||||
}
|
||||
|
||||
// update the tracked in-memory state with the new chain
|
||||
|
||||
@@ -1541,6 +1541,7 @@ mod tests {
|
||||
providers::OverlayStateProviderFactory, test_utils::create_test_provider_factory,
|
||||
BlockNumReader, BlockReader, ChangeSetReader, DatabaseProviderFactory, LatestStateProvider,
|
||||
PruneCheckpointReader, StageCheckpointReader, StateProviderBox, StorageChangeSetReader,
|
||||
StorageSettingsCache,
|
||||
};
|
||||
use reth_trie::MultiProof;
|
||||
use reth_trie_db::ChangesetCache;
|
||||
@@ -1562,6 +1563,7 @@ mod tests {
|
||||
+ PruneCheckpointReader
|
||||
+ ChangeSetReader
|
||||
+ StorageChangeSetReader
|
||||
+ StorageSettingsCache
|
||||
+ BlockNumReader,
|
||||
> + Clone
|
||||
+ Send
|
||||
@@ -1581,7 +1583,10 @@ mod tests {
|
||||
fn create_cached_provider<F>(factory: F) -> CachedStateProvider<StateProviderBox>
|
||||
where
|
||||
F: DatabaseProviderFactory<
|
||||
Provider: BlockReader + StageCheckpointReader + PruneCheckpointReader,
|
||||
Provider: BlockReader
|
||||
+ StageCheckpointReader
|
||||
+ PruneCheckpointReader
|
||||
+ reth_provider::StorageSettingsCache,
|
||||
> + Clone
|
||||
+ Send
|
||||
+ 'static,
|
||||
|
||||
@@ -38,7 +38,7 @@ use reth_provider::{
|
||||
providers::OverlayStateProviderFactory, BlockExecutionOutput, BlockNumReader, BlockReader,
|
||||
ChangeSetReader, DatabaseProviderFactory, DatabaseProviderROFactory, HashedPostStateProvider,
|
||||
ProviderError, PruneCheckpointReader, StageCheckpointReader, StateProvider,
|
||||
StateProviderFactory, StateReader, StorageChangeSetReader,
|
||||
StateProviderFactory, StateReader, StorageChangeSetReader, StorageSettingsCache,
|
||||
};
|
||||
use reth_revm::db::{states::bundle_state::BundleRetention, State};
|
||||
use reth_trie::{updates::TrieUpdates, HashedPostState, StateRoot};
|
||||
@@ -146,7 +146,8 @@ where
|
||||
+ PruneCheckpointReader
|
||||
+ ChangeSetReader
|
||||
+ StorageChangeSetReader
|
||||
+ BlockNumReader,
|
||||
+ BlockNumReader
|
||||
+ StorageSettingsCache,
|
||||
> + BlockReader<Header = N::BlockHeader>
|
||||
+ ChangeSetReader
|
||||
+ BlockNumReader
|
||||
@@ -1526,7 +1527,8 @@ where
|
||||
+ PruneCheckpointReader
|
||||
+ ChangeSetReader
|
||||
+ StorageChangeSetReader
|
||||
+ BlockNumReader,
|
||||
+ BlockNumReader
|
||||
+ StorageSettingsCache,
|
||||
> + BlockReader<Header = N::BlockHeader>
|
||||
+ StateProviderFactory
|
||||
+ StateReader
|
||||
|
||||
@@ -221,6 +221,7 @@ impl TestHarness {
|
||||
EngineApiKind::Ethereum,
|
||||
evm_config,
|
||||
changeset_cache,
|
||||
provider.cached_storage_settings().use_hashed_state,
|
||||
);
|
||||
|
||||
let block_builder = TestBlockBuilder::default().with_chain_spec((*chain_spec).clone());
|
||||
|
||||
@@ -2,13 +2,15 @@
|
||||
|
||||
mod fcu_finalized_blocks;
|
||||
|
||||
use alloy_rpc_types_engine::PayloadStatusEnum;
|
||||
use eyre::Result;
|
||||
use reth_chainspec::{ChainSpecBuilder, MAINNET};
|
||||
use reth_e2e_test_utils::testsuite::{
|
||||
actions::{
|
||||
CaptureBlock, CompareNodeChainTips, CreateFork, ExpectFcuStatus, MakeCanonical,
|
||||
ProduceBlocks, ProduceBlocksLocally, ProduceInvalidBlocks, ReorgTo, SelectActiveNode,
|
||||
SendNewPayloads, UpdateBlockInfo, ValidateCanonicalTag, WaitForSync,
|
||||
BlockReference, CaptureBlock, CompareNodeChainTips, CreateFork, ExpectFcuStatus,
|
||||
MakeCanonical, ProduceBlocks, ProduceBlocksLocally, ProduceInvalidBlocks, ReorgTo,
|
||||
SelectActiveNode, SendForkchoiceUpdate, SendNewPayloads, SetForkBase, UpdateBlockInfo,
|
||||
ValidateCanonicalTag, WaitForSync,
|
||||
},
|
||||
setup::{NetworkSetup, Setup},
|
||||
TestBuilder,
|
||||
@@ -39,6 +41,14 @@ fn default_engine_tree_setup() -> Setup<EthEngineTypes> {
|
||||
)
|
||||
}
|
||||
|
||||
/// Creates a v2 storage mode setup for engine tree e2e tests.
|
||||
///
|
||||
/// v2 mode uses keccak256-hashed slot keys in static file changesets and rocksdb history
|
||||
/// instead of plain keys in MDBX.
|
||||
fn v2_engine_tree_setup() -> Setup<EthEngineTypes> {
|
||||
default_engine_tree_setup().with_storage_v2()
|
||||
}
|
||||
|
||||
/// Test that verifies forkchoice update and canonical chain insertion functionality.
|
||||
#[tokio::test]
|
||||
async fn test_engine_tree_fcu_canon_chain_insertion_e2e() -> Result<()> {
|
||||
@@ -334,3 +344,152 @@ async fn test_engine_tree_live_sync_transition_eventually_canonical_e2e() -> Res
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// ==================== v2 storage mode variants ====================
|
||||
|
||||
/// v2 variant: Verifies forkchoice update and canonical chain insertion in v2 storage mode.
|
||||
///
|
||||
/// Exercises the full `save_blocks` → `write_state` → static file changeset path with hashed keys.
|
||||
#[tokio::test]
|
||||
async fn test_engine_tree_fcu_canon_chain_insertion_v2_e2e() -> Result<()> {
|
||||
reth_tracing::init_test_tracing();
|
||||
|
||||
let test = TestBuilder::new()
|
||||
.with_setup(v2_engine_tree_setup())
|
||||
.with_action(ProduceBlocks::<EthEngineTypes>::new(1))
|
||||
.with_action(MakeCanonical::new())
|
||||
.with_action(ProduceBlocks::<EthEngineTypes>::new(3))
|
||||
.with_action(MakeCanonical::new());
|
||||
|
||||
test.run::<EthereumNode>().await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// v2 variant: Verifies forkchoice update with a reorg where all blocks are already available.
|
||||
///
|
||||
/// Exercises `write_state_reverts` path with hashed changeset keys during CL-driven reorgs.
|
||||
#[tokio::test]
|
||||
async fn test_engine_tree_fcu_reorg_with_all_blocks_v2_e2e() -> Result<()> {
|
||||
reth_tracing::init_test_tracing();
|
||||
|
||||
let test = TestBuilder::new()
|
||||
.with_setup(v2_engine_tree_setup())
|
||||
.with_action(ProduceBlocks::<EthEngineTypes>::new(5))
|
||||
.with_action(MakeCanonical::new())
|
||||
.with_action(CreateFork::<EthEngineTypes>::new(2, 3))
|
||||
.with_action(CaptureBlock::new("fork_tip"))
|
||||
.with_action(ReorgTo::<EthEngineTypes>::new_from_tag("fork_tip"));
|
||||
|
||||
test.run::<EthereumNode>().await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// v2 variant: Verifies progressive canonical chain extension in v2 storage mode.
|
||||
#[tokio::test]
|
||||
async fn test_engine_tree_fcu_extends_canon_chain_v2_e2e() -> Result<()> {
|
||||
reth_tracing::init_test_tracing();
|
||||
|
||||
let test = TestBuilder::new()
|
||||
.with_setup(v2_engine_tree_setup())
|
||||
.with_action(ProduceBlocks::<EthEngineTypes>::new(1))
|
||||
.with_action(MakeCanonical::new())
|
||||
.with_action(ProduceBlocks::<EthEngineTypes>::new(10))
|
||||
.with_action(CaptureBlock::new("target_block"))
|
||||
.with_action(ReorgTo::<EthEngineTypes>::new_from_tag("target_block"))
|
||||
.with_action(MakeCanonical::new());
|
||||
|
||||
test.run::<EthereumNode>().await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Creates a 2-node setup for disk-level reorg testing.
|
||||
///
|
||||
/// Uses unconnected nodes so fork blocks can be produced independently on Node 1 and then
|
||||
/// sent to Node 0 via newPayload only (no FCU), keeping Node 0's persisted chain intact
|
||||
/// until the final `ReorgTo` triggers `find_disk_reorg`.
|
||||
fn disk_reorg_setup(storage_v2: bool) -> Setup<EthEngineTypes> {
|
||||
let mut setup = Setup::default()
|
||||
.with_chain_spec(Arc::new(
|
||||
ChainSpecBuilder::default()
|
||||
.chain(MAINNET.chain)
|
||||
.genesis(
|
||||
serde_json::from_str(include_str!(
|
||||
"../../../../e2e-test-utils/src/testsuite/assets/genesis.json"
|
||||
))
|
||||
.unwrap(),
|
||||
)
|
||||
.cancun_activated()
|
||||
.build(),
|
||||
))
|
||||
.with_network(NetworkSetup::multi_node_unconnected(2))
|
||||
.with_tree_config(
|
||||
TreeConfig::default().with_legacy_state_root(false).with_has_enough_parallelism(true),
|
||||
);
|
||||
if storage_v2 {
|
||||
setup = setup.with_storage_v2();
|
||||
}
|
||||
setup
|
||||
}
|
||||
|
||||
/// Builds a disk-level reorg test scenario.
|
||||
///
|
||||
/// 1. Both nodes receive 3 shared blocks
|
||||
/// 2. Node 0 extends to 10 blocks locally (persisted to disk)
|
||||
/// 3. Node 1 builds an 8-block fork from block 3 (its canonical head)
|
||||
/// 4. Fork blocks are sent to Node 0 via newPayload (no FCU, old chain stays on disk)
|
||||
/// 5. FCU to fork tip on Node 0 triggers `find_disk_reorg` → `RemoveBlocksAbove(3)`
|
||||
fn disk_reorg_test(storage_v2: bool) -> TestBuilder<EthEngineTypes> {
|
||||
TestBuilder::new()
|
||||
.with_setup(disk_reorg_setup(storage_v2))
|
||||
.with_action(SelectActiveNode::new(0))
|
||||
.with_action(ProduceBlocks::<EthEngineTypes>::new(3))
|
||||
.with_action(MakeCanonical::new())
|
||||
.with_action(ProduceBlocksLocally::<EthEngineTypes>::new(7))
|
||||
.with_action(MakeCanonical::with_active_node())
|
||||
.with_action(SelectActiveNode::new(1))
|
||||
.with_action(SetForkBase::new(3))
|
||||
.with_action(ProduceBlocksLocally::<EthEngineTypes>::new(8))
|
||||
.with_action(MakeCanonical::with_active_node())
|
||||
.with_action(CaptureBlock::new("fork_tip"))
|
||||
.with_action(
|
||||
SendNewPayloads::<EthEngineTypes>::new()
|
||||
.with_source_node(1)
|
||||
.with_target_node(0)
|
||||
.with_start_block(4)
|
||||
.with_total_blocks(8),
|
||||
)
|
||||
.with_action(
|
||||
SendForkchoiceUpdate::<EthEngineTypes>::new(
|
||||
BlockReference::Tag("fork_tip".into()),
|
||||
BlockReference::Tag("fork_tip".into()),
|
||||
BlockReference::Tag("fork_tip".into()),
|
||||
)
|
||||
.with_expected_status(PayloadStatusEnum::Valid)
|
||||
.with_node_idx(0),
|
||||
)
|
||||
}
|
||||
|
||||
/// Verifies disk-level reorg in v1 (plain key) storage mode.
|
||||
///
|
||||
/// Confirms `find_disk_reorg()` detects persisted blocks on the wrong fork and calls
|
||||
/// `RemoveBlocksAbove` to truncate, then re-persists the correct fork chain.
|
||||
#[tokio::test]
|
||||
async fn test_engine_tree_disk_reorg_v1_e2e() -> Result<()> {
|
||||
reth_tracing::init_test_tracing();
|
||||
disk_reorg_test(false).run::<EthereumNode>().await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// v2 variant: Verifies disk-level reorg in v2 storage mode.
|
||||
///
|
||||
/// Same scenario as v1 but with hashed changeset keys in static files and rocksdb history.
|
||||
/// Exercises `find_disk_reorg()` → `RemoveBlocksAbove` with v2 hashed key format.
|
||||
#[tokio::test]
|
||||
async fn test_engine_tree_disk_reorg_v2_e2e() -> Result<()> {
|
||||
reth_tracing::init_test_tracing();
|
||||
disk_reorg_test(true).run::<EthereumNode>().await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user