perf: make Chain use DeferredTrieData (#21137)

Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
This commit is contained in:
Dan Cline
2026-01-17 01:05:35 +00:00
committed by GitHub
parent b96a30821f
commit c617d25c36
38 changed files with 751 additions and 463 deletions

39
Cargo.lock generated
View File

@@ -7774,6 +7774,30 @@ dependencies = [
"tracing",
]
[[package]]
name = "reth-chain"
version = "1.10.0"
dependencies = [
"alloy-consensus",
"alloy-eips",
"alloy-primitives",
"arbitrary",
"bincode 1.3.3",
"metrics",
"parking_lot",
"rand 0.9.2",
"reth-ethereum-primitives",
"reth-execution-types",
"reth-metrics",
"reth-primitives-traits",
"reth-trie",
"reth-trie-common",
"revm",
"serde",
"serde_with",
"tracing",
]
[[package]]
name = "reth-chain-state"
version = "1.10.0"
@@ -7789,6 +7813,7 @@ dependencies = [
"parking_lot",
"pin-project",
"rand 0.9.2",
"reth-chain",
"reth-chainspec",
"reth-errors",
"reth-ethereum-primitives",
@@ -8927,7 +8952,6 @@ dependencies = [
name = "reth-execution-types"
version = "1.10.0"
dependencies = [
"alloy-consensus",
"alloy-eips",
"alloy-evm",
"alloy-primitives",
@@ -8981,6 +9005,7 @@ dependencies = [
"reth-trie-common",
"rmp-serde",
"secp256k1 0.30.0",
"serde_with",
"tempfile",
"thiserror 2.0.17",
"tokio",
@@ -8995,6 +9020,7 @@ dependencies = [
"alloy-eips",
"eyre",
"futures-util",
"reth-chain",
"reth-chainspec",
"reth-config",
"reth-consensus",
@@ -9002,7 +9028,6 @@ dependencies = [
"reth-db-common",
"reth-ethereum-primitives",
"reth-evm-ethereum",
"reth-execution-types",
"reth-exex",
"reth-network",
"reth-node-api",
@@ -9028,9 +9053,9 @@ dependencies = [
"arbitrary",
"bincode 1.3.3",
"rand 0.9.2",
"reth-chain",
"reth-chain-state",
"reth-ethereum-primitives",
"reth-execution-types",
"reth-primitives-traits",
"serde",
"serde_with",
@@ -9773,6 +9798,7 @@ dependencies = [
"op-alloy-consensus",
"op-alloy-rpc-types-engine",
"op-revm",
"reth-chain",
"reth-chainspec",
"reth-evm",
"reth-execution-errors",
@@ -10223,6 +10249,7 @@ dependencies = [
"parking_lot",
"rand 0.9.2",
"rayon",
"reth-chain",
"reth-chain-state",
"reth-chainspec",
"reth-codecs",
@@ -10712,12 +10739,12 @@ dependencies = [
"metrics",
"rand 0.9.2",
"reqwest",
"reth-chain",
"reth-chain-state",
"reth-chainspec",
"reth-errors",
"reth-ethereum-primitives",
"reth-evm",
"reth-execution-types",
"reth-metrics",
"reth-primitives-traits",
"reth-revm",
@@ -10790,6 +10817,7 @@ dependencies = [
"rand 0.9.2",
"rayon",
"reqwest",
"reth-chain",
"reth-chainspec",
"reth-codecs",
"reth-config",
@@ -10805,7 +10833,6 @@ dependencies = [
"reth-etl",
"reth-evm",
"reth-evm-ethereum",
"reth-execution-types",
"reth-exex",
"reth-fs-util",
"reth-network-p2p",
@@ -10952,6 +10979,7 @@ dependencies = [
"alloy-primitives",
"alloy-rpc-types-engine",
"auto_impl",
"reth-chain",
"reth-chainspec",
"reth-db-api",
"reth-db-models",
@@ -11110,6 +11138,7 @@ dependencies = [
"proptest",
"proptest-arbitrary-interop",
"rand 0.9.2",
"reth-chain",
"reth-chain-state",
"reth-chainspec",
"reth-eth-wire-types",

View File

@@ -44,6 +44,7 @@ members = [
"crates/ethereum/primitives/",
"crates/ethereum/reth/",
"crates/etl/",
"crates/evm/chain",
"crates/evm/evm",
"crates/evm/execution-errors",
"crates/evm/execution-types",
@@ -387,6 +388,7 @@ reth-etl = { path = "crates/etl" }
reth-evm = { path = "crates/evm/evm", default-features = false }
reth-evm-ethereum = { path = "crates/ethereum/evm", default-features = false }
reth-optimism-evm = { path = "crates/optimism/evm", default-features = false }
reth-chain = { path = "crates/evm/chain" }
reth-execution-errors = { path = "crates/evm/execution-errors", default-features = false }
reth-execution-types = { path = "crates/evm/execution-types", default-features = false }
reth-exex = { path = "crates/exex/exex" }

View File

@@ -13,6 +13,7 @@ workspace = true
[dependencies]
# reth
reth-chain.workspace = true
reth-chainspec.workspace = true
reth-errors.workspace = true
reth-execution-types.workspace = true
@@ -65,6 +66,7 @@ serde = [
"alloy-primitives/serde",
"parking_lot/serde",
"rand?/serde",
"reth-chain/serde",
"reth-ethereum-primitives/serde",
"reth-execution-types/serde",
"reth-primitives-traits/serde",

View File

@@ -8,9 +8,10 @@ use alloy_consensus::{transaction::TransactionMeta, BlockHeader};
use alloy_eips::{BlockHashOrNumber, BlockNumHash};
use alloy_primitives::{map::HashMap, BlockNumber, TxHash, B256};
use parking_lot::RwLock;
use reth_chain::Chain;
use reth_chainspec::ChainInfo;
use reth_ethereum_primitives::EthPrimitives;
use reth_execution_types::{BlockExecutionOutput, BlockExecutionResult, Chain, ExecutionOutcome};
use reth_execution_types::{BlockExecutionOutput, BlockExecutionResult, ExecutionOutcome};
use reth_metrics::{metrics::Gauge, Metrics};
use reth_primitives_traits::{
BlockBody as _, IndexedTx, NodePrimitives, RecoveredBlock, SealedBlock, SealedHeader,
@@ -944,6 +945,9 @@ impl<N: NodePrimitives<SignedTx: SignedTransaction>> NewCanonicalChain<N> {
}
/// Converts a slice of executed blocks into a [`Chain`].
///
/// Uses [`ExecutedBlock::trie_data_handle`] to avoid blocking on deferred trie computations.
/// The trie data will be computed lazily when actually needed by consumers.
fn blocks_to_chain(blocks: &[ExecutedBlock<N>]) -> Chain<N> {
match blocks {
[] => Chain::default(),
@@ -954,8 +958,7 @@ impl<N: NodePrimitives<SignedTx: SignedTransaction>> NewCanonicalChain<N> {
first.execution_outcome().clone(),
first.block_number(),
)),
first.trie_updates(),
first.hashed_state(),
first.trie_data_handle(),
);
for exec in rest {
chain.append_block(
@@ -964,8 +967,7 @@ impl<N: NodePrimitives<SignedTx: SignedTransaction>> NewCanonicalChain<N> {
exec.execution_outcome().clone(),
exec.block_number(),
)),
exec.trie_updates(),
exec.hashed_state(),
exec.trie_data_handle(),
);
}
chain
@@ -1560,17 +1562,30 @@ mod tests {
..Default::default()
};
assert_eq!(
chain_commit.to_chain_notification(),
CanonStateNotification::Commit {
new: Arc::new(Chain::new(
vec![block0.recovered_block().clone(), block1.recovered_block().clone()],
commit_execution_outcome,
expected_trie_updates,
expected_hashed_state
))
}
);
// Get the notification and verify
let notification = chain_commit.to_chain_notification();
let CanonStateNotification::Commit { new } = notification else {
panic!("Expected Commit notification");
};
// Compare blocks
let expected_blocks: Vec<_> =
vec![block0.recovered_block().clone(), block1.recovered_block().clone()];
let actual_blocks: Vec<_> = new.blocks().values().cloned().collect();
assert_eq!(actual_blocks, expected_blocks);
// Compare execution outcome
assert_eq!(*new.execution_outcome(), commit_execution_outcome);
// Compare trie data by waiting on deferred data
for (block_num, expected_updates) in &expected_trie_updates {
let actual = new.trie_data_at(*block_num).unwrap().wait_cloned();
assert_eq!(actual.trie_updates, *expected_updates);
}
for (block_num, expected_state) in &expected_hashed_state {
let actual = new.trie_data_at(*block_num).unwrap().wait_cloned();
assert_eq!(actual.hashed_state, *expected_state);
}
// Test reorg notification
let chain_reorg = NewCanonicalChain::Reorg {
@@ -1607,22 +1622,48 @@ mod tests {
..Default::default()
};
assert_eq!(
chain_reorg.to_chain_notification(),
CanonStateNotification::Reorg {
old: Arc::new(Chain::new(
vec![block1.recovered_block().clone(), block2.recovered_block().clone()],
reorg_execution_outcome.clone(),
old_trie_updates,
old_hashed_state
)),
new: Arc::new(Chain::new(
vec![block1a.recovered_block().clone(), block2a.recovered_block().clone()],
reorg_execution_outcome,
new_trie_updates,
new_hashed_state
))
}
);
// Get the notification and verify
let notification = chain_reorg.to_chain_notification();
let CanonStateNotification::Reorg { old, new } = notification else {
panic!("Expected Reorg notification");
};
// Compare old chain blocks
let expected_old_blocks: Vec<_> =
vec![block1.recovered_block().clone(), block2.recovered_block().clone()];
let actual_old_blocks: Vec<_> = old.blocks().values().cloned().collect();
assert_eq!(actual_old_blocks, expected_old_blocks);
// Compare old chain execution outcome
assert_eq!(*old.execution_outcome(), reorg_execution_outcome);
// Compare old chain trie data
for (block_num, expected_updates) in &old_trie_updates {
let actual = old.trie_data_at(*block_num).unwrap().wait_cloned();
assert_eq!(actual.trie_updates, *expected_updates);
}
for (block_num, expected_state) in &old_hashed_state {
let actual = old.trie_data_at(*block_num).unwrap().wait_cloned();
assert_eq!(actual.hashed_state, *expected_state);
}
// Compare new chain blocks
let expected_new_blocks: Vec<_> =
vec![block1a.recovered_block().clone(), block2a.recovered_block().clone()];
let actual_new_blocks: Vec<_> = new.blocks().values().cloned().collect();
assert_eq!(actual_new_blocks, expected_new_blocks);
// Compare new chain execution outcome
assert_eq!(*new.execution_outcome(), reorg_execution_outcome);
// Compare new chain trie data
for (block_num, expected_updates) in &new_trie_updates {
let actual = new.trie_data_at(*block_num).unwrap().wait_cloned();
assert_eq!(actual.trie_updates, *expected_updates);
}
for (block_num, expected_state) in &new_hashed_state {
let actual = new.trie_data_at(*block_num).unwrap().wait_cloned();
assert_eq!(actual.hashed_state, *expected_state);
}
}
}

View File

@@ -11,8 +11,8 @@
mod in_memory;
pub use in_memory::*;
mod deferred_trie;
pub use deferred_trie::*;
// Re-export deferred_trie types from reth_chain
pub use reth_chain::{AnchoredTrieInput, ComputedTrieData, DeferredTrieData};
mod lazy_overlay;
pub use lazy_overlay::*;

View File

@@ -2,7 +2,7 @@
use alloy_eips::{eip2718::Encodable2718, BlockNumHash};
use derive_more::{Deref, DerefMut};
use reth_execution_types::{BlockReceipts, Chain};
use reth_chain::{BlockReceipts, Chain};
use reth_primitives_traits::{NodePrimitives, RecoveredBlock, SealedHeader};
use reth_storage_api::NodePrimitivesProvider;
use std::{
@@ -80,7 +80,7 @@ impl<N: NodePrimitives> Stream for CanonStateNotificationStream<N> {
///
/// The notification contains at least one [`Chain`] with the imported segment. If some blocks were
/// reverted (e.g. during a reorg), the old chain is also returned.
#[derive(Clone, Debug, PartialEq, Eq)]
#[derive(Clone, Debug)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
#[cfg_attr(feature = "serde", serde(bound = ""))]
pub enum CanonStateNotification<N: NodePrimitives = reth_ethereum_primitives::EthPrimitives> {
@@ -280,14 +280,13 @@ mod tests {
vec![block1.clone(), block2.clone()],
ExecutionOutcome::default(),
BTreeMap::new(),
BTreeMap::new(),
));
// Create a commit notification
let notification = CanonStateNotification::Commit { new: chain.clone() };
// Test that `committed` returns the correct chain
assert_eq!(notification.committed(), chain);
// Test that `committed` returns the correct chain (compare Arc pointers)
assert!(Arc::ptr_eq(&notification.committed(), &chain));
// Test that `reverted` returns None for `Commit`
assert!(notification.reverted().is_none());
@@ -319,24 +318,22 @@ mod tests {
vec![block1.clone()],
ExecutionOutcome::default(),
BTreeMap::new(),
BTreeMap::new(),
));
let new_chain = Arc::new(Chain::new(
vec![block2.clone(), block3.clone()],
ExecutionOutcome::default(),
BTreeMap::new(),
BTreeMap::new(),
));
// Create a reorg notification
let notification =
CanonStateNotification::Reorg { old: old_chain.clone(), new: new_chain.clone() };
// Test that `reverted` returns the old chain
assert_eq!(notification.reverted(), Some(old_chain));
// Test that `reverted` returns the old chain (compare Arc pointers)
assert!(Arc::ptr_eq(&notification.reverted().unwrap(), &old_chain));
// Test that `committed` returns the new chain
assert_eq!(notification.committed(), new_chain);
// Test that `committed` returns the new chain (compare Arc pointers)
assert!(Arc::ptr_eq(&notification.committed(), &new_chain));
// Test that `tip` returns the tip of the new chain (last block in the new chain)
assert_eq!(*notification.tip(), block3);
@@ -391,7 +388,6 @@ mod tests {
vec![block1.clone(), block2.clone()],
execution_outcome,
BTreeMap::new(),
BTreeMap::new(),
));
// Create a commit notification containing the new chain segment.
@@ -449,12 +445,8 @@ mod tests {
ExecutionOutcome { receipts: old_receipts, ..Default::default() };
// Create an old chain segment to be reverted, containing `old_block1`.
let old_chain: Arc<Chain> = Arc::new(Chain::new(
vec![old_block1.clone()],
old_execution_outcome,
BTreeMap::new(),
BTreeMap::new(),
));
let old_chain: Arc<Chain> =
Arc::new(Chain::new(vec![old_block1.clone()], old_execution_outcome, BTreeMap::new()));
// Define block2 for the new chain segment, which will be committed.
let mut body = BlockBody::<TransactionSigned>::default();
@@ -482,12 +474,8 @@ mod tests {
ExecutionOutcome { receipts: new_receipts, ..Default::default() };
// Create a new chain segment to be committed, containing `new_block1`.
let new_chain = Arc::new(Chain::new(
vec![new_block1.clone()],
new_execution_outcome,
BTreeMap::new(),
BTreeMap::new(),
));
let new_chain =
Arc::new(Chain::new(vec![new_block1.clone()], new_execution_outcome, BTreeMap::new()));
// Create a reorg notification with both reverted (old) and committed (new) chain segments.
let notification = CanonStateNotification::Reorg { old: old_chain, new: new_chain };

View File

@@ -9,11 +9,12 @@ use alloy_signer::SignerSync;
use alloy_signer_local::PrivateKeySigner;
use core::marker::PhantomData;
use rand::Rng;
use reth_chain::Chain;
use reth_chainspec::{ChainSpec, EthereumHardfork, MIN_TRANSACTION_GAS};
use reth_ethereum_primitives::{
Block, BlockBody, EthPrimitives, Receipt, Transaction, TransactionSigned,
};
use reth_execution_types::{BlockExecutionOutput, BlockExecutionResult, Chain, ExecutionOutcome};
use reth_execution_types::{BlockExecutionOutput, BlockExecutionResult, ExecutionOutcome};
use reth_primitives_traits::{
proofs::{calculate_receipt_root, calculate_transaction_root, calculate_withdrawals_root},
Account, NodePrimitives, Recovered, RecoveredBlock, SealedBlock, SealedHeader,

View File

@@ -0,0 +1,69 @@
[package]
name = "reth-chain"
version.workspace = true
edition.workspace = true
rust-version.workspace = true
license.workspace = true
homepage.workspace = true
repository.workspace = true
description = "Chain and deferred trie data types for reth."
[lints]
workspace = true
[dependencies]
reth-ethereum-primitives.workspace = true
reth-execution-types = { workspace = true, features = ["std"] }
reth-metrics.workspace = true
reth-primitives-traits.workspace = true
reth-trie.workspace = true
reth-trie-common.workspace = true
# alloy
alloy-consensus.workspace = true
alloy-primitives.workspace = true
alloy-eips.workspace = true
serde = { workspace = true, optional = true }
serde_with = { workspace = true, optional = true }
metrics.workspace = true
parking_lot.workspace = true
tracing.workspace = true
[dev-dependencies]
reth-primitives-traits = { workspace = true, features = ["test-utils", "arbitrary"] }
reth-ethereum-primitives = { workspace = true, features = ["arbitrary"] }
alloy-primitives = { workspace = true, features = ["rand", "arbitrary"] }
alloy-consensus = { workspace = true, features = ["arbitrary"] }
arbitrary.workspace = true
bincode.workspace = true
rand.workspace = true
revm.workspace = true
[features]
default = []
serde = [
"dep:serde",
"alloy-eips/serde",
"alloy-primitives/serde",
"reth-primitives-traits/serde",
"alloy-consensus/serde",
"reth-trie/serde",
"reth-trie-common/serde",
"reth-ethereum-primitives/serde",
"reth-execution-types/serde",
"rand/serde",
"revm/serde",
"parking_lot/serde",
]
serde-bincode-compat = [
"serde",
"reth-trie-common/serde-bincode-compat",
"reth-primitives-traits/serde-bincode-compat",
"serde_with",
"alloy-eips/serde-bincode-compat",
"alloy-consensus/serde-bincode-compat",
"reth-ethereum-primitives/serde-bincode-compat",
"reth-execution-types/serde-bincode-compat",
]

View File

@@ -1,16 +1,16 @@
//! Contains [Chain], a chain of blocks and their final state.
use crate::ExecutionOutcome;
use alloc::{borrow::Cow, collections::BTreeMap, sync::Arc, vec::Vec};
use crate::DeferredTrieData;
use alloy_consensus::{transaction::Recovered, BlockHeader};
use alloy_eips::{eip1898::ForkBlock, eip2718::Encodable2718, BlockNumHash};
use alloy_primitives::{Address, BlockHash, BlockNumber, TxHash};
use core::{fmt, ops::RangeInclusive};
use reth_execution_types::ExecutionOutcome;
use reth_primitives_traits::{
transaction::signed::SignedTransaction, Block, BlockBody, IndexedTx, NodePrimitives,
RecoveredBlock, SealedHeader,
};
use reth_trie_common::{updates::TrieUpdatesSorted, HashedPostStateSorted};
use std::{borrow::Cow, collections::BTreeMap, fmt, ops::RangeInclusive, sync::Arc, vec::Vec};
/// A chain of blocks and their final state.
///
@@ -22,8 +22,7 @@ use reth_trie_common::{updates::TrieUpdatesSorted, HashedPostStateSorted};
/// # Warning
///
/// A chain of blocks should not be empty.
#[derive(Clone, Debug, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
#[derive(Clone, Debug)]
pub struct Chain<N: NodePrimitives = reth_ethereum_primitives::EthPrimitives> {
/// All blocks in this chain.
blocks: BTreeMap<BlockNumber, RecoveredBlock<N::Block>>,
@@ -34,10 +33,12 @@ pub struct Chain<N: NodePrimitives = reth_ethereum_primitives::EthPrimitives> {
///
/// Additionally, it includes the individual state changes that led to the current state.
execution_outcome: ExecutionOutcome<N::Receipt>,
/// State trie updates for each block in the chain, keyed by block number.
trie_updates: BTreeMap<BlockNumber, Arc<TrieUpdatesSorted>>,
/// Hashed post state for each block in the chain, keyed by block number.
hashed_state: BTreeMap<BlockNumber, Arc<HashedPostStateSorted>>,
/// Deferred trie data for each block in the chain, keyed by block number.
///
/// Contains handles to lazily-computed sorted trie updates and hashed state.
/// This allows Chain to be constructed without blocking on expensive trie
/// computations - the data is only materialized when actually needed.
trie_data: BTreeMap<BlockNumber, DeferredTrieData>,
}
type ChainTxReceiptMeta<'a, N> = (
@@ -52,8 +53,7 @@ impl<N: NodePrimitives> Default for Chain<N> {
Self {
blocks: Default::default(),
execution_outcome: Default::default(),
trie_updates: Default::default(),
hashed_state: Default::default(),
trie_data: Default::default(),
}
}
}
@@ -67,27 +67,24 @@ impl<N: NodePrimitives> Chain<N> {
pub fn new(
blocks: impl IntoIterator<Item = RecoveredBlock<N::Block>>,
execution_outcome: ExecutionOutcome<N::Receipt>,
trie_updates: BTreeMap<BlockNumber, Arc<TrieUpdatesSorted>>,
hashed_state: BTreeMap<BlockNumber, Arc<HashedPostStateSorted>>,
trie_data: BTreeMap<BlockNumber, DeferredTrieData>,
) -> Self {
let blocks =
blocks.into_iter().map(|b| (b.header().number(), b)).collect::<BTreeMap<_, _>>();
debug_assert!(!blocks.is_empty(), "Chain should have at least one block");
Self { blocks, execution_outcome, trie_updates, hashed_state }
Self { blocks, execution_outcome, trie_data }
}
/// Create new Chain from a single block and its state.
pub fn from_block(
block: RecoveredBlock<N::Block>,
execution_outcome: ExecutionOutcome<N::Receipt>,
trie_updates: Arc<TrieUpdatesSorted>,
hashed_state: Arc<HashedPostStateSorted>,
trie_data: DeferredTrieData,
) -> Self {
let block_number = block.header().number();
let trie_updates_map = BTreeMap::from([(block_number, trie_updates)]);
let hashed_state_map = BTreeMap::from([(block_number, hashed_state)]);
Self::new([block], execution_outcome, trie_updates_map, hashed_state_map)
let trie_data_map = BTreeMap::from([(block_number, trie_data)]);
Self::new([block], execution_outcome, trie_data_map)
}
/// Get the blocks in this chain.
@@ -105,37 +102,62 @@ impl<N: NodePrimitives> Chain<N> {
self.blocks.values().map(|block| block.clone_sealed_header())
}
/// Get all deferred trie data for this chain.
///
/// Returns handles to lazily-computed sorted trie updates and hashed state.
/// [`DeferredTrieData`] allows `Chain` to be constructed without blocking on
/// expensive trie computations - the data is only materialized when actually needed
/// via [`DeferredTrieData::wait_cloned`] or similar methods.
///
/// This method does **not** block. To access the computed trie data, call
/// [`DeferredTrieData::wait_cloned`] on individual entries, which will block
/// if the background computation has not yet completed.
pub const fn trie_data(&self) -> &BTreeMap<BlockNumber, DeferredTrieData> {
&self.trie_data
}
/// Get deferred trie data for a specific block number.
///
/// Returns a handle to the lazily-computed trie data. This method does **not** block.
/// Call [`DeferredTrieData::wait_cloned`] on the result to wait for and retrieve
/// the computed data, which will block if computation is still in progress.
pub fn trie_data_at(&self, block_number: BlockNumber) -> Option<&DeferredTrieData> {
self.trie_data.get(&block_number)
}
/// Get all trie updates for this chain.
pub const fn trie_updates(&self) -> &BTreeMap<BlockNumber, Arc<TrieUpdatesSorted>> {
&self.trie_updates
///
/// Note: This blocks on deferred trie data for all blocks in the chain.
/// Prefer using [`trie_data`](Self::trie_data) when possible to avoid blocking.
pub fn trie_updates(&self) -> BTreeMap<BlockNumber, Arc<TrieUpdatesSorted>> {
self.trie_data.iter().map(|(num, data)| (*num, data.wait_cloned().trie_updates)).collect()
}
/// Get trie updates for a specific block number.
pub fn trie_updates_at(&self, block_number: BlockNumber) -> Option<&Arc<TrieUpdatesSorted>> {
self.trie_updates.get(&block_number)
///
/// Note: This waits for deferred trie data if not already computed.
pub fn trie_updates_at(&self, block_number: BlockNumber) -> Option<Arc<TrieUpdatesSorted>> {
self.trie_data.get(&block_number).map(|data| data.wait_cloned().trie_updates)
}
/// Remove all trie updates for this chain.
pub fn clear_trie_updates(&mut self) {
self.trie_updates.clear();
/// Remove all trie data for this chain.
pub fn clear_trie_data(&mut self) {
self.trie_data.clear();
}
/// Get all hashed states for this chain.
pub const fn hashed_state(&self) -> &BTreeMap<BlockNumber, Arc<HashedPostStateSorted>> {
&self.hashed_state
///
/// Note: This blocks on deferred trie data for all blocks in the chain.
/// Prefer using [`trie_data`](Self::trie_data) when possible to avoid blocking.
pub fn hashed_state(&self) -> BTreeMap<BlockNumber, Arc<HashedPostStateSorted>> {
self.trie_data.iter().map(|(num, data)| (*num, data.wait_cloned().hashed_state)).collect()
}
/// Get hashed state for a specific block number.
pub fn hashed_state_at(
&self,
block_number: BlockNumber,
) -> Option<&Arc<HashedPostStateSorted>> {
self.hashed_state.get(&block_number)
}
/// Remove all hashed states for this chain.
pub fn clear_hashed_state(&mut self) {
self.hashed_state.clear();
///
/// Note: This waits for deferred trie data if not already computed.
pub fn hashed_state_at(&self, block_number: BlockNumber) -> Option<Arc<HashedPostStateSorted>> {
self.trie_data.get(&block_number).map(|data| data.wait_cloned().hashed_state)
}
/// Get execution outcome of this chain
@@ -183,23 +205,16 @@ impl<N: NodePrimitives> Chain<N> {
/// Destructure the chain into its inner components:
/// 1. The blocks contained in the chain.
/// 2. The execution outcome representing the final state.
/// 3. The trie updates map.
/// 4. The hashed state map.
/// 3. The deferred trie data map.
#[allow(clippy::type_complexity)]
pub fn into_inner(
self,
) -> (
ChainBlocks<'static, N::Block>,
ExecutionOutcome<N::Receipt>,
BTreeMap<BlockNumber, Arc<TrieUpdatesSorted>>,
BTreeMap<BlockNumber, Arc<HashedPostStateSorted>>,
BTreeMap<BlockNumber, DeferredTrieData>,
) {
(
ChainBlocks { blocks: Cow::Owned(self.blocks) },
self.execution_outcome,
self.trie_updates,
self.hashed_state,
)
(ChainBlocks { blocks: Cow::Owned(self.blocks) }, self.execution_outcome, self.trie_data)
}
/// Destructure the chain into its inner components:
@@ -329,14 +344,12 @@ impl<N: NodePrimitives> Chain<N> {
&mut self,
block: RecoveredBlock<N::Block>,
execution_outcome: ExecutionOutcome<N::Receipt>,
trie_updates: Arc<TrieUpdatesSorted>,
hashed_state: Arc<HashedPostStateSorted>,
trie_data: DeferredTrieData,
) {
let block_number = block.header().number();
self.blocks.insert(block_number, block);
self.execution_outcome.extend(execution_outcome);
self.trie_updates.insert(block_number, trie_updates);
self.hashed_state.insert(block_number, hashed_state);
self.trie_data.insert(block_number, trie_data);
}
/// Merge two chains by appending the given chain into the current one.
@@ -355,8 +368,7 @@ impl<N: NodePrimitives> Chain<N> {
// Insert blocks from other chain
self.blocks.extend(other.blocks);
self.execution_outcome.extend(other.execution_outcome);
self.trie_updates.extend(other.trie_updates);
self.hashed_state.extend(other.hashed_state);
self.trie_data.extend(other.trie_data);
Ok(())
}
@@ -459,7 +471,7 @@ impl<B: Block<Body: BlockBody<Transaction: SignedTransaction>>> ChainBlocks<'_,
impl<B: Block> IntoIterator for ChainBlocks<'_, B> {
type Item = (BlockNumber, RecoveredBlock<B>);
type IntoIter = alloc::collections::btree_map::IntoIter<BlockNumber, RecoveredBlock<B>>;
type IntoIter = std::collections::btree_map::IntoIter<BlockNumber, RecoveredBlock<B>>;
fn into_iter(self) -> Self::IntoIter {
self.blocks.into_owned().into_iter()
@@ -477,25 +489,95 @@ pub struct BlockReceipts<T = reth_ethereum_primitives::Receipt> {
pub timestamp: u64,
}
#[cfg(feature = "serde")]
mod chain_serde {
use super::*;
use crate::ComputedTrieData;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
/// Serializable representation of Chain that waits for deferred trie data.
#[derive(Serialize, Deserialize)]
#[serde(bound = "")]
struct ChainRepr<N: NodePrimitives> {
blocks: BTreeMap<BlockNumber, RecoveredBlock<N::Block>>,
execution_outcome: ExecutionOutcome<N::Receipt>,
#[serde(default)]
trie_updates: BTreeMap<BlockNumber, Arc<TrieUpdatesSorted>>,
#[serde(default)]
hashed_state: BTreeMap<BlockNumber, Arc<HashedPostStateSorted>>,
}
impl<N: NodePrimitives> Serialize for Chain<N> {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
// Wait for deferred trie data for serialization
let trie_updates: BTreeMap<_, _> = self
.trie_data
.iter()
.map(|(num, data)| (*num, data.wait_cloned().trie_updates))
.collect();
let hashed_state: BTreeMap<_, _> = self
.trie_data
.iter()
.map(|(num, data)| (*num, data.wait_cloned().hashed_state))
.collect();
let repr = ChainRepr::<N> {
blocks: self.blocks.clone(),
execution_outcome: self.execution_outcome.clone(),
trie_updates,
hashed_state,
};
repr.serialize(serializer)
}
}
impl<'de, N: NodePrimitives> Deserialize<'de> for Chain<N> {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
let repr = ChainRepr::<N>::deserialize(deserializer)?;
// Convert to ready DeferredTrieData handles
let trie_data = repr
.trie_updates
.into_iter()
.map(|(num, trie_updates)| {
let hashed_state = repr.hashed_state.get(&num).cloned().unwrap_or_default();
let computed = ComputedTrieData::without_trie_input(hashed_state, trie_updates);
(num, DeferredTrieData::ready(computed))
})
.collect();
Ok(Self { blocks: repr.blocks, execution_outcome: repr.execution_outcome, trie_data })
}
}
}
/// Bincode-compatible [`Chain`] serde implementation.
#[cfg(feature = "serde-bincode-compat")]
pub(super) mod serde_bincode_compat {
use crate::{serde_bincode_compat, ExecutionOutcome};
use alloc::{borrow::Cow, collections::BTreeMap, sync::Arc};
use alloy_primitives::BlockNumber;
use reth_ethereum_primitives::EthPrimitives;
use reth_execution_types::{
serde_bincode_compat as exec_serde_bincode_compat, ExecutionOutcome,
};
use reth_primitives_traits::{
serde_bincode_compat::{RecoveredBlock, SerdeBincodeCompat},
Block, NodePrimitives,
};
use serde::{ser::SerializeMap, Deserialize, Deserializer, Serialize, Serializer};
use serde_with::{DeserializeAs, SerializeAs};
use std::{borrow::Cow, collections::BTreeMap, sync::Arc};
/// Bincode-compatible [`super::Chain`] serde implementation.
///
/// Intended to use with the [`serde_with::serde_as`] macro in the following way:
/// ```rust
/// use reth_execution_types::{serde_bincode_compat, Chain};
/// use reth_chain::{serde_bincode_compat, Chain};
/// use serde::{Deserialize, Serialize};
/// use serde_with::serde_as;
///
@@ -515,7 +597,7 @@ pub(super) mod serde_bincode_compat {
>,
{
blocks: RecoveredBlocks<'a, N::Block>,
execution_outcome: serde_bincode_compat::ExecutionOutcome<'a, N::Receipt>,
execution_outcome: exec_serde_bincode_compat::ExecutionOutcome<'a, N::Receipt>,
#[serde(default, rename = "trie_updates_legacy")]
_trie_updates_legacy:
Option<reth_trie_common::serde_bincode_compat::updates::TrieUpdates<'a>>,
@@ -571,31 +653,6 @@ pub(super) mod serde_bincode_compat {
}
}
impl<'a, N> From<&'a super::Chain<N>> for Chain<'a, N>
where
N: NodePrimitives<
Block: Block<Header: SerdeBincodeCompat, Body: SerdeBincodeCompat> + 'static,
>,
{
fn from(value: &'a super::Chain<N>) -> Self {
Self {
blocks: RecoveredBlocks(Cow::Borrowed(&value.blocks)),
execution_outcome: value.execution_outcome.as_repr(),
_trie_updates_legacy: None,
trie_updates: value
.trie_updates
.iter()
.map(|(k, v)| (*k, v.as_ref().into()))
.collect(),
hashed_state: value
.hashed_state
.iter()
.map(|(k, v)| (*k, v.as_ref().into()))
.collect(),
}
}
}
impl<'a, N> From<Chain<'a, N>> for super::Chain<N>
where
N: NodePrimitives<
@@ -603,19 +660,26 @@ pub(super) mod serde_bincode_compat {
>,
{
fn from(value: Chain<'a, N>) -> Self {
use crate::{ComputedTrieData, DeferredTrieData};
let trie_updates: BTreeMap<_, _> =
value.trie_updates.into_iter().map(|(k, v)| (k, Arc::new(v.into()))).collect();
let hashed_state: BTreeMap<_, _> =
value.hashed_state.into_iter().map(|(k, v)| (k, Arc::new(v.into()))).collect();
let trie_data = trie_updates
.into_iter()
.map(|(num, trie_updates)| {
let hashed_state = hashed_state.get(&num).cloned().unwrap_or_default();
let computed = ComputedTrieData::without_trie_input(hashed_state, trie_updates);
(num, DeferredTrieData::ready(computed))
})
.collect();
Self {
blocks: value.blocks.0.into_owned(),
execution_outcome: ExecutionOutcome::from_repr(value.execution_outcome),
trie_updates: value
.trie_updates
.into_iter()
.map(|(k, v)| (k, Arc::new(v.into())))
.collect(),
hashed_state: value
.hashed_state
.into_iter()
.map(|(k, v)| (k, Arc::new(v.into())))
.collect(),
trie_data,
}
}
}
@@ -630,7 +694,31 @@ pub(super) mod serde_bincode_compat {
where
S: Serializer,
{
Chain::from(source).serialize(serializer)
use reth_trie_common::serde_bincode_compat as trie_serde;
// Wait for deferred trie data and collect into maps we can borrow from
let trie_updates_data: BTreeMap<BlockNumber, _> =
source.trie_data.iter().map(|(k, v)| (*k, v.wait_cloned().trie_updates)).collect();
let hashed_state_data: BTreeMap<BlockNumber, _> =
source.trie_data.iter().map(|(k, v)| (*k, v.wait_cloned().hashed_state)).collect();
// Now create the serde-compatible struct borrowing from the collected data
let chain: Chain<'_, N> = Chain {
blocks: RecoveredBlocks(Cow::Borrowed(&source.blocks)),
execution_outcome: source.execution_outcome.as_repr(),
_trie_updates_legacy: None,
trie_updates: trie_updates_data
.iter()
.map(|(k, v)| (*k, trie_serde::updates::TrieUpdatesSorted::from(v.as_ref())))
.collect(),
hashed_state: hashed_state_data
.iter()
.map(|(k, v)| {
(*k, trie_serde::hashed_state::HashedPostStateSorted::from(v.as_ref()))
})
.collect(),
};
chain.serialize(serializer)
}
}
@@ -659,10 +747,10 @@ pub(super) mod serde_bincode_compat {
#[test]
fn test_chain_bincode_roundtrip() {
use alloc::collections::BTreeMap;
use std::collections::BTreeMap;
#[serde_as]
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
#[derive(Debug, Serialize, Deserialize)]
struct Data {
#[serde_as(as = "serde_bincode_compat::Chain")]
chain: Chain,
@@ -676,13 +764,14 @@ pub(super) mod serde_bincode_compat {
.unwrap()],
Default::default(),
BTreeMap::new(),
BTreeMap::new(),
),
};
let encoded = bincode::serialize(&data).unwrap();
let decoded: Data = bincode::deserialize(&encoded).unwrap();
assert_eq!(decoded, data);
// Note: Can't compare directly because DeferredTrieData doesn't implement PartialEq
assert_eq!(decoded.chain.blocks, data.chain.blocks);
assert_eq!(decoded.chain.execution_outcome, data.chain.execution_outcome);
}
}
}
@@ -776,12 +865,8 @@ mod tests {
let mut block_state_extended = execution_outcome1;
block_state_extended.extend(execution_outcome2);
let chain: Chain = Chain::new(
vec![block1.clone(), block2.clone()],
block_state_extended,
BTreeMap::new(),
BTreeMap::new(),
);
let chain: Chain =
Chain::new(vec![block1.clone(), block2.clone()], block_state_extended, BTreeMap::new());
// return tip state
assert_eq!(

View File

@@ -8,6 +8,7 @@ use reth_trie::{
use std::{
fmt,
sync::{Arc, LazyLock},
vec::Vec,
};
use tracing::instrument;

View File

@@ -0,0 +1,30 @@
//! Chain and deferred trie data types for reth.
//!
//! This crate contains the [`Chain`] type representing a chain of blocks and their final state,
//! as well as [`DeferredTrieData`] for handling asynchronously computed trie data.
#![doc(
html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png",
html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256",
issue_tracker_base_url = "https://github.com/paradigmxyz/reth/issues/"
)]
#![cfg_attr(not(test), warn(unused_crate_dependencies))]
#![cfg_attr(docsrs, feature(doc_cfg))]
mod chain;
pub use chain::*;
mod deferred_trie;
pub use deferred_trie::*;
/// Bincode-compatible serde implementations for chain types.
///
/// `bincode` crate doesn't work with optionally serializable serde fields, but some of the
/// chain types require optional serialization for RPC compatibility. This module makes so that
/// all fields are serialized.
///
/// Read more: <https://github.com/bincode-org/bincode/issues/326>
#[cfg(feature = "serde-bincode-compat")]
pub mod serde_bincode_compat {
pub use super::chain::serde_bincode_compat::*;
}

View File

@@ -19,7 +19,6 @@ revm.workspace = true
# alloy
alloy-evm.workspace = true
alloy-consensus.workspace = true
alloy-primitives.workspace = true
alloy-eips.workspace = true
@@ -45,7 +44,6 @@ serde = [
"alloy-eips/serde",
"alloy-primitives/serde",
"reth-primitives-traits/serde",
"alloy-consensus/serde",
"reth-trie-common/serde",
"reth-ethereum-primitives/serde",
]
@@ -55,7 +53,6 @@ serde-bincode-compat = [
"reth-primitives-traits/serde-bincode-compat",
"serde_with",
"alloy-eips/serde-bincode-compat",
"alloy-consensus/serde-bincode-compat",
"reth-ethereum-primitives/serde-bincode-compat",
]
std = [
@@ -64,7 +61,6 @@ std = [
"revm/std",
"serde?/std",
"reth-primitives-traits/std",
"alloy-consensus/std",
"serde_with?/std",
"derive_more/std",
"reth-ethereum-primitives/std",

View File

@@ -564,8 +564,8 @@ pub(super) mod serde_bincode_compat {
#[cfg(test)]
mod tests {
use super::*;
use alloy_consensus::TxType;
use alloy_primitives::{bytes, Address, LogData, B256};
use reth_ethereum_primitives::TxType;
#[test]
fn test_initialization() {

View File

@@ -11,9 +11,6 @@
extern crate alloc;
mod chain;
pub use chain::*;
mod execute;
pub use execute::*;
@@ -29,5 +26,5 @@ pub use execution_outcome::*;
/// Read more: <https://github.com/bincode-org/bincode/issues/326>
#[cfg(feature = "serde-bincode-compat")]
pub mod serde_bincode_compat {
pub use super::{chain::serde_bincode_compat::*, execution_outcome::serde_bincode_compat::*};
pub use super::execution_outcome::serde_bincode_compat::*;
}

View File

@@ -48,6 +48,7 @@ itertools = { workspace = true, features = ["use_std"] }
metrics.workspace = true
parking_lot.workspace = true
rmp-serde.workspace = true
serde_with.workspace = true
thiserror.workspace = true
tracing.workspace = true

View File

@@ -149,7 +149,7 @@ where
executor.into_state().take_bundle(),
results,
);
let chain = Chain::new(blocks, outcome, BTreeMap::new(), BTreeMap::new());
let chain = Chain::new(blocks, outcome, BTreeMap::new());
Ok(chain)
}
}

View File

@@ -796,21 +796,20 @@ mod tests {
block1.set_block_number(10);
let notification1 = ExExNotification::ChainCommitted {
new: Arc::new(Chain::new(
vec![block1.clone()],
Default::default(),
Default::default(),
Default::default(),
)),
new: Arc::new(Chain::new(vec![block1.clone()], Default::default(), Default::default())),
};
// Push the first notification
exex_manager.push_notification(notification1.clone());
exex_manager.push_notification(notification1);
// Verify the buffer contains the notification with the correct ID
assert_eq!(exex_manager.buffer.len(), 1);
assert_eq!(exex_manager.buffer.front().unwrap().0, 0);
assert_eq!(exex_manager.buffer.front().unwrap().1, notification1);
// Compare by tip block since ExExNotification doesn't implement PartialEq
assert_eq!(
*exex_manager.buffer.front().unwrap().1.committed_chain().unwrap().tip(),
block1
);
assert_eq!(exex_manager.next_id, 1);
// Push another notification
@@ -819,22 +818,20 @@ mod tests {
block2.set_block_number(20);
let notification2 = ExExNotification::ChainCommitted {
new: Arc::new(Chain::new(
vec![block2.clone()],
Default::default(),
Default::default(),
Default::default(),
)),
new: Arc::new(Chain::new(vec![block2.clone()], Default::default(), Default::default())),
};
exex_manager.push_notification(notification2.clone());
exex_manager.push_notification(notification2);
// Verify the buffer contains both notifications with correct IDs
assert_eq!(exex_manager.buffer.len(), 2);
assert_eq!(exex_manager.buffer.front().unwrap().0, 0);
assert_eq!(exex_manager.buffer.front().unwrap().1, notification1);
assert_eq!(
*exex_manager.buffer.front().unwrap().1.committed_chain().unwrap().tip(),
block1
);
assert_eq!(exex_manager.buffer.get(1).unwrap().0, 1);
assert_eq!(exex_manager.buffer.get(1).unwrap().1, notification2);
assert_eq!(*exex_manager.buffer.get(1).unwrap().1.committed_chain().unwrap().tip(), block2);
assert_eq!(exex_manager.next_id, 2);
}
@@ -867,12 +864,7 @@ mod tests {
block1.set_block_number(10);
let notification1 = ExExNotification::ChainCommitted {
new: Arc::new(Chain::new(
vec![block1.clone()],
Default::default(),
Default::default(),
Default::default(),
)),
new: Arc::new(Chain::new(vec![block1.clone()], Default::default(), Default::default())),
};
exex_manager.push_notification(notification1.clone());
@@ -1100,7 +1092,6 @@ mod tests {
vec![Default::default()],
Default::default(),
Default::default(),
Default::default(),
)),
};
@@ -1166,10 +1157,10 @@ mod tests {
block2.set_block_number(11);
// Setup a notification
let expected_block: RecoveredBlock<reth_ethereum_primitives::Block> = Default::default();
let notification = ExExNotification::ChainCommitted {
new: Arc::new(Chain::new(
vec![Default::default()],
Default::default(),
vec![expected_block.clone()],
Default::default(),
Default::default(),
)),
@@ -1181,7 +1172,8 @@ mod tests {
match exex_handle.send(&mut cx, &(22, notification.clone())) {
Poll::Ready(Ok(())) => {
let received_notification = notifications.next().await.unwrap().unwrap();
assert_eq!(received_notification, notification);
// Compare by tip block since ExExNotification doesn't implement PartialEq
assert_eq!(*received_notification.committed_chain().unwrap().tip(), expected_block);
}
Poll::Pending => panic!("Notification send is pending"),
Poll::Ready(Err(e)) => panic!("Failed to send notification: {e:?}"),
@@ -1216,12 +1208,7 @@ mod tests {
block1.set_block_number(10);
let notification = ExExNotification::ChainCommitted {
new: Arc::new(Chain::new(
vec![block1.clone()],
Default::default(),
Default::default(),
Default::default(),
)),
new: Arc::new(Chain::new(vec![block1.clone()], Default::default(), Default::default())),
};
let mut cx = Context::from_waker(futures::task::noop_waker_ref());
@@ -1278,7 +1265,9 @@ mod tests {
match exex_handle.send(&mut cx, &(22, notification.clone())) {
Poll::Ready(Ok(())) => {
let received_notification = notifications.next().await.unwrap().unwrap();
assert_eq!(received_notification, notification);
// Compare by checking that both are reorgs with empty chains
assert!(received_notification.committed_chain().is_some());
assert!(received_notification.reverted_chain().is_some());
}
Poll::Pending | Poll::Ready(Err(_)) => {
panic!("Notification should not be pending or fail")
@@ -1318,7 +1307,9 @@ mod tests {
match exex_handle.send(&mut cx, &(22, notification.clone())) {
Poll::Ready(Ok(())) => {
let received_notification = notifications.next().await.unwrap().unwrap();
assert_eq!(received_notification, notification);
// Compare by checking that it's a revert with empty chain
assert!(received_notification.reverted_chain().is_some());
assert!(received_notification.committed_chain().is_none());
}
Poll::Pending | Poll::Ready(Err(_)) => {
panic!("Notification should not be pending or fail")
@@ -1371,16 +1362,10 @@ mod tests {
vec![genesis_block.clone()],
Default::default(),
BTreeMap::new(),
BTreeMap::new(),
)),
};
let notification = ExExNotification::ChainCommitted {
new: Arc::new(Chain::new(
vec![block.clone()],
Default::default(),
BTreeMap::new(),
BTreeMap::new(),
)),
new: Arc::new(Chain::new(vec![block.clone()], Default::default(), BTreeMap::new())),
};
let (finalized_headers_tx, rx) = watch::channel(None);
@@ -1397,34 +1382,38 @@ mod tests {
let mut cx = Context::from_waker(futures::task::noop_waker_ref());
exex_manager
.handle()
.send(ExExNotificationSource::Pipeline, genesis_notification.clone())?;
exex_manager.handle().send(ExExNotificationSource::BlockchainTree, notification.clone())?;
exex_manager.handle().send(ExExNotificationSource::Pipeline, genesis_notification)?;
exex_manager.handle().send(ExExNotificationSource::BlockchainTree, notification)?;
assert!(exex_manager.as_mut().poll(&mut cx)?.is_pending());
assert_eq!(
notifications.try_poll_next_unpin(&mut cx)?,
Poll::Ready(Some(genesis_notification))
);
// Check genesis notification received
let poll_result = notifications.try_poll_next_unpin(&mut cx)?;
if let Poll::Ready(Some(n)) = poll_result {
assert_eq!(*n.committed_chain().unwrap().tip(), genesis_block);
} else {
panic!("Expected genesis notification");
}
assert!(exex_manager.as_mut().poll(&mut cx)?.is_pending());
assert_eq!(
notifications.try_poll_next_unpin(&mut cx)?,
Poll::Ready(Some(notification.clone()))
);
// Check block notification received
let poll_result = notifications.try_poll_next_unpin(&mut cx)?;
if let Poll::Ready(Some(n)) = poll_result {
assert_eq!(*n.committed_chain().unwrap().tip(), block);
} else {
panic!("Expected block notification");
}
// WAL shouldn't contain the genesis notification, because it's finalized
assert_eq!(
exex_manager.wal.iter_notifications()?.collect::<WalResult<Vec<_>>>()?,
std::slice::from_ref(&notification)
);
let wal_notifications =
exex_manager.wal.iter_notifications()?.collect::<WalResult<Vec<_>>>()?;
assert_eq!(wal_notifications.len(), 1);
assert_eq!(*wal_notifications[0].committed_chain().unwrap().tip(), block);
finalized_headers_tx.send(Some(block.clone_sealed_header()))?;
assert!(exex_manager.as_mut().poll(&mut cx).is_pending());
// WAL isn't finalized because the ExEx didn't emit the `FinishedHeight` event
assert_eq!(
exex_manager.wal.iter_notifications()?.collect::<WalResult<Vec<_>>>()?,
std::slice::from_ref(&notification)
);
let wal_notifications =
exex_manager.wal.iter_notifications()?.collect::<WalResult<Vec<_>>>()?;
assert_eq!(wal_notifications.len(), 1);
assert_eq!(*wal_notifications[0].committed_chain().unwrap().tip(), block);
// Send a `FinishedHeight` event with a non-canonical block
events_tx
@@ -1435,10 +1424,10 @@ mod tests {
assert!(exex_manager.as_mut().poll(&mut cx).is_pending());
// WAL isn't finalized because the ExEx emitted a `FinishedHeight` event with a
// non-canonical block
assert_eq!(
exex_manager.wal.iter_notifications()?.collect::<WalResult<Vec<_>>>()?,
std::slice::from_ref(&notification)
);
let wal_notifications =
exex_manager.wal.iter_notifications()?.collect::<WalResult<Vec<_>>>()?;
assert_eq!(wal_notifications.len(), 1);
assert_eq!(*wal_notifications[0].committed_chain().unwrap().tip(), block);
// Send a `FinishedHeight` event with a canonical block
events_tx.send(ExExEvent::FinishedHeight(block.num_hash())).unwrap();
@@ -1446,7 +1435,7 @@ mod tests {
finalized_headers_tx.send(Some(block.clone_sealed_header()))?;
assert!(exex_manager.as_mut().poll(&mut cx).is_pending());
// WAL is finalized
assert_eq!(exex_manager.wal.iter_notifications()?.next().transpose()?, None);
assert!(exex_manager.wal.iter_notifications()?.next().is_none());
Ok(())
}
@@ -1492,12 +1481,7 @@ mod tests {
let mut make_notif = |id: u64| {
let block = random_block(&mut rng, id, BlockParams::default()).try_recover().unwrap();
ExExNotification::ChainCommitted {
new: Arc::new(Chain::new(
vec![block],
Default::default(),
Default::default(),
Default::default(),
)),
new: Arc::new(Chain::new(vec![block], Default::default(), Default::default())),
}
};

View File

@@ -449,7 +449,7 @@ mod tests {
use crate::Wal;
use alloy_consensus::Header;
use alloy_eips::BlockNumHash;
use eyre::OptionExt;
use futures::StreamExt;
use reth_db_common::init::init_genesis;
use reth_ethereum_primitives::Block;
@@ -491,17 +491,17 @@ mod tests {
let exex_head =
ExExHead { block: BlockNumHash { number: genesis_block.number, hash: genesis_hash } };
let expected_block = random_block(
&mut rng,
node_head.number + 1,
BlockParams { parent: Some(node_head.hash), ..Default::default() },
)
.try_recover()?;
let notification = ExExNotification::ChainCommitted {
new: Arc::new(Chain::new(
vec![random_block(
&mut rng,
node_head.number + 1,
BlockParams { parent: Some(node_head.hash), ..Default::default() },
)
.try_recover()?],
vec![expected_block.clone()],
Default::default(),
BTreeMap::new(),
BTreeMap::new(),
)),
};
@@ -519,23 +519,16 @@ mod tests {
.with_head(exex_head);
// First notification is the backfill of missing blocks from the canonical chain
assert_eq!(
notifications.next().await.transpose()?,
Some(ExExNotification::ChainCommitted {
new: Arc::new(
BackfillJobFactory::new(
notifications.evm_config.clone(),
notifications.provider.clone()
)
.backfill(1..=1)
.next()
.ok_or_eyre("failed to backfill")??
)
})
);
let backfill_notification = notifications.next().await.transpose()?;
assert!(backfill_notification.is_some());
// Verify it's a commit notification with the expected block range
let backfill_chain = backfill_notification.unwrap().committed_chain().unwrap();
assert_eq!(backfill_chain.first().header().number(), 1);
// Second notification is the actual notification that we sent before
assert_eq!(notifications.next().await.transpose()?, Some(notification));
let received = notifications.next().await.transpose()?;
assert!(received.is_some());
assert_eq!(*received.unwrap().committed_chain().unwrap().tip(), expected_block);
Ok(())
}
@@ -556,21 +549,21 @@ mod tests {
let node_head = BlockNumHash { number: genesis_block.number, hash: genesis_hash };
let exex_head = ExExHead { block: node_head };
let expected_block = Block {
header: Header {
parent_hash: node_head.hash,
number: node_head.number + 1,
..Default::default()
},
..Default::default()
}
.seal_slow()
.try_recover()?;
let notification = ExExNotification::ChainCommitted {
new: Arc::new(Chain::new(
vec![Block {
header: Header {
parent_hash: node_head.hash,
number: node_head.number + 1,
..Default::default()
},
..Default::default()
}
.seal_slow()
.try_recover()?],
vec![expected_block.clone()],
Default::default(),
BTreeMap::new(),
BTreeMap::new(),
)),
};
@@ -588,7 +581,8 @@ mod tests {
.with_head(exex_head);
let new_notification = notifications.next().await.transpose()?;
assert_eq!(new_notification, Some(notification));
assert!(new_notification.is_some());
assert_eq!(*new_notification.unwrap().committed_chain().unwrap().tip(), expected_block);
Ok(())
}
@@ -618,7 +612,7 @@ mod tests {
let provider_rw = provider.database_provider_rw()?;
provider_rw.insert_block(&node_head_block)?;
provider_rw.commit()?;
let node_head_notification = ExExNotification::ChainCommitted {
let _node_head_notification = ExExNotification::ChainCommitted {
new: Arc::new(
BackfillJobFactory::new(EthEvmConfig::mainnet(), provider.clone())
.backfill(node_head.number..=node_head.number)
@@ -633,28 +627,24 @@ mod tests {
BlockParams { parent: Some(genesis_hash), tx_count: Some(0), ..Default::default() },
);
let exex_head = ExExHead { block: exex_head_block.num_hash() };
let exex_head_recovered = exex_head_block.clone().try_recover()?;
let exex_head_notification = ExExNotification::ChainCommitted {
new: Arc::new(Chain::new(
vec![exex_head_block.clone().try_recover()?],
vec![exex_head_recovered.clone()],
Default::default(),
BTreeMap::new(),
BTreeMap::new(),
)),
};
wal.commit(&exex_head_notification)?;
let new_block = random_block(
&mut rng,
node_head.number + 1,
BlockParams { parent: Some(node_head.hash), ..Default::default() },
)
.try_recover()?;
let new_notification = ExExNotification::ChainCommitted {
new: Arc::new(Chain::new(
vec![random_block(
&mut rng,
node_head.number + 1,
BlockParams { parent: Some(node_head.hash), ..Default::default() },
)
.try_recover()?],
Default::default(),
BTreeMap::new(),
BTreeMap::new(),
)),
new: Arc::new(Chain::new(vec![new_block.clone()], Default::default(), BTreeMap::new())),
};
let (notifications_tx, notifications_rx) = mpsc::channel(1);
@@ -672,15 +662,25 @@ mod tests {
// First notification is the revert of the ExEx head block to get back to the canonical
// chain
let revert_notification = notifications.next().await.transpose()?;
assert!(revert_notification.is_some());
// Verify it's a revert with the exex_head block
assert_eq!(
notifications.next().await.transpose()?,
Some(exex_head_notification.into_inverted())
*revert_notification.unwrap().reverted_chain().unwrap().tip(),
exex_head_recovered
);
// Second notification is the backfilled block from the canonical chain to get back to the
// canonical tip
assert_eq!(notifications.next().await.transpose()?, Some(node_head_notification));
let backfill_notification = notifications.next().await.transpose()?;
assert!(backfill_notification.is_some());
assert_eq!(
backfill_notification.unwrap().committed_chain().unwrap().tip().header().number(),
node_head.number
);
// Third notification is the actual notification that we sent before
assert_eq!(notifications.next().await.transpose()?, Some(new_notification));
let received = notifications.next().await.transpose()?;
assert!(received.is_some());
assert_eq!(*received.unwrap().committed_chain().unwrap().tip(), new_block);
Ok(())
}
@@ -706,12 +706,12 @@ mod tests {
genesis_block.number + 1,
BlockParams { parent: Some(genesis_hash), tx_count: Some(0), ..Default::default() },
);
let exex_head_recovered = exex_head_block.clone().try_recover()?;
let exex_head_notification = ExExNotification::ChainCommitted {
new: Arc::new(Chain::new(
vec![exex_head_block.clone().try_recover()?],
vec![exex_head_recovered.clone()],
Default::default(),
BTreeMap::new(),
BTreeMap::new(),
)),
};
wal.commit(&exex_head_notification)?;
@@ -721,18 +721,14 @@ mod tests {
block: BlockNumHash { number: exex_head_block.number, hash: exex_head_block.hash() },
};
let new_block = random_block(
&mut rng,
genesis_block.number + 1,
BlockParams { parent: Some(genesis_hash), ..Default::default() },
)
.try_recover()?;
let new_notification = ExExNotification::ChainCommitted {
new: Arc::new(Chain::new(
vec![random_block(
&mut rng,
genesis_block.number + 1,
BlockParams { parent: Some(genesis_hash), ..Default::default() },
)
.try_recover()?],
Default::default(),
BTreeMap::new(),
BTreeMap::new(),
)),
new: Arc::new(Chain::new(vec![new_block.clone()], Default::default(), BTreeMap::new())),
};
let (notifications_tx, notifications_rx) = mpsc::channel(1);
@@ -750,13 +746,17 @@ mod tests {
// First notification is the revert of the ExEx head block to get back to the canonical
// chain
let revert_notification = notifications.next().await.transpose()?;
assert!(revert_notification.is_some());
assert_eq!(
notifications.next().await.transpose()?,
Some(exex_head_notification.into_inverted())
*revert_notification.unwrap().reverted_chain().unwrap().tip(),
exex_head_recovered
);
// Second notification is the actual notification that we sent before
assert_eq!(notifications.next().await.transpose()?, Some(new_notification));
let received = notifications.next().await.transpose()?;
assert!(received.is_some());
assert_eq!(*received.unwrap().committed_chain().unwrap().tip(), new_block);
Ok(())
}

View File

@@ -255,6 +255,36 @@ mod tests {
})
}
fn notifications_equal(a: &[ExExNotification], b: &[ExExNotification]) -> bool {
if a.len() != b.len() {
return false;
}
a.iter().zip(b.iter()).all(|(n1, n2)| {
let committed_eq = match (n1.committed_chain(), n2.committed_chain()) {
(Some(c1), Some(c2)) => {
c1.tip().hash() == c2.tip().hash() && c1.blocks() == c2.blocks()
}
(None, None) => true,
_ => false,
};
let reverted_eq = match (n1.reverted_chain(), n2.reverted_chain()) {
(Some(c1), Some(c2)) => {
c1.tip().hash() == c2.tip().hash() && c1.blocks() == c2.blocks()
}
(None, None) => true,
_ => false,
};
committed_eq && reverted_eq
})
}
fn assert_notifications_eq(actual: Vec<ExExNotification>, expected: Vec<ExExNotification>) {
assert!(
notifications_equal(&actual, &expected),
"notifications mismatch:\nactual: {actual:?}\nexpected: {expected:?}"
);
}
fn sort_committed_blocks(
committed_blocks: Vec<(B256, u32, CachedBlock)>,
) -> Vec<(B256, u32, CachedBlock)> {
@@ -304,37 +334,24 @@ mod tests {
vec![blocks[0].clone(), blocks[1].clone()],
Default::default(),
BTreeMap::new(),
BTreeMap::new(),
)),
};
let reverted_notification = ExExNotification::ChainReverted {
old: Arc::new(Chain::new(
vec![blocks[1].clone()],
Default::default(),
BTreeMap::new(),
BTreeMap::new(),
)),
old: Arc::new(Chain::new(vec![blocks[1].clone()], Default::default(), BTreeMap::new())),
};
let committed_notification_2 = ExExNotification::ChainCommitted {
new: Arc::new(Chain::new(
vec![block_1_reorged.clone(), blocks[2].clone()],
Default::default(),
BTreeMap::new(),
BTreeMap::new(),
)),
};
let reorged_notification = ExExNotification::ChainReorged {
old: Arc::new(Chain::new(
vec![blocks[2].clone()],
Default::default(),
BTreeMap::new(),
BTreeMap::new(),
)),
old: Arc::new(Chain::new(vec![blocks[2].clone()], Default::default(), BTreeMap::new())),
new: Arc::new(Chain::new(
vec![block_2_reorged.clone(), blocks[3].clone()],
Default::default(),
BTreeMap::new(),
BTreeMap::new(),
)),
};
@@ -371,7 +388,7 @@ mod tests {
wal.inner.block_cache().committed_blocks_sorted(),
committed_notification_1_cache_committed_blocks
);
assert_eq!(read_notifications(&wal)?, vec![committed_notification_1.clone()]);
assert_notifications_eq(read_notifications(&wal)?, vec![committed_notification_1.clone()]);
// Second notification (revert block 1)
wal.commit(&reverted_notification)?;
@@ -385,9 +402,9 @@ mod tests {
wal.inner.block_cache().committed_blocks_sorted(),
committed_notification_1_cache_committed_blocks
);
assert_eq!(
assert_notifications_eq(
read_notifications(&wal)?,
vec![committed_notification_1.clone(), reverted_notification.clone()]
vec![committed_notification_1.clone(), reverted_notification.clone()],
);
// Third notification (commit block 1, 2)
@@ -430,13 +447,13 @@ mod tests {
.concat()
)
);
assert_eq!(
assert_notifications_eq(
read_notifications(&wal)?,
vec![
committed_notification_1.clone(),
reverted_notification.clone(),
committed_notification_2.clone()
]
committed_notification_2.clone(),
],
);
// Fourth notification (revert block 2, commit block 2, 3)
@@ -481,14 +498,14 @@ mod tests {
.concat()
)
);
assert_eq!(
assert_notifications_eq(
read_notifications(&wal)?,
vec![
committed_notification_1,
reverted_notification,
committed_notification_2.clone(),
reorged_notification.clone()
]
reorged_notification.clone(),
],
);
// Now, finalize the WAL up to the block 1. Block 1 was in the third notification that also
@@ -510,9 +527,9 @@ mod tests {
.concat()
)
);
assert_eq!(
assert_notifications_eq(
read_notifications(&wal)?,
vec![committed_notification_2.clone(), reorged_notification.clone()]
vec![committed_notification_2.clone(), reorged_notification.clone()],
);
// Re-open the WAL and verify that the cache population works correctly
@@ -531,7 +548,10 @@ mod tests {
.concat()
)
);
assert_eq!(read_notifications(&wal)?, vec![committed_notification_2, reorged_notification]);
assert_notifications_eq(
read_notifications(&wal)?,
vec![committed_notification_2, reorged_notification],
);
Ok(())
}

View File

@@ -163,12 +163,16 @@ where
let file_path = self.file_path(file_id);
debug!(target: "exex::wal::storage", ?file_path, "Writing notification to WAL");
// Serialize using the bincode- and msgpack-compatible serde wrapper
let notification =
reth_exex_types::serde_bincode_compat::ExExNotification::<N>::from(notification);
// Serialize using the bincode- and msgpack-compatible serde wrapper via SerializeAs
reth_fs_util::atomic_write_file(&file_path, |file| {
rmp_serde::encode::write(file, &notification)
use serde_with::SerializeAs;
let mut buf = Vec::new();
reth_exex_types::serde_bincode_compat::ExExNotification::<'_, N>::serialize_as(
notification,
&mut rmp_serde::Serializer::new(&mut buf),
)
.map_err(|err| std::io::Error::new(std::io::ErrorKind::InvalidData, err))?;
std::io::Write::write_all(file, &buf)
})?;
Ok(file_path.metadata().map_err(|err| WalError::FileMetadata(file_id, err))?.len())
@@ -224,8 +228,10 @@ mod tests {
// Get expected data
let expected_notification = get_test_notification_data().unwrap();
// Compare by tip block since ExExNotification doesn't implement PartialEq
assert_eq!(
&notification, &expected_notification,
*notification.committed_chain().unwrap().tip(),
*expected_notification.committed_chain().unwrap().tip(),
"Decoded notification should match expected static data"
);
}
@@ -241,28 +247,18 @@ mod tests {
let new_block = random_block(&mut rng, 0, Default::default()).try_recover()?;
let notification = ExExNotification::ChainReorged {
new: Arc::new(Chain::new(
vec![new_block],
Default::default(),
BTreeMap::new(),
BTreeMap::new(),
)),
old: Arc::new(Chain::new(
vec![old_block],
Default::default(),
BTreeMap::new(),
BTreeMap::new(),
)),
new: Arc::new(Chain::new(vec![new_block.clone()], Default::default(), BTreeMap::new())),
old: Arc::new(Chain::new(vec![old_block.clone()], Default::default(), BTreeMap::new())),
};
// Do a round trip serialization and deserialization
let file_id = 0;
storage.write_notification(file_id, &notification)?;
let deserialized_notification = storage.read_notification(file_id)?;
assert_eq!(
deserialized_notification.map(|(notification, _)| notification),
Some(notification)
);
// Compare by chain tips since ExExNotification doesn't implement PartialEq
let deserialized = deserialized_notification.map(|(n, _)| n).unwrap();
assert_eq!(*deserialized.committed_chain().unwrap().tip(), new_block);
assert_eq!(*deserialized.reverted_chain().unwrap().tip(), old_block);
Ok(())
}
@@ -280,10 +276,14 @@ mod tests {
let notification = get_test_notification_data()?;
// Serialize the notification
let notification_compat =
reth_exex_types::serde_bincode_compat::ExExNotification::from(&notification);
let encoded = rmp_serde::encode::to_vec(&notification_compat)?;
// Create a temp storage and write the notification using the existing serialization path
let temp_dir = tempfile::tempdir()?;
let storage = Storage::new(&temp_dir)?;
storage.write_notification(0, &notification)?;
// Read it back as raw bytes
let temp_path = temp_dir.path().join("0.wal");
let encoded = std::fs::read(&temp_path)?;
// Write to test-data directory
let test_data_dir = std::path::Path::new(env!("CARGO_MANIFEST_DIR")).join("test-data");
@@ -346,13 +346,18 @@ mod tests {
)]),
};
let trie_data =
reth_chain_state::DeferredTrieData::ready(reth_chain_state::ComputedTrieData {
hashed_state: Arc::new(hashed_state.into_sorted()),
trie_updates: Arc::new(trie_updates.into_sorted()),
anchored_trie_input: None,
});
let notification: ExExNotification<reth_ethereum_primitives::EthPrimitives> =
ExExNotification::ChainCommitted {
new: Arc::new(Chain::new(
vec![block],
Default::default(),
BTreeMap::from([(block_number, Arc::new(trie_updates.into_sorted()))]),
BTreeMap::from([(block_number, Arc::new(hashed_state.into_sorted()))]),
BTreeMap::from([(block_number, trie_data)]),
)),
};
Ok(notification)

View File

@@ -12,13 +12,13 @@ workspace = true
[dependencies]
## reth
reth-chain.workspace = true
reth-chainspec.workspace = true
reth-config.workspace = true
reth-consensus = { workspace = true, features = ["test-utils"] }
reth-db = { workspace = true, features = ["test-utils"] }
reth-db-common.workspace = true
reth-evm-ethereum = { workspace = true, features = ["test-utils"] }
reth-execution-types.workspace = true
reth-exex.workspace = true
reth-payload-builder.workspace = true
reth-network.workspace = true

View File

@@ -17,6 +17,7 @@ use std::{
use alloy_eips::BlockNumHash;
use futures_util::FutureExt;
use reth_chain::Chain;
use reth_chainspec::{ChainSpec, MAINNET};
use reth_consensus::test_utils::TestConsensus;
use reth_db::{
@@ -28,7 +29,6 @@ use reth_db::{
use reth_db_common::init::init_genesis;
use reth_ethereum_primitives::{EthPrimitives, TransactionSigned};
use reth_evm_ethereum::MockEvmConfig;
use reth_execution_types::Chain;
use reth_exex::{ExExContext, ExExEvent, ExExNotification, ExExNotifications, Wal};
use reth_network::{config::rng_secret_key, NetworkConfigBuilder, NetworkManager};
use reth_node_api::{

View File

@@ -13,8 +13,8 @@ workspace = true
[dependencies]
# reth
reth-chain.workspace = true
reth-chain-state.workspace = true
reth-execution-types.workspace = true
reth-primitives-traits.workspace = true
# reth
@@ -36,7 +36,7 @@ rand.workspace = true
default = []
serde = [
"dep:serde",
"reth-execution-types/serde",
"reth-chain/serde",
"alloy-eips/serde",
"alloy-primitives/serde",
"rand/serde",
@@ -45,7 +45,7 @@ serde = [
"reth-chain-state/serde",
]
serde-bincode-compat = [
"reth-execution-types/serde-bincode-compat",
"reth-chain/serde-bincode-compat",
"serde_with",
"alloy-eips/serde-bincode-compat",
"reth-primitives-traits/serde-bincode-compat",

View File

@@ -1,12 +1,13 @@
use std::sync::Arc;
use reth_chain::Chain;
use reth_chain_state::CanonStateNotification;
use reth_execution_types::Chain;
use reth_primitives_traits::NodePrimitives;
/// Notifications sent to an `ExEx`.
#[derive(Debug, Clone, PartialEq, Eq)]
#[derive(Debug, Clone)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
#[cfg_attr(feature = "serde", serde(bound = ""))]
pub enum ExExNotification<N: NodePrimitives = reth_chain_state::EthPrimitives> {
/// Chain got committed without a reorg, and only the new chain is returned.
ChainCommitted {
@@ -73,7 +74,7 @@ impl<P: NodePrimitives> From<CanonStateNotification<P>> for ExExNotification<P>
/// Bincode-compatible [`ExExNotification`] serde implementation.
#[cfg(all(feature = "serde", feature = "serde-bincode-compat"))]
pub(super) mod serde_bincode_compat {
use reth_execution_types::serde_bincode_compat::Chain;
use reth_chain::serde_bincode_compat::Chain;
use reth_primitives_traits::NodePrimitives;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use serde_with::{DeserializeAs, SerializeAs};
@@ -124,28 +125,6 @@ pub(super) mod serde_bincode_compat {
},
}
impl<'a, N> From<&'a super::ExExNotification<N>> for ExExNotification<'a, N>
where
N: NodePrimitives,
{
fn from(value: &'a super::ExExNotification<N>) -> Self {
match value {
super::ExExNotification::ChainCommitted { new } => {
ExExNotification::ChainCommitted { new: Chain::from(new.as_ref()) }
}
super::ExExNotification::ChainReorged { old, new } => {
ExExNotification::ChainReorged {
old: Chain::from(old.as_ref()),
new: Chain::from(new.as_ref()),
}
}
super::ExExNotification::ChainReverted { old } => {
ExExNotification::ChainReverted { old: Chain::from(old.as_ref()) }
}
}
}
}
impl<'a, N> From<ExExNotification<'a, N>> for super::ExExNotification<N>
where
N: NodePrimitives,
@@ -176,7 +155,41 @@ pub(super) mod serde_bincode_compat {
where
S: Serializer,
{
ExExNotification::from(source).serialize(serializer)
// Helper that uses Chain's SerializeAs for bincode-compatible serialization
struct ChainWrapper<'a, N: NodePrimitives>(&'a reth_chain::Chain<N>);
impl<N: NodePrimitives> Serialize for ChainWrapper<'_, N> {
fn serialize<S2>(&self, serializer: S2) -> Result<S2::Ok, S2::Error>
where
S2: Serializer,
{
Chain::<'_, N>::serialize_as(self.0, serializer)
}
}
// Create an enum that matches the ExExNotification structure but uses ChainWrapper
#[derive(Serialize)]
#[serde(bound = "")]
#[allow(clippy::enum_variant_names)]
enum Repr<'a, N: NodePrimitives> {
ChainCommitted { new: ChainWrapper<'a, N> },
ChainReorged { old: ChainWrapper<'a, N>, new: ChainWrapper<'a, N> },
ChainReverted { old: ChainWrapper<'a, N> },
}
match source {
super::ExExNotification::ChainCommitted { new } => {
Repr::ChainCommitted { new: ChainWrapper(new.as_ref()) }.serialize(serializer)
}
super::ExExNotification::ChainReorged { old, new } => Repr::ChainReorged {
old: ChainWrapper(old.as_ref()),
new: ChainWrapper(new.as_ref()),
}
.serialize(serializer),
super::ExExNotification::ChainReverted { old } => {
Repr::ChainReverted { old: ChainWrapper(old.as_ref()) }.serialize(serializer)
}
}
}
}
@@ -197,7 +210,7 @@ pub(super) mod serde_bincode_compat {
use super::super::{serde_bincode_compat, ExExNotification};
use arbitrary::Arbitrary;
use rand::Rng;
use reth_execution_types::Chain;
use reth_chain::Chain;
use reth_primitives_traits::RecoveredBlock;
use serde::{Deserialize, Serialize};
use serde_with::serde_as;
@@ -206,7 +219,7 @@ pub(super) mod serde_bincode_compat {
#[test]
fn test_exex_notification_bincode_roundtrip() {
#[serde_as]
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
#[derive(Debug, Serialize, Deserialize)]
struct Data {
#[serde_as(
as = "serde_bincode_compat::ExExNotification<'_, reth_ethereum_primitives::EthPrimitives>"
@@ -216,28 +229,34 @@ pub(super) mod serde_bincode_compat {
let mut bytes = [0u8; 1024];
rand::rng().fill(bytes.as_mut_slice());
let old_block: reth_primitives_traits::RecoveredBlock<reth_ethereum_primitives::Block> =
RecoveredBlock::arbitrary(&mut arbitrary::Unstructured::new(&bytes)).unwrap();
let new_block: reth_primitives_traits::RecoveredBlock<reth_ethereum_primitives::Block> =
RecoveredBlock::arbitrary(&mut arbitrary::Unstructured::new(&bytes)).unwrap();
let data = Data {
notification: ExExNotification::ChainReorged {
old: Arc::new(Chain::new(
vec![RecoveredBlock::arbitrary(&mut arbitrary::Unstructured::new(&bytes))
.unwrap()],
Default::default(),
BTreeMap::new(),
BTreeMap::new(),
)),
new: Arc::new(Chain::new(
vec![RecoveredBlock::arbitrary(&mut arbitrary::Unstructured::new(&bytes))
.unwrap()],
Default::default(),
BTreeMap::new(),
BTreeMap::new(),
)),
old: Arc::new(Chain::new(vec![old_block], Default::default(), BTreeMap::new())),
new: Arc::new(Chain::new(vec![new_block], Default::default(), BTreeMap::new())),
},
};
let encoded = bincode::serialize(&data).unwrap();
let decoded: Data = bincode::deserialize(&encoded).unwrap();
assert_eq!(decoded, data);
// Compare fields individually since Chain doesn't implement PartialEq
match (&decoded.notification, &data.notification) {
(
ExExNotification::ChainReorged { old: decoded_old, new: decoded_new },
ExExNotification::ChainReorged { old: expected_old, new: expected_new },
) => {
assert_eq!(decoded_old.blocks(), expected_old.blocks());
assert_eq!(decoded_old.execution_outcome(), expected_old.execution_outcome());
assert_eq!(decoded_new.blocks(), expected_new.blocks());
assert_eq!(decoded_new.execution_outcome(), expected_new.execution_outcome());
}
_ => panic!("Expected ChainReorged variant"),
}
}
}
}

View File

@@ -44,6 +44,7 @@ op-revm.workspace = true
thiserror.workspace = true
[dev-dependencies]
reth-chain.workspace = true
reth-evm = { workspace = true, features = ["test-utils"] }
reth-revm = { workspace = true, features = ["test-utils"] }
alloy-genesis.workspace = true

View File

@@ -295,11 +295,10 @@ mod tests {
use alloy_genesis::Genesis;
use alloy_primitives::{bytes, map::HashMap, Address, LogData, B256};
use op_revm::OpSpecId;
use reth_chain::Chain;
use reth_chainspec::ChainSpec;
use reth_evm::execute::ProviderError;
use reth_execution_types::{
AccountRevertInit, BundleStateInit, Chain, ExecutionOutcome, RevertsInit,
};
use reth_execution_types::{AccountRevertInit, BundleStateInit, ExecutionOutcome, RevertsInit};
use reth_optimism_chainspec::{OpChainSpec, BASE_MAINNET};
use reth_optimism_primitives::{OpBlock, OpPrimitives, OpReceipt};
use reth_primitives_traits::{Account, RecoveredBlock};
@@ -529,12 +528,8 @@ mod tests {
// Create a Chain object with a BTreeMap of blocks mapped to their block numbers,
// including block1_hash and block2_hash, and the execution_outcome
let chain: Chain<OpPrimitives> = Chain::new(
[block1, block2],
execution_outcome.clone(),
BTreeMap::new(),
BTreeMap::new(),
);
let chain: Chain<OpPrimitives> =
Chain::new([block1, block2], execution_outcome.clone(), BTreeMap::new());
// Assert that the proper receipt vector is returned for block1_hash
assert_eq!(chain.receipts_by_block_hash(block1_hash), Some(vec![&receipt1]));

View File

@@ -12,11 +12,11 @@ description = "Types supporting implementation of 'eth' namespace RPC server API
workspace = true
[dependencies]
reth-chain.workspace = true
reth-chainspec.workspace = true
reth-chain-state.workspace = true
reth-errors.workspace = true
reth-evm.workspace = true
reth-execution-types.workspace = true
reth-metrics.workspace = true
reth-ethereum-primitives = { workspace = true, features = ["rpc"] }
reth-primitives-traits = { workspace = true, features = ["rpc-compat"] }

View File

@@ -5,9 +5,9 @@ use alloy_consensus::BlockHeader;
use alloy_eips::BlockHashOrNumber;
use alloy_primitives::B256;
use futures::{stream::FuturesOrdered, Stream, StreamExt};
use reth_chain::Chain;
use reth_chain_state::CanonStateNotification;
use reth_errors::{ProviderError, ProviderResult};
use reth_execution_types::Chain;
use reth_primitives_traits::{Block, NodePrimitives, RecoveredBlock};
use reth_storage_api::{BlockReader, TransactionVariant};
use reth_tasks::{TaskSpawner, TokioTaskExecutor};

View File

@@ -13,6 +13,7 @@ workspace = true
[dependencies]
# reth
reth-chain.workspace = true
reth-chainspec = { workspace = true, optional = true }
reth-codecs.workspace = true
reth-config.workspace = true
@@ -29,7 +30,6 @@ reth-fs-util.workspace = true
reth-network-p2p.workspace = true
reth-primitives-traits = { workspace = true, features = ["serde-bincode-compat"] }
reth-provider.workspace = true
reth-execution-types.workspace = true
reth-ethereum-primitives = { workspace = true, optional = true }
reth-prune.workspace = true
reth-prune-types.workspace = true

View File

@@ -2,11 +2,11 @@ use crate::stages::MERKLE_STAGE_DEFAULT_INCREMENTAL_THRESHOLD;
use alloy_consensus::BlockHeader;
use alloy_primitives::BlockNumber;
use num_traits::Zero;
use reth_chain::Chain;
use reth_config::config::ExecutionConfig;
use reth_consensus::FullConsensus;
use reth_db::{static_file::HeaderMask, tables};
use reth_evm::{execute::Executor, metrics::ExecutorMetrics, ConfigureEvm};
use reth_execution_types::Chain;
use reth_exex::{ExExManagerHandle, ExExNotification, ExExNotificationSource};
use reth_primitives_traits::{format_gas_throughput, BlockBody, NodePrimitives};
use reth_provider::{
@@ -423,7 +423,6 @@ where
blocks,
state.clone(),
BTreeMap::new(),
BTreeMap::new(),
));
if previous_input.is_some() {
@@ -525,7 +524,6 @@ where
blocks,
bundle_state_with_receipts,
BTreeMap::new(),
BTreeMap::new(),
));
debug_assert!(

View File

@@ -13,6 +13,7 @@ workspace = true
[dependencies]
# reth
reth-chain.workspace = true
reth-chainspec.workspace = true
reth-execution-types.workspace = true
reth-ethereum-primitives = { workspace = true, features = ["reth-codec"] }
@@ -86,6 +87,15 @@ tokio = { workspace = true, features = ["sync", "macros", "rt-multi-thread"] }
[features]
rocksdb = ["dep:rocksdb"]
serde-bincode-compat = [
"reth-chain/serde-bincode-compat",
"alloy-consensus/serde-bincode-compat",
"alloy-eips/serde-bincode-compat",
"reth-ethereum-primitives/serde-bincode-compat",
"reth-execution-types/serde-bincode-compat",
"reth-primitives-traits/serde-bincode-compat",
"reth-storage-api/serde-bincode-compat",
]
test-utils = [
"reth-db/test-utils",
"reth-nippy-jar/test-utils",

View File

@@ -35,11 +35,20 @@ pub mod test_utils;
pub mod either_writer;
pub use either_writer::*;
#[cfg(feature = "serde-bincode-compat")]
pub use reth_chain::serde_bincode_compat;
pub use reth_chain::{
AnchoredTrieInput, BlockReceipts, Chain, ChainBlocks, ComputedTrieData, DeferredTrieData,
DisplayBlocksChain,
};
pub use reth_chain_state::{
CanonStateNotification, CanonStateNotificationSender, CanonStateNotificationStream,
CanonStateNotifications, CanonStateSubscriptions,
};
pub use reth_execution_types::*;
pub use reth_execution_types::{
AccountRevertInit, BlockExecutionOutput, BlockExecutionResult, BundleStateInit, ChangedAccount,
ExecutionOutcome, RevertsInit,
};
/// Re-export `OriginalValuesKnown`
pub use revm_database::states::OriginalValuesKnown;
// reexport traits to avoid breaking changes

View File

@@ -782,6 +782,7 @@ mod tests {
use alloy_primitives::{BlockNumber, TxNumber, B256};
use itertools::Itertools;
use rand::Rng;
use reth_chain::Chain;
use reth_chain_state::{
test_utils::TestBlockBuilder, CanonStateNotification, CanonStateSubscriptions,
CanonicalInMemoryState, ExecutedBlock, NewCanonicalChain,
@@ -790,9 +791,7 @@ mod tests {
use reth_db_api::models::{AccountBeforeTx, StoredBlockBodyIndices};
use reth_errors::ProviderError;
use reth_ethereum_primitives::{Block, Receipt};
use reth_execution_types::{
BlockExecutionOutput, BlockExecutionResult, Chain, ExecutionOutcome,
};
use reth_execution_types::{BlockExecutionOutput, BlockExecutionResult, ExecutionOutcome};
use reth_primitives_traits::{RecoveredBlock, SealedBlock, SignerRecoverable};
use reth_storage_api::{
BlockBodyIndicesProvider, BlockHashReader, BlockIdReader, BlockNumReader, BlockReader,
@@ -1348,33 +1347,33 @@ mod tests {
// Send and receive commit notifications.
let block_2 = test_block_builder.generate_random_block(1, block_hash_1).try_recover()?;
let chain = Chain::new(
vec![block_2],
ExecutionOutcome::default(),
BTreeMap::new(),
BTreeMap::new(),
);
let chain = Chain::new(vec![block_2.clone()], ExecutionOutcome::default(), BTreeMap::new());
let commit = CanonStateNotification::Commit { new: Arc::new(chain.clone()) };
in_memory_state.notify_canon_state(commit.clone());
let (notification_1, notification_2) = tokio::join!(rx_1.recv(), rx_2.recv());
assert_eq!(notification_1, Ok(commit.clone()));
assert_eq!(notification_2, Ok(commit.clone()));
// Verify both subscribers received commit notifications with matching tip
let n1 = notification_1.unwrap();
let n2 = notification_2.unwrap();
assert_eq!(*n1.tip(), block_2);
assert_eq!(*n2.tip(), block_2);
// Send and receive re-org notifications.
let block_3 = test_block_builder.generate_random_block(1, block_hash_1).try_recover()?;
let block_4 = test_block_builder.generate_random_block(2, block_3.hash()).try_recover()?;
let new_chain = Chain::new(
vec![block_3, block_4],
vec![block_3, block_4.clone()],
ExecutionOutcome::default(),
BTreeMap::new(),
BTreeMap::new(),
);
let re_org =
CanonStateNotification::Reorg { old: Arc::new(chain), new: Arc::new(new_chain) };
in_memory_state.notify_canon_state(re_org.clone());
let (notification_1, notification_2) = tokio::join!(rx_1.recv(), rx_2.recv());
assert_eq!(notification_1, Ok(re_org.clone()));
assert_eq!(notification_2, Ok(re_org.clone()));
// Verify both subscribers received reorg notifications with matching tip
let n1 = notification_1.unwrap();
let n2 = notification_2.unwrap();
assert_eq!(*n1.tip(), block_4);
assert_eq!(*n2.tip(), block_4);
Ok(())
}

View File

@@ -33,6 +33,7 @@ use alloy_primitives::{
use itertools::Itertools;
use parking_lot::RwLock;
use rayon::slice::ParallelSliceMut;
use reth_chain::Chain;
use reth_chain_state::{ComputedTrieData, ExecutedBlock};
use reth_chainspec::{ChainInfo, ChainSpecProvider, EthChainSpec};
use reth_db_api::{
@@ -47,7 +48,7 @@ use reth_db_api::{
transaction::{DbTx, DbTxMut},
BlockNumberList, PlainAccountState, PlainStorageState,
};
use reth_execution_types::{BlockExecutionOutput, BlockExecutionResult, Chain, ExecutionOutcome};
use reth_execution_types::{BlockExecutionOutput, BlockExecutionResult, ExecutionOutcome};
use reth_node_types::{BlockTy, BodyTy, HeaderTy, NodeTypes, ReceiptTy, TxTy};
use reth_primitives_traits::{
Account, Block as _, BlockBody as _, Bytecode, RecoveredBlock, SealedHeader, StorageEntry,
@@ -3076,7 +3077,7 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> BlockExecutionWriter
// Update pipeline progress
self.update_pipeline_stages(block, true)?;
Ok(Chain::new(blocks, execution_state, BTreeMap::new(), BTreeMap::new()))
Ok(Chain::new(blocks, execution_state, BTreeMap::new()))
}
fn remove_block_and_execution_above(&self, block: BlockNumber) -> ProviderResult<()> {

View File

@@ -13,6 +13,7 @@ workspace = true
[dependencies]
# reth
reth-chain.workspace = true
reth-db-models.workspace = true
reth-chainspec.workspace = true
reth-db-api = { workspace = true, optional = true }
@@ -60,6 +61,7 @@ db-api = [
]
serde = [
"reth-chain/serde",
"reth-ethereum-primitives/serde",
"reth-db-models/serde",
"reth-execution-types/serde",
@@ -78,6 +80,7 @@ serde-bincode-compat = [
"reth-execution-types/serde-bincode-compat",
"reth-primitives-traits/serde-bincode-compat",
"reth-trie-common/serde-bincode-compat",
"reth-chain/serde-bincode-compat",
"reth-ethereum-primitives/serde-bincode-compat",
"alloy-eips/serde-bincode-compat",
"alloy-consensus/serde-bincode-compat",

View File

@@ -1,8 +1,9 @@
use crate::NodePrimitivesProvider;
use alloc::vec::Vec;
use alloy_primitives::BlockNumber;
use reth_chain::Chain;
use reth_db_models::StoredBlockBodyIndices;
use reth_execution_types::{Chain, ExecutionOutcome};
use reth_execution_types::ExecutionOutcome;
use reth_primitives_traits::{Block, NodePrimitives, RecoveredBlock};
use reth_storage_errors::provider::ProviderResult;
use reth_trie_common::HashedPostStateSorted;

View File

@@ -13,6 +13,7 @@ workspace = true
[dependencies]
# reth
reth-chain.workspace = true
reth-chain-state.workspace = true
reth-ethereum-primitives.workspace = true
reth-chainspec.workspace = true
@@ -90,6 +91,7 @@ serde = [
"revm-primitives/serde",
"reth-primitives-traits/serde",
"reth-ethereum-primitives/serde",
"reth-chain/serde",
"reth-chain-state/serde",
"reth-storage-api/serde",
]

View File

@@ -3,7 +3,7 @@
use alloy_consensus::Typed2718;
use alloy_eips::eip2718::Encodable2718;
use alloy_primitives::{BlockNumber, B256};
use reth_execution_types::ChainBlocks;
use reth_chain::ChainBlocks;
use reth_primitives_traits::{Block, BlockBody, SignedTransaction};
use std::collections::BTreeMap;
@@ -91,8 +91,8 @@ mod tests {
use super::*;
use alloy_consensus::{Header, Signed};
use alloy_primitives::Signature;
use reth_chain::Chain;
use reth_ethereum_primitives::Transaction;
use reth_execution_types::Chain;
use reth_primitives_traits::{RecoveredBlock, SealedBlock, SealedHeader};
#[test]
@@ -175,8 +175,7 @@ mod tests {
);
// Extract blocks from the chain
let chain: Chain =
Chain::new(vec![block1, block2], Default::default(), BTreeMap::new(), BTreeMap::new());
let chain: Chain = Chain::new(vec![block1, block2], Default::default(), BTreeMap::new());
let blocks = chain.into_inner().0;
// Add new chain blocks to the tracker