revert: undo Chain crate, add LazyTrieData to trie-common (#21155)

This commit is contained in:
Matthias Seitz
2026-01-17 16:57:09 +01:00
committed by GitHub
parent 1ea574417f
commit 40bc9d3860
39 changed files with 601 additions and 830 deletions

39
Cargo.lock generated
View File

@@ -7774,30 +7774,6 @@ dependencies = [
"tracing",
]
[[package]]
name = "reth-chain"
version = "1.10.0"
dependencies = [
"alloy-consensus",
"alloy-eips",
"alloy-primitives",
"arbitrary",
"bincode 1.3.3",
"metrics",
"parking_lot",
"rand 0.9.2",
"reth-ethereum-primitives",
"reth-execution-types",
"reth-metrics",
"reth-primitives-traits",
"reth-trie",
"reth-trie-common",
"revm",
"serde",
"serde_with",
"tracing",
]
[[package]]
name = "reth-chain-state"
version = "1.10.0"
@@ -7813,7 +7789,6 @@ dependencies = [
"parking_lot",
"pin-project",
"rand 0.9.2",
"reth-chain",
"reth-chainspec",
"reth-errors",
"reth-ethereum-primitives",
@@ -8952,6 +8927,7 @@ dependencies = [
name = "reth-execution-types"
version = "1.10.0"
dependencies = [
"alloy-consensus",
"alloy-eips",
"alloy-evm",
"alloy-primitives",
@@ -9005,7 +8981,6 @@ dependencies = [
"reth-trie-common",
"rmp-serde",
"secp256k1 0.30.0",
"serde_with",
"tempfile",
"thiserror 2.0.17",
"tokio",
@@ -9020,7 +8995,6 @@ dependencies = [
"alloy-eips",
"eyre",
"futures-util",
"reth-chain",
"reth-chainspec",
"reth-config",
"reth-consensus",
@@ -9028,6 +9002,7 @@ dependencies = [
"reth-db-common",
"reth-ethereum-primitives",
"reth-evm-ethereum",
"reth-execution-types",
"reth-exex",
"reth-network",
"reth-node-api",
@@ -9053,9 +9028,9 @@ dependencies = [
"arbitrary",
"bincode 1.3.3",
"rand 0.9.2",
"reth-chain",
"reth-chain-state",
"reth-ethereum-primitives",
"reth-execution-types",
"reth-primitives-traits",
"serde",
"serde_with",
@@ -9798,7 +9773,6 @@ dependencies = [
"op-alloy-consensus",
"op-alloy-rpc-types-engine",
"op-revm",
"reth-chain",
"reth-chainspec",
"reth-evm",
"reth-execution-errors",
@@ -10249,7 +10223,6 @@ dependencies = [
"parking_lot",
"rand 0.9.2",
"rayon",
"reth-chain",
"reth-chain-state",
"reth-chainspec",
"reth-codecs",
@@ -10739,12 +10712,12 @@ dependencies = [
"metrics",
"rand 0.9.2",
"reqwest",
"reth-chain",
"reth-chain-state",
"reth-chainspec",
"reth-errors",
"reth-ethereum-primitives",
"reth-evm",
"reth-execution-types",
"reth-metrics",
"reth-primitives-traits",
"reth-revm",
@@ -10817,7 +10790,6 @@ dependencies = [
"rand 0.9.2",
"rayon",
"reqwest",
"reth-chain",
"reth-chainspec",
"reth-codecs",
"reth-config",
@@ -10833,6 +10805,7 @@ dependencies = [
"reth-etl",
"reth-evm",
"reth-evm-ethereum",
"reth-execution-types",
"reth-exex",
"reth-fs-util",
"reth-network-p2p",
@@ -10979,7 +10952,6 @@ dependencies = [
"alloy-primitives",
"alloy-rpc-types-engine",
"auto_impl",
"reth-chain",
"reth-chainspec",
"reth-db-api",
"reth-db-models",
@@ -11138,7 +11110,6 @@ dependencies = [
"proptest",
"proptest-arbitrary-interop",
"rand 0.9.2",
"reth-chain",
"reth-chain-state",
"reth-chainspec",
"reth-eth-wire-types",

View File

@@ -44,7 +44,6 @@ members = [
"crates/ethereum/primitives/",
"crates/ethereum/reth/",
"crates/etl/",
"crates/evm/chain",
"crates/evm/evm",
"crates/evm/execution-errors",
"crates/evm/execution-types",
@@ -388,7 +387,6 @@ reth-etl = { path = "crates/etl" }
reth-evm = { path = "crates/evm/evm", default-features = false }
reth-evm-ethereum = { path = "crates/ethereum/evm", default-features = false }
reth-optimism-evm = { path = "crates/optimism/evm", default-features = false }
reth-chain = { path = "crates/evm/chain" }
reth-execution-errors = { path = "crates/evm/execution-errors", default-features = false }
reth-execution-types = { path = "crates/evm/execution-types", default-features = false }
reth-exex = { path = "crates/exex/exex" }

View File

@@ -13,7 +13,6 @@ workspace = true
[dependencies]
# reth
reth-chain.workspace = true
reth-chainspec.workspace = true
reth-errors.workspace = true
reth-execution-types.workspace = true
@@ -66,7 +65,6 @@ serde = [
"alloy-primitives/serde",
"parking_lot/serde",
"rand?/serde",
"reth-chain/serde",
"reth-ethereum-primitives/serde",
"reth-execution-types/serde",
"reth-primitives-traits/serde",

View File

@@ -8,7 +8,6 @@ use reth_trie::{
use std::{
fmt,
sync::{Arc, LazyLock},
vec::Vec,
};
use tracing::instrument;
@@ -23,6 +22,72 @@ pub struct DeferredTrieData {
state: Arc<Mutex<DeferredState>>,
}
/// Sorted trie data computed for an executed block.
/// These represent the complete set of sorted trie data required to persist
/// block state for, and generate proofs on top of, a block.
#[derive(Clone, Debug, Default)]
pub struct ComputedTrieData {
/// Sorted hashed post-state produced by execution.
pub hashed_state: Arc<HashedPostStateSorted>,
/// Sorted trie updates produced by state root computation.
pub trie_updates: Arc<TrieUpdatesSorted>,
/// Trie input bundled with its anchor hash, if available.
pub anchored_trie_input: Option<AnchoredTrieInput>,
}
/// Trie input bundled with its anchor hash.
///
/// The `trie_input` contains the **cumulative** overlay of all in-memory ancestor blocks,
/// not just this block's changes. Child blocks reuse the parent's overlay in O(1) by
/// cloning the Arc-wrapped data.
///
/// The `anchor_hash` is metadata indicating which persisted base state this overlay
/// sits on top of. It is CRITICAL for overlay reuse decisions: an overlay built on top
/// of Anchor A cannot be reused for a block anchored to Anchor B, as it would result
/// in an incorrect state.
#[derive(Clone, Debug)]
pub struct AnchoredTrieInput {
/// The persisted ancestor hash this trie input is anchored to.
pub anchor_hash: B256,
/// Cumulative trie input overlay from all in-memory ancestors.
pub trie_input: Arc<TrieInputSorted>,
}
/// Metrics for deferred trie computation.
#[derive(Metrics)]
#[metrics(scope = "sync.block_validation")]
struct DeferredTrieMetrics {
/// Number of times deferred trie data was ready (async task completed first).
deferred_trie_async_ready: Counter,
/// Number of times deferred trie data required synchronous computation (fallback path).
deferred_trie_sync_fallback: Counter,
}
static DEFERRED_TRIE_METRICS: LazyLock<DeferredTrieMetrics> =
LazyLock::new(DeferredTrieMetrics::default);
/// Internal state for deferred trie data.
enum DeferredState {
/// Data is not yet available; raw inputs stored for fallback computation.
/// Wrapped in `Option` to allow taking ownership during computation.
Pending(Option<PendingInputs>),
/// Data has been computed and is ready.
Ready(ComputedTrieData),
}
/// Inputs kept while a deferred trie computation is pending.
#[derive(Clone, Debug)]
struct PendingInputs {
/// Unsorted hashed post-state from execution.
hashed_state: Arc<HashedPostState>,
/// Unsorted trie updates from state root computation.
trie_updates: Arc<TrieUpdates>,
/// The persisted ancestor hash this trie input is anchored to.
anchor_hash: B256,
/// Deferred trie data from ancestor blocks for merging.
ancestors: Vec<DeferredTrieData>,
}
impl fmt::Debug for DeferredTrieData {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let state = self.state.lock();
@@ -233,19 +298,6 @@ impl DeferredTrieData {
}
}
/// Sorted trie data computed for an executed block.
/// These represent the complete set of sorted trie data required to persist
/// block state for, and generate proofs on top of, a block.
#[derive(Clone, Debug, Default)]
pub struct ComputedTrieData {
/// Sorted hashed post-state produced by execution.
pub hashed_state: Arc<HashedPostStateSorted>,
/// Sorted trie updates produced by state root computation.
pub trie_updates: Arc<TrieUpdatesSorted>,
/// Trie input bundled with its anchor hash, if available.
pub anchored_trie_input: Option<AnchoredTrieInput>,
}
impl ComputedTrieData {
/// Construct a bundle that includes trie input anchored to a persisted ancestor.
pub const fn with_trie_input(
@@ -287,59 +339,6 @@ impl ComputedTrieData {
}
}
/// Trie input bundled with its anchor hash.
///
/// The `trie_input` contains the **cumulative** overlay of all in-memory ancestor blocks,
/// not just this block's changes. Child blocks reuse the parent's overlay in O(1) by
/// cloning the Arc-wrapped data.
///
/// The `anchor_hash` is metadata indicating which persisted base state this overlay
/// sits on top of. It is CRITICAL for overlay reuse decisions: an overlay built on top
/// of Anchor A cannot be reused for a block anchored to Anchor B, as it would result
/// in an incorrect state.
#[derive(Clone, Debug)]
pub struct AnchoredTrieInput {
/// The persisted ancestor hash this trie input is anchored to.
pub anchor_hash: B256,
/// Cumulative trie input overlay from all in-memory ancestors.
pub trie_input: Arc<TrieInputSorted>,
}
/// Metrics for deferred trie computation.
#[derive(Metrics)]
#[metrics(scope = "sync.block_validation")]
struct DeferredTrieMetrics {
/// Number of times deferred trie data was ready (async task completed first).
deferred_trie_async_ready: Counter,
/// Number of times deferred trie data required synchronous computation (fallback path).
deferred_trie_sync_fallback: Counter,
}
static DEFERRED_TRIE_METRICS: LazyLock<DeferredTrieMetrics> =
LazyLock::new(DeferredTrieMetrics::default);
/// Internal state for deferred trie data.
enum DeferredState {
/// Data is not yet available; raw inputs stored for fallback computation.
/// Wrapped in `Option` to allow taking ownership during computation.
Pending(Option<PendingInputs>),
/// Data has been computed and is ready.
Ready(ComputedTrieData),
}
/// Inputs kept while a deferred trie computation is pending.
#[derive(Clone, Debug)]
struct PendingInputs {
/// Unsorted hashed post-state from execution.
hashed_state: Arc<HashedPostState>,
/// Unsorted trie updates from state root computation.
trie_updates: Arc<TrieUpdates>,
/// The persisted ancestor hash this trie input is anchored to.
anchor_hash: B256,
/// Deferred trie data from ancestor blocks for merging.
ancestors: Vec<DeferredTrieData>,
}
#[cfg(test)]
mod tests {
use super::*;

View File

@@ -8,17 +8,16 @@ use alloy_consensus::{transaction::TransactionMeta, BlockHeader};
use alloy_eips::{BlockHashOrNumber, BlockNumHash};
use alloy_primitives::{map::HashMap, BlockNumber, TxHash, B256};
use parking_lot::RwLock;
use reth_chain::Chain;
use reth_chainspec::ChainInfo;
use reth_ethereum_primitives::EthPrimitives;
use reth_execution_types::{BlockExecutionOutput, BlockExecutionResult, ExecutionOutcome};
use reth_execution_types::{BlockExecutionOutput, BlockExecutionResult, Chain, ExecutionOutcome};
use reth_metrics::{metrics::Gauge, Metrics};
use reth_primitives_traits::{
BlockBody as _, IndexedTx, NodePrimitives, RecoveredBlock, SealedBlock, SealedHeader,
SignedTransaction,
};
use reth_storage_api::StateProviderBox;
use reth_trie::{updates::TrieUpdatesSorted, HashedPostStateSorted, TrieInputSorted};
use reth_trie::{updates::TrieUpdatesSorted, HashedPostStateSorted, LazyTrieData, TrieInputSorted};
use std::{collections::BTreeMap, sync::Arc, time::Instant};
use tokio::sync::{broadcast, watch};
@@ -945,29 +944,26 @@ impl<N: NodePrimitives<SignedTx: SignedTransaction>> NewCanonicalChain<N> {
}
/// Converts a slice of executed blocks into a [`Chain`].
///
/// Uses [`ExecutedBlock::trie_data_handle`] to avoid blocking on deferred trie computations.
/// The trie data will be computed lazily when actually needed by consumers.
fn blocks_to_chain(blocks: &[ExecutedBlock<N>]) -> Chain<N> {
match blocks {
[] => Chain::default(),
[first, rest @ ..] => {
let mut chain = Chain::from_block(
first.recovered_block().clone(),
ExecutionOutcome::single(
first.block_number(),
ExecutionOutcome::from((
first.execution_outcome().clone(),
),
first.trie_data_handle(),
first.block_number(),
)),
LazyTrieData::ready(first.hashed_state(), first.trie_updates()),
);
for exec in rest {
chain.append_block(
exec.recovered_block().clone(),
ExecutionOutcome::single(
exec.block_number(),
ExecutionOutcome::from((
exec.execution_outcome().clone(),
),
exec.trie_data_handle(),
exec.block_number(),
)),
LazyTrieData::ready(exec.hashed_state(), exec.trie_updates()),
);
}
chain
@@ -1544,15 +1540,12 @@ mod tests {
// Test commit notification
let chain_commit = NewCanonicalChain::Commit { new: vec![block0.clone(), block1.clone()] };
// Build expected trie updates map
let mut expected_trie_updates = BTreeMap::new();
expected_trie_updates.insert(0, block0.trie_updates());
expected_trie_updates.insert(1, block1.trie_updates());
// Build expected hashed state map
let mut expected_hashed_state = BTreeMap::new();
expected_hashed_state.insert(0, block0.hashed_state());
expected_hashed_state.insert(1, block1.hashed_state());
// Build expected trie data map
let mut expected_trie_data = BTreeMap::new();
expected_trie_data
.insert(0, LazyTrieData::ready(block0.hashed_state(), block0.trie_updates()));
expected_trie_data
.insert(1, LazyTrieData::ready(block1.hashed_state(), block1.trie_updates()));
// Build expected execution outcome (first_block matches first block number)
let commit_execution_outcome = ExecutionOutcome {
@@ -1562,30 +1555,16 @@ mod tests {
..Default::default()
};
// Get the notification and verify
let notification = chain_commit.to_chain_notification();
let CanonStateNotification::Commit { new } = notification else {
panic!("Expected Commit notification");
};
// Compare blocks
let expected_blocks: Vec<_> =
vec![block0.recovered_block().clone(), block1.recovered_block().clone()];
let actual_blocks: Vec<_> = new.blocks().values().cloned().collect();
assert_eq!(actual_blocks, expected_blocks);
// Compare execution outcome
assert_eq!(*new.execution_outcome(), commit_execution_outcome);
// Compare trie data by waiting on deferred data
for (block_num, expected_updates) in &expected_trie_updates {
let actual = new.trie_data_at(*block_num).unwrap().wait_cloned();
assert_eq!(actual.trie_updates, *expected_updates);
}
for (block_num, expected_state) in &expected_hashed_state {
let actual = new.trie_data_at(*block_num).unwrap().wait_cloned();
assert_eq!(actual.hashed_state, *expected_state);
}
assert_eq!(
chain_commit.to_chain_notification(),
CanonStateNotification::Commit {
new: Arc::new(Chain::new(
vec![block0.recovered_block().clone(), block1.recovered_block().clone()],
commit_execution_outcome,
expected_trie_data,
))
}
);
// Test reorg notification
let chain_reorg = NewCanonicalChain::Reorg {
@@ -1593,25 +1572,17 @@ mod tests {
old: vec![block1.clone(), block2.clone()],
};
// Build expected trie updates for old chain
let mut old_trie_updates = BTreeMap::new();
old_trie_updates.insert(1, block1.trie_updates());
old_trie_updates.insert(2, block2.trie_updates());
// Build expected trie data for old chain
let mut old_trie_data = BTreeMap::new();
old_trie_data.insert(1, LazyTrieData::ready(block1.hashed_state(), block1.trie_updates()));
old_trie_data.insert(2, LazyTrieData::ready(block2.hashed_state(), block2.trie_updates()));
// Build expected trie updates for new chain
let mut new_trie_updates = BTreeMap::new();
new_trie_updates.insert(1, block1a.trie_updates());
new_trie_updates.insert(2, block2a.trie_updates());
// Build expected hashed state for old chain
let mut old_hashed_state = BTreeMap::new();
old_hashed_state.insert(1, block1.hashed_state());
old_hashed_state.insert(2, block2.hashed_state());
// Build expected hashed state for new chain
let mut new_hashed_state = BTreeMap::new();
new_hashed_state.insert(1, block1a.hashed_state());
new_hashed_state.insert(2, block2a.hashed_state());
// Build expected trie data for new chain
let mut new_trie_data = BTreeMap::new();
new_trie_data
.insert(1, LazyTrieData::ready(block1a.hashed_state(), block1a.trie_updates()));
new_trie_data
.insert(2, LazyTrieData::ready(block2a.hashed_state(), block2a.trie_updates()));
// Build expected execution outcome for reorg chains (first_block matches first block
// number)
@@ -1622,48 +1593,20 @@ mod tests {
..Default::default()
};
// Get the notification and verify
let notification = chain_reorg.to_chain_notification();
let CanonStateNotification::Reorg { old, new } = notification else {
panic!("Expected Reorg notification");
};
// Compare old chain blocks
let expected_old_blocks: Vec<_> =
vec![block1.recovered_block().clone(), block2.recovered_block().clone()];
let actual_old_blocks: Vec<_> = old.blocks().values().cloned().collect();
assert_eq!(actual_old_blocks, expected_old_blocks);
// Compare old chain execution outcome
assert_eq!(*old.execution_outcome(), reorg_execution_outcome);
// Compare old chain trie data
for (block_num, expected_updates) in &old_trie_updates {
let actual = old.trie_data_at(*block_num).unwrap().wait_cloned();
assert_eq!(actual.trie_updates, *expected_updates);
}
for (block_num, expected_state) in &old_hashed_state {
let actual = old.trie_data_at(*block_num).unwrap().wait_cloned();
assert_eq!(actual.hashed_state, *expected_state);
}
// Compare new chain blocks
let expected_new_blocks: Vec<_> =
vec![block1a.recovered_block().clone(), block2a.recovered_block().clone()];
let actual_new_blocks: Vec<_> = new.blocks().values().cloned().collect();
assert_eq!(actual_new_blocks, expected_new_blocks);
// Compare new chain execution outcome
assert_eq!(*new.execution_outcome(), reorg_execution_outcome);
// Compare new chain trie data
for (block_num, expected_updates) in &new_trie_updates {
let actual = new.trie_data_at(*block_num).unwrap().wait_cloned();
assert_eq!(actual.trie_updates, *expected_updates);
}
for (block_num, expected_state) in &new_hashed_state {
let actual = new.trie_data_at(*block_num).unwrap().wait_cloned();
assert_eq!(actual.hashed_state, *expected_state);
}
assert_eq!(
chain_reorg.to_chain_notification(),
CanonStateNotification::Reorg {
old: Arc::new(Chain::new(
vec![block1.recovered_block().clone(), block2.recovered_block().clone()],
reorg_execution_outcome.clone(),
old_trie_data,
)),
new: Arc::new(Chain::new(
vec![block1a.recovered_block().clone(), block2a.recovered_block().clone()],
reorg_execution_outcome,
new_trie_data,
))
}
);
}
}

View File

@@ -11,8 +11,8 @@
mod in_memory;
pub use in_memory::*;
// Re-export deferred_trie types from reth_chain
pub use reth_chain::{AnchoredTrieInput, ComputedTrieData, DeferredTrieData};
mod deferred_trie;
pub use deferred_trie::*;
mod lazy_overlay;
pub use lazy_overlay::*;

View File

@@ -2,7 +2,7 @@
use alloy_eips::{eip2718::Encodable2718, BlockNumHash};
use derive_more::{Deref, DerefMut};
use reth_chain::{BlockReceipts, Chain};
use reth_execution_types::{BlockReceipts, Chain};
use reth_primitives_traits::{NodePrimitives, RecoveredBlock, SealedHeader};
use reth_storage_api::NodePrimitivesProvider;
use std::{
@@ -80,7 +80,7 @@ impl<N: NodePrimitives> Stream for CanonStateNotificationStream<N> {
///
/// The notification contains at least one [`Chain`] with the imported segment. If some blocks were
/// reverted (e.g. during a reorg), the old chain is also returned.
#[derive(Clone, Debug)]
#[derive(Clone, Debug, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
#[cfg_attr(feature = "serde", serde(bound = ""))]
pub enum CanonStateNotification<N: NodePrimitives = reth_ethereum_primitives::EthPrimitives> {
@@ -285,8 +285,8 @@ mod tests {
// Create a commit notification
let notification = CanonStateNotification::Commit { new: chain.clone() };
// Test that `committed` returns the correct chain (compare Arc pointers)
assert!(Arc::ptr_eq(&notification.committed(), &chain));
// Test that `committed` returns the correct chain
assert_eq!(notification.committed(), chain);
// Test that `reverted` returns None for `Commit`
assert!(notification.reverted().is_none());
@@ -329,11 +329,11 @@ mod tests {
let notification =
CanonStateNotification::Reorg { old: old_chain.clone(), new: new_chain.clone() };
// Test that `reverted` returns the old chain (compare Arc pointers)
assert!(Arc::ptr_eq(&notification.reverted().unwrap(), &old_chain));
// Test that `reverted` returns the old chain
assert_eq!(notification.reverted(), Some(old_chain));
// Test that `committed` returns the new chain (compare Arc pointers)
assert!(Arc::ptr_eq(&notification.committed(), &new_chain));
// Test that `committed` returns the new chain
assert_eq!(notification.committed(), new_chain);
// Test that `tip` returns the tip of the new chain (last block in the new chain)
assert_eq!(*notification.tip(), block3);

View File

@@ -9,12 +9,11 @@ use alloy_signer::SignerSync;
use alloy_signer_local::PrivateKeySigner;
use core::marker::PhantomData;
use rand::Rng;
use reth_chain::Chain;
use reth_chainspec::{ChainSpec, EthereumHardfork, MIN_TRANSACTION_GAS};
use reth_ethereum_primitives::{
Block, BlockBody, EthPrimitives, Receipt, Transaction, TransactionSigned,
};
use reth_execution_types::{BlockExecutionOutput, BlockExecutionResult, ExecutionOutcome};
use reth_execution_types::{BlockExecutionOutput, BlockExecutionResult, Chain, ExecutionOutcome};
use reth_primitives_traits::{
proofs::{calculate_receipt_root, calculate_transaction_root, calculate_withdrawals_root},
Account, NodePrimitives, Recovered, RecoveredBlock, SealedBlock, SealedHeader,

View File

@@ -1,69 +0,0 @@
[package]
name = "reth-chain"
version.workspace = true
edition.workspace = true
rust-version.workspace = true
license.workspace = true
homepage.workspace = true
repository.workspace = true
description = "Chain and deferred trie data types for reth."
[lints]
workspace = true
[dependencies]
reth-ethereum-primitives.workspace = true
reth-execution-types = { workspace = true, features = ["std"] }
reth-metrics.workspace = true
reth-primitives-traits.workspace = true
reth-trie.workspace = true
reth-trie-common.workspace = true
# alloy
alloy-consensus.workspace = true
alloy-primitives.workspace = true
alloy-eips.workspace = true
serde = { workspace = true, optional = true }
serde_with = { workspace = true, optional = true }
metrics.workspace = true
parking_lot.workspace = true
tracing.workspace = true
[dev-dependencies]
reth-primitives-traits = { workspace = true, features = ["test-utils", "arbitrary"] }
reth-ethereum-primitives = { workspace = true, features = ["arbitrary"] }
alloy-primitives = { workspace = true, features = ["rand", "arbitrary"] }
alloy-consensus = { workspace = true, features = ["arbitrary"] }
arbitrary.workspace = true
bincode.workspace = true
rand.workspace = true
revm.workspace = true
[features]
default = []
serde = [
"dep:serde",
"alloy-eips/serde",
"alloy-primitives/serde",
"reth-primitives-traits/serde",
"alloy-consensus/serde",
"reth-trie/serde",
"reth-trie-common/serde",
"reth-ethereum-primitives/serde",
"reth-execution-types/serde",
"rand/serde",
"revm/serde",
"parking_lot/serde",
]
serde-bincode-compat = [
"serde",
"reth-trie-common/serde-bincode-compat",
"reth-primitives-traits/serde-bincode-compat",
"serde_with",
"alloy-eips/serde-bincode-compat",
"alloy-consensus/serde-bincode-compat",
"reth-ethereum-primitives/serde-bincode-compat",
"reth-execution-types/serde-bincode-compat",
]

View File

@@ -1,30 +0,0 @@
//! Chain and deferred trie data types for reth.
//!
//! This crate contains the [`Chain`] type representing a chain of blocks and their final state,
//! as well as [`DeferredTrieData`] for handling asynchronously computed trie data.
#![doc(
html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png",
html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256",
issue_tracker_base_url = "https://github.com/paradigmxyz/reth/issues/"
)]
#![cfg_attr(not(test), warn(unused_crate_dependencies))]
#![cfg_attr(docsrs, feature(doc_cfg))]
mod chain;
pub use chain::*;
mod deferred_trie;
pub use deferred_trie::*;
/// Bincode-compatible serde implementations for chain types.
///
/// `bincode` crate doesn't work with optionally serializable serde fields, but some of the
/// chain types require optional serialization for RPC compatibility. This module makes so that
/// all fields are serialized.
///
/// Read more: <https://github.com/bincode-org/bincode/issues/326>
#[cfg(feature = "serde-bincode-compat")]
pub mod serde_bincode_compat {
pub use super::chain::serde_bincode_compat::*;
}

View File

@@ -19,6 +19,7 @@ revm.workspace = true
# alloy
alloy-evm.workspace = true
alloy-consensus.workspace = true
alloy-primitives.workspace = true
alloy-eips.workspace = true
@@ -44,6 +45,7 @@ serde = [
"alloy-eips/serde",
"alloy-primitives/serde",
"reth-primitives-traits/serde",
"alloy-consensus/serde",
"reth-trie-common/serde",
"reth-ethereum-primitives/serde",
]
@@ -53,6 +55,7 @@ serde-bincode-compat = [
"reth-primitives-traits/serde-bincode-compat",
"serde_with",
"alloy-eips/serde-bincode-compat",
"alloy-consensus/serde-bincode-compat",
"reth-ethereum-primitives/serde-bincode-compat",
]
std = [
@@ -61,6 +64,7 @@ std = [
"revm/std",
"serde?/std",
"reth-primitives-traits/std",
"alloy-consensus/std",
"serde_with?/std",
"derive_more/std",
"reth-ethereum-primitives/std",

View File

@@ -1,16 +1,16 @@
//! Contains [Chain], a chain of blocks and their final state.
use crate::DeferredTrieData;
use crate::ExecutionOutcome;
use alloc::{borrow::Cow, collections::BTreeMap, vec::Vec};
use alloy_consensus::{transaction::Recovered, BlockHeader};
use alloy_eips::{eip1898::ForkBlock, eip2718::Encodable2718, BlockNumHash};
use alloy_primitives::{Address, BlockHash, BlockNumber, TxHash};
use reth_execution_types::ExecutionOutcome;
use core::{fmt, ops::RangeInclusive};
use reth_primitives_traits::{
transaction::signed::SignedTransaction, Block, BlockBody, IndexedTx, NodePrimitives,
RecoveredBlock, SealedHeader,
};
use reth_trie_common::{updates::TrieUpdatesSorted, HashedPostStateSorted};
use std::{borrow::Cow, collections::BTreeMap, fmt, ops::RangeInclusive, sync::Arc, vec::Vec};
use reth_trie_common::LazyTrieData;
/// A chain of blocks and their final state.
///
@@ -22,7 +22,8 @@ use std::{borrow::Cow, collections::BTreeMap, fmt, ops::RangeInclusive, sync::Ar
/// # Warning
///
/// A chain of blocks should not be empty.
#[derive(Clone, Debug)]
#[derive(Clone, Debug, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub struct Chain<N: NodePrimitives = reth_ethereum_primitives::EthPrimitives> {
/// All blocks in this chain.
blocks: BTreeMap<BlockNumber, RecoveredBlock<N::Block>>,
@@ -33,12 +34,10 @@ pub struct Chain<N: NodePrimitives = reth_ethereum_primitives::EthPrimitives> {
///
/// Additionally, it includes the individual state changes that led to the current state.
execution_outcome: ExecutionOutcome<N::Receipt>,
/// Deferred trie data for each block in the chain, keyed by block number.
/// Lazy trie data for each block in the chain, keyed by block number.
///
/// Contains handles to lazily-computed sorted trie updates and hashed state.
/// This allows Chain to be constructed without blocking on expensive trie
/// computations - the data is only materialized when actually needed.
trie_data: BTreeMap<BlockNumber, DeferredTrieData>,
/// Contains handles to lazily-initialized sorted trie updates and hashed state.
trie_data: BTreeMap<BlockNumber, LazyTrieData>,
}
type ChainTxReceiptMeta<'a, N> = (
@@ -67,7 +66,7 @@ impl<N: NodePrimitives> Chain<N> {
pub fn new(
blocks: impl IntoIterator<Item = RecoveredBlock<N::Block>>,
execution_outcome: ExecutionOutcome<N::Receipt>,
trie_data: BTreeMap<BlockNumber, DeferredTrieData>,
trie_data: BTreeMap<BlockNumber, LazyTrieData>,
) -> Self {
let blocks =
blocks.into_iter().map(|b| (b.header().number(), b)).collect::<BTreeMap<_, _>>();
@@ -80,11 +79,10 @@ impl<N: NodePrimitives> Chain<N> {
pub fn from_block(
block: RecoveredBlock<N::Block>,
execution_outcome: ExecutionOutcome<N::Receipt>,
trie_data: DeferredTrieData,
trie_data: LazyTrieData,
) -> Self {
let block_number = block.header().number();
let trie_data_map = BTreeMap::from([(block_number, trie_data)]);
Self::new([block], execution_outcome, trie_data_map)
Self::new([block], execution_outcome, BTreeMap::from([(block_number, trie_data)]))
}
/// Get the blocks in this chain.
@@ -102,64 +100,21 @@ impl<N: NodePrimitives> Chain<N> {
self.blocks.values().map(|block| block.clone_sealed_header())
}
/// Get all deferred trie data for this chain.
///
/// Returns handles to lazily-computed sorted trie updates and hashed state.
/// [`DeferredTrieData`] allows `Chain` to be constructed without blocking on
/// expensive trie computations - the data is only materialized when actually needed
/// via [`DeferredTrieData::wait_cloned`] or similar methods.
///
/// This method does **not** block. To access the computed trie data, call
/// [`DeferredTrieData::wait_cloned`] on individual entries, which will block
/// if the background computation has not yet completed.
pub const fn trie_data(&self) -> &BTreeMap<BlockNumber, DeferredTrieData> {
/// Get all trie data for this chain.
pub const fn trie_data(&self) -> &BTreeMap<BlockNumber, LazyTrieData> {
&self.trie_data
}
/// Get deferred trie data for a specific block number.
///
/// Returns a handle to the lazily-computed trie data. This method does **not** block.
/// Call [`DeferredTrieData::wait_cloned`] on the result to wait for and retrieve
/// the computed data, which will block if computation is still in progress.
pub fn trie_data_at(&self, block_number: BlockNumber) -> Option<&DeferredTrieData> {
/// Get trie data for a specific block number.
pub fn trie_data_at(&self, block_number: BlockNumber) -> Option<&LazyTrieData> {
self.trie_data.get(&block_number)
}
/// Get all trie updates for this chain.
///
/// Note: This blocks on deferred trie data for all blocks in the chain.
/// Prefer using [`trie_data`](Self::trie_data) when possible to avoid blocking.
pub fn trie_updates(&self) -> BTreeMap<BlockNumber, Arc<TrieUpdatesSorted>> {
self.trie_data.iter().map(|(num, data)| (*num, data.wait_cloned().trie_updates)).collect()
}
/// Get trie updates for a specific block number.
///
/// Note: This waits for deferred trie data if not already computed.
pub fn trie_updates_at(&self, block_number: BlockNumber) -> Option<Arc<TrieUpdatesSorted>> {
self.trie_data.get(&block_number).map(|data| data.wait_cloned().trie_updates)
}
/// Remove all trie data for this chain.
pub fn clear_trie_data(&mut self) {
self.trie_data.clear();
}
/// Get all hashed states for this chain.
///
/// Note: This blocks on deferred trie data for all blocks in the chain.
/// Prefer using [`trie_data`](Self::trie_data) when possible to avoid blocking.
pub fn hashed_state(&self) -> BTreeMap<BlockNumber, Arc<HashedPostStateSorted>> {
self.trie_data.iter().map(|(num, data)| (*num, data.wait_cloned().hashed_state)).collect()
}
/// Get hashed state for a specific block number.
///
/// Note: This waits for deferred trie data if not already computed.
pub fn hashed_state_at(&self, block_number: BlockNumber) -> Option<Arc<HashedPostStateSorted>> {
self.trie_data.get(&block_number).map(|data| data.wait_cloned().hashed_state)
}
/// Get execution outcome of this chain
pub const fn execution_outcome(&self) -> &ExecutionOutcome<N::Receipt> {
&self.execution_outcome
@@ -205,14 +160,14 @@ impl<N: NodePrimitives> Chain<N> {
/// Destructure the chain into its inner components:
/// 1. The blocks contained in the chain.
/// 2. The execution outcome representing the final state.
/// 3. The deferred trie data map.
/// 3. The trie data map.
#[allow(clippy::type_complexity)]
pub fn into_inner(
self,
) -> (
ChainBlocks<'static, N::Block>,
ExecutionOutcome<N::Receipt>,
BTreeMap<BlockNumber, DeferredTrieData>,
BTreeMap<BlockNumber, LazyTrieData>,
) {
(ChainBlocks { blocks: Cow::Owned(self.blocks) }, self.execution_outcome, self.trie_data)
}
@@ -344,7 +299,7 @@ impl<N: NodePrimitives> Chain<N> {
&mut self,
block: RecoveredBlock<N::Block>,
execution_outcome: ExecutionOutcome<N::Receipt>,
trie_data: DeferredTrieData,
trie_data: LazyTrieData,
) {
let block_number = block.header().number();
self.blocks.insert(block_number, block);
@@ -471,7 +426,7 @@ impl<B: Block<Body: BlockBody<Transaction: SignedTransaction>>> ChainBlocks<'_,
impl<B: Block> IntoIterator for ChainBlocks<'_, B> {
type Item = (BlockNumber, RecoveredBlock<B>);
type IntoIter = std::collections::btree_map::IntoIter<BlockNumber, RecoveredBlock<B>>;
type IntoIter = alloc::collections::btree_map::IntoIter<BlockNumber, RecoveredBlock<B>>;
fn into_iter(self) -> Self::IntoIter {
self.blocks.into_owned().into_iter()
@@ -489,95 +444,25 @@ pub struct BlockReceipts<T = reth_ethereum_primitives::Receipt> {
pub timestamp: u64,
}
#[cfg(feature = "serde")]
mod chain_serde {
use super::*;
use crate::ComputedTrieData;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
/// Serializable representation of Chain that waits for deferred trie data.
#[derive(Serialize, Deserialize)]
#[serde(bound = "")]
struct ChainRepr<N: NodePrimitives> {
blocks: BTreeMap<BlockNumber, RecoveredBlock<N::Block>>,
execution_outcome: ExecutionOutcome<N::Receipt>,
#[serde(default)]
trie_updates: BTreeMap<BlockNumber, Arc<TrieUpdatesSorted>>,
#[serde(default)]
hashed_state: BTreeMap<BlockNumber, Arc<HashedPostStateSorted>>,
}
impl<N: NodePrimitives> Serialize for Chain<N> {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
// Wait for deferred trie data for serialization
let trie_updates: BTreeMap<_, _> = self
.trie_data
.iter()
.map(|(num, data)| (*num, data.wait_cloned().trie_updates))
.collect();
let hashed_state: BTreeMap<_, _> = self
.trie_data
.iter()
.map(|(num, data)| (*num, data.wait_cloned().hashed_state))
.collect();
let repr = ChainRepr::<N> {
blocks: self.blocks.clone(),
execution_outcome: self.execution_outcome.clone(),
trie_updates,
hashed_state,
};
repr.serialize(serializer)
}
}
impl<'de, N: NodePrimitives> Deserialize<'de> for Chain<N> {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
let repr = ChainRepr::<N>::deserialize(deserializer)?;
// Convert to ready DeferredTrieData handles
let trie_data = repr
.trie_updates
.into_iter()
.map(|(num, trie_updates)| {
let hashed_state = repr.hashed_state.get(&num).cloned().unwrap_or_default();
let computed = ComputedTrieData::without_trie_input(hashed_state, trie_updates);
(num, DeferredTrieData::ready(computed))
})
.collect();
Ok(Self { blocks: repr.blocks, execution_outcome: repr.execution_outcome, trie_data })
}
}
}
/// Bincode-compatible [`Chain`] serde implementation.
#[cfg(feature = "serde-bincode-compat")]
pub(super) mod serde_bincode_compat {
use crate::{serde_bincode_compat, ExecutionOutcome};
use alloc::{borrow::Cow, collections::BTreeMap, sync::Arc};
use alloy_primitives::BlockNumber;
use reth_ethereum_primitives::EthPrimitives;
use reth_execution_types::{
serde_bincode_compat as exec_serde_bincode_compat, ExecutionOutcome,
};
use reth_primitives_traits::{
serde_bincode_compat::{RecoveredBlock, SerdeBincodeCompat},
Block, NodePrimitives,
};
use serde::{ser::SerializeMap, Deserialize, Deserializer, Serialize, Serializer};
use serde_with::{DeserializeAs, SerializeAs};
use std::{borrow::Cow, collections::BTreeMap, sync::Arc};
/// Bincode-compatible [`super::Chain`] serde implementation.
///
/// Intended to use with the [`serde_with::serde_as`] macro in the following way:
/// ```rust
/// use reth_chain::{serde_bincode_compat, Chain};
/// use reth_execution_types::{serde_bincode_compat, Chain};
/// use serde::{Deserialize, Serialize};
/// use serde_with::serde_as;
///
@@ -597,7 +482,7 @@ pub(super) mod serde_bincode_compat {
>,
{
blocks: RecoveredBlocks<'a, N::Block>,
execution_outcome: exec_serde_bincode_compat::ExecutionOutcome<'a, N::Receipt>,
execution_outcome: serde_bincode_compat::ExecutionOutcome<'a, N::Receipt>,
#[serde(default, rename = "trie_updates_legacy")]
_trie_updates_legacy:
Option<reth_trie_common::serde_bincode_compat::updates::TrieUpdates<'a>>,
@@ -653,6 +538,31 @@ pub(super) mod serde_bincode_compat {
}
}
impl<'a, N> From<&'a super::Chain<N>> for Chain<'a, N>
where
N: NodePrimitives<
Block: Block<Header: SerdeBincodeCompat, Body: SerdeBincodeCompat> + 'static,
>,
{
fn from(value: &'a super::Chain<N>) -> Self {
Self {
blocks: RecoveredBlocks(Cow::Borrowed(&value.blocks)),
execution_outcome: value.execution_outcome.as_repr(),
_trie_updates_legacy: None,
trie_updates: value
.trie_data
.iter()
.map(|(k, v)| (*k, v.get().trie_updates.as_ref().into()))
.collect(),
hashed_state: value
.trie_data
.iter()
.map(|(k, v)| (*k, v.get().hashed_state.as_ref().into()))
.collect(),
}
}
}
impl<'a, N> From<Chain<'a, N>> for super::Chain<N>
where
N: NodePrimitives<
@@ -660,19 +570,17 @@ pub(super) mod serde_bincode_compat {
>,
{
fn from(value: Chain<'a, N>) -> Self {
use crate::{ComputedTrieData, DeferredTrieData};
use reth_trie_common::LazyTrieData;
let trie_updates: BTreeMap<_, _> =
value.trie_updates.into_iter().map(|(k, v)| (k, Arc::new(v.into()))).collect();
let hashed_state: BTreeMap<_, _> =
let hashed_state_map: BTreeMap<_, _> =
value.hashed_state.into_iter().map(|(k, v)| (k, Arc::new(v.into()))).collect();
let trie_data = trie_updates
let trie_data: BTreeMap<BlockNumber, LazyTrieData> = value
.trie_updates
.into_iter()
.map(|(num, trie_updates)| {
let hashed_state = hashed_state.get(&num).cloned().unwrap_or_default();
let computed = ComputedTrieData::without_trie_input(hashed_state, trie_updates);
(num, DeferredTrieData::ready(computed))
.map(|(k, v)| {
let hashed_state = hashed_state_map.get(&k).cloned().unwrap_or_default();
(k, LazyTrieData::ready(hashed_state, Arc::new(v.into())))
})
.collect();
@@ -694,31 +602,7 @@ pub(super) mod serde_bincode_compat {
where
S: Serializer,
{
use reth_trie_common::serde_bincode_compat as trie_serde;
// Wait for deferred trie data and collect into maps we can borrow from
let trie_updates_data: BTreeMap<BlockNumber, _> =
source.trie_data.iter().map(|(k, v)| (*k, v.wait_cloned().trie_updates)).collect();
let hashed_state_data: BTreeMap<BlockNumber, _> =
source.trie_data.iter().map(|(k, v)| (*k, v.wait_cloned().hashed_state)).collect();
// Now create the serde-compatible struct borrowing from the collected data
let chain: Chain<'_, N> = Chain {
blocks: RecoveredBlocks(Cow::Borrowed(&source.blocks)),
execution_outcome: source.execution_outcome.as_repr(),
_trie_updates_legacy: None,
trie_updates: trie_updates_data
.iter()
.map(|(k, v)| (*k, trie_serde::updates::TrieUpdatesSorted::from(v.as_ref())))
.collect(),
hashed_state: hashed_state_data
.iter()
.map(|(k, v)| {
(*k, trie_serde::hashed_state::HashedPostStateSorted::from(v.as_ref()))
})
.collect(),
};
chain.serialize(serializer)
Chain::from(source).serialize(serializer)
}
}
@@ -747,10 +631,10 @@ pub(super) mod serde_bincode_compat {
#[test]
fn test_chain_bincode_roundtrip() {
use std::collections::BTreeMap;
use alloc::collections::BTreeMap;
#[serde_as]
#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
struct Data {
#[serde_as(as = "serde_bincode_compat::Chain")]
chain: Chain,
@@ -769,9 +653,7 @@ pub(super) mod serde_bincode_compat {
let encoded = bincode::serialize(&data).unwrap();
let decoded: Data = bincode::deserialize(&encoded).unwrap();
// Note: Can't compare directly because DeferredTrieData doesn't implement PartialEq
assert_eq!(decoded.chain.blocks, data.chain.blocks);
assert_eq!(decoded.chain.execution_outcome, data.chain.execution_outcome);
assert_eq!(decoded, data);
}
}
}

View File

@@ -564,8 +564,8 @@ pub(super) mod serde_bincode_compat {
#[cfg(test)]
mod tests {
use super::*;
use alloy_consensus::TxType;
use alloy_primitives::{bytes, Address, LogData, B256};
use reth_ethereum_primitives::TxType;
#[test]
fn test_initialization() {

View File

@@ -11,6 +11,9 @@
extern crate alloc;
mod chain;
pub use chain::*;
mod execute;
pub use execute::*;
@@ -26,5 +29,5 @@ pub use execution_outcome::*;
/// Read more: <https://github.com/bincode-org/bincode/issues/326>
#[cfg(feature = "serde-bincode-compat")]
pub mod serde_bincode_compat {
pub use super::execution_outcome::serde_bincode_compat::*;
pub use super::{chain::serde_bincode_compat::*, execution_outcome::serde_bincode_compat::*};
}

View File

@@ -48,7 +48,6 @@ itertools = { workspace = true, features = ["use_std"] }
metrics.workspace = true
parking_lot.workspace = true
rmp-serde.workspace = true
serde_with.workspace = true
thiserror.workspace = true
tracing.workspace = true

View File

@@ -694,7 +694,6 @@ mod tests {
BlockWriter, Chain, DBProvider, DatabaseProviderFactory, TransactionVariant,
};
use reth_testing_utils::generators::{self, random_block, BlockParams};
use std::collections::BTreeMap;
fn empty_finalized_header_stream() -> ForkChoiceStream<SealedHeader> {
let (tx, rx) = watch::channel(None);
@@ -800,16 +799,12 @@ mod tests {
};
// Push the first notification
exex_manager.push_notification(notification1);
exex_manager.push_notification(notification1.clone());
// Verify the buffer contains the notification with the correct ID
assert_eq!(exex_manager.buffer.len(), 1);
assert_eq!(exex_manager.buffer.front().unwrap().0, 0);
// Compare by tip block since ExExNotification doesn't implement PartialEq
assert_eq!(
*exex_manager.buffer.front().unwrap().1.committed_chain().unwrap().tip(),
block1
);
assert_eq!(exex_manager.buffer.front().unwrap().1, notification1);
assert_eq!(exex_manager.next_id, 1);
// Push another notification
@@ -821,17 +816,14 @@ mod tests {
new: Arc::new(Chain::new(vec![block2.clone()], Default::default(), Default::default())),
};
exex_manager.push_notification(notification2);
exex_manager.push_notification(notification2.clone());
// Verify the buffer contains both notifications with correct IDs
assert_eq!(exex_manager.buffer.len(), 2);
assert_eq!(exex_manager.buffer.front().unwrap().0, 0);
assert_eq!(
*exex_manager.buffer.front().unwrap().1.committed_chain().unwrap().tip(),
block1
);
assert_eq!(exex_manager.buffer.front().unwrap().1, notification1);
assert_eq!(exex_manager.buffer.get(1).unwrap().0, 1);
assert_eq!(*exex_manager.buffer.get(1).unwrap().1.committed_chain().unwrap().tip(), block2);
assert_eq!(exex_manager.buffer.get(1).unwrap().1, notification2);
assert_eq!(exex_manager.next_id, 2);
}
@@ -1157,10 +1149,9 @@ mod tests {
block2.set_block_number(11);
// Setup a notification
let expected_block: RecoveredBlock<reth_ethereum_primitives::Block> = Default::default();
let notification = ExExNotification::ChainCommitted {
new: Arc::new(Chain::new(
vec![expected_block.clone()],
vec![Default::default()],
Default::default(),
Default::default(),
)),
@@ -1172,8 +1163,7 @@ mod tests {
match exex_handle.send(&mut cx, &(22, notification.clone())) {
Poll::Ready(Ok(())) => {
let received_notification = notifications.next().await.unwrap().unwrap();
// Compare by tip block since ExExNotification doesn't implement PartialEq
assert_eq!(*received_notification.committed_chain().unwrap().tip(), expected_block);
assert_eq!(received_notification, notification);
}
Poll::Pending => panic!("Notification send is pending"),
Poll::Ready(Err(e)) => panic!("Failed to send notification: {e:?}"),
@@ -1265,9 +1255,7 @@ mod tests {
match exex_handle.send(&mut cx, &(22, notification.clone())) {
Poll::Ready(Ok(())) => {
let received_notification = notifications.next().await.unwrap().unwrap();
// Compare by checking that both are reorgs with empty chains
assert!(received_notification.committed_chain().is_some());
assert!(received_notification.reverted_chain().is_some());
assert_eq!(received_notification, notification);
}
Poll::Pending | Poll::Ready(Err(_)) => {
panic!("Notification should not be pending or fail")
@@ -1307,9 +1295,7 @@ mod tests {
match exex_handle.send(&mut cx, &(22, notification.clone())) {
Poll::Ready(Ok(())) => {
let received_notification = notifications.next().await.unwrap().unwrap();
// Compare by checking that it's a revert with empty chain
assert!(received_notification.reverted_chain().is_some());
assert!(received_notification.committed_chain().is_none());
assert_eq!(received_notification, notification);
}
Poll::Pending | Poll::Ready(Err(_)) => {
panic!("Notification should not be pending or fail")
@@ -1361,11 +1347,11 @@ mod tests {
new: Arc::new(Chain::new(
vec![genesis_block.clone()],
Default::default(),
BTreeMap::new(),
Default::default(),
)),
};
let notification = ExExNotification::ChainCommitted {
new: Arc::new(Chain::new(vec![block.clone()], Default::default(), BTreeMap::new())),
new: Arc::new(Chain::new(vec![block.clone()], Default::default(), Default::default())),
};
let (finalized_headers_tx, rx) = watch::channel(None);
@@ -1382,38 +1368,34 @@ mod tests {
let mut cx = Context::from_waker(futures::task::noop_waker_ref());
exex_manager.handle().send(ExExNotificationSource::Pipeline, genesis_notification)?;
exex_manager.handle().send(ExExNotificationSource::BlockchainTree, notification)?;
exex_manager
.handle()
.send(ExExNotificationSource::Pipeline, genesis_notification.clone())?;
exex_manager.handle().send(ExExNotificationSource::BlockchainTree, notification.clone())?;
assert!(exex_manager.as_mut().poll(&mut cx)?.is_pending());
// Check genesis notification received
let poll_result = notifications.try_poll_next_unpin(&mut cx)?;
if let Poll::Ready(Some(n)) = poll_result {
assert_eq!(*n.committed_chain().unwrap().tip(), genesis_block);
} else {
panic!("Expected genesis notification");
}
assert_eq!(
notifications.try_poll_next_unpin(&mut cx)?,
Poll::Ready(Some(genesis_notification))
);
assert!(exex_manager.as_mut().poll(&mut cx)?.is_pending());
// Check block notification received
let poll_result = notifications.try_poll_next_unpin(&mut cx)?;
if let Poll::Ready(Some(n)) = poll_result {
assert_eq!(*n.committed_chain().unwrap().tip(), block);
} else {
panic!("Expected block notification");
}
assert_eq!(
notifications.try_poll_next_unpin(&mut cx)?,
Poll::Ready(Some(notification.clone()))
);
// WAL shouldn't contain the genesis notification, because it's finalized
let wal_notifications =
exex_manager.wal.iter_notifications()?.collect::<WalResult<Vec<_>>>()?;
assert_eq!(wal_notifications.len(), 1);
assert_eq!(*wal_notifications[0].committed_chain().unwrap().tip(), block);
assert_eq!(
exex_manager.wal.iter_notifications()?.collect::<WalResult<Vec<_>>>()?,
std::slice::from_ref(&notification)
);
finalized_headers_tx.send(Some(block.clone_sealed_header()))?;
assert!(exex_manager.as_mut().poll(&mut cx).is_pending());
// WAL isn't finalized because the ExEx didn't emit the `FinishedHeight` event
let wal_notifications =
exex_manager.wal.iter_notifications()?.collect::<WalResult<Vec<_>>>()?;
assert_eq!(wal_notifications.len(), 1);
assert_eq!(*wal_notifications[0].committed_chain().unwrap().tip(), block);
assert_eq!(
exex_manager.wal.iter_notifications()?.collect::<WalResult<Vec<_>>>()?,
std::slice::from_ref(&notification)
);
// Send a `FinishedHeight` event with a non-canonical block
events_tx
@@ -1424,10 +1406,10 @@ mod tests {
assert!(exex_manager.as_mut().poll(&mut cx).is_pending());
// WAL isn't finalized because the ExEx emitted a `FinishedHeight` event with a
// non-canonical block
let wal_notifications =
exex_manager.wal.iter_notifications()?.collect::<WalResult<Vec<_>>>()?;
assert_eq!(wal_notifications.len(), 1);
assert_eq!(*wal_notifications[0].committed_chain().unwrap().tip(), block);
assert_eq!(
exex_manager.wal.iter_notifications()?.collect::<WalResult<Vec<_>>>()?,
std::slice::from_ref(&notification)
);
// Send a `FinishedHeight` event with a canonical block
events_tx.send(ExExEvent::FinishedHeight(block.num_hash())).unwrap();
@@ -1435,7 +1417,7 @@ mod tests {
finalized_headers_tx.send(Some(block.clone_sealed_header()))?;
assert!(exex_manager.as_mut().poll(&mut cx).is_pending());
// WAL is finalized
assert!(exex_manager.wal.iter_notifications()?.next().is_none());
assert_eq!(exex_manager.wal.iter_notifications()?.next().transpose()?, None);
Ok(())
}

View File

@@ -449,7 +449,7 @@ mod tests {
use crate::Wal;
use alloy_consensus::Header;
use alloy_eips::BlockNumHash;
use eyre::OptionExt;
use futures::StreamExt;
use reth_db_common::init::init_genesis;
use reth_ethereum_primitives::Block;
@@ -491,15 +491,14 @@ mod tests {
let exex_head =
ExExHead { block: BlockNumHash { number: genesis_block.number, hash: genesis_hash } };
let expected_block = random_block(
&mut rng,
node_head.number + 1,
BlockParams { parent: Some(node_head.hash), ..Default::default() },
)
.try_recover()?;
let notification = ExExNotification::ChainCommitted {
new: Arc::new(Chain::new(
vec![expected_block.clone()],
vec![random_block(
&mut rng,
node_head.number + 1,
BlockParams { parent: Some(node_head.hash), ..Default::default() },
)
.try_recover()?],
Default::default(),
BTreeMap::new(),
)),
@@ -519,16 +518,23 @@ mod tests {
.with_head(exex_head);
// First notification is the backfill of missing blocks from the canonical chain
let backfill_notification = notifications.next().await.transpose()?;
assert!(backfill_notification.is_some());
// Verify it's a commit notification with the expected block range
let backfill_chain = backfill_notification.unwrap().committed_chain().unwrap();
assert_eq!(backfill_chain.first().header().number(), 1);
assert_eq!(
notifications.next().await.transpose()?,
Some(ExExNotification::ChainCommitted {
new: Arc::new(
BackfillJobFactory::new(
notifications.evm_config.clone(),
notifications.provider.clone()
)
.backfill(1..=1)
.next()
.ok_or_eyre("failed to backfill")??
)
})
);
// Second notification is the actual notification that we sent before
let received = notifications.next().await.transpose()?;
assert!(received.is_some());
assert_eq!(*received.unwrap().committed_chain().unwrap().tip(), expected_block);
assert_eq!(notifications.next().await.transpose()?, Some(notification));
Ok(())
}
@@ -549,19 +555,18 @@ mod tests {
let node_head = BlockNumHash { number: genesis_block.number, hash: genesis_hash };
let exex_head = ExExHead { block: node_head };
let expected_block = Block {
header: Header {
parent_hash: node_head.hash,
number: node_head.number + 1,
..Default::default()
},
..Default::default()
}
.seal_slow()
.try_recover()?;
let notification = ExExNotification::ChainCommitted {
new: Arc::new(Chain::new(
vec![expected_block.clone()],
vec![Block {
header: Header {
parent_hash: node_head.hash,
number: node_head.number + 1,
..Default::default()
},
..Default::default()
}
.seal_slow()
.try_recover()?],
Default::default(),
BTreeMap::new(),
)),
@@ -581,8 +586,7 @@ mod tests {
.with_head(exex_head);
let new_notification = notifications.next().await.transpose()?;
assert!(new_notification.is_some());
assert_eq!(*new_notification.unwrap().committed_chain().unwrap().tip(), expected_block);
assert_eq!(new_notification, Some(notification));
Ok(())
}
@@ -612,7 +616,7 @@ mod tests {
let provider_rw = provider.database_provider_rw()?;
provider_rw.insert_block(&node_head_block)?;
provider_rw.commit()?;
let _node_head_notification = ExExNotification::ChainCommitted {
let node_head_notification = ExExNotification::ChainCommitted {
new: Arc::new(
BackfillJobFactory::new(EthEvmConfig::mainnet(), provider.clone())
.backfill(node_head.number..=node_head.number)
@@ -627,24 +631,26 @@ mod tests {
BlockParams { parent: Some(genesis_hash), tx_count: Some(0), ..Default::default() },
);
let exex_head = ExExHead { block: exex_head_block.num_hash() };
let exex_head_recovered = exex_head_block.clone().try_recover()?;
let exex_head_notification = ExExNotification::ChainCommitted {
new: Arc::new(Chain::new(
vec![exex_head_recovered.clone()],
vec![exex_head_block.clone().try_recover()?],
Default::default(),
BTreeMap::new(),
)),
};
wal.commit(&exex_head_notification)?;
let new_block = random_block(
&mut rng,
node_head.number + 1,
BlockParams { parent: Some(node_head.hash), ..Default::default() },
)
.try_recover()?;
let new_notification = ExExNotification::ChainCommitted {
new: Arc::new(Chain::new(vec![new_block.clone()], Default::default(), BTreeMap::new())),
new: Arc::new(Chain::new(
vec![random_block(
&mut rng,
node_head.number + 1,
BlockParams { parent: Some(node_head.hash), ..Default::default() },
)
.try_recover()?],
Default::default(),
BTreeMap::new(),
)),
};
let (notifications_tx, notifications_rx) = mpsc::channel(1);
@@ -662,25 +668,15 @@ mod tests {
// First notification is the revert of the ExEx head block to get back to the canonical
// chain
let revert_notification = notifications.next().await.transpose()?;
assert!(revert_notification.is_some());
// Verify it's a revert with the exex_head block
assert_eq!(
*revert_notification.unwrap().reverted_chain().unwrap().tip(),
exex_head_recovered
notifications.next().await.transpose()?,
Some(exex_head_notification.into_inverted())
);
// Second notification is the backfilled block from the canonical chain to get back to the
// canonical tip
let backfill_notification = notifications.next().await.transpose()?;
assert!(backfill_notification.is_some());
assert_eq!(
backfill_notification.unwrap().committed_chain().unwrap().tip().header().number(),
node_head.number
);
assert_eq!(notifications.next().await.transpose()?, Some(node_head_notification));
// Third notification is the actual notification that we sent before
let received = notifications.next().await.transpose()?;
assert!(received.is_some());
assert_eq!(*received.unwrap().committed_chain().unwrap().tip(), new_block);
assert_eq!(notifications.next().await.transpose()?, Some(new_notification));
Ok(())
}
@@ -706,10 +702,9 @@ mod tests {
genesis_block.number + 1,
BlockParams { parent: Some(genesis_hash), tx_count: Some(0), ..Default::default() },
);
let exex_head_recovered = exex_head_block.clone().try_recover()?;
let exex_head_notification = ExExNotification::ChainCommitted {
new: Arc::new(Chain::new(
vec![exex_head_recovered.clone()],
vec![exex_head_block.clone().try_recover()?],
Default::default(),
BTreeMap::new(),
)),
@@ -721,14 +716,17 @@ mod tests {
block: BlockNumHash { number: exex_head_block.number, hash: exex_head_block.hash() },
};
let new_block = random_block(
&mut rng,
genesis_block.number + 1,
BlockParams { parent: Some(genesis_hash), ..Default::default() },
)
.try_recover()?;
let new_notification = ExExNotification::ChainCommitted {
new: Arc::new(Chain::new(vec![new_block.clone()], Default::default(), BTreeMap::new())),
new: Arc::new(Chain::new(
vec![random_block(
&mut rng,
genesis_block.number + 1,
BlockParams { parent: Some(genesis_hash), ..Default::default() },
)
.try_recover()?],
Default::default(),
BTreeMap::new(),
)),
};
let (notifications_tx, notifications_rx) = mpsc::channel(1);
@@ -746,17 +744,13 @@ mod tests {
// First notification is the revert of the ExEx head block to get back to the canonical
// chain
let revert_notification = notifications.next().await.transpose()?;
assert!(revert_notification.is_some());
assert_eq!(
*revert_notification.unwrap().reverted_chain().unwrap().tip(),
exex_head_recovered
notifications.next().await.transpose()?,
Some(exex_head_notification.into_inverted())
);
// Second notification is the actual notification that we sent before
let received = notifications.next().await.transpose()?;
assert!(received.is_some());
assert_eq!(*received.unwrap().committed_chain().unwrap().tip(), new_block);
assert_eq!(notifications.next().await.transpose()?, Some(new_notification));
Ok(())
}

View File

@@ -255,36 +255,6 @@ mod tests {
})
}
fn notifications_equal(a: &[ExExNotification], b: &[ExExNotification]) -> bool {
if a.len() != b.len() {
return false;
}
a.iter().zip(b.iter()).all(|(n1, n2)| {
let committed_eq = match (n1.committed_chain(), n2.committed_chain()) {
(Some(c1), Some(c2)) => {
c1.tip().hash() == c2.tip().hash() && c1.blocks() == c2.blocks()
}
(None, None) => true,
_ => false,
};
let reverted_eq = match (n1.reverted_chain(), n2.reverted_chain()) {
(Some(c1), Some(c2)) => {
c1.tip().hash() == c2.tip().hash() && c1.blocks() == c2.blocks()
}
(None, None) => true,
_ => false,
};
committed_eq && reverted_eq
})
}
fn assert_notifications_eq(actual: Vec<ExExNotification>, expected: Vec<ExExNotification>) {
assert!(
notifications_equal(&actual, &expected),
"notifications mismatch:\nactual: {actual:?}\nexpected: {expected:?}"
);
}
fn sort_committed_blocks(
committed_blocks: Vec<(B256, u32, CachedBlock)>,
) -> Vec<(B256, u32, CachedBlock)> {
@@ -388,7 +358,7 @@ mod tests {
wal.inner.block_cache().committed_blocks_sorted(),
committed_notification_1_cache_committed_blocks
);
assert_notifications_eq(read_notifications(&wal)?, vec![committed_notification_1.clone()]);
assert_eq!(read_notifications(&wal)?, vec![committed_notification_1.clone()]);
// Second notification (revert block 1)
wal.commit(&reverted_notification)?;
@@ -402,9 +372,9 @@ mod tests {
wal.inner.block_cache().committed_blocks_sorted(),
committed_notification_1_cache_committed_blocks
);
assert_notifications_eq(
assert_eq!(
read_notifications(&wal)?,
vec![committed_notification_1.clone(), reverted_notification.clone()],
vec![committed_notification_1.clone(), reverted_notification.clone()]
);
// Third notification (commit block 1, 2)
@@ -447,13 +417,13 @@ mod tests {
.concat()
)
);
assert_notifications_eq(
assert_eq!(
read_notifications(&wal)?,
vec![
committed_notification_1.clone(),
reverted_notification.clone(),
committed_notification_2.clone(),
],
committed_notification_2.clone()
]
);
// Fourth notification (revert block 2, commit block 2, 3)
@@ -498,14 +468,14 @@ mod tests {
.concat()
)
);
assert_notifications_eq(
assert_eq!(
read_notifications(&wal)?,
vec![
committed_notification_1,
reverted_notification,
committed_notification_2.clone(),
reorged_notification.clone(),
],
reorged_notification.clone()
]
);
// Now, finalize the WAL up to the block 1. Block 1 was in the third notification that also
@@ -527,9 +497,9 @@ mod tests {
.concat()
)
);
assert_notifications_eq(
assert_eq!(
read_notifications(&wal)?,
vec![committed_notification_2.clone(), reorged_notification.clone()],
vec![committed_notification_2.clone(), reorged_notification.clone()]
);
// Re-open the WAL and verify that the cache population works correctly
@@ -548,10 +518,7 @@ mod tests {
.concat()
)
);
assert_notifications_eq(
read_notifications(&wal)?,
vec![committed_notification_2, reorged_notification],
);
assert_eq!(read_notifications(&wal)?, vec![committed_notification_2, reorged_notification]);
Ok(())
}

View File

@@ -163,16 +163,12 @@ where
let file_path = self.file_path(file_id);
debug!(target: "exex::wal::storage", ?file_path, "Writing notification to WAL");
// Serialize using the bincode- and msgpack-compatible serde wrapper via SerializeAs
// Serialize using the bincode- and msgpack-compatible serde wrapper
let notification =
reth_exex_types::serde_bincode_compat::ExExNotification::<N>::from(notification);
reth_fs_util::atomic_write_file(&file_path, |file| {
use serde_with::SerializeAs;
let mut buf = Vec::new();
reth_exex_types::serde_bincode_compat::ExExNotification::<'_, N>::serialize_as(
notification,
&mut rmp_serde::Serializer::new(&mut buf),
)
.map_err(|err| std::io::Error::new(std::io::ErrorKind::InvalidData, err))?;
std::io::Write::write_all(file, &buf)
rmp_serde::encode::write(file, &notification)
})?;
Ok(file_path.metadata().map_err(|err| WalError::FileMetadata(file_id, err))?.len())
@@ -193,7 +189,7 @@ mod tests {
use reth_testing_utils::generators::{self, random_block};
use reth_trie_common::{
updates::{StorageTrieUpdates, TrieUpdates},
BranchNodeCompact, HashedPostState, HashedStorage, Nibbles,
BranchNodeCompact, HashedPostState, HashedStorage, LazyTrieData, Nibbles,
};
use std::{collections::BTreeMap, fs::File, sync::Arc};
@@ -228,10 +224,8 @@ mod tests {
// Get expected data
let expected_notification = get_test_notification_data().unwrap();
// Compare by tip block since ExExNotification doesn't implement PartialEq
assert_eq!(
*notification.committed_chain().unwrap().tip(),
*expected_notification.committed_chain().unwrap().tip(),
&notification, &expected_notification,
"Decoded notification should match expected static data"
);
}
@@ -247,18 +241,18 @@ mod tests {
let new_block = random_block(&mut rng, 0, Default::default()).try_recover()?;
let notification = ExExNotification::ChainReorged {
new: Arc::new(Chain::new(vec![new_block.clone()], Default::default(), BTreeMap::new())),
old: Arc::new(Chain::new(vec![old_block.clone()], Default::default(), BTreeMap::new())),
new: Arc::new(Chain::new(vec![new_block], Default::default(), BTreeMap::new())),
old: Arc::new(Chain::new(vec![old_block], Default::default(), BTreeMap::new())),
};
// Do a round trip serialization and deserialization
let file_id = 0;
storage.write_notification(file_id, &notification)?;
let deserialized_notification = storage.read_notification(file_id)?;
// Compare by chain tips since ExExNotification doesn't implement PartialEq
let deserialized = deserialized_notification.map(|(n, _)| n).unwrap();
assert_eq!(*deserialized.committed_chain().unwrap().tip(), new_block);
assert_eq!(*deserialized.reverted_chain().unwrap().tip(), old_block);
assert_eq!(
deserialized_notification.map(|(notification, _)| notification),
Some(notification)
);
Ok(())
}
@@ -276,14 +270,10 @@ mod tests {
let notification = get_test_notification_data()?;
// Create a temp storage and write the notification using the existing serialization path
let temp_dir = tempfile::tempdir()?;
let storage = Storage::new(&temp_dir)?;
storage.write_notification(0, &notification)?;
// Read it back as raw bytes
let temp_path = temp_dir.path().join("0.wal");
let encoded = std::fs::read(&temp_path)?;
// Serialize the notification
let notification_compat =
reth_exex_types::serde_bincode_compat::ExExNotification::from(&notification);
let encoded = rmp_serde::encode::to_vec(&notification_compat)?;
// Write to test-data directory
let test_data_dir = std::path::Path::new(env!("CARGO_MANIFEST_DIR")).join("test-data");
@@ -346,12 +336,11 @@ mod tests {
)]),
};
let trie_data =
reth_chain_state::DeferredTrieData::ready(reth_chain_state::ComputedTrieData {
hashed_state: Arc::new(hashed_state.into_sorted()),
trie_updates: Arc::new(trie_updates.into_sorted()),
anchored_trie_input: None,
});
let trie_data = LazyTrieData::ready(
Arc::new(hashed_state.into_sorted()),
Arc::new(trie_updates.into_sorted()),
);
let notification: ExExNotification<reth_ethereum_primitives::EthPrimitives> =
ExExNotification::ChainCommitted {
new: Arc::new(Chain::new(

View File

@@ -12,13 +12,13 @@ workspace = true
[dependencies]
## reth
reth-chain.workspace = true
reth-chainspec.workspace = true
reth-config.workspace = true
reth-consensus = { workspace = true, features = ["test-utils"] }
reth-db = { workspace = true, features = ["test-utils"] }
reth-db-common.workspace = true
reth-evm-ethereum = { workspace = true, features = ["test-utils"] }
reth-execution-types.workspace = true
reth-exex.workspace = true
reth-payload-builder.workspace = true
reth-network.workspace = true

View File

@@ -17,7 +17,6 @@ use std::{
use alloy_eips::BlockNumHash;
use futures_util::FutureExt;
use reth_chain::Chain;
use reth_chainspec::{ChainSpec, MAINNET};
use reth_consensus::test_utils::TestConsensus;
use reth_db::{
@@ -29,6 +28,7 @@ use reth_db::{
use reth_db_common::init::init_genesis;
use reth_ethereum_primitives::{EthPrimitives, TransactionSigned};
use reth_evm_ethereum::MockEvmConfig;
use reth_execution_types::Chain;
use reth_exex::{ExExContext, ExExEvent, ExExNotification, ExExNotifications, Wal};
use reth_network::{config::rng_secret_key, NetworkConfigBuilder, NetworkManager};
use reth_node_api::{

View File

@@ -13,8 +13,8 @@ workspace = true
[dependencies]
# reth
reth-chain.workspace = true
reth-chain-state.workspace = true
reth-execution-types.workspace = true
reth-primitives-traits.workspace = true
# reth
@@ -36,7 +36,7 @@ rand.workspace = true
default = []
serde = [
"dep:serde",
"reth-chain/serde",
"reth-execution-types/serde",
"alloy-eips/serde",
"alloy-primitives/serde",
"rand/serde",
@@ -45,7 +45,7 @@ serde = [
"reth-chain-state/serde",
]
serde-bincode-compat = [
"reth-chain/serde-bincode-compat",
"reth-execution-types/serde-bincode-compat",
"serde_with",
"alloy-eips/serde-bincode-compat",
"reth-primitives-traits/serde-bincode-compat",

View File

@@ -1,13 +1,12 @@
use std::sync::Arc;
use reth_chain::Chain;
use reth_chain_state::CanonStateNotification;
use reth_execution_types::Chain;
use reth_primitives_traits::NodePrimitives;
/// Notifications sent to an `ExEx`.
#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
#[cfg_attr(feature = "serde", serde(bound = ""))]
pub enum ExExNotification<N: NodePrimitives = reth_chain_state::EthPrimitives> {
/// Chain got committed without a reorg, and only the new chain is returned.
ChainCommitted {
@@ -74,7 +73,7 @@ impl<P: NodePrimitives> From<CanonStateNotification<P>> for ExExNotification<P>
/// Bincode-compatible [`ExExNotification`] serde implementation.
#[cfg(all(feature = "serde", feature = "serde-bincode-compat"))]
pub(super) mod serde_bincode_compat {
use reth_chain::serde_bincode_compat::Chain;
use reth_execution_types::serde_bincode_compat::Chain;
use reth_primitives_traits::NodePrimitives;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use serde_with::{DeserializeAs, SerializeAs};
@@ -125,6 +124,28 @@ pub(super) mod serde_bincode_compat {
},
}
impl<'a, N> From<&'a super::ExExNotification<N>> for ExExNotification<'a, N>
where
N: NodePrimitives,
{
fn from(value: &'a super::ExExNotification<N>) -> Self {
match value {
super::ExExNotification::ChainCommitted { new } => {
ExExNotification::ChainCommitted { new: Chain::from(new.as_ref()) }
}
super::ExExNotification::ChainReorged { old, new } => {
ExExNotification::ChainReorged {
old: Chain::from(old.as_ref()),
new: Chain::from(new.as_ref()),
}
}
super::ExExNotification::ChainReverted { old } => {
ExExNotification::ChainReverted { old: Chain::from(old.as_ref()) }
}
}
}
}
impl<'a, N> From<ExExNotification<'a, N>> for super::ExExNotification<N>
where
N: NodePrimitives,
@@ -155,41 +176,7 @@ pub(super) mod serde_bincode_compat {
where
S: Serializer,
{
// Helper that uses Chain's SerializeAs for bincode-compatible serialization
struct ChainWrapper<'a, N: NodePrimitives>(&'a reth_chain::Chain<N>);
impl<N: NodePrimitives> Serialize for ChainWrapper<'_, N> {
fn serialize<S2>(&self, serializer: S2) -> Result<S2::Ok, S2::Error>
where
S2: Serializer,
{
Chain::<'_, N>::serialize_as(self.0, serializer)
}
}
// Create an enum that matches the ExExNotification structure but uses ChainWrapper
#[derive(Serialize)]
#[serde(bound = "")]
#[allow(clippy::enum_variant_names)]
enum Repr<'a, N: NodePrimitives> {
ChainCommitted { new: ChainWrapper<'a, N> },
ChainReorged { old: ChainWrapper<'a, N>, new: ChainWrapper<'a, N> },
ChainReverted { old: ChainWrapper<'a, N> },
}
match source {
super::ExExNotification::ChainCommitted { new } => {
Repr::ChainCommitted { new: ChainWrapper(new.as_ref()) }.serialize(serializer)
}
super::ExExNotification::ChainReorged { old, new } => Repr::ChainReorged {
old: ChainWrapper(old.as_ref()),
new: ChainWrapper(new.as_ref()),
}
.serialize(serializer),
super::ExExNotification::ChainReverted { old } => {
Repr::ChainReverted { old: ChainWrapper(old.as_ref()) }.serialize(serializer)
}
}
ExExNotification::from(source).serialize(serializer)
}
}
@@ -210,7 +197,7 @@ pub(super) mod serde_bincode_compat {
use super::super::{serde_bincode_compat, ExExNotification};
use arbitrary::Arbitrary;
use rand::Rng;
use reth_chain::Chain;
use reth_execution_types::Chain;
use reth_primitives_traits::RecoveredBlock;
use serde::{Deserialize, Serialize};
use serde_with::serde_as;
@@ -219,7 +206,7 @@ pub(super) mod serde_bincode_compat {
#[test]
fn test_exex_notification_bincode_roundtrip() {
#[serde_as]
#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
struct Data {
#[serde_as(
as = "serde_bincode_compat::ExExNotification<'_, reth_ethereum_primitives::EthPrimitives>"
@@ -229,34 +216,26 @@ pub(super) mod serde_bincode_compat {
let mut bytes = [0u8; 1024];
rand::rng().fill(bytes.as_mut_slice());
let old_block: reth_primitives_traits::RecoveredBlock<reth_ethereum_primitives::Block> =
RecoveredBlock::arbitrary(&mut arbitrary::Unstructured::new(&bytes)).unwrap();
let new_block: reth_primitives_traits::RecoveredBlock<reth_ethereum_primitives::Block> =
RecoveredBlock::arbitrary(&mut arbitrary::Unstructured::new(&bytes)).unwrap();
let data = Data {
notification: ExExNotification::ChainReorged {
old: Arc::new(Chain::new(vec![old_block], Default::default(), BTreeMap::new())),
new: Arc::new(Chain::new(vec![new_block], Default::default(), BTreeMap::new())),
old: Arc::new(Chain::new(
vec![RecoveredBlock::arbitrary(&mut arbitrary::Unstructured::new(&bytes))
.unwrap()],
Default::default(),
BTreeMap::new(),
)),
new: Arc::new(Chain::new(
vec![RecoveredBlock::arbitrary(&mut arbitrary::Unstructured::new(&bytes))
.unwrap()],
Default::default(),
BTreeMap::new(),
)),
},
};
let encoded = bincode::serialize(&data).unwrap();
let decoded: Data = bincode::deserialize(&encoded).unwrap();
// Compare fields individually since Chain doesn't implement PartialEq
match (&decoded.notification, &data.notification) {
(
ExExNotification::ChainReorged { old: decoded_old, new: decoded_new },
ExExNotification::ChainReorged { old: expected_old, new: expected_new },
) => {
assert_eq!(decoded_old.blocks(), expected_old.blocks());
assert_eq!(decoded_old.execution_outcome(), expected_old.execution_outcome());
assert_eq!(decoded_new.blocks(), expected_new.blocks());
assert_eq!(decoded_new.execution_outcome(), expected_new.execution_outcome());
}
_ => panic!("Expected ChainReorged variant"),
}
assert_eq!(decoded, data);
}
}
}

View File

@@ -44,7 +44,6 @@ op-revm.workspace = true
thiserror.workspace = true
[dev-dependencies]
reth-chain.workspace = true
reth-evm = { workspace = true, features = ["test-utils"] }
reth-revm = { workspace = true, features = ["test-utils"] }
alloy-genesis.workspace = true

View File

@@ -295,10 +295,11 @@ mod tests {
use alloy_genesis::Genesis;
use alloy_primitives::{bytes, map::HashMap, Address, LogData, B256};
use op_revm::OpSpecId;
use reth_chain::Chain;
use reth_chainspec::ChainSpec;
use reth_evm::execute::ProviderError;
use reth_execution_types::{AccountRevertInit, BundleStateInit, ExecutionOutcome, RevertsInit};
use reth_execution_types::{
AccountRevertInit, BundleStateInit, Chain, ExecutionOutcome, RevertsInit,
};
use reth_optimism_chainspec::{OpChainSpec, BASE_MAINNET};
use reth_optimism_primitives::{OpBlock, OpPrimitives, OpReceipt};
use reth_primitives_traits::{Account, RecoveredBlock};

View File

@@ -12,11 +12,11 @@ description = "Types supporting implementation of 'eth' namespace RPC server API
workspace = true
[dependencies]
reth-chain.workspace = true
reth-chainspec.workspace = true
reth-chain-state.workspace = true
reth-errors.workspace = true
reth-evm.workspace = true
reth-execution-types.workspace = true
reth-metrics.workspace = true
reth-ethereum-primitives = { workspace = true, features = ["rpc"] }
reth-primitives-traits = { workspace = true, features = ["rpc-compat"] }

View File

@@ -5,9 +5,9 @@ use alloy_consensus::BlockHeader;
use alloy_eips::BlockHashOrNumber;
use alloy_primitives::B256;
use futures::{stream::FuturesOrdered, Stream, StreamExt};
use reth_chain::Chain;
use reth_chain_state::CanonStateNotification;
use reth_errors::{ProviderError, ProviderResult};
use reth_execution_types::Chain;
use reth_primitives_traits::{Block, NodePrimitives, RecoveredBlock};
use reth_storage_api::{BlockReader, TransactionVariant};
use reth_tasks::{TaskSpawner, TokioTaskExecutor};

View File

@@ -13,7 +13,6 @@ workspace = true
[dependencies]
# reth
reth-chain.workspace = true
reth-chainspec = { workspace = true, optional = true }
reth-codecs.workspace = true
reth-config.workspace = true
@@ -30,6 +29,7 @@ reth-fs-util.workspace = true
reth-network-p2p.workspace = true
reth-primitives-traits = { workspace = true, features = ["serde-bincode-compat"] }
reth-provider.workspace = true
reth-execution-types.workspace = true
reth-ethereum-primitives = { workspace = true, optional = true }
reth-prune.workspace = true
reth-prune-types.workspace = true

View File

@@ -2,11 +2,11 @@ use crate::stages::MERKLE_STAGE_DEFAULT_INCREMENTAL_THRESHOLD;
use alloy_consensus::BlockHeader;
use alloy_primitives::BlockNumber;
use num_traits::Zero;
use reth_chain::Chain;
use reth_config::config::ExecutionConfig;
use reth_consensus::FullConsensus;
use reth_db::{static_file::HeaderMask, tables};
use reth_evm::{execute::Executor, metrics::ExecutorMetrics, ConfigureEvm};
use reth_execution_types::Chain;
use reth_exex::{ExExManagerHandle, ExExNotification, ExExNotificationSource};
use reth_primitives_traits::{format_gas_throughput, BlockBody, NodePrimitives};
use reth_provider::{

View File

@@ -13,7 +13,6 @@ workspace = true
[dependencies]
# reth
reth-chain.workspace = true
reth-chainspec.workspace = true
reth-execution-types.workspace = true
reth-ethereum-primitives = { workspace = true, features = ["reth-codec"] }
@@ -87,15 +86,6 @@ tokio = { workspace = true, features = ["sync", "macros", "rt-multi-thread"] }
[features]
rocksdb = ["dep:rocksdb"]
serde-bincode-compat = [
"reth-chain/serde-bincode-compat",
"alloy-consensus/serde-bincode-compat",
"alloy-eips/serde-bincode-compat",
"reth-ethereum-primitives/serde-bincode-compat",
"reth-execution-types/serde-bincode-compat",
"reth-primitives-traits/serde-bincode-compat",
"reth-storage-api/serde-bincode-compat",
]
test-utils = [
"reth-db/test-utils",
"reth-nippy-jar/test-utils",

View File

@@ -35,20 +35,11 @@ pub mod test_utils;
pub mod either_writer;
pub use either_writer::*;
#[cfg(feature = "serde-bincode-compat")]
pub use reth_chain::serde_bincode_compat;
pub use reth_chain::{
AnchoredTrieInput, BlockReceipts, Chain, ChainBlocks, ComputedTrieData, DeferredTrieData,
DisplayBlocksChain,
};
pub use reth_chain_state::{
CanonStateNotification, CanonStateNotificationSender, CanonStateNotificationStream,
CanonStateNotifications, CanonStateSubscriptions,
};
pub use reth_execution_types::{
AccountRevertInit, BlockExecutionOutput, BlockExecutionResult, BundleStateInit, ChangedAccount,
ExecutionOutcome, RevertsInit,
};
pub use reth_execution_types::*;
/// Re-export `OriginalValuesKnown`
pub use revm_database::states::OriginalValuesKnown;
// reexport traits to avoid breaking changes

View File

@@ -782,7 +782,6 @@ mod tests {
use alloy_primitives::{BlockNumber, TxNumber, B256};
use itertools::Itertools;
use rand::Rng;
use reth_chain::Chain;
use reth_chain_state::{
test_utils::TestBlockBuilder, CanonStateNotification, CanonStateSubscriptions,
CanonicalInMemoryState, ExecutedBlock, NewCanonicalChain,
@@ -791,7 +790,9 @@ mod tests {
use reth_db_api::models::{AccountBeforeTx, StoredBlockBodyIndices};
use reth_errors::ProviderError;
use reth_ethereum_primitives::{Block, Receipt};
use reth_execution_types::{BlockExecutionOutput, BlockExecutionResult, ExecutionOutcome};
use reth_execution_types::{
BlockExecutionOutput, BlockExecutionResult, Chain, ExecutionOutcome,
};
use reth_primitives_traits::{RecoveredBlock, SealedBlock, SignerRecoverable};
use reth_storage_api::{
BlockBodyIndicesProvider, BlockHashReader, BlockIdReader, BlockNumReader, BlockReader,
@@ -1347,33 +1348,24 @@ mod tests {
// Send and receive commit notifications.
let block_2 = test_block_builder.generate_random_block(1, block_hash_1).try_recover()?;
let chain = Chain::new(vec![block_2.clone()], ExecutionOutcome::default(), BTreeMap::new());
let chain = Chain::new(vec![block_2], ExecutionOutcome::default(), BTreeMap::new());
let commit = CanonStateNotification::Commit { new: Arc::new(chain.clone()) };
in_memory_state.notify_canon_state(commit.clone());
let (notification_1, notification_2) = tokio::join!(rx_1.recv(), rx_2.recv());
// Verify both subscribers received commit notifications with matching tip
let n1 = notification_1.unwrap();
let n2 = notification_2.unwrap();
assert_eq!(*n1.tip(), block_2);
assert_eq!(*n2.tip(), block_2);
assert_eq!(notification_1, Ok(commit.clone()));
assert_eq!(notification_2, Ok(commit.clone()));
// Send and receive re-org notifications.
let block_3 = test_block_builder.generate_random_block(1, block_hash_1).try_recover()?;
let block_4 = test_block_builder.generate_random_block(2, block_3.hash()).try_recover()?;
let new_chain = Chain::new(
vec![block_3, block_4.clone()],
ExecutionOutcome::default(),
BTreeMap::new(),
);
let new_chain =
Chain::new(vec![block_3, block_4], ExecutionOutcome::default(), BTreeMap::new());
let re_org =
CanonStateNotification::Reorg { old: Arc::new(chain), new: Arc::new(new_chain) };
in_memory_state.notify_canon_state(re_org.clone());
let (notification_1, notification_2) = tokio::join!(rx_1.recv(), rx_2.recv());
// Verify both subscribers received reorg notifications with matching tip
let n1 = notification_1.unwrap();
let n2 = notification_2.unwrap();
assert_eq!(*n1.tip(), block_4);
assert_eq!(*n2.tip(), block_4);
assert_eq!(notification_1, Ok(re_org.clone()));
assert_eq!(notification_2, Ok(re_org.clone()));
Ok(())
}

View File

@@ -33,7 +33,6 @@ use alloy_primitives::{
use itertools::Itertools;
use parking_lot::RwLock;
use rayon::slice::ParallelSliceMut;
use reth_chain::Chain;
use reth_chain_state::{ComputedTrieData, ExecutedBlock};
use reth_chainspec::{ChainInfo, ChainSpecProvider, EthChainSpec};
use reth_db_api::{
@@ -48,7 +47,7 @@ use reth_db_api::{
transaction::{DbTx, DbTxMut},
BlockNumberList, PlainAccountState, PlainStorageState,
};
use reth_execution_types::{BlockExecutionOutput, BlockExecutionResult, ExecutionOutcome};
use reth_execution_types::{BlockExecutionOutput, BlockExecutionResult, Chain, ExecutionOutcome};
use reth_node_types::{BlockTy, BodyTy, HeaderTy, NodeTypes, ReceiptTy, TxTy};
use reth_primitives_traits::{
Account, Block as _, BlockBody as _, Bytecode, RecoveredBlock, SealedHeader, StorageEntry,

View File

@@ -13,7 +13,6 @@ workspace = true
[dependencies]
# reth
reth-chain = { workspace = true, optional = true }
reth-db-models.workspace = true
reth-chainspec.workspace = true
reth-db-api = { workspace = true, optional = true }
@@ -38,7 +37,6 @@ serde_json = { workspace = true, optional = true }
[features]
default = ["std"]
std = [
"dep:reth-chain",
"reth-chainspec/std",
"alloy-consensus/std",
"alloy-eips/std",
@@ -62,7 +60,6 @@ db-api = [
]
serde = [
"reth-chain?/serde",
"reth-ethereum-primitives/serde",
"reth-db-models/serde",
"reth-execution-types/serde",
@@ -81,7 +78,6 @@ serde-bincode-compat = [
"reth-execution-types/serde-bincode-compat",
"reth-primitives-traits/serde-bincode-compat",
"reth-trie-common/serde-bincode-compat",
"reth-chain?/serde-bincode-compat",
"reth-ethereum-primitives/serde-bincode-compat",
"alloy-eips/serde-bincode-compat",
"alloy-consensus/serde-bincode-compat",

View File

@@ -1,9 +1,8 @@
use crate::NodePrimitivesProvider;
use alloc::vec::Vec;
use alloy_primitives::BlockNumber;
use reth_chain::Chain;
use reth_db_models::StoredBlockBodyIndices;
use reth_execution_types::ExecutionOutcome;
use reth_execution_types::{Chain, ExecutionOutcome};
use reth_primitives_traits::{Block, NodePrimitives, RecoveredBlock};
use reth_storage_errors::provider::ProviderResult;
use reth_trie_common::HashedPostStateSorted;

View File

@@ -13,7 +13,6 @@ workspace = true
[dependencies]
# reth
reth-chain.workspace = true
reth-chain-state.workspace = true
reth-ethereum-primitives.workspace = true
reth-chainspec.workspace = true
@@ -91,7 +90,6 @@ serde = [
"revm-primitives/serde",
"reth-primitives-traits/serde",
"reth-ethereum-primitives/serde",
"reth-chain/serde",
"reth-chain-state/serde",
"reth-storage-api/serde",
]

View File

@@ -3,7 +3,7 @@
use alloy_consensus::Typed2718;
use alloy_eips::eip2718::Encodable2718;
use alloy_primitives::{BlockNumber, B256};
use reth_chain::ChainBlocks;
use reth_execution_types::ChainBlocks;
use reth_primitives_traits::{Block, BlockBody, SignedTransaction};
use std::collections::BTreeMap;
@@ -91,8 +91,8 @@ mod tests {
use super::*;
use alloy_consensus::{Header, Signed};
use alloy_primitives::Signature;
use reth_chain::Chain;
use reth_ethereum_primitives::Transaction;
use reth_execution_types::Chain;
use reth_primitives_traits::{RecoveredBlock, SealedBlock, SealedHeader};
#[test]

View File

@@ -0,0 +1,194 @@
//! Lazy initialization wrapper for trie data.
//!
//! Provides a no-std compatible [`LazyTrieData`] type for lazily initialized
//! trie-related data containing sorted hashed state and trie updates.
use crate::{updates::TrieUpdatesSorted, HashedPostStateSorted};
use alloc::sync::Arc;
use core::fmt;
use reth_primitives_traits::sync::OnceLock;
/// Container for sorted trie data: hashed state and trie updates.
///
/// This bundles both [`HashedPostStateSorted`] and [`TrieUpdatesSorted`] together
/// for convenient passing and storage.
#[derive(Clone, Debug, Default, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub struct SortedTrieData {
/// Sorted hashed post-state produced by execution.
pub hashed_state: Arc<HashedPostStateSorted>,
/// Sorted trie updates produced by state root computation.
pub trie_updates: Arc<TrieUpdatesSorted>,
}
impl SortedTrieData {
/// Creates a new [`SortedTrieData`] with the given values.
pub const fn new(
hashed_state: Arc<HashedPostStateSorted>,
trie_updates: Arc<TrieUpdatesSorted>,
) -> Self {
Self { hashed_state, trie_updates }
}
}
/// Lazily initialized trie data containing sorted hashed state and trie updates.
///
/// This is a no-std compatible wrapper that supports two modes:
/// 1. **Ready mode**: Data is available immediately (created via `ready()`)
/// 2. **Deferred mode**: Data is computed on first access (created via `deferred()`)
///
/// In deferred mode, the computation runs on the first call to `get()`, `hashed_state()`,
/// or `trie_updates()`, and results are cached for subsequent calls.
///
/// Cloning is cheap (Arc clone) and clones share the cached state.
pub struct LazyTrieData {
/// Cached sorted trie data, computed on first access.
data: Arc<OnceLock<SortedTrieData>>,
/// Optional deferred computation function.
compute: Option<Arc<dyn Fn() -> SortedTrieData + Send + Sync>>,
}
impl Clone for LazyTrieData {
fn clone(&self) -> Self {
Self { data: Arc::clone(&self.data), compute: self.compute.clone() }
}
}
impl fmt::Debug for LazyTrieData {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("LazyTrieData")
.field("data", &if self.data.get().is_some() { "initialized" } else { "pending" })
.finish()
}
}
impl PartialEq for LazyTrieData {
fn eq(&self, other: &Self) -> bool {
self.get() == other.get()
}
}
impl Eq for LazyTrieData {}
impl LazyTrieData {
/// Creates a new [`LazyTrieData`] that is already initialized with the given values.
pub fn ready(
hashed_state: Arc<HashedPostStateSorted>,
trie_updates: Arc<TrieUpdatesSorted>,
) -> Self {
let data = OnceLock::new();
let _ = data.set(SortedTrieData::new(hashed_state, trie_updates));
Self { data: Arc::new(data), compute: None }
}
/// Creates a new [`LazyTrieData`] from pre-computed [`SortedTrieData`].
pub fn from_sorted(sorted: SortedTrieData) -> Self {
let data = OnceLock::new();
let _ = data.set(sorted);
Self { data: Arc::new(data), compute: None }
}
/// Creates a new [`LazyTrieData`] with a deferred computation function.
///
/// The computation will run on the first call to `get()`, `hashed_state()`,
/// or `trie_updates()`. Results are cached for subsequent calls.
pub fn deferred(compute: impl Fn() -> SortedTrieData + Send + Sync + 'static) -> Self {
Self { data: Arc::new(OnceLock::new()), compute: Some(Arc::new(compute)) }
}
/// Returns a reference to the sorted trie data, computing if necessary.
///
/// # Panics
///
/// Panics if created via `deferred()` and the computation function was not provided.
pub fn get(&self) -> &SortedTrieData {
self.data.get_or_init(|| {
self.compute.as_ref().expect("LazyTrieData::get called before initialization")()
})
}
/// Returns a clone of the hashed state Arc.
///
/// If not initialized, computes from the deferred source or panics.
pub fn hashed_state(&self) -> Arc<HashedPostStateSorted> {
Arc::clone(&self.get().hashed_state)
}
/// Returns a clone of the trie updates Arc.
///
/// If not initialized, computes from the deferred source or panics.
pub fn trie_updates(&self) -> Arc<TrieUpdatesSorted> {
Arc::clone(&self.get().trie_updates)
}
/// Returns a clone of the [`SortedTrieData`].
///
/// If not initialized, computes from the deferred source or panics.
pub fn sorted_trie_data(&self) -> SortedTrieData {
self.get().clone()
}
}
#[cfg(feature = "serde")]
impl serde::Serialize for LazyTrieData {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
self.get().serialize(serializer)
}
}
#[cfg(feature = "serde")]
impl<'de> serde::Deserialize<'de> for LazyTrieData {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
let data = SortedTrieData::deserialize(deserializer)?;
Ok(Self::from_sorted(data))
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_lazy_ready_is_initialized() {
let lazy = LazyTrieData::ready(
Arc::new(HashedPostStateSorted::default()),
Arc::new(TrieUpdatesSorted::default()),
);
let _ = lazy.hashed_state();
let _ = lazy.trie_updates();
}
#[test]
fn test_lazy_clone_shares_state() {
let lazy1 = LazyTrieData::ready(
Arc::new(HashedPostStateSorted::default()),
Arc::new(TrieUpdatesSorted::default()),
);
let lazy2 = lazy1.clone();
// Both point to the same data
assert!(Arc::ptr_eq(&lazy1.hashed_state(), &lazy2.hashed_state()));
assert!(Arc::ptr_eq(&lazy1.trie_updates(), &lazy2.trie_updates()));
}
#[test]
fn test_lazy_deferred() {
let lazy = LazyTrieData::deferred(SortedTrieData::default);
assert!(lazy.hashed_state().is_empty());
assert!(lazy.trie_updates().is_empty());
}
#[test]
fn test_lazy_from_sorted() {
let sorted = SortedTrieData::default();
let lazy = LazyTrieData::from_sorted(sorted);
assert!(lazy.hashed_state().is_empty());
assert!(lazy.trie_updates().is_empty());
}
}

View File

@@ -11,6 +11,10 @@
extern crate alloc;
/// Lazy initialization wrapper for trie data.
mod lazy;
pub use lazy::{LazyTrieData, SortedTrieData};
/// In-memory hashed state.
mod hashed_state;
pub use hashed_state::*;