Compare commits

..

11 Commits

Author SHA1 Message Date
Brian Picciano
d6324d63e2 chore: release 1.11.3 2026-03-12 12:34:39 +01:00
Brian Picciano
5f3ade1bfe fix(trie): Reset proof v2 calculator on error (#22781)
Co-authored-by: Amp <amp@ampcode.com>
2026-03-12 10:09:18 +00:00
Derek Cofausper
b053f6fafe cherry-pick: fix don't produce both updates and removals for trie nodes (#22507)
Co-Authored-By: Arsenii Kulikov <62447812+klkvr@users.noreply.github.com>
2026-03-12 02:30:25 +00:00
Derek Cofausper
2a58e7a077 cherry-pick: install rayon panic handler (37f5b3a)
Co-Authored-By: Arsenii Kulikov <62447812+klkvr@users.noreply.github.com>
2026-03-12 02:30:17 +00:00
Emma Jamieson-Hoare
793a3d5fb3 fix missing import 2026-03-10 11:44:07 +00:00
Emma Jamieson-Hoare
89ae1af694 chore: upgrade to 1.11.2 2026-03-10 10:48:03 +00:00
Alexey Shekhirin
9c33fb5d45 fix(engine): reset execution cache hash on clear (#22895)
Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-10 10:46:09 +00:00
Alexey Shekhirin
bef3d7b4d1 fix lockfile 2026-02-23 18:36:44 +00:00
Emma Jamieson-Hoare
e918c17af9 chore: release 1.11.1
Amp-Thread-ID: https://ampcode.com/threads/T-019c8ba4-fd85-736b-9d2d-e878d350a91b
Co-authored-by: Amp <amp@ampcode.com>
2026-02-23 18:02:14 +00:00
Arsenii Kulikov
fcc170d53c fix: properly reveal trie nodes (#22415) 2026-02-23 17:58:13 +00:00
Arsenii Kulikov
c685842ba2 fix: overlay preparation on tokio (#22492) 2026-02-23 17:57:51 +00:00
79 changed files with 1170 additions and 1743 deletions

View File

@@ -1,5 +0,0 @@
---
reth-transaction-pool: minor
---
Added `consensus_ref` method to `PoolTransaction` trait for borrowing consensus transactions without cloning.

View File

@@ -1,6 +0,0 @@
---
reth-rpc-eth-api: minor
reth-rpc-server-types: minor
---
Added `eth_getStorageValues` RPC method for batch storage slot retrieval across multiple addresses.

View File

@@ -1,5 +0,0 @@
---
reth-storage-api: patch
---
Added `Arc` to `auto_impl` derive for storage-api traits to support automatic `Arc` wrapper implementations.

View File

@@ -0,0 +1,5 @@
---
reth-trie-sparse: patch
---
Fixed a bug where trie nodes could appear in both `updated_nodes` and `removed_nodes` simultaneously by removing entries from `removed_nodes` when a node is inserted as updated.

View File

@@ -0,0 +1,5 @@
---
reth-trie: patch
---
Fixed a potential panic in `ProofCalculator` by clearing internal computation state (`branch_stack`, `child_stack`, `branch_path`, etc.) after errors, preventing stale state from causing `usize` underflow panics when the calculator is reused. Added a test verifying correct behavior after simulated mid-computation errors.

View File

@@ -124,7 +124,7 @@ jobs:
- uses: rui314/setup-mold@v1
- uses: dtolnay/rust-toolchain@master
with:
toolchain: "1.93" # MSRV
toolchain: "1.88" # MSRV
- uses: mozilla-actions/sccache-action@v0.0.9
- uses: Swatinem/rust-cache@v2
with:

570
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -1,7 +1,7 @@
[workspace.package]
version = "1.11.0"
version = "1.11.3"
edition = "2024"
rust-version = "1.93"
rust-version = "1.88"
license = "MIT OR Apache-2.0"
homepage = "https://paradigmxyz.github.io/reth"
repository = "https://github.com/paradigmxyz/reth"
@@ -27,6 +27,7 @@ members = [
"crates/engine/invalid-block-hooks/",
"crates/engine/local",
"crates/engine/primitives/",
"crates/engine/service",
"crates/engine/tree/",
"crates/engine/util/",
"crates/era",
@@ -348,6 +349,7 @@ reth-ecies = { path = "crates/net/ecies" }
reth-engine-local = { path = "crates/engine/local" }
reth-engine-primitives = { path = "crates/engine/primitives", default-features = false }
reth-engine-tree = { path = "crates/engine/tree" }
reth-engine-service = { path = "crates/engine/service" }
reth-engine-util = { path = "crates/engine/util" }
reth-era = { path = "crates/era" }
reth-era-downloader = { path = "crates/era-downloader" }
@@ -447,14 +449,18 @@ revm-inspectors = "0.34.2"
# eth
alloy-dyn-abi = "1.5.6"
alloy-primitives = { version = "1.5.6", default-features = false, features = ["map-foldhash"] }
alloy-primitives = { version = "1.5.6", default-features = false, features = [
"map-foldhash",
] }
alloy-sol-types = { version = "1.5.6", default-features = false }
alloy-chains = { version = "0.2.5", default-features = false }
alloy-eip2124 = { version = "0.2.0", default-features = false }
alloy-eip7928 = { version = "0.3.0", default-features = false }
alloy-evm = { version = "0.27.2", default-features = false }
alloy-rlp = { version = "0.3.13", default-features = false, features = ["core-net"] }
alloy-rlp = { version = "0.3.13", default-features = false, features = [
"core-net",
] }
alloy-trie = { version = "0.9.4", default-features = false }
alloy-hardforks = "0.4.5"
@@ -466,10 +472,15 @@ alloy-genesis = { version = "1.6.3", default-features = false }
alloy-json-rpc = { version = "1.6.3", default-features = false }
alloy-network = { version = "1.6.3", default-features = false }
alloy-network-primitives = { version = "1.6.3", default-features = false }
alloy-provider = { version = "1.6.3", features = ["reqwest", "debug-api"], default-features = false }
alloy-provider = { version = "1.6.3", features = [
"reqwest",
"debug-api",
], default-features = false }
alloy-pubsub = { version = "1.6.3", default-features = false }
alloy-rpc-client = { version = "1.6.3", default-features = false }
alloy-rpc-types = { version = "1.6.3", features = ["eth"], default-features = false }
alloy-rpc-types = { version = "1.6.3", features = [
"eth",
], default-features = false }
alloy-rpc-types-admin = { version = "1.6.3", default-features = false }
alloy-rpc-types-anvil = { version = "1.6.3", default-features = false }
alloy-rpc-types-beacon = { version = "1.6.3", default-features = false }
@@ -483,7 +494,9 @@ alloy-serde = { version = "1.6.3", default-features = false }
alloy-signer = { version = "1.6.3", default-features = false }
alloy-signer-local = { version = "1.6.3", default-features = false }
alloy-transport = { version = "1.6.3" }
alloy-transport-http = { version = "1.6.3", features = ["reqwest-rustls-tls"], default-features = false }
alloy-transport-http = { version = "1.6.3", features = [
"reqwest-rustls-tls",
], default-features = false }
alloy-transport-ipc = { version = "1.6.3", default-features = false }
alloy-transport-ws = { version = "1.6.3", default-features = false }
@@ -502,7 +515,10 @@ either = { version = "1.15.0", default-features = false }
arrayvec = { version = "0.7.6", default-features = false }
aquamarine = "0.6"
auto_impl = "1"
backon = { version = "1.2", default-features = false, features = ["std-blocking-sleep", "tokio-sleep"] }
backon = { version = "1.2", default-features = false, features = [
"std-blocking-sleep",
"tokio-sleep",
] }
bincode = "1.3"
bitflags = "2.4"
boyer-moore-magiclen = "0.2.16"
@@ -524,9 +540,13 @@ itertools = { version = "0.14", default-features = false }
linked_hash_set = "0.1"
lz4 = "1.28.1"
modular-bitfield = "0.13.1"
notify = { version = "8.0.0", default-features = false, features = ["macos_fsevent"] }
notify = { version = "8.0.0", default-features = false, features = [
"macos_fsevent",
] }
nybbles = { version = "0.4.8", default-features = false }
once_cell = { version = "1.19", default-features = false, features = ["critical-section"] }
once_cell = { version = "1.19", default-features = false, features = [
"critical-section",
] }
parking_lot = "0.12"
paste = "1.0"
rand = "0.9"
@@ -545,7 +565,9 @@ strum_macros = "0.27"
syn = "2.0"
thiserror = { version = "2.0.0", default-features = false }
tar = "0.4.44"
tracing = { version = "0.1.0", default-features = false, features = ["attributes"] }
tracing = { version = "0.1.0", default-features = false, features = [
"attributes",
] }
tracing-appender = "0.2"
url = { version = "2.3", default-features = false }
zstd = "0.13"
@@ -583,7 +605,11 @@ futures-util = { version = "0.3", default-features = false }
hyper = "1.3"
hyper-util = "0.1.5"
pin-project = "1.0.12"
reqwest = { version = "0.12", default-features = false, features = ["rustls-tls", "rustls-tls-native-roots", "stream"] }
reqwest = { version = "0.12", default-features = false, features = [
"rustls-tls",
"rustls-tls-native-roots",
"stream",
] }
tracing-futures = "0.2"
tower = "0.5"
tower-http = "0.6"
@@ -608,7 +634,10 @@ proptest-arbitrary-interop = "0.1.0"
# crypto
enr = { version = "0.13", default-features = false }
k256 = { version = "0.13", default-features = false, features = ["ecdsa"] }
secp256k1 = { version = "0.30", default-features = false, features = ["global-context", "recovery"] }
secp256k1 = { version = "0.30", default-features = false, features = [
"global-context",
"recovery",
] }
# rand 8 for secp256k1
rand_08 = { package = "rand", version = "0.8" }
@@ -663,7 +692,6 @@ cipher = "0.4.3"
comfy-table = "7.0"
concat-kdf = "0.1.0"
crossbeam-channel = "0.5.13"
crossbeam-utils = "0.8"
crossterm = "0.29.0"
csv = "1.3.0"
ctrlc = "3.4"

View File

@@ -19,11 +19,10 @@ pre-build = [
image = "ubuntu:24.04"
pre-build = [
"apt update",
"apt install --yes gcc gcc-riscv64-linux-gnu g++-riscv64-linux-gnu libclang-dev make",
"apt install --yes gcc gcc-riscv64-linux-gnu libclang-dev make",
]
env.passthrough = [
"CARGO_TARGET_RISCV64GC_UNKNOWN_LINUX_GNU_LINKER=riscv64-linux-gnu-gcc",
"CXX_riscv64gc_unknown_linux_gnu=riscv64-linux-gnu-g++",
]
[build.env]

View File

@@ -93,7 +93,7 @@ When updating this, also update:
- .github/workflows/lint.yml
-->
The Minimum Supported Rust Version (MSRV) of this project is [1.93.0](https://blog.rust-lang.org/2026/01/22/Rust-1.93.0/).
The Minimum Supported Rust Version (MSRV) of this project is [1.88.0](https://blog.rust-lang.org/2025/06/26/Rust-1.88.0/).
See the docs for detailed instructions on how to [build from source](https://reth.rs/installation/source/).

View File

@@ -312,6 +312,11 @@ impl DeferredTrieData {
/// Given that invariant, circular wait dependencies are impossible.
#[instrument(level = "debug", target = "engine::tree::deferred_trie", skip_all)]
pub fn wait_cloned(&self) -> ComputedTrieData {
#[cfg(feature = "rayon")]
debug_assert!(
rayon::current_thread_index().is_none(),
"wait_cloned must not be called from a rayon worker thread"
);
let mut state = self.state.lock();
match &mut *state {
// If the deferred trie data is ready, return the cached result.

View File

@@ -285,6 +285,7 @@ fn verify_and_repair<N: ProviderNodeTypes>(tool: &DbTool<N>) -> eyre::Result<()>
// (We can't just use `upsert` method with a dup cursor, it's not properly
// supported)
let nibbles = StoredNibblesSubKey(path);
let entry = StorageTrieEntry { nibbles: nibbles.clone(), node };
if storage_trie_cursor
.seek_by_key_subkey(account, nibbles.clone())?
.filter(|v| v.nibbles == nibbles)
@@ -292,7 +293,6 @@ fn verify_and_repair<N: ProviderNodeTypes>(tool: &DbTool<N>) -> eyre::Result<()>
{
storage_trie_cursor.delete_current()?;
}
let entry = StorageTrieEntry { nibbles, node };
storage_trie_cursor.upsert(account, &entry)?;
}
Output::Progress(path) => {

View File

@@ -0,0 +1,47 @@
[package]
name = "reth-engine-service"
version.workspace = true
edition.workspace = true
rust-version.workspace = true
license.workspace = true
homepage.workspace = true
repository.workspace = true
[lints]
workspace = true
[dependencies]
# reth
reth-consensus.workspace = true
reth-engine-tree.workspace = true
reth-evm.workspace = true
reth-network-p2p.workspace = true
reth-payload-builder.workspace = true
reth-provider.workspace = true
reth-prune.workspace = true
reth-stages-api.workspace = true
reth-tasks.workspace = true
reth-node-types.workspace = true
reth-chainspec.workspace = true
reth-engine-primitives.workspace = true
reth-trie-db.workspace = true
# async
futures.workspace = true
pin-project.workspace = true
# misc
[dev-dependencies]
reth-engine-tree = { workspace = true, features = ["test-utils"] }
reth-ethereum-consensus.workspace = true
reth-ethereum-engine-primitives.workspace = true
reth-evm-ethereum.workspace = true
reth-exex-types.workspace = true
reth-primitives-traits.workspace = true
reth-node-ethereum.workspace = true
reth-trie-db.workspace = true
alloy-eips.workspace = true
tokio = { workspace = true, features = ["sync"] }
tokio-stream.workspace = true

View File

@@ -0,0 +1,12 @@
//! Engine service implementation.
#![doc(
html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png",
html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256",
issue_tracker_base_url = "https://github.com/paradigmxyz/reth/issues/"
)]
#![cfg_attr(docsrs, feature(doc_cfg))]
#![cfg_attr(not(test), warn(unused_crate_dependencies))]
/// Engine Service
pub mod service;

View File

@@ -0,0 +1,229 @@
use futures::{Stream, StreamExt};
use pin_project::pin_project;
use reth_chainspec::EthChainSpec;
use reth_consensus::FullConsensus;
use reth_engine_primitives::{BeaconEngineMessage, ConsensusEngineEvent};
use reth_engine_tree::{
backfill::PipelineSync,
download::BasicBlockDownloader,
engine::{EngineApiKind, EngineApiRequest, EngineApiRequestHandler, EngineHandler},
persistence::PersistenceHandle,
tree::{EngineApiTreeHandler, EngineValidator, TreeConfig},
};
pub use reth_engine_tree::{
chain::{ChainEvent, ChainOrchestrator},
engine::EngineApiEvent,
};
use reth_evm::ConfigureEvm;
use reth_network_p2p::BlockClient;
use reth_node_types::{BlockTy, NodeTypes};
use reth_payload_builder::PayloadBuilderHandle;
use reth_provider::{
providers::{BlockchainProvider, ProviderNodeTypes},
ProviderFactory, StorageSettingsCache,
};
use reth_prune::PrunerWithFactory;
use reth_stages_api::{MetricEventsSender, Pipeline};
use reth_tasks::TaskSpawner;
use reth_trie_db::ChangesetCache;
use std::{
pin::Pin,
sync::Arc,
task::{Context, Poll},
};
/// Alias for consensus engine stream.
pub type EngineMessageStream<T> = Pin<Box<dyn Stream<Item = BeaconEngineMessage<T>> + Send + Sync>>;
/// Alias for chain orchestrator.
type EngineServiceType<N, Client> = ChainOrchestrator<
EngineHandler<
EngineApiRequestHandler<
EngineApiRequest<<N as NodeTypes>::Payload, <N as NodeTypes>::Primitives>,
<N as NodeTypes>::Primitives,
>,
EngineMessageStream<<N as NodeTypes>::Payload>,
BasicBlockDownloader<Client, BlockTy<N>>,
>,
PipelineSync<N>,
>;
/// The type that drives the chain forward and communicates progress.
#[pin_project]
#[expect(missing_debug_implementations)]
// TODO(mattsse): remove hidden once fixed : <https://github.com/rust-lang/rust/issues/135363>
// otherwise rustdoc fails to resolve the alias
#[doc(hidden)]
pub struct EngineService<N, Client>
where
N: ProviderNodeTypes,
Client: BlockClient<Block = BlockTy<N>> + 'static,
{
orchestrator: EngineServiceType<N, Client>,
}
impl<N, Client> EngineService<N, Client>
where
N: ProviderNodeTypes,
Client: BlockClient<Block = BlockTy<N>> + 'static,
{
/// Constructor for `EngineService`.
#[expect(clippy::too_many_arguments)]
pub fn new<V, C>(
consensus: Arc<dyn FullConsensus<N::Primitives>>,
chain_spec: Arc<N::ChainSpec>,
client: Client,
incoming_requests: EngineMessageStream<N::Payload>,
pipeline: Pipeline<N>,
pipeline_task_spawner: Box<dyn TaskSpawner>,
provider: ProviderFactory<N>,
blockchain_db: BlockchainProvider<N>,
pruner: PrunerWithFactory<ProviderFactory<N>>,
payload_builder: PayloadBuilderHandle<N::Payload>,
payload_validator: V,
tree_config: TreeConfig,
sync_metrics_tx: MetricEventsSender,
evm_config: C,
changeset_cache: ChangesetCache,
) -> Self
where
V: EngineValidator<N::Payload>,
C: ConfigureEvm<Primitives = N::Primitives> + 'static,
{
let engine_kind =
if chain_spec.is_optimism() { EngineApiKind::OpStack } else { EngineApiKind::Ethereum };
let downloader = BasicBlockDownloader::new(client, consensus.clone());
let use_hashed_state = provider.cached_storage_settings().use_hashed_state();
let persistence_handle =
PersistenceHandle::<N::Primitives>::spawn_service(provider, pruner, sync_metrics_tx);
let canonical_in_memory_state = blockchain_db.canonical_in_memory_state();
let (to_tree_tx, from_tree) = EngineApiTreeHandler::spawn_new(
blockchain_db,
consensus,
payload_validator,
persistence_handle,
payload_builder,
canonical_in_memory_state,
tree_config,
engine_kind,
evm_config,
changeset_cache,
use_hashed_state,
);
let engine_handler = EngineApiRequestHandler::new(to_tree_tx, from_tree);
let handler = EngineHandler::new(engine_handler, downloader, incoming_requests);
let backfill_sync = PipelineSync::new(pipeline, pipeline_task_spawner);
Self { orchestrator: ChainOrchestrator::new(handler, backfill_sync) }
}
/// Returns a mutable reference to the orchestrator.
pub fn orchestrator_mut(&mut self) -> &mut EngineServiceType<N, Client> {
&mut self.orchestrator
}
}
impl<N, Client> Stream for EngineService<N, Client>
where
N: ProviderNodeTypes,
Client: BlockClient<Block = BlockTy<N>> + 'static,
{
type Item = ChainEvent<ConsensusEngineEvent<N::Primitives>>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut orchestrator = self.project().orchestrator;
StreamExt::poll_next_unpin(&mut orchestrator, cx)
}
}
#[cfg(test)]
mod tests {
use super::*;
use reth_chainspec::{ChainSpecBuilder, MAINNET};
use reth_engine_primitives::{BeaconEngineMessage, NoopInvalidBlockHook};
use reth_engine_tree::{test_utils::TestPipelineBuilder, tree::BasicEngineValidator};
use reth_ethereum_consensus::EthBeaconConsensus;
use reth_ethereum_engine_primitives::EthEngineTypes;
use reth_evm_ethereum::EthEvmConfig;
use reth_exex_types::FinishedExExHeight;
use reth_network_p2p::test_utils::TestFullBlockClient;
use reth_node_ethereum::EthereumEngineValidator;
use reth_primitives_traits::SealedHeader;
use reth_provider::{
providers::BlockchainProvider, test_utils::create_test_provider_factory_with_chain_spec,
};
use reth_prune::Pruner;
use reth_tasks::TokioTaskExecutor;
use reth_trie_db::ChangesetCache;
use std::sync::Arc;
use tokio::sync::{mpsc::unbounded_channel, watch};
use tokio_stream::wrappers::UnboundedReceiverStream;
#[test]
fn eth_chain_orchestrator_build() {
let chain_spec = Arc::new(
ChainSpecBuilder::default()
.chain(MAINNET.chain)
.genesis(MAINNET.genesis.clone())
.paris_activated()
.build(),
);
let consensus = Arc::new(EthBeaconConsensus::new(chain_spec.clone()));
let client = TestFullBlockClient::default();
let (_tx, rx) = unbounded_channel::<BeaconEngineMessage<EthEngineTypes>>();
let incoming_requests = UnboundedReceiverStream::new(rx);
let pipeline = TestPipelineBuilder::new().build(chain_spec.clone());
let pipeline_task_spawner = Box::<TokioTaskExecutor>::default();
let provider_factory = create_test_provider_factory_with_chain_spec(chain_spec.clone());
let blockchain_db =
BlockchainProvider::with_latest(provider_factory.clone(), SealedHeader::default())
.unwrap();
let engine_payload_validator = EthereumEngineValidator::new(chain_spec.clone());
let (_tx, rx) = watch::channel(FinishedExExHeight::NoExExs);
let pruner = Pruner::new_with_factory(provider_factory.clone(), vec![], 0, 0, None, rx);
let evm_config = EthEvmConfig::new(chain_spec.clone());
let changeset_cache = ChangesetCache::new();
let engine_validator = BasicEngineValidator::new(
blockchain_db.clone(),
consensus.clone(),
evm_config.clone(),
engine_payload_validator,
TreeConfig::default(),
Box::new(NoopInvalidBlockHook::default()),
changeset_cache.clone(),
reth_tasks::Runtime::test(),
);
let (sync_metrics_tx, _sync_metrics_rx) = unbounded_channel();
let (tx, _rx) = unbounded_channel();
let _eth_service = EngineService::new(
consensus,
chain_spec,
client,
Box::pin(incoming_requests),
pipeline,
pipeline_task_spawner,
provider_factory,
blockchain_db,
pruner,
PayloadBuilderHandle::new(tx),
engine_validator,
TreeConfig::default(),
sync_metrics_tx,
evm_config,
changeset_cache,
);
}
}

View File

@@ -29,7 +29,7 @@ reth-provider.workspace = true
reth-prune.workspace = true
reth-revm = { workspace = true, features = ["optional-balance-check"] }
reth-stages-api.workspace = true
reth-tasks = { workspace = true, features = ["rayon"] }
reth-tasks.workspace = true
reth-trie-parallel.workspace = true
reth-trie-sparse = { workspace = true, features = ["std", "metrics"] }
reth-trie.workspace = true

View File

@@ -1,110 +0,0 @@
//! Engine orchestrator launch helper.
//!
//! Provides [`build_engine_orchestrator`](crate::launch::build_engine_orchestrator) which wires
//! together all engine components and returns a
//! [`ChainOrchestrator`](crate::chain::ChainOrchestrator) ready to be polled as a `Stream`.
use crate::{
backfill::PipelineSync,
chain::ChainOrchestrator,
download::BasicBlockDownloader,
engine::{EngineApiKind, EngineApiRequest, EngineApiRequestHandler, EngineHandler},
persistence::PersistenceHandle,
tree::{EngineApiTreeHandler, EngineValidator, TreeConfig},
};
use futures::Stream;
use reth_consensus::FullConsensus;
use reth_engine_primitives::BeaconEngineMessage;
use reth_evm::ConfigureEvm;
use reth_network_p2p::BlockClient;
use reth_payload_builder::PayloadBuilderHandle;
use reth_primitives_traits::NodePrimitives;
use reth_provider::{
providers::{BlockchainProvider, ProviderNodeTypes},
ProviderFactory, StorageSettingsCache,
};
use reth_prune::PrunerWithFactory;
use reth_stages_api::{MetricEventsSender, Pipeline};
use reth_tasks::TaskSpawner;
use reth_trie_db::ChangesetCache;
use std::sync::Arc;
/// Builds the engine [`ChainOrchestrator`] that drives the chain forward.
///
/// This spawns and wires together the following components:
///
/// - **[`BasicBlockDownloader`]** — downloads blocks on demand from the network during live sync.
/// - **[`PersistenceHandle`]** — spawns the persistence service on a background thread for writing
/// blocks and performing pruning outside the critical consensus path.
/// - **[`EngineApiTreeHandler`]** — spawns the tree handler that processes engine API requests
/// (`newPayload`, `forkchoiceUpdated`) and maintains the in-memory chain state.
/// - **[`EngineApiRequestHandler`]** + **[`EngineHandler`]** — glue that routes incoming CL
/// messages to the tree handler and manages download requests.
/// - **[`PipelineSync`]** — wraps the staged sync [`Pipeline`] for backfill sync when the node
/// needs to catch up over large block ranges.
///
/// The returned orchestrator implements [`Stream`] and yields
/// [`ChainEvent`]s.
///
/// [`ChainEvent`]: crate::chain::ChainEvent
#[expect(clippy::too_many_arguments, clippy::type_complexity)]
pub fn build_engine_orchestrator<N, Client, S, V, C>(
engine_kind: EngineApiKind,
consensus: Arc<dyn FullConsensus<N::Primitives>>,
client: Client,
incoming_requests: S,
pipeline: Pipeline<N>,
pipeline_task_spawner: Box<dyn TaskSpawner>,
provider: ProviderFactory<N>,
blockchain_db: BlockchainProvider<N>,
pruner: PrunerWithFactory<ProviderFactory<N>>,
payload_builder: PayloadBuilderHandle<N::Payload>,
payload_validator: V,
tree_config: TreeConfig,
sync_metrics_tx: MetricEventsSender,
evm_config: C,
changeset_cache: ChangesetCache,
) -> ChainOrchestrator<
EngineHandler<
EngineApiRequestHandler<EngineApiRequest<N::Payload, N::Primitives>, N::Primitives>,
S,
BasicBlockDownloader<Client, <N::Primitives as NodePrimitives>::Block>,
>,
PipelineSync<N>,
>
where
N: ProviderNodeTypes,
Client: BlockClient<Block = <N::Primitives as NodePrimitives>::Block> + 'static,
S: Stream<Item = BeaconEngineMessage<N::Payload>> + Send + Sync + Unpin + 'static,
V: EngineValidator<N::Payload>,
C: ConfigureEvm<Primitives = N::Primitives> + 'static,
{
let downloader = BasicBlockDownloader::new(client, consensus.clone());
let use_hashed_state = provider.cached_storage_settings().use_hashed_state();
let persistence_handle =
PersistenceHandle::<N::Primitives>::spawn_service(provider, pruner, sync_metrics_tx);
let canonical_in_memory_state = blockchain_db.canonical_in_memory_state();
let (to_tree_tx, from_tree) = EngineApiTreeHandler::spawn_new(
blockchain_db,
consensus,
payload_validator,
persistence_handle,
payload_builder,
canonical_in_memory_state,
tree_config,
engine_kind,
evm_config,
changeset_cache,
use_hashed_state,
);
let engine_handler = EngineApiRequestHandler::new(to_tree_tx, from_tree);
let handler = EngineHandler::new(engine_handler, downloader, incoming_requests);
let backfill_sync = PipelineSync::new(pipeline, pipeline_task_spawner);
ChainOrchestrator::new(handler, backfill_sync)
}

View File

@@ -100,8 +100,6 @@ pub mod chain;
pub mod download;
/// Engine Api chain handler support.
pub mod engine;
/// Engine orchestrator launch helper.
pub mod launch;
/// Metrics support.
pub mod metrics;
/// The background writer service, coordinating write operations on static files and the database.

View File

@@ -119,7 +119,7 @@ where
Ok(())
}
#[instrument(level = "debug", target = "engine::persistence", skip_all, fields(%new_tip_num))]
#[instrument(level = "debug", target = "engine::persistence", skip_all, fields(new_tip_num))]
fn on_remove_blocks_above(
&self,
new_tip_num: u64,

View File

@@ -845,8 +845,10 @@ impl SavedCache {
self.caches.update_metrics(&self.metrics);
}
/// Clears all caches, resetting them to empty state.
pub(crate) fn clear(&self) {
/// Clears all caches, resetting them to empty state,
/// and updates the hash of the block this cache belongs to.
pub(crate) fn clear_with_hash(&mut self, hash: B256) {
self.hash = hash;
self.caches.clear();
}
}

View File

@@ -1411,7 +1411,7 @@ where
// Spawn a background task to trigger computation so it's ready when the next payload
// arrives.
if let Some(overlay) = self.state.tree_state.prepare_canonical_overlay() {
rayon::spawn(move || {
tokio::task::spawn_blocking(move || {
let _ = overlay.get();
});
}
@@ -2602,7 +2602,7 @@ where
/// Returns `InsertPayloadOk::Inserted(BlockStatus::Valid)` on successful execution,
/// `InsertPayloadOk::AlreadySeen` if the block already exists, or
/// `InsertPayloadOk::Inserted(BlockStatus::Disconnected)` if parent state is missing.
#[instrument(level = "debug", target = "engine::tree", skip_all, fields(?block_id))]
#[instrument(level = "debug", target = "engine::tree", skip_all, fields(block_id))]
fn insert_block_or_payload<Input, Err>(
&mut self,
block_id: BlockWithParent,

View File

@@ -33,7 +33,7 @@ use reth_provider::{
StateProviderFactory, StateReader,
};
use reth_revm::{db::BundleState, state::EvmState};
use reth_tasks::{ForEachOrdered, Runtime};
use reth_tasks::Runtime;
use reth_trie::{hashed_cursor::HashedCursorFactory, trie_cursor::TrieCursorFactory};
use reth_trie_parallel::{
proof_task::{ProofTaskCtx, ProofWorkerHandle},
@@ -43,13 +43,14 @@ use reth_trie_sparse::{
ParallelSparseTrie, ParallelismThresholds, RevealableSparseTrie, SparseStateTrie,
};
use std::{
collections::BTreeMap,
ops::Not,
sync::{
atomic::AtomicBool,
mpsc::{self, channel},
Arc,
},
time::Instant,
time::{Duration, Instant},
};
use tracing::{debug, debug_span, instrument, warn, Span};
@@ -96,7 +97,6 @@ pub const SPARSE_TRIE_MAX_VALUES_SHRINK_CAPACITY: usize = 1_000_000;
/// Blocks with fewer transactions than this skip prewarming, since the fixed overhead of spawning
/// prewarm workers exceeds the execution time saved.
pub const SMALL_BLOCK_TX_THRESHOLD: usize = 5;
/// Type alias for [`PayloadHandle`] returned by payload processor spawn methods.
type IteratorPayloadHandle<Evm, I, N> = PayloadHandle<
WithTxEnv<TxEnvFor<Evm>, <I as ExecutableTxIterator<Evm>>::Recovered>,
@@ -248,23 +248,42 @@ where
let (to_sparse_trie, sparse_trie_rx) = channel();
let (to_multi_proof, from_multi_proof) = crossbeam_channel::unbounded();
// Extract V2 proofs flag early so we can pass it to prewarm
let v2_proofs_enabled = !config.disable_proof_v2();
let parent_state_root = env.parent_state_root;
let transaction_count = env.transaction_count;
let prewarm_handle = self.spawn_caching_with(
env,
prewarm_rx,
provider_builder.clone(),
Some(to_multi_proof.clone()),
bal,
v2_proofs_enabled,
);
// Create and spawn the storage proof task.
// Capture parent_state_root before env is moved into spawn_caching_with
let parent_state_root = env.parent_state_root;
// Handle BAL-based optimization if available
let prewarm_handle = if let Some(bal) = bal {
// When BAL is present, use BAL prewarming and send BAL to multiproof
debug!(target: "engine::tree::payload_processor", "BAL present, using BAL prewarming");
// The prewarm task converts the BAL to HashedPostState and sends it on
// to_multi_proof after slot prefetching completes.
self.spawn_caching_with(
env,
prewarm_rx,
provider_builder.clone(),
Some(to_multi_proof.clone()),
Some(bal),
v2_proofs_enabled,
)
} else {
// Normal path: spawn with transaction prewarming
self.spawn_caching_with(
env,
prewarm_rx,
provider_builder.clone(),
Some(to_multi_proof.clone()),
None,
v2_proofs_enabled,
)
};
// Create and spawn the storage proof task
let task_ctx = ProofTaskCtx::new(multiproof_provider_factory);
let halve_workers = transaction_count <= Self::SMALL_BLOCK_PROOF_WORKER_TX_THRESHOLD;
let proof_handle =
ProofWorkerHandle::new(&self.executor, task_ctx, halve_workers, v2_proofs_enabled);
let proof_handle = ProofWorkerHandle::new(&self.executor, task_ctx, v2_proofs_enabled);
if config.disable_trie_cache() {
let multi_proof_task = MultiProofTask::new(
@@ -344,10 +363,6 @@ where
}
}
/// Transaction count threshold below which proof workers are halved, since fewer transactions
/// produce fewer state changes and most workers would be idle overhead.
const SMALL_BLOCK_PROOF_WORKER_TX_THRESHOLD: usize = 30;
/// Transaction count threshold below which sequential signature recovery is used.
///
/// For blocks with fewer than this many transactions, the rayon parallel iterator overhead
@@ -359,11 +374,8 @@ where
/// Spawns a task advancing transaction env iterator and streaming updates through a channel.
///
/// For blocks with fewer than [`Self::SMALL_BLOCK_TX_THRESHOLD`] transactions, uses
/// sequential iteration to avoid rayon overhead. For larger blocks, uses rayon parallel
/// iteration with [`ForEachOrdered`] to recover signatures in parallel while streaming
/// results to execution in the original transaction order.
/// sequential iteration to avoid rayon overhead.
#[expect(clippy::type_complexity)]
#[instrument(level = "debug", target = "engine::tree::payload_processor", skip_all)]
fn spawn_tx_iterator<I: ExecutableTxIterator<Evm>>(
&self,
transactions: I,
@@ -372,8 +384,9 @@ where
mpsc::Receiver<WithTxEnv<TxEnvFor<Evm>, I::Recovered>>,
mpsc::Receiver<Result<WithTxEnv<TxEnvFor<Evm>, I::Recovered>, I::Error>>,
) {
let (prewarm_tx, prewarm_rx) = mpsc::sync_channel(transaction_count);
let (execute_tx, execute_rx) = mpsc::sync_channel(transaction_count);
let (ooo_tx, ooo_rx) = mpsc::channel();
let (prewarm_tx, prewarm_rx) = mpsc::channel();
let (execute_tx, execute_rx) = mpsc::channel();
if transaction_count == 0 {
// Empty block — nothing to do.
@@ -387,7 +400,7 @@ where
);
self.executor.spawn_blocking(move || {
let (transactions, convert) = transactions.into_parts();
for tx in transactions {
for (idx, tx) in transactions.into_iter().enumerate() {
let tx = convert.convert(tx);
let tx = tx.map(|tx| {
let (tx_env, tx) = tx.into_parts();
@@ -396,42 +409,57 @@ where
if let Ok(tx) = &tx {
let _ = prewarm_tx.send(tx.clone());
}
let _ = execute_tx.send(tx);
let _ = ooo_tx.send((idx, tx));
}
});
} else {
// Parallel path — recover signatures in parallel on rayon, stream results
// to execution in order via `for_each_ordered`.
// Parallel path — spawn on rayon for parallel signature recovery.
rayon::spawn(move || {
let (transactions, convert) = transactions.into_parts();
transactions
.into_par_iter()
.map(|tx| {
transactions.into_par_iter().enumerate().for_each_with(
ooo_tx,
|ooo_tx, (idx, tx)| {
let tx = convert.convert(tx);
tx.map(|tx| {
let tx = tx.map(|tx| {
let (tx_env, tx) = tx.into_parts();
let tx = WithTxEnv { tx_env, tx: Arc::new(tx) };
// Send to prewarming out of order — order doesn't matter there.
WithTxEnv { tx_env, tx: Arc::new(tx) }
});
// Only send Ok(_) variants to prewarming task.
if let Ok(tx) = &tx {
let _ = prewarm_tx.send(tx.clone());
tx
})
})
.for_each_ordered(|tx| {
let _ = execute_tx.send(tx);
});
}
let _ = ooo_tx.send((idx, tx));
},
);
});
}
// Spawn a task that processes out-of-order transactions from the task above and sends them
// to the execution task in order.
self.executor.spawn_blocking(move || {
let mut next_for_execution = 0;
let mut queue = BTreeMap::new();
while let Ok((idx, tx)) = ooo_rx.recv() {
if next_for_execution == idx {
let _ = execute_tx.send(tx);
next_for_execution += 1;
while let Some(entry) = queue.first_entry()
&& *entry.key() == next_for_execution
{
let _ = execute_tx.send(entry.remove());
next_for_execution += 1;
}
} else {
queue.insert(idx, tx);
}
}
});
(prewarm_rx, execute_rx)
}
/// Spawn prewarming optionally wired to the multiproof task for target updates.
#[instrument(
level = "debug",
target = "engine::tree::payload_processor",
skip_all,
fields(bal=%bal.is_some(), %v2_proofs_enabled)
)]
fn spawn_caching_with<P>(
&self,
env: ExecutionEnv<Evm>,
@@ -470,6 +498,7 @@ where
self.prewarm_max_concurrency,
);
// spawn pre-warm task
{
let to_prewarm_task = to_prewarm_task.clone();
self.executor.spawn_blocking(move || {
@@ -897,7 +926,7 @@ impl PayloadExecutionCache {
#[instrument(level = "debug", target = "engine::tree::payload_processor", skip(self))]
pub(crate) fn get_cache_for(&self, parent_hash: B256) -> Option<SavedCache> {
let start = Instant::now();
let cache = self.inner.read();
let mut cache = self.inner.write();
let elapsed = start.elapsed();
self.metrics.execution_cache_wait_duration.record(elapsed.as_secs_f64());
@@ -905,7 +934,7 @@ impl PayloadExecutionCache {
warn!(blocked_for=?elapsed, "Blocked waiting for execution cache mutex");
}
if let Some(c) = cache.as_ref() {
if let Some(c) = cache.as_mut() {
let cached_hash = c.executed_block_hash();
// Check that the cache hash matches the parent hash of the current block. It won't
// match in case it's a fork block.
@@ -926,13 +955,13 @@ impl PayloadExecutionCache {
);
if available {
// If the has is available (no other threads are using it), but has a mismatching
// parent hash, we can just clear it and keep using without re-creating from
// scratch.
if !hash_matches {
c.clear();
// Fork block: clear and update the hash on the ORIGINAL before cloning.
// This prevents the canonical chain from matching on the stale hash
// and picking up polluted data if the fork block fails.
c.clear_with_hash(parent_hash);
}
return Some(c.clone())
return Some(c.clone());
} else if hash_matches {
self.metrics.execution_cache_in_use.increment(1);
}
@@ -943,10 +972,25 @@ impl PayloadExecutionCache {
None
}
/// Clears the tracked cache
#[expect(unused)]
pub(crate) fn clear(&self) {
self.inner.write().take();
/// Waits until the execution cache becomes available for use.
///
/// This acquires a write lock to ensure exclusive access, then immediately releases it.
/// This is useful for synchronization before starting payload processing.
///
/// Returns the time spent waiting for the lock.
pub fn wait_for_availability(&self) -> Duration {
let start = Instant::now();
// Acquire write lock to wait for any current holders to finish
let _guard = self.inner.write();
let elapsed = start.elapsed();
if elapsed.as_millis() > 5 {
debug!(
target: "engine::tree::payload_processor",
blocked_for=?elapsed,
"Waited for execution cache to become available"
);
}
elapsed
}
/// Updates the cache with a closure that has exclusive access to the guard.
@@ -1099,10 +1143,18 @@ mod tests {
execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(hash)));
// When the parent hash doesn't match, the cache is cleared and returned for reuse
// When the parent hash doesn't match (fork block), the cache is cleared,
// hash updated on the original, and clone returned for reuse
let different_hash = B256::from([4u8; 32]);
let cache = execution_cache.get_cache_for(different_hash);
assert!(cache.is_some(), "cache should be returned for reuse after clearing")
assert!(cache.is_some(), "cache should be returned for reuse after clearing");
drop(cache);
// The stored cache now has the fork block's parent hash.
// Canonical chain looking for original hash sees a mismatch → clears and reuses.
let original = execution_cache.get_cache_for(hash);
assert!(original.is_some(), "canonical chain gets cache back via mismatch+clear");
}
#[test]
@@ -1326,4 +1378,61 @@ mod tests {
"State root mismatch: task={root_from_task}, base={root_from_regular}"
);
}
/// Tests the full prewarm lifecycle for a fork block:
///
/// 1. Cache is at canonical block 4.
/// 2. Fork block (parent = block 2) checks out the cache via `get_cache_for`, simulating what
/// `PrewarmCacheTask` does when it receives a `SavedCache`.
/// 3. Prewarm populates the shared cache with fork-specific state.
/// 4. While the prewarm clone is alive, the cache is unavailable (`usage_guard` > 1).
/// 5. Prewarm drops without calling `save_cache` (fork block was invalid).
/// 6. Canonical block 5 (parent = block 4) must get a cache with correct hash and no stale fork
/// data.
#[test]
fn fork_prewarm_dropped_without_save_does_not_corrupt_cache() {
let execution_cache = PayloadExecutionCache::default();
// Canonical chain at block 4.
let block4_hash = B256::from([4u8; 32]);
execution_cache.update_with_guard(|slot| *slot = Some(make_saved_cache(block4_hash)));
// Fork block arrives with parent = block 2. Prewarm task checks out the cache.
// This simulates PrewarmCacheTask receiving a SavedCache clone from get_cache_for.
let fork_parent = B256::from([2u8; 32]);
let prewarm_cache = execution_cache.get_cache_for(fork_parent);
assert!(prewarm_cache.is_some(), "prewarm should obtain cache for fork block");
let prewarm_cache = prewarm_cache.unwrap();
assert_eq!(prewarm_cache.executed_block_hash(), fork_parent);
// Prewarm populates cache with fork-specific state (ancestor data for block 2).
// Since ExecutionCache uses Arc<Inner>, this data is shared with the stored original.
let fork_addr = Address::from([0xBB; 20]);
let fork_key = B256::from([0xCC; 32]);
prewarm_cache.cache().insert_storage(fork_addr, fork_key, Some(U256::from(999)));
// While prewarm holds the clone, the usage_guard count > 1 → cache is in use.
let during_prewarm = execution_cache.get_cache_for(block4_hash);
assert!(
during_prewarm.is_none(),
"cache must be unavailable while prewarm holds a reference"
);
// Fork block fails — prewarm task drops without calling save_cache/update_with_guard.
drop(prewarm_cache);
// Canonical block 5 arrives (parent = block 4).
// Stored hash = fork_parent (our fix), so get_cache_for sees a mismatch,
// clears the stale fork data, and returns a cache with hash = block4_hash.
let block5_cache = execution_cache.get_cache_for(block4_hash);
assert!(
block5_cache.is_some(),
"canonical chain must get cache after fork prewarm is dropped"
);
assert_eq!(
block5_cache.as_ref().unwrap().executed_block_hash(),
block4_hash,
"cache must carry the canonical parent hash, not the fork parent"
);
}
}

View File

@@ -771,11 +771,6 @@ impl MultiProofTask {
fn on_prefetch_proof(&mut self, mut targets: VersionedMultiProofTargets) -> u64 {
// Remove already fetched proof targets to avoid redundant work.
targets.retain_difference(&self.fetched_proof_targets);
if targets.is_empty() {
return 0;
}
extend_multiproof_targets(&mut self.fetched_proof_targets, &targets);
// For Legacy multiproofs, make sure all target accounts have an `AddedRemovedKeySet` in the
@@ -894,10 +889,6 @@ impl MultiProofTask {
state_updates += 1;
}
if not_fetched_state_update.is_empty() {
return state_updates;
}
// Clone+Arc MultiAddedRemovedKeys for sharing with the dispatched multiproof tasks
let multi_added_removed_keys = Arc::new(MultiAddedRemovedKeys {
account: self.multi_added_removed_keys.account.clone(),
@@ -1582,7 +1573,7 @@ mod tests {
let changeset_cache = ChangesetCache::new();
let overlay_factory = OverlayStateProviderFactory::new(factory, changeset_cache);
let task_ctx = ProofTaskCtx::new(overlay_factory);
let proof_handle = ProofWorkerHandle::new(runtime, task_ctx, false, false);
let proof_handle = ProofWorkerHandle::new(runtime, task_ctx, false);
let (to_sparse_trie, _receiver) = std::sync::mpsc::channel();
let (tx, rx) = crossbeam_channel::unbounded();
@@ -2065,7 +2056,7 @@ mod tests {
panic!("Expected PrefetchProofs message");
};
assert!(proofs_requested >= 1);
assert_eq!(proofs_requested, 1);
}
/// Verifies that different message types arriving mid-batch are not lost and preserve order.

View File

@@ -42,7 +42,7 @@ use std::{
ops::Range,
sync::{
atomic::{AtomicBool, Ordering},
mpsc::{self, channel, Receiver, Sender, SyncSender},
mpsc::{self, channel, Receiver, Sender},
Arc,
},
time::Instant,
@@ -154,6 +154,8 @@ where
self.executor.spawn_blocking(move || {
let _enter = debug_span!(target: "engine::tree::payload_processor::prewarm", parent: span, "spawn_all").entered();
let (done_tx, done_rx) = mpsc::channel();
// When transaction_count is 0, it means the count is unknown. In this case, spawn
// max workers to handle potentially many transactions in parallel rather
// than bottlenecking on a single worker.
@@ -164,8 +166,6 @@ where
transaction_count.min(max_concurrency)
};
let (done_tx, done_rx) = mpsc::sync_channel(workers_needed);
// Spawn workers
let tx_sender = ctx.clone().spawn_workers(workers_needed, &executor, to_multi_proof.clone(), done_tx.clone());
@@ -236,33 +236,36 @@ where
if let Some(saved_cache) = saved_cache {
debug!(target: "engine::caching", parent_hash=?hash, "Updating execution cache");
// Detach the published cache so readers see None during the update.
// This is necessary because ExecutionCache is Arc-shared: mutating
// it via insert_state would be visible through the old SavedCache.
// Perform all cache operations atomically under the lock
execution_cache.update_with_guard(|cached| {
cached.take();
});
// consumes the `SavedCache` held by the prewarming task, which releases its usage
// guard
let (caches, cache_metrics, disable_cache_metrics) = saved_cache.split();
let new_cache = SavedCache::new(hash, caches, cache_metrics)
.with_disable_cache_metrics(disable_cache_metrics);
let (caches, cache_metrics, disable_cache_metrics) = saved_cache.split();
let new_cache = SavedCache::new(hash, caches, cache_metrics)
.with_disable_cache_metrics(disable_cache_metrics);
// Insert state into cache while holding the lock
// Access the BundleState through the shared ExecutionOutcome
if new_cache.cache().insert_state(&execution_outcome.state).is_err() {
// Clear the cache on error to prevent having a polluted cache
*cached = None;
debug!(target: "engine::caching", "cleared execution cache on update error");
return;
}
if new_cache.cache().insert_state(&execution_outcome.state).is_err() {
debug!(target: "engine::caching", "cleared execution cache on update error");
} else {
new_cache.update_metrics();
let valid = valid_block_rx.recv().is_ok();
execution_cache.update_with_guard(|cached| {
if valid {
*cached = Some(new_cache);
} else {
debug!(target: "engine::caching", "cleared execution cache on invalid block");
}
});
}
if valid_block_rx.recv().is_ok() {
// Replace the shared cache with the new one; the previous cache (if any) is
// dropped.
*cached = Some(new_cache);
} else {
// Block was invalid; caches were already mutated by insert_state above,
// so we must clear to prevent using polluted state
*cached = None;
debug!(target: "engine::caching", "cleared execution cache on invalid block");
}
});
let elapsed = start.elapsed();
debug!(target: "engine::caching", parent_hash=?hash, elapsed=?elapsed, "Updated execution cache");
@@ -309,11 +312,11 @@ where
return;
}
let (done_tx, done_rx) = mpsc::channel();
// Calculate number of workers needed (at most max_concurrency)
let workers_needed = total_slots.min(self.max_concurrency);
let (done_tx, done_rx) = mpsc::sync_channel(workers_needed);
// Calculate slots per worker
let slots_per_worker = total_slots / workers_needed;
let remainder = total_slots % workers_needed;
@@ -582,7 +585,7 @@ where
self,
txs: CrossbeamReceiver<IndexedTransaction<Tx>>,
to_multi_proof: Option<CrossbeamSender<MultiProofMessage>>,
done_tx: SyncSender<()>,
done_tx: Sender<()>,
) where
Tx: ExecutableTxFor<Evm>,
{
@@ -657,7 +660,7 @@ where
workers_needed: usize,
task_executor: &Runtime,
to_multi_proof: Option<CrossbeamSender<MultiProofMessage>>,
done_tx: SyncSender<()>,
done_tx: Sender<()>,
) -> CrossbeamSender<IndexedTransaction<Tx>>
where
Tx: ExecutableTxFor<Evm> + Send + 'static,
@@ -695,7 +698,7 @@ where
executor: &Runtime,
bal: Arc<BlockAccessList>,
range: Range<usize>,
done_tx: SyncSender<()>,
done_tx: Sender<()>,
) {
let ctx = self.clone();
let span = debug_span!(
@@ -721,7 +724,7 @@ where
self,
bal: Arc<BlockAccessList>,
range: Range<usize>,
done_tx: SyncSender<()>,
done_tx: Sender<()>,
) {
let Self { saved_cache, provider, metrics, .. } = self;

View File

@@ -593,24 +593,18 @@ where
self.process_leaf_updates(true)?;
for (address, mut new) in self.new_storage_updates.drain() {
match self.storage_updates.entry(address) {
Entry::Vacant(entry) => {
entry.insert(new); // insert the whole map at once, no per-slot loop
}
Entry::Occupied(mut entry) => {
let updates = entry.get_mut();
for (slot, new) in new.drain() {
match updates.entry(slot) {
Entry::Occupied(mut slot_entry) => {
if new.is_changed() {
slot_entry.insert(new);
}
}
Entry::Vacant(slot_entry) => {
slot_entry.insert(new);
}
let updates = self.storage_updates.entry(address).or_default();
for (slot, new) in new.drain() {
match updates.entry(slot) {
Entry::Occupied(mut entry) => {
// Only overwrite existing entries with new values
if new.is_changed() {
entry.insert(new);
}
}
Entry::Vacant(entry) => {
entry.insert(new);
}
}
}
}

View File

@@ -10,11 +10,11 @@ use crate::tree::{
EngineApiMetrics, EngineApiTreeState, ExecutionEnv, PayloadHandle, StateProviderBuilder,
StateProviderDatabase, TreeConfig,
};
use alloy_consensus::{proofs::calculate_receipt_root, transaction::Either, TxHashRef, TxReceipt};
use alloy_consensus::transaction::{Either, TxHashRef};
use alloy_eip7928::BlockAccessList;
use alloy_eips::{eip1898::BlockWithParent, eip4895::Withdrawal, NumHash};
use alloy_evm::Evm;
use alloy_primitives::{Bloom, B256};
use alloy_primitives::B256;
use crate::tree::payload_processor::receipt_root_task::{IndexedReceipt, ReceiptRootTaskHandle};
use reth_chain_state::{CanonicalInMemoryState, DeferredTrieData, ExecutedBlock, LazyOverlay};
@@ -53,16 +53,6 @@ use std::{
};
use tracing::{debug, debug_span, error, info, instrument, trace, warn};
/// Blocks with at most this many transactions compute the receipt root inline to avoid
/// background task overhead.
const SMALL_BLOCK_RECEIPT_ROOT_TX_THRESHOLD: usize = 50;
const SMALL_BLOCK_STATE_ROOT_TX_THRESHOLD: usize = 50;
enum ReceiptRootResult {
Precomputed(ReceiptRootBloom),
Pending(tokio::sync::oneshot::Receiver<ReceiptRootBloom>),
}
/// Context providing access to tree state during validation.
///
/// This context is provided to the [`EngineValidator`] and includes the state of the tree's
@@ -300,7 +290,7 @@ where
// Validate block consensus rules which includes header validation
if let Err(consensus_err) = self.validate_block_inner(&block) {
// Header validation error takes precedence over execution error
return Err(InsertBlockError::new(block, consensus_err.into()).into());
return Err(InsertBlockError::new(block, consensus_err.into()).into())
}
// Also validate against the parent
@@ -308,7 +298,7 @@ where
self.consensus.validate_header_against_parent(block.sealed_header(), parent_block)
{
// Parent validation error takes precedence over execution error
return Err(InsertBlockError::new(block, consensus_err.into()).into());
return Err(InsertBlockError::new(block, consensus_err.into()).into())
}
// No header validation errors, return the original execution error
@@ -348,7 +338,7 @@ where
Ok(val) => val,
Err(e) => {
let block = self.convert_to_block(input)?;
return Err(InsertBlockError::new(block, e.into()).into());
return Err(InsertBlockError::new(block, e.into()).into())
}
}
};
@@ -381,7 +371,7 @@ where
self.convert_to_block(input)?,
ProviderError::HeaderNotFound(parent_hash.into()).into(),
)
.into());
.into())
};
let mut state_provider = ensure_ok!(provider_builder.build());
drop(_enter);
@@ -394,7 +384,7 @@ where
self.convert_to_block(input)?,
ProviderError::HeaderNotFound(parent_hash.into()).into(),
)
.into());
.into())
};
let evm_env = debug_span!(target: "engine::tree::payload_validator", "evm env")
@@ -411,7 +401,7 @@ where
};
// Plan the strategy used for state root computation.
let strategy = self.plan_state_root_computation(input.transaction_count());
let strategy = self.plan_state_root_computation();
debug!(
target: "engine::tree::payload_validator",
@@ -465,7 +455,7 @@ where
// Execute the block and handle any execution errors.
// The receipt root task is spawned before execution and receives receipts incrementally
// as transactions complete, allowing parallel computation during execution.
let (output, senders, receipt_root_result) =
let (output, senders, receipt_root_rx) =
match self.execute_block(state_provider, env, &input, &mut handle) {
Ok(output) => output,
Err(err) => return self.handle_execution_error(input, err, &parent_block),
@@ -486,18 +476,15 @@ where
let block = self.convert_to_block(input)?.with_senders(senders);
// Wait for the receipt root computation to complete.
let receipt_root_bloom = match receipt_root_result {
ReceiptRootResult::Precomputed(receipt_root_bloom) => Some(receipt_root_bloom),
ReceiptRootResult::Pending(receipt_root_rx) => receipt_root_rx
.blocking_recv()
.inspect_err(|_| {
tracing::error!(
target: "engine::tree::payload_validator",
"Receipt root task dropped sender without result, receipt root calculation likely aborted"
);
})
.ok(),
};
let receipt_root_bloom = receipt_root_rx
.blocking_recv()
.inspect_err(|_| {
tracing::error!(
target: "engine::tree::payload_validator",
"Receipt root task dropped sender without result, receipt root calculation likely aborted"
);
})
.ok();
let hashed_state = ensure_ok_post_block!(
self.validate_post_execution(
@@ -630,7 +617,7 @@ where
)
.into(),
)
.into());
.into())
}
if let Some(valid_block_tx) = valid_block_tx {
@@ -669,12 +656,12 @@ where
fn validate_block_inner(&self, block: &SealedBlock<N::Block>) -> Result<(), ConsensusError> {
if let Err(e) = self.consensus.validate_header(block.sealed_header()) {
error!(target: "engine::tree::payload_validator", ?block, "Failed to validate header {}: {e}", block.hash());
return Err(e);
return Err(e)
}
if let Err(e) = self.consensus.validate_block_pre_execution(block) {
error!(target: "engine::tree::payload_validator", ?block, "Failed to validate block {}: {e}", block.hash());
return Err(e);
return Err(e)
}
Ok(())
@@ -684,7 +671,7 @@ where
///
/// This method orchestrates block execution:
/// 1. Sets up the EVM with state database and precompile caching
/// 2. Spawns a background task for incremental receipt root computation (if needed)
/// 2. Spawns a background task for incremental receipt root computation
/// 3. Executes transactions with metrics collection via state hooks
/// 4. Merges state transitions and records execution metrics
#[instrument(level = "debug", target = "engine::tree::payload_validator", skip_all)]
@@ -696,7 +683,11 @@ where
input: &BlockOrPayload<T>,
handle: &mut PayloadHandle<impl ExecutableTxFor<Evm>, Err, N::Receipt>,
) -> Result<
(BlockExecutionOutput<N::Receipt>, Vec<Address>, ReceiptRootResult),
(
BlockExecutionOutput<N::Receipt>,
Vec<Address>,
tokio::sync::oneshot::Receiver<(B256, alloy_primitives::Bloom)>,
),
InsertBlockErrorKind,
>
where
@@ -740,16 +731,10 @@ where
// Spawn background task to compute receipt root and logs bloom incrementally.
// Unbounded channel is used since tx count bounds capacity anyway (max ~30k txs per block).
let receipts_len = input.transaction_count();
let compute_receipt_root_inline = receipts_len <= SMALL_BLOCK_RECEIPT_ROOT_TX_THRESHOLD;
let (receipt_tx, result_rx) = if compute_receipt_root_inline {
(None, None)
} else {
let (receipt_tx, receipt_rx) = crossbeam_channel::unbounded();
let (result_tx, result_rx) = tokio::sync::oneshot::channel();
let task_handle = ReceiptRootTaskHandle::new(receipt_rx, result_tx);
self.payload_processor.executor().spawn_blocking(move || task_handle.run(receipts_len));
(Some(receipt_tx), Some(result_rx))
};
let (receipt_tx, receipt_rx) = crossbeam_channel::unbounded();
let (result_tx, result_rx) = tokio::sync::oneshot::channel();
let task_handle = ReceiptRootTaskHandle::new(receipt_rx, result_tx);
self.payload_processor.executor().spawn_blocking(move || task_handle.run(receipts_len));
let transaction_count = input.transaction_count();
let executor = executor.with_state_hook(Some(Box::new(handle.state_hook())));
@@ -761,7 +746,7 @@ where
executor,
transaction_count,
handle.iter_transactions(),
receipt_tx.as_ref(),
&receipt_tx,
)?;
drop(receipt_tx);
@@ -778,21 +763,11 @@ where
let output = BlockExecutionOutput { result, state: db.take_bundle() };
let receipt_root_result = if compute_receipt_root_inline {
ReceiptRootResult::Precomputed(Self::compute_receipt_root_bloom(
&output.result.receipts,
))
} else {
ReceiptRootResult::Pending(
result_rx.expect("receipt root receiver missing when task spawned"),
)
};
let execution_duration = execution_start.elapsed();
self.metrics.record_block_execution(&output, execution_duration);
debug!(target: "engine::tree::payload_validator", elapsed = ?execution_duration, "Executed block");
Ok((output, senders, receipt_root_result))
Ok((output, senders, result_rx))
}
/// Executes transactions and collects senders, streaming receipts to a background task.
@@ -809,7 +784,7 @@ where
mut executor: E,
transaction_count: usize,
transactions: impl Iterator<Item = Result<Tx, Err>>,
receipt_tx: Option<&crossbeam_channel::Sender<IndexedReceipt<N::Receipt>>>,
receipt_tx: &crossbeam_channel::Sender<IndexedReceipt<N::Receipt>>,
) -> Result<(E, Vec<Address>), BlockExecutionError>
where
E: BlockExecutor<Receipt = N::Receipt>,
@@ -856,15 +831,13 @@ where
executor.execute_transaction(tx)?;
self.metrics.record_transaction_execution(tx_start.elapsed());
if let Some(receipt_tx) = receipt_tx {
let current_len = executor.receipts().len();
if current_len > last_sent_len {
last_sent_len = current_len;
// Send the latest receipt to the background task for incremental root computation.
if let Some(receipt) = executor.receipts().last() {
let tx_index = current_len - 1;
let _ = receipt_tx.send(IndexedReceipt::new(tx_index, receipt.clone()));
}
let current_len = executor.receipts().len();
if current_len > last_sent_len {
last_sent_len = current_len;
// Send the latest receipt to the background task for incremental root computation.
if let Some(receipt) = executor.receipts().last() {
let tx_index = current_len - 1;
let _ = receipt_tx.send(IndexedReceipt::new(tx_index, receipt.clone()));
}
}
}
@@ -1100,7 +1073,7 @@ where
trace!(target: "engine::tree::payload_validator", block=?block.num_hash(), "Validating block consensus");
// validate block consensus rules
if let Err(e) = self.validate_block_inner(block) {
return Err(e.into());
return Err(e.into())
}
// now validate against the parent
@@ -1109,7 +1082,7 @@ where
self.consensus.validate_header_against_parent(block.sealed_header(), parent_block)
{
warn!(target: "engine::tree::payload_validator", ?block, "Failed to validate header {} against parent: {e}", block.hash());
return Err(e.into());
return Err(e.into())
}
drop(_enter);
@@ -1122,7 +1095,7 @@ where
{
// call post-block hook
self.on_invalid_block(parent_block, block, output, None, ctx.state_mut());
return Err(err.into());
return Err(err.into())
}
drop(_enter);
@@ -1137,7 +1110,7 @@ where
{
// call post-block hook
self.on_invalid_block(parent_block, block, output, None, ctx.state_mut());
return Err(err.into());
return Err(err.into())
}
// record post-execution validation duration
@@ -1149,16 +1122,6 @@ where
Ok(hashed_state)
}
fn compute_receipt_root_bloom(receipts: &[N::Receipt]) -> ReceiptRootBloom {
let receipts_with_bloom =
receipts.iter().map(TxReceipt::with_bloom_ref).collect::<Vec<_>>();
let receipts_root = calculate_receipt_root(&receipts_with_bloom);
let logs_bloom = receipts_with_bloom
.iter()
.fold(Bloom::ZERO, |bloom, receipt| bloom | receipt.bloom_ref());
(receipts_root, logs_bloom)
}
/// Spawns a payload processor task based on the state root strategy.
///
/// This method determines how to execute the block and compute its state root based on
@@ -1179,7 +1142,7 @@ where
level = "debug",
target = "engine::tree::payload_validator",
skip_all,
fields(?strategy)
fields(strategy)
)]
fn spawn_payload_processor<T: ExecutableTxIterator<Evm>>(
&mut self,
@@ -1255,7 +1218,7 @@ where
self.provider.clone(),
historical,
Some(blocks),
)));
)))
}
// Check if the block is persisted
@@ -1263,7 +1226,7 @@ where
debug!(target: "engine::tree::payload_validator", %hash, number = %header.number(), "found canonical state for block in database, creating provider builder");
// For persisted blocks, we create a builder that will fetch state directly from the
// database
return Ok(Some(StateProviderBuilder::new(self.provider.clone(), hash, None)));
return Ok(Some(StateProviderBuilder::new(self.provider.clone(), hash, None)))
}
debug!(target: "engine::tree::payload_validator", %hash, "no canonical state found for block");
@@ -1274,12 +1237,7 @@ where
///
/// Note: Use state root task only if prefix sets are empty, otherwise proof generation is
/// too expensive because it requires walking all paths in every proof.
fn plan_state_root_computation(&self, transaction_count: usize) -> StateRootStrategy {
// Small blocks are faster without spawning parallel state root tasks.
if transaction_count > 0 && transaction_count <= SMALL_BLOCK_STATE_ROOT_TX_THRESHOLD {
return StateRootStrategy::Synchronous;
}
const fn plan_state_root_computation(&self) -> StateRootStrategy {
if self.config.state_root_fallback() {
StateRootStrategy::Synchronous
} else if self.config.use_state_root_task() {
@@ -1300,7 +1258,7 @@ where
) {
if state.invalid_headers.get(&block.hash()).is_some() {
// we already marked this block as invalid
return;
return
}
self.invalid_block_hook.on_invalid_block(parent_header, block, output, trie_updates);
}

View File

@@ -425,9 +425,17 @@ impl TotalDifficulty {
/// Convert to an [`Entry`]
pub fn to_entry(&self) -> Entry {
// era1 spec: `total-difficulty = { type: 0x0600, data: SSZ uint256 }` (little-endian)
let data = self.value.to_le_bytes::<32>().to_vec();
Entry::new(TOTAL_DIFFICULTY, data)
let mut data = [0u8; 32];
let be_bytes = self.value.to_be_bytes_vec();
if be_bytes.len() <= 32 {
data[32 - be_bytes.len()..].copy_from_slice(&be_bytes);
} else {
data.copy_from_slice(&be_bytes[be_bytes.len() - 32..]);
}
Entry::new(TOTAL_DIFFICULTY, data.to_vec())
}
/// Create from an [`Entry`]
@@ -446,8 +454,8 @@ impl TotalDifficulty {
)));
}
// era1 spec: `total-difficulty = { type: 0x0600, data: SSZ uint256 }` (little-endian)
let value = U256::from_le_slice(&entry.data);
// Convert 32-byte array to U256
let value = U256::from_be_slice(&entry.data);
Ok(Self { value })
}
@@ -600,19 +608,6 @@ mod tests {
assert_eq!(recovered.value, value);
}
#[test]
fn test_total_difficulty_ssz_le_encoding() {
// Verify that total-difficulty is encoded as SSZ uint256 (little-endian).
// See https://github.com/eth-clients/e2store-format-specs/blob/main/formats/era1.md
let value = U256::from(1u64);
let td = TotalDifficulty::new(value);
let entry = td.to_entry();
// Little-endian: least significant byte first [1, 0, 0, ..., 0]
assert_eq!(entry.data[0], 1, "First byte must be 1 (little-endian)");
assert_eq!(entry.data[31], 0, "Last byte must be 0 (little-endian)");
}
#[test]
fn test_compression_roundtrip() {
let rlp_data = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];

View File

@@ -1,6 +1,6 @@
use alloy_primitives::{Address, B256, U256};
use reth_primitives_traits::{Account, Bytecode};
use revm::database::{states::BundleState, BundleAccount};
use revm::database::BundleState;
pub use alloy_evm::block::BlockExecutionResult;
@@ -37,11 +37,6 @@ impl<T> BlockExecutionOutput<T> {
self.state.account(address).map(|a| a.info.as_ref().map(Into::into))
}
/// Returns the state [`BundleAccount`] for the given address.
pub fn account_state(&self, address: &Address) -> Option<&BundleAccount> {
self.state.account(address)
}
/// Get storage if value is known.
///
/// This means that depending on status we can potentially return `U256::ZERO`.

View File

@@ -10,7 +10,6 @@ use reth_provider::{BlockReader, Chain, HeaderProvider, StateProviderFactory};
use reth_stages_api::ExecutionStageThresholds;
use reth_tracing::tracing::debug;
use std::{
collections::VecDeque,
fmt::Debug,
pin::Pin,
sync::Arc,
@@ -287,9 +286,6 @@ where
backfill_job: Option<StreamBackfillJob<E, P, Chain<E::Primitives>>>,
/// Custom thresholds for the backfill job, if set.
backfill_thresholds: Option<ExecutionStageThresholds>,
/// Notifications that arrived during backfill and need to be delivered after it completes.
/// These are notifications for blocks beyond the backfill range that we must not drop.
pending_notifications: VecDeque<ExExNotification<E::Primitives>>,
}
impl<P, E> ExExNotificationsWithHead<P, E>
@@ -316,7 +312,6 @@ where
pending_check_backfill: true,
backfill_job: None,
backfill_thresholds: None,
pending_notifications: VecDeque::new(),
}
}
@@ -453,34 +448,6 @@ where
// 3. If backfill is in progress yield new notifications
if let Some(backfill_job) = &mut this.backfill_job {
debug!(target: "exex::notifications", "Polling backfill job");
// Drain the notification channel to prevent backpressure from stalling the
// ExExManager. During backfill, the ExEx is not consuming from the channel,
// so the capacity-1 channel fills up, which blocks the manager's PollSender,
// which fills the manager's 1024-entry buffer, which blocks all upstream
// senders. Notifications for blocks covered by the backfill range are
// discarded (they'll be re-delivered by the backfill job), while
// notifications beyond the backfill range are buffered for delivery after the
// backfill completes.
while let Poll::Ready(Some(notification)) = this.notifications.poll_recv(cx) {
// Always buffer revert-containing notifications (ChainReverted,
// ChainReorged) because the backfill job only re-delivers
// ChainCommitted from the database. Discarding a reorg here would
// leave the ExEx unaware of the fork switch.
if notification.reverted_chain().is_some() {
this.pending_notifications.push_back(notification);
continue;
}
if let Some(committed) = notification.committed_chain() &&
committed.tip().number() <= this.initial_local_head.number
{
// Covered by backfill range, safe to discard
continue;
}
// Beyond the backfill range — buffer for delivery after backfill
this.pending_notifications.push_back(notification);
}
if let Some(chain) = ready!(backfill_job.poll_next_unpin(cx)).transpose()? {
debug!(target: "exex::notifications", range = ?chain.range(), "Backfill job returned a chain");
return Poll::Ready(Some(Ok(ExExNotification::ChainCommitted {
@@ -492,18 +459,13 @@ where
this.backfill_job = None;
}
// 4. Deliver any notifications that were buffered during backfill
if let Some(notification) = this.pending_notifications.pop_front() {
return Poll::Ready(Some(Ok(notification)))
}
// 5. Otherwise advance the regular event stream
// 4. Otherwise advance the regular event stream
loop {
let Some(notification) = ready!(this.notifications.poll_recv(cx)) else {
return Poll::Ready(None)
};
// 6. In case the exex is ahead of the new tip, we must skip it
// 5. In case the exex is ahead of the new tip, we must skip it
if let Some(committed) = notification.committed_chain() {
// inclusive check because we should start with `exex.head + 1`
if this.initial_exex_head.block.number >= committed.tip().number() {
@@ -827,135 +789,4 @@ mod tests {
Ok(())
}
/// Regression test for <https://github.com/paradigmxyz/reth/issues/19665>.
///
/// During backfill, `poll_next` must drain the notification channel so that
/// the upstream `ExExManager` is never blocked by a full channel. Without
/// the drain loop the capacity-1 channel stays full for the entire backfill
/// duration, which stalls the manager's `PollSender` and eventually blocks
/// all upstream senders once the 1024-entry buffer fills up.
///
/// The key assertion is the `try_send` after the first `poll_next`: it
/// proves the channel was drained during the backfill poll. Without the
/// fix this `try_send` fails because the notification is still sitting in
/// the channel.
#[tokio::test]
async fn exex_notifications_backfill_drains_channel() -> eyre::Result<()> {
let mut rng = generators::rng();
let temp_dir = tempfile::tempdir().unwrap();
let wal = Wal::new(temp_dir.path()).unwrap();
let provider_factory = create_test_provider_factory();
let genesis_hash = init_genesis(&provider_factory)?;
let genesis_block = provider_factory
.block(genesis_hash.into())?
.ok_or_else(|| eyre::eyre!("genesis block not found"))?;
let provider = BlockchainProvider::new(provider_factory.clone())?;
// Insert block 1 into the DB so there's something to backfill
let node_head_block = random_block(
&mut rng,
genesis_block.number + 1,
BlockParams { parent: Some(genesis_hash), tx_count: Some(0), ..Default::default() },
)
.try_recover()?;
let node_head = node_head_block.num_hash();
let provider_rw = provider_factory.provider_rw()?;
provider_rw.insert_block(&node_head_block)?;
provider_rw.commit()?;
// ExEx head is at genesis — backfill will run for block 1
let exex_head =
ExExHead { block: BlockNumHash { number: genesis_block.number, hash: genesis_hash } };
// Notification for a block AFTER the backfill range (block 2).
let post_backfill_notification = ExExNotification::ChainCommitted {
new: Arc::new(Chain::new(
vec![random_block(
&mut rng,
node_head.number + 1,
BlockParams { parent: Some(node_head.hash), ..Default::default() },
)
.try_recover()?],
Default::default(),
BTreeMap::new(),
)),
};
// Another notification (block 3) used to probe channel capacity.
let probe_notification = ExExNotification::ChainCommitted {
new: Arc::new(Chain::new(
vec![random_block(
&mut rng,
node_head.number + 2,
BlockParams { parent: None, ..Default::default() },
)
.try_recover()?],
Default::default(),
BTreeMap::new(),
)),
};
let (notifications_tx, notifications_rx) = mpsc::channel(1);
// Fill the capacity-1 channel.
notifications_tx.send(post_backfill_notification.clone()).await?;
// Confirm the channel is full — this is the precondition that causes the
// stall in production: the ExExManager's PollSender would block here.
assert!(
notifications_tx.try_send(probe_notification.clone()).is_err(),
"channel should be full before backfill poll"
);
let mut notifications = ExExNotificationsWithoutHead::new(
node_head,
provider,
EthEvmConfig::mainnet(),
notifications_rx,
wal.handle(),
)
.with_head(exex_head);
// Poll once — this returns the backfill result for block 1. Crucially,
// the drain loop in poll_next runs in this same call, consuming the
// notification from the channel and buffering it.
let backfill_result = notifications.next().await.transpose()?;
assert_eq!(
backfill_result,
Some(ExExNotification::ChainCommitted {
new: Arc::new(
BackfillJobFactory::new(
notifications.evm_config.clone(),
notifications.provider.clone()
)
.backfill(1..=1)
.next()
.ok_or_eyre("failed to backfill")??
)
})
);
// KEY ASSERTION: the channel was drained during the backfill poll above.
// Without the drain loop this try_send fails because the original
// notification is still occupying the capacity-1 channel.
assert!(
notifications_tx.try_send(probe_notification.clone()).is_ok(),
"channel should have been drained during backfill poll"
);
// The first buffered notification (block 2) was drained from the channel
// during backfill and is delivered now.
let buffered = notifications.next().await.transpose()?;
assert_eq!(buffered, Some(post_backfill_notification));
// The probe notification (block 3) that we just sent is delivered next.
let probe = notifications.next().await.transpose()?;
assert_eq!(probe, Some(probe_notification));
Ok(())
}
}

View File

@@ -1,27 +0,0 @@
//! Implements the `GetBlockAccessLists` and `BlockAccessLists` message types.
use alloc::vec::Vec;
use alloy_primitives::{Bytes, B256};
use alloy_rlp::{RlpDecodableWrapper, RlpEncodableWrapper};
use reth_codecs_derive::add_arbitrary_tests;
/// A request for block access lists from the given block hashes.
#[derive(Clone, Debug, PartialEq, Eq, RlpEncodableWrapper, RlpDecodableWrapper, Default)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
#[cfg_attr(any(test, feature = "arbitrary"), derive(arbitrary::Arbitrary))]
#[add_arbitrary_tests(rlp)]
pub struct GetBlockAccessLists(
/// The block hashes to request block access lists for.
pub Vec<B256>,
);
/// Response for [`GetBlockAccessLists`] containing one BAL per requested block hash.
#[derive(Clone, Debug, PartialEq, Eq, RlpEncodableWrapper, RlpDecodableWrapper, Default)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
#[cfg_attr(any(test, feature = "arbitrary"), derive(arbitrary::Arbitrary))]
#[add_arbitrary_tests(rlp)]
pub struct BlockAccessLists(
/// The requested block access lists as opaque bytes. Unavailable entries are represented by
/// empty byte slices.
pub Vec<Bytes>,
);

View File

@@ -169,10 +169,7 @@ impl NewPooledTransactionHashes {
matches!(version, EthVersion::Eth67 | EthVersion::Eth66)
}
Self::Eth68(_) => {
matches!(
version,
EthVersion::Eth68 | EthVersion::Eth69 | EthVersion::Eth70 | EthVersion::Eth71
)
matches!(version, EthVersion::Eth68 | EthVersion::Eth69 | EthVersion::Eth70)
}
}
}

View File

@@ -110,11 +110,6 @@ impl Capability {
Self::eth(EthVersion::Eth70)
}
/// Returns the [`EthVersion::Eth71`] capability.
pub const fn eth_71() -> Self {
Self::eth(EthVersion::Eth71)
}
/// Whether this is eth v66 protocol.
#[inline]
pub fn is_eth_v66(&self) -> bool {
@@ -145,12 +140,6 @@ impl Capability {
self.name == "eth" && self.version == 70
}
/// Whether this is eth v71.
#[inline]
pub fn is_eth_v71(&self) -> bool {
self.name == "eth" && self.version == 71
}
/// Whether this is any eth version.
#[inline]
pub fn is_eth(&self) -> bool {
@@ -158,8 +147,7 @@ impl Capability {
self.is_eth_v67() ||
self.is_eth_v68() ||
self.is_eth_v69() ||
self.is_eth_v70() ||
self.is_eth_v71()
self.is_eth_v70()
}
}
@@ -179,7 +167,7 @@ impl From<EthVersion> for Capability {
#[cfg(any(test, feature = "arbitrary"))]
impl<'a> arbitrary::Arbitrary<'a> for Capability {
fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result<Self> {
let version = u.int_in_range(66..=71)?; // Valid eth protocol versions are 66-71
let version = u.int_in_range(66..=70)?; // Valid eth protocol versions are 66-70
// Only generate valid eth protocol name for now since it's the only supported protocol
Ok(Self::new_static("eth", version))
}
@@ -195,7 +183,6 @@ pub struct Capabilities {
eth_68: bool,
eth_69: bool,
eth_70: bool,
eth_71: bool,
}
impl Capabilities {
@@ -207,7 +194,6 @@ impl Capabilities {
eth_68: value.iter().any(Capability::is_eth_v68),
eth_69: value.iter().any(Capability::is_eth_v69),
eth_70: value.iter().any(Capability::is_eth_v70),
eth_71: value.iter().any(Capability::is_eth_v71),
inner: value,
}
}
@@ -226,7 +212,7 @@ impl Capabilities {
/// Whether the peer supports `eth` sub-protocol.
#[inline]
pub const fn supports_eth(&self) -> bool {
self.eth_71 || self.eth_70 || self.eth_69 || self.eth_68 || self.eth_67 || self.eth_66
self.eth_70 || self.eth_69 || self.eth_68 || self.eth_67 || self.eth_66
}
/// Whether this peer supports eth v66 protocol.
@@ -258,12 +244,6 @@ impl Capabilities {
pub const fn supports_eth_v70(&self) -> bool {
self.eth_70
}
/// Whether this peer supports eth v71 protocol.
#[inline]
pub const fn supports_eth_v71(&self) -> bool {
self.eth_71
}
}
impl From<Vec<Capability>> for Capabilities {
@@ -288,7 +268,6 @@ impl Decodable for Capabilities {
eth_68: inner.iter().any(Capability::is_eth_v68),
eth_69: inner.iter().any(Capability::is_eth_v69),
eth_70: inner.iter().any(Capability::is_eth_v70),
eth_71: inner.iter().any(Capability::is_eth_v71),
inner,
})
}

View File

@@ -38,9 +38,6 @@ pub use state::*;
pub mod receipts;
pub use receipts::*;
pub mod block_access_lists;
pub use block_access_lists::*;
pub mod disconnect_reason;
pub use disconnect_reason::*;

View File

@@ -1,4 +1,4 @@
//! Implements Ethereum wire protocol for versions 66 through 71.
//! Implements Ethereum wire protocol for versions 66 through 70.
//! Defines structs/enums for messages, request-response pairs, and broadcasts.
//! Handles compatibility with [`EthVersion`].
//!
@@ -7,10 +7,10 @@
//! Reference: [Ethereum Wire Protocol](https://github.com/ethereum/devp2p/blob/master/caps/eth.md).
use super::{
broadcast::NewBlockHashes, BlockAccessLists, BlockBodies, BlockHeaders, GetBlockAccessLists,
GetBlockBodies, GetBlockHeaders, GetNodeData, GetPooledTransactions, GetReceipts,
GetReceipts70, NewPooledTransactionHashes66, NewPooledTransactionHashes68, NodeData,
PooledTransactions, Receipts, Status, StatusEth69, Transactions,
broadcast::NewBlockHashes, BlockBodies, BlockHeaders, GetBlockBodies, GetBlockHeaders,
GetNodeData, GetPooledTransactions, GetReceipts, GetReceipts70, NewPooledTransactionHashes66,
NewPooledTransactionHashes68, NodeData, PooledTransactions, Receipts, Status, StatusEth69,
Transactions,
};
use crate::{
status::StatusMessage, BlockRangeUpdate, EthNetworkPrimitives, EthVersion, NetworkPrimitives,
@@ -168,18 +168,6 @@ impl<N: NetworkPrimitives> ProtocolMessage<N> {
}
EthMessage::BlockRangeUpdate(BlockRangeUpdate::decode(buf)?)
}
EthMessageID::GetBlockAccessLists => {
if version < EthVersion::Eth71 {
return Err(MessageError::Invalid(version, EthMessageID::GetBlockAccessLists))
}
EthMessage::GetBlockAccessLists(RequestPair::decode(buf)?)
}
EthMessageID::BlockAccessLists => {
if version < EthVersion::Eth71 {
return Err(MessageError::Invalid(version, EthMessageID::BlockAccessLists))
}
EthMessage::BlockAccessLists(RequestPair::decode(buf)?)
}
EthMessageID::Other(_) => {
let raw_payload = Bytes::copy_from_slice(buf);
buf.advance(raw_payload.len());
@@ -262,8 +250,6 @@ impl<N: NetworkPrimitives> From<EthBroadcastMessage<N>> for ProtocolBroadcastMes
///
/// The `eth/70` (EIP-7975) keeps the eth/69 status format and introduces partial receipts.
/// requests/responses.
///
/// The `eth/71` draft extends eth/70 with block access list request/response messages.
#[derive(Clone, Debug, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub enum EthMessage<N: NetworkPrimitives = EthNetworkPrimitives> {
@@ -324,8 +310,6 @@ pub enum EthMessage<N: NetworkPrimitives = EthNetworkPrimitives> {
/// `GetReceipts` in EIP-7975 inlines the request id. The type still wraps
/// a [`RequestPair`], but with a custom inline encoding.
GetReceipts70(RequestPair<GetReceipts70>),
/// Represents a `GetBlockAccessLists` request-response pair for eth/71.
GetBlockAccessLists(RequestPair<GetBlockAccessLists>),
/// Represents a Receipts request-response pair.
#[cfg_attr(
feature = "serde",
@@ -348,8 +332,6 @@ pub enum EthMessage<N: NetworkPrimitives = EthNetworkPrimitives> {
/// request id. The type still wraps a [`RequestPair`], but with a custom
/// inline encoding.
Receipts70(RequestPair<Receipts70<N::Receipt>>),
/// Represents a `BlockAccessLists` request-response pair for eth/71.
BlockAccessLists(RequestPair<BlockAccessLists>),
/// Represents a `BlockRangeUpdate` message broadcast to the network.
#[cfg_attr(
feature = "serde",
@@ -382,8 +364,6 @@ impl<N: NetworkPrimitives> EthMessage<N> {
Self::GetReceipts(_) | Self::GetReceipts70(_) => EthMessageID::GetReceipts,
Self::Receipts(_) | Self::Receipts69(_) | Self::Receipts70(_) => EthMessageID::Receipts,
Self::BlockRangeUpdate(_) => EthMessageID::BlockRangeUpdate,
Self::GetBlockAccessLists(_) => EthMessageID::GetBlockAccessLists,
Self::BlockAccessLists(_) => EthMessageID::BlockAccessLists,
Self::Other(msg) => EthMessageID::Other(msg.id as u8),
}
}
@@ -396,7 +376,6 @@ impl<N: NetworkPrimitives> EthMessage<N> {
Self::GetBlockHeaders(_) |
Self::GetReceipts(_) |
Self::GetReceipts70(_) |
Self::GetBlockAccessLists(_) |
Self::GetPooledTransactions(_) |
Self::GetNodeData(_)
)
@@ -410,7 +389,6 @@ impl<N: NetworkPrimitives> EthMessage<N> {
Self::Receipts(_) |
Self::Receipts69(_) |
Self::Receipts70(_) |
Self::BlockAccessLists(_) |
Self::BlockHeaders(_) |
Self::BlockBodies(_) |
Self::NodeData(_)
@@ -465,11 +443,9 @@ impl<N: NetworkPrimitives> Encodable for EthMessage<N> {
Self::NodeData(data) => data.encode(out),
Self::GetReceipts(request) => request.encode(out),
Self::GetReceipts70(request) => request.encode(out),
Self::GetBlockAccessLists(request) => request.encode(out),
Self::Receipts(receipts) => receipts.encode(out),
Self::Receipts69(receipt69) => receipt69.encode(out),
Self::Receipts70(receipt70) => receipt70.encode(out),
Self::BlockAccessLists(block_access_lists) => block_access_lists.encode(out),
Self::BlockRangeUpdate(block_range_update) => block_range_update.encode(out),
Self::Other(unknown) => out.put_slice(&unknown.payload),
}
@@ -492,11 +468,9 @@ impl<N: NetworkPrimitives> Encodable for EthMessage<N> {
Self::NodeData(data) => data.length(),
Self::GetReceipts(request) => request.length(),
Self::GetReceipts70(request) => request.length(),
Self::GetBlockAccessLists(request) => request.length(),
Self::Receipts(receipts) => receipts.length(),
Self::Receipts69(receipt69) => receipt69.length(),
Self::Receipts70(receipt70) => receipt70.length(),
Self::BlockAccessLists(block_access_lists) => block_access_lists.length(),
Self::BlockRangeUpdate(block_range_update) => block_range_update.length(),
Self::Other(unknown) => unknown.length(),
}
@@ -585,14 +559,6 @@ pub enum EthMessageID {
///
/// Introduced in Eth69
BlockRangeUpdate = 0x11,
/// Requests block access lists.
///
/// Introduced in Eth71
GetBlockAccessLists = 0x12,
/// Represents block access lists.
///
/// Introduced in Eth71
BlockAccessLists = 0x13,
/// Represents unknown message types.
Other(u8),
}
@@ -617,17 +583,13 @@ impl EthMessageID {
Self::GetReceipts => 0x0f,
Self::Receipts => 0x10,
Self::BlockRangeUpdate => 0x11,
Self::GetBlockAccessLists => 0x12,
Self::BlockAccessLists => 0x13,
Self::Other(value) => *value, // Return the stored `u8`
}
}
/// Returns the max value for the given version.
pub const fn max(version: EthVersion) -> u8 {
if version.is_eth71() {
Self::BlockAccessLists.to_u8()
} else if version.is_eth69_or_newer() {
if version as u8 >= EthVersion::Eth69 as u8 {
Self::BlockRangeUpdate.to_u8()
} else {
Self::Receipts.to_u8()
@@ -672,8 +634,6 @@ impl Decodable for EthMessageID {
0x0f => Self::GetReceipts,
0x10 => Self::Receipts,
0x11 => Self::BlockRangeUpdate,
0x12 => Self::GetBlockAccessLists,
0x13 => Self::BlockAccessLists,
unknown => Self::Other(*unknown),
};
buf.advance(1);
@@ -702,8 +662,6 @@ impl TryFrom<usize> for EthMessageID {
0x0f => Ok(Self::GetReceipts),
0x10 => Ok(Self::Receipts),
0x11 => Ok(Self::BlockRangeUpdate),
0x12 => Ok(Self::GetBlockAccessLists),
0x13 => Ok(Self::BlockAccessLists),
_ => Err("Invalid message ID"),
}
}
@@ -784,9 +742,8 @@ where
mod tests {
use super::MessageError;
use crate::{
message::RequestPair, BlockAccessLists, EthMessage, EthMessageID, EthNetworkPrimitives,
EthVersion, GetBlockAccessLists, GetNodeData, NodeData, ProtocolMessage,
RawCapabilityMessage,
message::RequestPair, EthMessage, EthMessageID, EthNetworkPrimitives, EthVersion,
GetNodeData, NodeData, ProtocolMessage, RawCapabilityMessage,
};
use alloy_primitives::hex;
use alloy_rlp::{Decodable, Encodable, Error};
@@ -827,60 +784,6 @@ mod tests {
assert!(matches!(msg, Err(MessageError::Invalid(..))));
}
#[test]
fn test_bal_message_version_gating() {
let get_block_access_lists =
EthMessage::<EthNetworkPrimitives>::GetBlockAccessLists(RequestPair {
request_id: 1337,
message: GetBlockAccessLists(vec![]),
});
let buf = encode(ProtocolMessage {
message_type: EthMessageID::GetBlockAccessLists,
message: get_block_access_lists,
});
let msg = ProtocolMessage::<EthNetworkPrimitives>::decode_message(
EthVersion::Eth70,
&mut &buf[..],
);
assert!(matches!(
msg,
Err(MessageError::Invalid(EthVersion::Eth70, EthMessageID::GetBlockAccessLists))
));
let block_access_lists =
EthMessage::<EthNetworkPrimitives>::BlockAccessLists(RequestPair {
request_id: 1337,
message: BlockAccessLists(vec![]),
});
let buf = encode(ProtocolMessage {
message_type: EthMessageID::BlockAccessLists,
message: block_access_lists,
});
let msg = ProtocolMessage::<EthNetworkPrimitives>::decode_message(
EthVersion::Eth70,
&mut &buf[..],
);
assert!(matches!(
msg,
Err(MessageError::Invalid(EthVersion::Eth70, EthMessageID::BlockAccessLists))
));
}
#[test]
fn test_bal_message_eth71_roundtrip() {
let msg = ProtocolMessage::from(EthMessage::<EthNetworkPrimitives>::GetBlockAccessLists(
RequestPair { request_id: 42, message: GetBlockAccessLists(vec![]) },
));
let encoded = encode(msg.clone());
let decoded = ProtocolMessage::<EthNetworkPrimitives>::decode_message(
EthVersion::Eth71,
&mut &encoded[..],
)
.unwrap();
assert_eq!(decoded, msg);
}
#[test]
fn request_pair_encode() {
let request_pair = RequestPair { request_id: 1337, message: vec![5u8] };

View File

@@ -29,8 +29,6 @@ pub enum EthVersion {
Eth69 = 69,
/// The `eth` protocol version 70.
Eth70 = 70,
/// The `eth` protocol version 71.
Eth71 = 71,
}
impl EthVersion {
@@ -64,19 +62,9 @@ impl EthVersion {
pub const fn is_eth70(&self) -> bool {
matches!(self, Self::Eth70)
}
/// Returns true if the version is eth/71
pub const fn is_eth71(&self) -> bool {
matches!(self, Self::Eth71)
}
/// Returns true if the version is eth/69 or newer.
pub const fn is_eth69_or_newer(&self) -> bool {
matches!(self, Self::Eth69 | Self::Eth70 | Self::Eth71)
}
}
/// RLP encodes `EthVersion` as a single byte (66-71).
/// RLP encodes `EthVersion` as a single byte (66-69).
impl Encodable for EthVersion {
fn encode(&self, out: &mut dyn BufMut) {
(*self as u8).encode(out)
@@ -88,7 +76,7 @@ impl Encodable for EthVersion {
}
/// RLP decodes a single byte into `EthVersion`.
/// Returns error if byte is not a valid version (66-71).
/// Returns error if byte is not a valid version (66-69).
impl Decodable for EthVersion {
fn decode(buf: &mut &[u8]) -> alloy_rlp::Result<Self> {
let version = u8::decode(buf)?;
@@ -116,7 +104,6 @@ impl TryFrom<&str> for EthVersion {
"68" => Ok(Self::Eth68),
"69" => Ok(Self::Eth69),
"70" => Ok(Self::Eth70),
"71" => Ok(Self::Eth71),
_ => Err(ParseVersionError(s.to_string())),
}
}
@@ -142,7 +129,6 @@ impl TryFrom<u8> for EthVersion {
68 => Ok(Self::Eth68),
69 => Ok(Self::Eth69),
70 => Ok(Self::Eth70),
71 => Ok(Self::Eth71),
_ => Err(ParseVersionError(u.to_string())),
}
}
@@ -173,7 +159,6 @@ impl From<EthVersion> for &'static str {
EthVersion::Eth68 => "68",
EthVersion::Eth69 => "69",
EthVersion::Eth70 => "70",
EthVersion::Eth71 => "71",
}
}
}
@@ -231,7 +216,6 @@ mod tests {
assert_eq!(EthVersion::Eth68, EthVersion::try_from("68").unwrap());
assert_eq!(EthVersion::Eth69, EthVersion::try_from("69").unwrap());
assert_eq!(EthVersion::Eth70, EthVersion::try_from("70").unwrap());
assert_eq!(EthVersion::Eth71, EthVersion::try_from("71").unwrap());
}
#[test]
@@ -241,7 +225,6 @@ mod tests {
assert_eq!(EthVersion::Eth68, "68".parse().unwrap());
assert_eq!(EthVersion::Eth69, "69".parse().unwrap());
assert_eq!(EthVersion::Eth70, "70".parse().unwrap());
assert_eq!(EthVersion::Eth71, "71".parse().unwrap());
}
#[test]
@@ -252,7 +235,6 @@ mod tests {
EthVersion::Eth68,
EthVersion::Eth69,
EthVersion::Eth70,
EthVersion::Eth71,
];
for version in versions {
@@ -271,7 +253,6 @@ mod tests {
(68_u8, Ok(EthVersion::Eth68)),
(69_u8, Ok(EthVersion::Eth69)),
(70_u8, Ok(EthVersion::Eth70)),
(71_u8, Ok(EthVersion::Eth71)),
(65_u8, Err(RlpError::Custom("invalid eth version"))),
];

View File

@@ -294,8 +294,7 @@ mod tests {
use alloy_primitives::B256;
use alloy_rlp::Encodable;
use reth_eth_wire_types::{
message::RequestPair, GetAccountRangeMessage, GetBlockAccessLists, GetBlockHeaders,
HeadersDirection,
message::RequestPair, GetAccountRangeMessage, GetBlockHeaders, HeadersDirection,
};
// Helper to create eth message and its bytes
@@ -420,40 +419,4 @@ mod tests {
let snap_boundary_result = inner.decode_message(snap_boundary_bytes);
assert!(snap_boundary_result.is_err());
}
#[test]
fn test_eth70_message_id_0x12_is_snap() {
let inner = EthSnapStreamInner::<EthNetworkPrimitives>::new(EthVersion::Eth70);
let snap_msg = SnapProtocolMessage::GetAccountRange(GetAccountRangeMessage {
request_id: 1,
root_hash: B256::default(),
starting_hash: B256::default(),
limit_hash: B256::default(),
response_bytes: 1000,
});
let encoded = inner.encode_snap_message(snap_msg);
assert_eq!(encoded[0], EthMessageID::message_count(EthVersion::Eth70));
let decoded = inner.decode_message(BytesMut::from(&encoded[..])).unwrap();
assert!(matches!(decoded, EthSnapMessage::Snap(_)));
}
#[test]
fn test_eth71_message_id_0x12_is_eth() {
let inner = EthSnapStreamInner::<EthNetworkPrimitives>::new(EthVersion::Eth71);
let eth_msg = EthMessage::<EthNetworkPrimitives>::GetBlockAccessLists(RequestPair {
request_id: 1,
message: GetBlockAccessLists(vec![B256::ZERO]),
});
let protocol_msg = ProtocolMessage::from(eth_msg.clone());
let mut buf = Vec::new();
protocol_msg.encode(&mut buf);
let decoded = inner.decode_message(BytesMut::from(&buf[..])).unwrap();
let EthSnapMessage::Eth(decoded_eth) = decoded else {
panic!("expected eth message");
};
assert_eq!(decoded_eth, eth_msg);
}
}

View File

@@ -84,7 +84,5 @@ mod tests {
assert_eq!(Protocol::eth(EthVersion::Eth67).messages(), 17);
assert_eq!(Protocol::eth(EthVersion::Eth68).messages(), 17);
assert_eq!(Protocol::eth(EthVersion::Eth69).messages(), 18);
assert_eq!(Protocol::eth(EthVersion::Eth70).messages(), 18);
assert_eq!(Protocol::eth(EthVersion::Eth71).messages(), 20);
}
}

View File

@@ -1,11 +1,10 @@
//! API related to listening for network events.
use reth_eth_wire_types::{
message::RequestPair, BlockAccessLists, BlockBodies, BlockHeaders, Capabilities,
DisconnectReason, EthMessage, EthNetworkPrimitives, EthVersion, GetBlockAccessLists,
GetBlockBodies, GetBlockHeaders, GetNodeData, GetPooledTransactions, GetReceipts,
GetReceipts70, NetworkPrimitives, NodeData, PooledTransactions, Receipts, Receipts69,
Receipts70, UnifiedStatus,
message::RequestPair, BlockBodies, BlockHeaders, Capabilities, DisconnectReason, EthMessage,
EthNetworkPrimitives, EthVersion, GetBlockBodies, GetBlockHeaders, GetNodeData,
GetPooledTransactions, GetReceipts, GetReceipts70, NetworkPrimitives, NodeData,
PooledTransactions, Receipts, Receipts69, Receipts70, UnifiedStatus,
};
use reth_ethereum_forks::ForkId;
use reth_network_p2p::error::{RequestError, RequestResult};
@@ -253,15 +252,6 @@ pub enum PeerRequest<N: NetworkPrimitives = EthNetworkPrimitives> {
/// The channel to send the response for receipts.
response: oneshot::Sender<RequestResult<Receipts70<N::Receipt>>>,
},
/// Requests block access lists from the peer.
///
/// The response should be sent through the channel.
GetBlockAccessLists {
/// The request for block access lists.
request: GetBlockAccessLists,
/// The channel to send the response for block access lists.
response: oneshot::Sender<RequestResult<BlockAccessLists>>,
},
}
// === impl PeerRequest ===
@@ -282,19 +272,9 @@ impl<N: NetworkPrimitives> PeerRequest<N> {
Self::GetReceipts { response, .. } => response.send(Err(err)).ok(),
Self::GetReceipts69 { response, .. } => response.send(Err(err)).ok(),
Self::GetReceipts70 { response, .. } => response.send(Err(err)).ok(),
Self::GetBlockAccessLists { response, .. } => response.send(Err(err)).ok(),
};
}
/// Returns true if this request is supported for the negotiated eth protocol version.
#[inline]
pub fn is_supported_by_eth_version(&self, version: EthVersion) -> bool {
match self {
Self::GetBlockAccessLists { .. } => version >= EthVersion::Eth71,
_ => true,
}
}
/// Returns the [`EthMessage`] for this type
pub fn create_request_message(&self, request_id: u64) -> EthMessage<N> {
match self {
@@ -319,12 +299,6 @@ impl<N: NetworkPrimitives> PeerRequest<N> {
Self::GetReceipts70 { request, .. } => {
EthMessage::GetReceipts70(RequestPair { request_id, message: request.clone() })
}
Self::GetBlockAccessLists { request, .. } => {
EthMessage::GetBlockAccessLists(RequestPair {
request_id,
message: request.clone(),
})
}
}
}
@@ -375,18 +349,3 @@ impl<R> fmt::Debug for PeerRequestSender<R> {
f.debug_struct("PeerRequestSender").field("peer_id", &self.peer_id).finish_non_exhaustive()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_get_block_access_lists_version_support() {
let (tx, _rx) = oneshot::channel();
let req: PeerRequest<EthNetworkPrimitives> =
PeerRequest::GetBlockAccessLists { request: GetBlockAccessLists(vec![]), response: tx };
assert!(!req.is_supported_by_eth_version(EthVersion::Eth70));
assert!(req.is_supported_by_eth_version(EthVersion::Eth71));
}
}

View File

@@ -6,13 +6,12 @@ use crate::{
};
use alloy_consensus::{BlockHeader, ReceiptWithBloom};
use alloy_eips::BlockHashOrNumber;
use alloy_primitives::Bytes;
use alloy_rlp::Encodable;
use futures::StreamExt;
use reth_eth_wire::{
BlockAccessLists, BlockBodies, BlockHeaders, EthNetworkPrimitives, GetBlockAccessLists,
GetBlockBodies, GetBlockHeaders, GetNodeData, GetReceipts, GetReceipts70, HeadersDirection,
NetworkPrimitives, NodeData, Receipts, Receipts69, Receipts70,
BlockBodies, BlockHeaders, EthNetworkPrimitives, GetBlockBodies, GetBlockHeaders, GetNodeData,
GetReceipts, GetReceipts70, HeadersDirection, NetworkPrimitives, NodeData, Receipts,
Receipts69, Receipts70,
};
use reth_network_api::test_utils::PeersHandle;
use reth_network_p2p::error::RequestResult;
@@ -282,19 +281,6 @@ where
let _ = response.send(Ok(Receipts70 { last_block_incomplete, receipts }));
}
/// Handles [`GetBlockAccessLists`] queries.
///
/// For now this returns one empty BAL per requested hash.
fn on_block_access_lists_request(
&self,
_peer_id: PeerId,
request: GetBlockAccessLists,
response: oneshot::Sender<RequestResult<BlockAccessLists>>,
) {
let access_lists = request.0.into_iter().map(|_| Bytes::new()).collect();
let _ = response.send(Ok(BlockAccessLists(access_lists)));
}
#[inline]
fn get_receipts_response<T, F>(&self, request: GetReceipts, transform_fn: F) -> Vec<Vec<T>>
where
@@ -366,9 +352,6 @@ where
IncomingEthRequest::GetReceipts70 { peer_id, request, response } => {
this.on_receipts70_request(peer_id, request, response)
}
IncomingEthRequest::GetBlockAccessLists { peer_id, request, response } => {
this.on_block_access_lists_request(peer_id, request, response)
}
}
},
);
@@ -454,15 +437,4 @@ pub enum IncomingEthRequest<N: NetworkPrimitives = EthNetworkPrimitives> {
/// The channel sender for the response containing Receipts70.
response: oneshot::Sender<RequestResult<Receipts70<N::Receipt>>>,
},
/// Request Block Access Lists from the peer.
///
/// The response should be sent through the channel.
GetBlockAccessLists {
/// The ID of the peer to request block access lists from.
peer_id: PeerId,
/// The requested block hashes.
request: GetBlockAccessLists,
/// The channel sender for the response containing block access lists.
response: oneshot::Sender<RequestResult<BlockAccessLists>>,
},
}

View File

@@ -551,13 +551,6 @@ impl<N: NetworkPrimitives> NetworkManager<N> {
response,
})
}
PeerRequest::GetBlockAccessLists { request, response } => {
self.delegate_eth_request(IncomingEthRequest::GetBlockAccessLists {
peer_id,
request,
response,
})
}
PeerRequest::GetPooledTransactions { request, response } => {
self.notify_tx_manager(NetworkTransactionEvent::GetPooledTransactions {
peer_id,

View File

@@ -3,7 +3,7 @@
//! An `RLPx` stream is multiplexed via the prepended message-id of a framed message.
//! Capabilities are exchanged via the `RLPx` `Hello` message as pairs of `(id, version)`, <https://github.com/ethereum/devp2p/blob/master/rlpx.md#capability-messaging>
use crate::types::{BlockAccessLists, Receipts69, Receipts70};
use crate::types::{Receipts69, Receipts70};
use alloy_consensus::{BlockHeader, ReceiptWithBloom};
use alloy_primitives::{Bytes, B256};
use futures::FutureExt;
@@ -121,11 +121,6 @@ pub enum PeerResponse<N: NetworkPrimitives = EthNetworkPrimitives> {
/// The receiver channel for the response to a receipts request.
response: oneshot::Receiver<RequestResult<Receipts70<N::Receipt>>>,
},
/// Represents a response to a request for block access lists.
BlockAccessLists {
/// The receiver channel for the response to a block access lists request.
response: oneshot::Receiver<RequestResult<BlockAccessLists>>,
},
}
// === impl PeerResponse ===
@@ -165,10 +160,6 @@ impl<N: NetworkPrimitives> PeerResponse<N> {
Ok(res) => PeerResponseResult::Receipts70(res),
Err(err) => PeerResponseResult::Receipts70(Err(err.into())),
},
Self::BlockAccessLists { response } => match ready!(response.poll_unpin(cx)) {
Ok(res) => PeerResponseResult::BlockAccessLists(res),
Err(err) => PeerResponseResult::BlockAccessLists(Err(err.into())),
},
};
Poll::Ready(res)
}
@@ -191,8 +182,6 @@ pub enum PeerResponseResult<N: NetworkPrimitives = EthNetworkPrimitives> {
Receipts69(RequestResult<Vec<Vec<N::Receipt>>>),
/// Represents a result containing receipts or an error for eth/70.
Receipts70(RequestResult<Receipts70<N::Receipt>>),
/// Represents a result containing block access lists or an error.
BlockAccessLists(RequestResult<BlockAccessLists>),
}
// === impl PeerResponseResult ===
@@ -237,13 +226,6 @@ impl<N: NetworkPrimitives> PeerResponseResult<N> {
}
Err(err) => Err(err),
},
Self::BlockAccessLists(resp) => match resp {
Ok(res) => {
let request = RequestPair { request_id: id, message: res };
Ok(EthMessage::BlockAccessLists(request))
}
Err(err) => Err(err),
},
}
}
@@ -257,7 +239,6 @@ impl<N: NetworkPrimitives> PeerResponseResult<N> {
Self::Receipts(res) => res.as_ref().err(),
Self::Receipts69(res) => res.as_ref().err(),
Self::Receipts70(res) => res.as_ref().err(),
Self::BlockAccessLists(res) => res.as_ref().err(),
}
}

View File

@@ -282,12 +282,6 @@ impl<N: NetworkPrimitives> ActiveSession<N> {
EthMessage::Receipts70(resp) => {
on_response!(resp, GetReceipts70)
}
EthMessage::GetBlockAccessLists(req) => {
on_request!(req, BlockAccessLists, GetBlockAccessLists)
}
EthMessage::BlockAccessLists(resp) => {
on_response!(resp, GetBlockAccessLists)
}
EthMessage::BlockRangeUpdate(msg) => {
// Validate that earliest <= latest according to the spec
if msg.earliest > msg.latest {
@@ -322,22 +316,9 @@ impl<N: NetworkPrimitives> ActiveSession<N> {
/// Handle an internal peer request that will be sent to the remote.
fn on_internal_peer_request(&mut self, request: PeerRequest<N>, deadline: Instant) {
let version = self.conn.version();
if !Self::is_request_supported_for_version(&request, version) {
debug!(
target: "net",
?request,
peer_id=?self.remote_peer_id,
?version,
"Request not supported for negotiated eth version",
);
request.send_err_response(RequestError::UnsupportedCapability);
return;
}
let request_id = self.next_id();
trace!(?request, peer_id=?self.remote_peer_id, ?request_id, "sending request to peer");
let msg = request.create_request_message(request_id).map_versioned(version);
let msg = request.create_request_message(request_id).map_versioned(self.conn.version());
self.queued_outgoing.push_back(msg.into());
let req = InflightRequest {
@@ -348,11 +329,6 @@ impl<N: NetworkPrimitives> ActiveSession<N> {
self.inflight_requests.insert(request_id, req);
}
#[inline]
fn is_request_supported_for_version(request: &PeerRequest<N>, version: EthVersion) -> bool {
request.is_supported_by_eth_version(version)
}
/// Handle a message received from the internal network
fn on_internal_peer_message(&mut self, msg: PeerMessage<N>) {
match msg {
@@ -962,9 +938,9 @@ mod tests {
use reth_chainspec::MAINNET;
use reth_ecies::stream::ECIESStream;
use reth_eth_wire::{
handshake::EthHandshake, EthNetworkPrimitives, EthStream, GetBlockAccessLists,
GetBlockBodies, HelloMessageWithProtocols, P2PStream, StatusBuilder, UnauthedEthStream,
UnauthedP2PStream, UnifiedStatus,
handshake::EthHandshake, EthNetworkPrimitives, EthStream, GetBlockBodies,
HelloMessageWithProtocols, P2PStream, StatusBuilder, UnauthedEthStream, UnauthedP2PStream,
UnifiedStatus,
};
use reth_ethereum_forks::EthereumHardfork;
use reth_network_peers::pk2id;
@@ -1264,22 +1240,6 @@ mod tests {
}
}
#[test]
fn test_reject_bal_request_for_eth70() {
let (tx, _rx) = oneshot::channel();
let request: PeerRequest<EthNetworkPrimitives> =
PeerRequest::GetBlockAccessLists { request: GetBlockAccessLists(vec![]), response: tx };
assert!(!ActiveSession::<EthNetworkPrimitives>::is_request_supported_for_version(
&request,
EthVersion::Eth70
));
assert!(ActiveSession::<EthNetworkPrimitives>::is_request_supported_for_version(
&request,
EthVersion::Eth71
));
}
#[tokio::test(flavor = "multi_thread")]
async fn test_keep_alive() {
let mut builder = SessionBuilder::default();

View File

@@ -908,7 +908,7 @@ pub(crate) async fn start_pending_incoming_session<N: NetworkPrimitives>(
}
/// Starts the authentication process for a connection initiated by a remote peer.
#[instrument(level = "trace", target = "net::network", skip_all, fields(%remote_addr, peer_id = ?remote_peer_id))]
#[instrument(level = "trace", target = "net::network", skip_all, fields(%remote_addr, peer_id))]
#[expect(clippy::too_many_arguments)]
async fn start_pending_outbound_session<N: NetworkPrimitives>(
handshake: Arc<dyn EthRlpxHandshake>,

View File

@@ -1949,7 +1949,7 @@ impl PooledTransactionsHashesBuilder {
fn new(version: EthVersion) -> Self {
match version {
EthVersion::Eth66 | EthVersion::Eth67 => Self::Eth66(Default::default()),
EthVersion::Eth68 | EthVersion::Eth69 | EthVersion::Eth70 | EthVersion::Eth71 => {
EthVersion::Eth68 | EthVersion::Eth69 | EthVersion::Eth70 => {
Self::Eth68(Default::default())
}
}

View File

@@ -24,6 +24,7 @@ reth-db-common.workspace = true
reth-downloaders.workspace = true
reth-engine-local.workspace = true
reth-engine-primitives.workspace = true
reth-engine-service.workspace = true
reth-engine-tree.workspace = true
reth-engine-util.workspace = true
reth-evm.workspace = true

View File

@@ -11,10 +11,10 @@ use crate::{
use alloy_consensus::BlockHeader;
use futures::{stream_select, FutureExt, StreamExt};
use reth_chainspec::{EthChainSpec, EthereumHardforks};
use reth_engine_service::service::{ChainEvent, EngineService};
use reth_engine_tree::{
chain::{ChainEvent, FromOrchestrator},
engine::{EngineApiKind, EngineApiRequest, EngineRequestHandler},
launch::build_engine_orchestrator,
chain::FromOrchestrator,
engine::{EngineApiRequest, EngineRequestHandler},
tree::TreeConfig,
};
use reth_engine_util::EngineMessageStreamExt;
@@ -219,15 +219,9 @@ impl EngineNodeLauncher {
// during this run.
.maybe_store_messages(node_config.debug.engine_api_store.clone());
let engine_kind = if ctx.chain_spec().is_optimism() {
EngineApiKind::OpStack
} else {
EngineApiKind::Ethereum
};
let mut orchestrator = build_engine_orchestrator(
engine_kind,
let mut engine_service = EngineService::new(
consensus.clone(),
ctx.chain_spec(),
network_client.clone(),
Box::pin(consensus_engine_stream),
pipeline,
@@ -296,7 +290,7 @@ impl EngineNodeLauncher {
if let Some(initial_target) = initial_target {
debug!(target: "reth::cli", %initial_target, "start backfill sync");
// network_handle's sync state is already initialized at Syncing
orchestrator.start_backfill_sync(initial_target);
engine_service.orchestrator_mut().start_backfill_sync(initial_target);
} else if startup_sync_state_idle {
network_handle.update_sync_state(SyncState::Idle);
}
@@ -309,7 +303,7 @@ impl EngineNodeLauncher {
// the CL
loop {
tokio::select! {
event = orchestrator.next() => {
event = engine_service.next() => {
let Some(event) = event else { break };
debug!(target: "reth::cli", "Event: {event}");
match event {
@@ -359,13 +353,13 @@ impl EngineNodeLauncher {
payload = built_payloads.select_next_some() => {
if let Some(executed_block) = payload.executed_block() {
debug!(target: "reth::cli", block=?executed_block.recovered_block.num_hash(), "inserting built payload");
orchestrator.handler_mut().handler_mut().on_event(EngineApiRequest::InsertExecutedBlock(executed_block.into_executed_payload()).into());
engine_service.orchestrator_mut().handler_mut().handler_mut().on_event(EngineApiRequest::InsertExecutedBlock(executed_block.into_executed_payload()).into());
}
}
shutdown_req = &mut shutdown_rx => {
if let Ok(req) = shutdown_req {
debug!(target: "reth::cli", "received engine shutdown request");
orchestrator.handler_mut().handler_mut().on_event(
engine_service.orchestrator_mut().handler_mut().handler_mut().on_event(
FromOrchestrator::Terminate { tx: req.done_tx }.into()
);
}

View File

@@ -354,7 +354,7 @@ pub(crate) fn parse_receipts_log_filter(
) -> Result<ReceiptsLogPruneConfig, ReceiptsLogError> {
let mut config = BTreeMap::new();
// Split out each of the filters.
let filters = value.split(',').map(str::trim);
let filters = value.split(',');
for filter in filters {
let parts: Vec<&str> = filter.split(':').collect();
if parts.len() < 2 {
@@ -450,23 +450,6 @@ mod tests {
assert_eq!(config.0.get(&addr3), Some(&PruneMode::Before(5000000)));
}
#[test]
fn test_parse_receipts_log_filter_with_spaces() {
// Verify that spaces after commas are handled correctly
let filters = "0x0000000000000000000000000000000000000001:full, 0x0000000000000000000000000000000000000002:distance:1000";
let result = parse_receipts_log_filter(filters);
assert!(result.is_ok());
let config = result.unwrap();
assert_eq!(config.0.len(), 2);
let addr1: Address = "0x0000000000000000000000000000000000000001".parse().unwrap();
let addr2: Address = "0x0000000000000000000000000000000000000002".parse().unwrap();
assert_eq!(config.0.get(&addr1), Some(&PruneMode::Full));
assert_eq!(config.0.get(&addr2), Some(&PruneMode::Distance(1000)));
}
#[test]
fn test_parse_receipts_log_filter_invalid_filter_format() {
let result = parse_receipts_log_filter("invalid_format");

View File

@@ -20,7 +20,6 @@ use reth_primitives_traits::TxTy;
use reth_rpc_convert::RpcTxReq;
use reth_rpc_eth_types::FillTransaction;
use reth_rpc_server_types::{result::internal_rpc_err, ToRpcResult};
use std::collections::HashMap;
use tracing::trace;
/// Helper trait, unifies functionality that must be supported to implement all RPC methods for
@@ -202,14 +201,6 @@ pub trait EthApi<
block_number: Option<BlockId>,
) -> RpcResult<B256>;
/// Returns values from multiple storage positions across multiple addresses.
#[method(name = "getStorageValues")]
async fn storage_values(
&self,
requests: HashMap<Address, Vec<JsonStorageKey>>,
block_number: Option<BlockId>,
) -> RpcResult<HashMap<Address, Vec<B256>>>;
/// Returns the number of transactions sent from an address at given block number.
#[method(name = "getTransactionCount")]
async fn transaction_count(
@@ -660,16 +651,6 @@ where
Ok(EthState::storage_at(self, address, index, block_number).await?)
}
/// Handler for: `eth_getStorageValues`
async fn storage_values(
&self,
requests: HashMap<Address, Vec<JsonStorageKey>>,
block_number: Option<BlockId>,
) -> RpcResult<HashMap<Address, Vec<B256>>> {
trace!(target: "rpc::eth", ?block_number, "Serving eth_getStorageValues");
Ok(EthState::storage_values(self, requests, block_number).await?)
}
/// Handler for: `eth_getTransactionCount`
async fn transaction_count(
&self,

View File

@@ -16,13 +16,11 @@ use reth_rpc_convert::RpcConvert;
use reth_rpc_eth_types::{
error::FromEvmError, EthApiError, PendingBlockEnv, RpcInvalidTransactionError,
};
use reth_rpc_server_types::constants::DEFAULT_MAX_STORAGE_VALUES_SLOTS;
use reth_storage_api::{
BlockIdReader, BlockNumReader, BlockReaderIdExt, StateProvider, StateProviderBox,
StateProviderFactory,
};
use reth_transaction_pool::TransactionPool;
use std::collections::HashMap;
/// Helper methods for `eth_` methods relating to state (accounts).
pub trait EthState: LoadState + SpawnBlocking {
@@ -85,47 +83,6 @@ pub trait EthState: LoadState + SpawnBlocking {
})
}
/// Returns values from multiple storage positions across multiple addresses.
///
/// Enforces a cap on total slot count (sum of all slot arrays) and returns an error if
/// exceeded.
fn storage_values(
&self,
requests: HashMap<Address, Vec<JsonStorageKey>>,
block_id: Option<BlockId>,
) -> impl Future<Output = Result<HashMap<Address, Vec<B256>>, Self::Error>> + Send {
async move {
let total_slots: usize = requests.values().map(|slots| slots.len()).sum();
if total_slots > DEFAULT_MAX_STORAGE_VALUES_SLOTS {
return Err(Self::Error::from_eth_err(EthApiError::InvalidParams(
format!(
"total slot count {total_slots} exceeds limit {DEFAULT_MAX_STORAGE_VALUES_SLOTS}",
),
)));
}
self.spawn_blocking_io_fut(move |this| async move {
let state = this.state_at_block_id_or_latest(block_id).await?;
let mut result = HashMap::with_capacity(requests.len());
for (address, slots) in requests {
let mut values = Vec::with_capacity(slots.len());
for slot in &slots {
let value = state
.storage(address, slot.as_b256())
.map_err(Self::Error::from_eth_err)?
.unwrap_or_default();
values.push(B256::new(value.to_be_bytes()));
}
result.insert(address, values);
}
Ok(result)
})
.await
}
}
/// Returns values stored of given account, with Merkle-proof, at given blocknumber.
fn get_proof(
&self,

View File

@@ -55,9 +55,6 @@ pub const DEFAULT_ENGINE_API_IPC_ENDPOINT: &str = "/tmp/reth_engine_api.ipc";
/// The default limit for blocks count in `eth_simulateV1`.
pub const DEFAULT_MAX_SIMULATE_BLOCKS: u64 = 256;
/// The default maximum number of total storage slots for `eth_getStorageValues`.
pub const DEFAULT_MAX_STORAGE_VALUES_SLOTS: usize = 1024;
/// The default eth historical proof window.
pub const DEFAULT_ETH_PROOF_WINDOW: u64 = 0;

View File

@@ -183,9 +183,10 @@ where
// channels used to return result of account hashing
for chunk in &accounts_cursor.walk(None)?.chunks(WORKER_CHUNK_SIZE) {
// An _unordered_ channel to receive results from a rayon job
let chunk = chunk.collect::<Result<Vec<_>, _>>()?;
let (tx, rx) = mpsc::sync_channel(chunk.len());
let (tx, rx) = mpsc::channel();
channels.push(rx);
let chunk = chunk.collect::<Result<Vec<_>, _>>()?;
// Spawn the hashing task onto the global rayon pool
rayon::spawn(move || {
for (address, account) in chunk {

View File

@@ -110,9 +110,10 @@ where
for chunk in &storage_cursor.walk(None)?.chunks(WORKER_CHUNK_SIZE) {
// An _unordered_ channel to receive results from a rayon job
let chunk = chunk.collect::<Result<Vec<_>, _>>()?;
let (tx, rx) = mpsc::sync_channel(chunk.len());
let (tx, rx) = mpsc::channel();
channels.push(rx);
let chunk = chunk.collect::<Result<Vec<_>, _>>()?;
// Spawn the hashing task onto the global rayon pool
rayon::spawn(move || {
// Cache hashed address since PlainStorageState is sorted by address

View File

@@ -34,7 +34,7 @@ const BATCH_SIZE: usize = 100_000;
const WORKER_CHUNK_SIZE: usize = 100;
/// Type alias for a sender that transmits the result of sender recovery.
type RecoveryResultSender = mpsc::SyncSender<Result<(u64, Address), Box<SenderRecoveryStageError>>>;
type RecoveryResultSender = mpsc::Sender<Result<(u64, Address), Box<SenderRecoveryStageError>>>;
/// The sender recovery stage iterates over existing transactions,
/// recovers the transaction signer and stores them
@@ -245,7 +245,7 @@ where
.step_by(WORKER_CHUNK_SIZE)
.map(|start| {
let range = start..std::cmp::min(start + WORKER_CHUNK_SIZE as u64, tx_range.end);
let (tx, rx) = mpsc::sync_channel((range.end - range.start) as usize);
let (tx, rx) = mpsc::channel();
// Range and channel sender will be sent to rayon worker
((range, tx), rx)
})

View File

@@ -282,7 +282,7 @@ where
level = "debug",
target = "providers::state::overlay",
skip_all,
fields(%db_tip_block)
fields(db_tip_block)
)]
fn calculate_overlay(
&self,

View File

@@ -39,7 +39,7 @@ impl<T: BlockExecutionWriter> BlockExecutionWriter for &T {
}
/// Block Writer
#[auto_impl::auto_impl(&, Arc, Box)]
#[auto_impl::auto_impl(&, Box)]
pub trait BlockWriter {
/// The body this writer can write.
type Block: Block;

View File

@@ -9,7 +9,7 @@ use reth_primitives_traits::{Account, StorageEntry};
use reth_storage_errors::provider::ProviderResult;
/// Hashing Writer
#[auto_impl(&, Arc, Box)]
#[auto_impl(&, Box)]
pub trait HashingWriter: Send {
/// Unwind and clear account hashing.
///

View File

@@ -7,7 +7,7 @@ use reth_db_models::AccountBeforeTx;
use reth_storage_errors::provider::ProviderResult;
/// History Writer
#[auto_impl(&, Arc, Box)]
#[auto_impl(&, Box)]
pub trait HistoryWriter: Send {
/// Unwind and clear account history indices.
///

View File

@@ -11,7 +11,7 @@ pub mod keys {
}
/// Client trait for reading node metadata from the database.
#[auto_impl::auto_impl(&, Arc)]
#[auto_impl::auto_impl(&)]
pub trait MetadataProvider: Send {
/// Get a metadata value by key
fn get_metadata(&self, key: &str) -> ProviderResult<Option<Vec<u8>>>;

View File

@@ -3,7 +3,7 @@ use reth_prune_types::{PruneCheckpoint, PruneSegment};
use reth_storage_errors::provider::ProviderResult;
/// The trait for fetching prune checkpoint related data.
#[auto_impl::auto_impl(&, Arc)]
#[auto_impl::auto_impl(&)]
pub trait PruneCheckpointReader: Send {
/// Fetch the prune checkpoint for the given segment.
fn get_prune_checkpoint(
@@ -16,7 +16,7 @@ pub trait PruneCheckpointReader: Send {
}
/// The trait for updating prune checkpoint related data.
#[auto_impl::auto_impl(&, Arc)]
#[auto_impl::auto_impl(&)]
pub trait PruneCheckpointWriter {
/// Save prune checkpoint.
fn save_prune_checkpoint(

View File

@@ -4,7 +4,7 @@ use reth_stages_types::{StageCheckpoint, StageId};
use reth_storage_errors::provider::ProviderResult;
/// The trait for fetching stage checkpoint related data.
#[auto_impl::auto_impl(&, Arc)]
#[auto_impl::auto_impl(&)]
pub trait StageCheckpointReader: Send {
/// Fetch the checkpoint for the given stage.
fn get_stage_checkpoint(&self, id: StageId) -> ProviderResult<Option<StageCheckpoint>>;
@@ -18,7 +18,7 @@ pub trait StageCheckpointReader: Send {
}
/// The trait for updating stage checkpoint related data.
#[auto_impl::auto_impl(&, Arc)]
#[auto_impl::auto_impl(&)]
pub trait StageCheckpointWriter {
/// Save stage checkpoint.
fn save_stage_checkpoint(&self, id: StageId, checkpoint: StageCheckpoint)

View File

@@ -14,7 +14,7 @@ use reth_trie_common::HashedPostState;
use revm_database::BundleState;
/// This just receives state, or [`ExecutionOutcome`], from the provider
#[auto_impl::auto_impl(&, Arc, Box)]
#[auto_impl::auto_impl(&, Box)]
pub trait StateReader: Send {
/// Receipt type in [`ExecutionOutcome`].
type Receipt: Send + Sync;
@@ -30,7 +30,7 @@ pub trait StateReader: Send {
pub type StateProviderBox = Box<dyn StateProvider + Send + 'static>;
/// An abstraction for a type that provides state data.
#[auto_impl(&, Arc, Box)]
#[auto_impl(&, Box)]
pub trait StateProvider:
BlockHashReader
+ AccountReader
@@ -110,14 +110,14 @@ pub trait AccountInfoReader: AccountReader + BytecodeReader {}
impl<T: AccountReader + BytecodeReader> AccountInfoReader for T {}
/// Trait that provides the hashed state from various sources.
#[auto_impl(&, Arc, Box)]
#[auto_impl(&, Box)]
pub trait HashedPostStateProvider {
/// Returns the `HashedPostState` of the provided [`BundleState`].
fn hashed_post_state(&self, bundle_state: &BundleState) -> HashedPostState;
}
/// Trait for reading bytecode associated with a given code hash.
#[auto_impl(&, Arc, Box)]
#[auto_impl(&, Box)]
pub trait BytecodeReader {
/// Get account code by its hash
fn bytecode_by_hash(&self, code_hash: &B256) -> ProviderResult<Option<Bytecode>>;

View File

@@ -1,7 +1,7 @@
use reth_db_api::table::Table;
/// The trait for fetching provider statistics.
#[auto_impl::auto_impl(&, Arc)]
#[auto_impl::auto_impl(&)]
pub trait StatsReader {
/// Fetch the number of entries in the corresponding [Table]. Depending on the provider, it may
/// route to different data sources other than [Table].

View File

@@ -3,7 +3,7 @@ use alloc::{
vec::Vec,
};
use alloy_primitives::{Address, BlockNumber, B256, U256};
use core::ops::RangeInclusive;
use core::ops::{RangeBounds, RangeInclusive};
use reth_primitives_traits::{StorageEntry, StorageSlotKey};
use reth_storage_errors::provider::ProviderResult;
@@ -35,7 +35,7 @@ impl From<ChangesetEntry> for StorageEntry {
}
/// Storage reader
#[auto_impl::auto_impl(&, Arc, Box)]
#[auto_impl::auto_impl(&, Box)]
pub trait StorageReader: Send {
/// Get plainstate storages for addresses and storage keys.
fn plain_state_storages(
@@ -61,7 +61,7 @@ pub trait StorageReader: Send {
/// Storage `ChangeSet` reader
#[cfg(feature = "db-api")]
#[auto_impl::auto_impl(&, Arc, Box)]
#[auto_impl::auto_impl(&, Box)]
pub trait StorageChangeSetReader: Send {
/// Iterate over storage changesets and return the storage state from before this block.
///
@@ -91,7 +91,7 @@ pub trait StorageChangeSetReader: Send {
/// [`StorageSlotKey::Hashed`] based on the current storage mode.
fn storage_changesets_range(
&self,
range: impl core::ops::RangeBounds<BlockNumber>,
range: impl RangeBounds<BlockNumber>,
) -> ProviderResult<Vec<(reth_db_api::models::BlockNumberAddress, ChangesetEntry)>>;
/// Get the total count of all storage changes.

View File

@@ -40,7 +40,7 @@ pub trait StateRootProvider {
}
/// A type that can compute the storage root for a given account.
#[auto_impl::auto_impl(&, Box, Arc)]
#[auto_impl::auto_impl(&, Box)]
pub trait StorageRootProvider {
/// Returns the storage root of the `HashedStorage` for target address on top of the current
/// state.
@@ -66,7 +66,7 @@ pub trait StorageRootProvider {
}
/// A type that can generate state proof on top of a given post state.
#[auto_impl::auto_impl(&, Box, Arc)]
#[auto_impl::auto_impl(&, Box)]
pub trait StateProofProvider {
/// Get account and storage proofs of target keys in the `HashedPostState`
/// on top of the current state.
@@ -90,7 +90,7 @@ pub trait StateProofProvider {
}
/// Trie Writer
#[auto_impl::auto_impl(&, Arc, Box)]
#[auto_impl::auto_impl(&, Box)]
pub trait TrieWriter: Send {
/// Writes trie updates to the database.
///
@@ -106,7 +106,7 @@ pub trait TrieWriter: Send {
}
/// Storage Trie Writer
#[auto_impl::auto_impl(&, Arc, Box)]
#[auto_impl::auto_impl(&, Box)]
pub trait StorageTrieWriter: Send {
/// Writes storage trie updates from the given storage trie map with already sorted updates.
///

View File

@@ -29,12 +29,11 @@ dyn-clone.workspace = true
# feature `rayon`
rayon = { workspace = true, optional = true }
crossbeam-utils = { workspace = true, optional = true }
pin-project = { workspace = true, optional = true }
[dev-dependencies]
tokio = { workspace = true, features = ["sync", "rt", "rt-multi-thread", "time", "macros"] }
[features]
rayon = ["dep:rayon", "dep:crossbeam-utils", "pin-project"]
rayon = ["dep:rayon", "pin-project"]
test-utils = []

View File

@@ -1,298 +0,0 @@
use crossbeam_utils::CachePadded;
use rayon::iter::{IndexedParallelIterator, ParallelIterator};
use std::{
cell::UnsafeCell,
mem::MaybeUninit,
sync::atomic::{AtomicBool, Ordering},
};
/// Extension trait for [`IndexedParallelIterator`]
/// that streams results to a sequential consumer in index order.
pub trait ForEachOrdered: IndexedParallelIterator {
/// Executes the parallel iterator, calling `f` on each result **sequentially in index
/// order**.
///
/// Items are computed in parallel, but `f` is invoked as `f(item_0)`, `f(item_1)`, …,
/// `f(item_{n-1})` on the calling thread. The calling thread receives each item as soon
/// as it (and all preceding items) are ready.
///
/// `f` does **not** need to be [`Send`] — it runs exclusively on the calling thread.
fn for_each_ordered<F>(self, f: F)
where
Self::Item: Send,
F: FnMut(Self::Item);
}
impl<I: IndexedParallelIterator> ForEachOrdered for I {
fn for_each_ordered<F>(self, f: F)
where
Self::Item: Send,
F: FnMut(Self::Item),
{
ordered_impl(self, f);
}
}
/// A cache-line-padded slot with an atomic ready flag.
struct Slot<T> {
value: UnsafeCell<MaybeUninit<T>>,
ready: AtomicBool,
}
// SAFETY: Each slot is written by exactly one producer and read by exactly one consumer.
// The AtomicBool synchronizes access (Release on write, Acquire on read).
unsafe impl<T: Send> Send for Slot<T> {}
unsafe impl<T: Send> Sync for Slot<T> {}
struct Shared<T> {
slots: Box<[CachePadded<Slot<T>>]>,
panicked: AtomicBool,
}
impl<T> Shared<T> {
fn new(n: usize) -> Self {
Self {
// SAFETY: Zero is a valid bit pattern for Slot.
// Needs to be zero for `ready` to be false.
slots: unsafe {
Box::<[_]>::assume_init(Box::<[CachePadded<Slot<T>>]>::new_zeroed_slice(n))
},
panicked: AtomicBool::new(false),
}
}
/// # Safety
/// Index `i` must be in bounds and must only be written once.
#[inline]
unsafe fn write(&self, i: usize, val: T) {
let slot = unsafe { self.slots.get_unchecked(i) };
unsafe { (*slot.value.get()).write(val) };
slot.ready.store(true, Ordering::Release);
}
/// # Safety
/// Index `i` must be in bounds. Must only be called after `ready` is observed `true`.
#[inline]
unsafe fn take(&self, i: usize) -> T {
let slot = unsafe { self.slots.get_unchecked(i) };
let v = unsafe { (*slot.value.get()).assume_init_read() };
// Clear ready so Drop doesn't double-free this slot.
slot.ready.store(false, Ordering::Relaxed);
v
}
#[inline]
fn is_ready(&self, i: usize) -> bool {
// SAFETY: caller ensures `i < n`.
unsafe { self.slots.get_unchecked(i) }.ready.load(Ordering::Acquire)
}
}
impl<T> Drop for Shared<T> {
fn drop(&mut self) {
for slot in &mut *self.slots {
if *slot.ready.get_mut() {
unsafe { (*slot.value.get()).assume_init_drop() };
}
}
}
}
/// Executes a parallel iterator and delivers results to a sequential callback in index order.
///
/// This works by pre-allocating one cache-line-padded slot per item. Each slot holds an
/// `UnsafeCell<MaybeUninit<T>>` and an `AtomicBool` ready flag. A rayon task computes all
/// items in parallel, writing each result into its slot and setting the flag (`Release`).
/// The calling thread walks slots 0, 1, 2, … in order, spinning on the flag (`Acquire`),
/// then reading the value and passing it to `f`.
fn ordered_impl<I, F>(iter: I, mut f: F)
where
I: IndexedParallelIterator,
I::Item: Send,
F: FnMut(I::Item),
{
use std::panic::{catch_unwind, AssertUnwindSafe};
let n = iter.len();
if n == 0 {
return;
}
let shared = Shared::<I::Item>::new(n);
rayon::in_place_scope(|s| {
// Producer: compute items in parallel and write them into their slots.
s.spawn(|_| {
let res = catch_unwind(AssertUnwindSafe(|| {
iter.enumerate().for_each(|(i, item)| {
// SAFETY: `enumerate()` on an IndexedParallelIterator yields each
// index exactly once.
unsafe { shared.write(i, item) };
});
}));
if let Err(payload) = res {
shared.panicked.store(true, Ordering::Release);
std::panic::resume_unwind(payload);
}
});
// Consumer: sequential, ordered, on the calling thread.
// Exponential backoff: 1, 2, 4, …, 64 pause instructions, then OS yields.
const SPIN_SHIFT_LIMIT: u32 = 6;
for i in 0..n {
let mut backoff = 0u32;
'wait: loop {
if shared.is_ready(i) {
break 'wait;
}
if shared.panicked.load(Ordering::Relaxed) {
return;
}
// Yield to rayon's work-stealing so the producer can make progress,
// especially important when the thread pool is small.
if rayon::yield_now() == Some(rayon::Yield::Executed) {
continue 'wait;
}
if backoff < SPIN_SHIFT_LIMIT {
for _ in 0..(1u32 << backoff) {
std::hint::spin_loop();
}
backoff += 1;
} else {
// Producer is genuinely slow; fall back to OS-level yield.
std::thread::yield_now();
}
}
// SAFETY: `i < n` and we just observed the ready flag with Acquire ordering.
let value = unsafe { shared.take(i) };
f(value);
}
});
}
#[cfg(test)]
mod tests {
use super::*;
use rayon::prelude::*;
use std::sync::{
atomic::{AtomicUsize, Ordering},
Barrier,
};
#[test]
fn preserves_order() {
let input: Vec<u64> = (0..1000).collect();
let mut output = Vec::with_capacity(input.len());
input.par_iter().map(|x| x * 2).for_each_ordered(|x| output.push(x));
let expected: Vec<u64> = (0..1000).map(|x| x * 2).collect();
assert_eq!(output, expected);
}
#[test]
fn empty_iterator() {
let input: Vec<u64> = vec![];
let mut output = Vec::new();
input.par_iter().map(|x| *x).for_each_ordered(|x| output.push(x));
assert!(output.is_empty());
}
#[test]
fn single_element() {
let mut output = Vec::new();
vec![42u64].par_iter().map(|x| *x).for_each_ordered(|x| output.push(x));
assert_eq!(output, vec![42]);
}
#[test]
fn slow_early_items_still_delivered_in_order() {
// Item 0 is deliberately delayed; all other items complete quickly.
// The consumer must still deliver items in order 0, 1, 2, … regardless.
let barrier = Barrier::new(2);
let n = 64usize;
let input: Vec<usize> = (0..n).collect();
let mut output = Vec::with_capacity(n);
input
.par_iter()
.map(|&i| {
if i == 0 {
// Wait until at least one other item has been produced.
barrier.wait();
} else if i == n - 1 {
// Signal that other items are ready.
barrier.wait();
}
i
})
.for_each_ordered(|x| output.push(x));
assert_eq!(output, input);
}
#[test]
fn drops_unconsumed_slots_on_panic() {
static DROP_COUNT: AtomicUsize = AtomicUsize::new(0);
#[derive(Clone)]
struct Tracked(#[allow(dead_code)] u64);
impl Drop for Tracked {
fn drop(&mut self) {
DROP_COUNT.fetch_add(1, Ordering::Relaxed);
}
}
DROP_COUNT.store(0, Ordering::Relaxed);
let input: Vec<u64> = (0..100).collect();
let result = std::panic::catch_unwind(|| {
input
.par_iter()
.map(|&i| {
assert!(i != 50, "intentional");
Tracked(i)
})
.for_each_ordered(|_item| {});
});
assert!(result.is_err());
// All produced Tracked values must have been dropped (either consumed or cleaned up).
// We can't assert an exact count since the panic may cut production short.
let drops = DROP_COUNT.load(Ordering::Relaxed);
assert!(drops > 0, "some items should have been dropped");
}
#[test]
fn no_double_drop() {
// Verify that consumed items are dropped exactly once (not double-freed by Drop).
static DROP_COUNT: AtomicUsize = AtomicUsize::new(0);
struct Counted(#[allow(dead_code)] u64);
impl Drop for Counted {
fn drop(&mut self) {
DROP_COUNT.fetch_add(1, Ordering::Relaxed);
}
}
DROP_COUNT.store(0, Ordering::Relaxed);
let n = 200u64;
let input: Vec<u64> = (0..n).collect();
input.par_iter().map(|&i| Counted(i)).for_each_ordered(|_item| {});
assert_eq!(DROP_COUNT.load(Ordering::Relaxed), n as usize);
}
#[test]
fn callback_is_not_send() {
// Verify that the callback does not need to be Send.
use std::rc::Rc;
let counter = Rc::new(std::cell::Cell::new(0u64));
let input: Vec<u64> = (0..100).collect();
input.par_iter().map(|&x| x).for_each_ordered(|x| {
counter.set(counter.get() + x);
});
assert_eq!(counter.get(), (0..100u64).sum::<u64>());
}
}

View File

@@ -40,12 +40,6 @@ pub mod shutdown;
#[cfg(feature = "rayon")]
pub mod pool;
/// Lock-free ordered parallel iterator extension trait.
#[cfg(feature = "rayon")]
pub mod for_each_ordered;
#[cfg(feature = "rayon")]
pub use for_each_ordered::ForEachOrdered;
#[cfg(feature = "rayon")]
pub use runtime::RayonConfig;
pub use runtime::{Runtime, RuntimeBuildError, RuntimeBuilder, RuntimeConfig, TokioConfig};

View File

@@ -816,6 +816,17 @@ impl RuntimeBuilder {
let (task_manager, on_shutdown, task_events_tx, graceful_tasks) =
TaskManager::new_parts(handle.clone());
#[cfg(feature = "rayon")]
#[allow(clippy::needless_pass_by_value)]
fn rayon_panic_handler(payload: Box<dyn std::any::Any + Send>) {
let msg = payload
.downcast_ref::<&str>()
.copied()
.or_else(|| payload.downcast_ref::<String>().map(|s| s.as_str()))
.unwrap_or("(no message)");
error!(target: "reth::tasks", %msg, "panic in worker pool thread");
}
#[cfg(feature = "rayon")]
let (
cpu_pool,
@@ -853,6 +864,7 @@ impl RuntimeBuilder {
let proof_storage_worker_pool = rayon::ThreadPoolBuilder::new()
.num_threads(proof_storage_worker_threads)
.thread_name(|i| format!("proof-strg-{i:02}"))
.panic_handler(rayon_panic_handler)
.build()?;
let proof_account_worker_threads =
@@ -860,6 +872,7 @@ impl RuntimeBuilder {
let proof_account_worker_pool = rayon::ThreadPoolBuilder::new()
.num_threads(proof_account_worker_threads)
.thread_name(|i| format!("proof-acct-{i:02}"))
.panic_handler(rayon_panic_handler)
.build()?;
debug!(

View File

@@ -501,26 +501,11 @@ where
results.pop().expect("result length is the same as the input")
}
async fn add_transactions(
&self,
origin: TransactionOrigin,
transactions: Vec<Self::Transaction>,
) -> Vec<PoolResult<AddedTransactionOutcome>> {
if transactions.is_empty() {
return Vec::new()
}
let validated = self
.pool
.validator()
.validate_transactions(transactions.into_iter().map(|tx| (origin, tx)))
.await;
self.pool.add_transactions(origin, validated)
}
async fn add_transactions_with_origins(
&self,
transactions: Vec<(TransactionOrigin, Self::Transaction)>,
transactions: impl IntoIterator<Item = (TransactionOrigin, Self::Transaction)> + Send,
) -> Vec<PoolResult<AddedTransactionOutcome>> {
let transactions: Vec<_> = transactions.into_iter().collect();
if transactions.is_empty() {
return Vec::new()
}

View File

@@ -84,23 +84,9 @@ impl<T: EthPoolTransaction> TransactionPool for NoopTransactionPool<T> {
Err(PoolError::other(hash, Box::new(NoopInsertError::new(transaction))))
}
async fn add_transactions(
&self,
_origin: TransactionOrigin,
transactions: Vec<Self::Transaction>,
) -> Vec<PoolResult<AddedTransactionOutcome>> {
transactions
.into_iter()
.map(|transaction| {
let hash = *transaction.hash();
Err(PoolError::other(hash, Box::new(NoopInsertError::new(transaction))))
})
.collect()
}
async fn add_transactions_with_origins(
&self,
transactions: Vec<(TransactionOrigin, Self::Transaction)>,
transactions: impl IntoIterator<Item = (TransactionOrigin, Self::Transaction)> + Send,
) -> Vec<PoolResult<AddedTransactionOutcome>> {
transactions
.into_iter()

View File

@@ -704,10 +704,6 @@ impl PoolTransaction for MockTransaction {
type Pooled = PooledTransactionVariant;
fn consensus_ref(&self) -> Recovered<&Self::Consensus> {
unimplemented!("mock transaction does not wrap a consensus transaction")
}
fn into_consensus(self) -> Recovered<Self::Consensus> {
self.into()
}

View File

@@ -175,7 +175,9 @@ pub trait TransactionPool: Clone + Debug + Send + Sync {
&self,
origin: TransactionOrigin,
transactions: Vec<Self::Transaction>,
) -> impl Future<Output = Vec<PoolResult<AddedTransactionOutcome>>> + Send;
) -> impl Future<Output = Vec<PoolResult<AddedTransactionOutcome>>> + Send {
self.add_transactions_with_origins(transactions.into_iter().map(move |tx| (origin, tx)))
}
/// Adds the given _unvalidated_ transactions into the pool.
///
@@ -186,7 +188,7 @@ pub trait TransactionPool: Clone + Debug + Send + Sync {
/// Consumer: RPC
fn add_transactions_with_origins(
&self,
transactions: Vec<(TransactionOrigin, Self::Transaction)>,
transactions: impl IntoIterator<Item = (TransactionOrigin, Self::Transaction)> + Send,
) -> impl Future<Output = Vec<PoolResult<AddedTransactionOutcome>>> + Send;
/// Submit a consensus transaction directly to the pool
@@ -1257,9 +1259,6 @@ pub trait PoolTransaction:
self.clone().into_consensus()
}
/// Returns a reference to the consensus transaction with the recovered sender.
fn consensus_ref(&self) -> Recovered<&Self::Consensus>;
/// Define a method to convert from the `Self` type to `Consensus`
fn into_consensus(self) -> Recovered<Self::Consensus>;
@@ -1450,10 +1449,6 @@ impl PoolTransaction for EthPooledTransaction {
self.transaction().clone()
}
fn consensus_ref(&self) -> Recovered<&Self::Consensus> {
Recovered::new_unchecked(&*self.transaction, self.transaction.signer())
}
fn into_consensus(self) -> Recovered<Self::Consensus> {
self.transaction
}

View File

@@ -325,7 +325,7 @@ mod tests {
reth_provider::providers::OverlayStateProviderFactory::new(factory, changeset_cache);
let task_ctx = ProofTaskCtx::new(factory);
let runtime = reth_tasks::Runtime::test();
let proof_worker_handle = ProofWorkerHandle::new(&runtime, task_ctx, false, false);
let proof_worker_handle = ProofWorkerHandle::new(&runtime, task_ctx, false);
let parallel_result = ParallelProof::new(Default::default(), proof_worker_handle.clone())
.decoded_multiproof(targets.clone())

View File

@@ -75,7 +75,7 @@ use std::{
},
time::{Duration, Instant},
};
use tracing::{debug, debug_span, error, instrument, trace};
use tracing::{debug, debug_span, error, trace};
#[cfg(feature = "metrics")]
use crate::proof_task_metrics::{
@@ -134,18 +134,10 @@ impl ProofWorkerHandle {
/// # Parameters
/// - `runtime`: The centralized runtime used to spawn blocking worker tasks
/// - `task_ctx`: Shared context with database view and prefix sets
/// - `halve_workers`: Whether to halve the worker pool size (for small blocks)
/// - `v2_proofs_enabled`: Whether to enable V2 storage proofs
#[instrument(
name = "ProofWorkerHandle::new",
level = "debug",
target = "trie::proof_task",
skip_all
)]
pub fn new<Factory>(
runtime: &Runtime,
task_ctx: ProofTaskCtx<Factory>,
halve_workers: bool,
v2_proofs_enabled: bool,
) -> Self
where
@@ -162,17 +154,13 @@ impl ProofWorkerHandle {
let cached_storage_roots = Arc::<DashMap<_, _>>::default();
let divisor = if halve_workers { 2 } else { 1 };
let storage_worker_count =
runtime.proof_storage_worker_pool().current_num_threads() / divisor;
let account_worker_count =
runtime.proof_account_worker_pool().current_num_threads() / divisor;
let storage_worker_count = runtime.proof_storage_worker_pool().current_num_threads();
let account_worker_count = runtime.proof_account_worker_pool().current_num_threads();
debug!(
target: "trie::proof_task",
storage_worker_count,
account_worker_count,
halve_workers,
?v2_proofs_enabled,
"Spawning proof worker pools"
);
@@ -2024,7 +2012,7 @@ mod tests {
let ctx = test_ctx(factory);
let runtime = reth_tasks::Runtime::test();
let proof_handle = ProofWorkerHandle::new(&runtime, ctx, false, false);
let proof_handle = ProofWorkerHandle::new(&runtime, ctx, false);
// Verify handle can be cloned
let _cloned_handle = proof_handle.clone();

View File

@@ -258,7 +258,7 @@ impl SparseTrie for ParallelSparseTrie {
#[cfg(feature = "std")]
// Reveal lower subtrie nodes in parallel
{
use rayon::iter::{IndexedParallelIterator, IntoParallelIterator, ParallelIterator};
use rayon::iter::{IntoParallelIterator, ParallelIterator};
use tracing::Span;
// Capture the current span so it can be propagated to rayon worker threads
@@ -267,23 +267,39 @@ impl SparseTrie for ParallelSparseTrie {
// Capture reference to upper subtrie nodes for boundary leaf reachability checks
let upper_nodes = &self.upper_subtrie.nodes;
// Group the nodes by lower subtrie. This must be collected into a Vec in order for
// rayon's `zip` to be happy.
let node_groups: Vec<_> = lower_nodes
// Group the nodes by lower subtrie.
let results = lower_nodes
.chunk_by(|node_a, node_b| {
SparseSubtrieType::from_path(&node_a.path) ==
SparseSubtrieType::from_path(&node_b.path)
})
.collect();
// Take the lower subtries in the same order that the nodes were grouped into, so that
// the two can be zipped together. This also must be collected into a Vec for rayon's
// `zip` to be happy.
let lower_subtries: Vec<_> = node_groups
.iter()
// Filter out chunks for unreachable subtries.
.filter_map(|nodes| {
// NOTE: chunk_by won't produce empty groups
let node = &nodes[0];
let mut nodes = nodes
.iter()
.filter(|node| {
// For boundary leaves, check reachability from upper subtrie's parent
// branch.
if node.path.len() == UPPER_TRIE_MAX_DEPTH &&
!Self::is_boundary_leaf_reachable(
upper_nodes,
&node.path,
&node.node,
)
{
trace!(
target: "trie::parallel_sparse",
path = ?node.path,
"Boundary leaf not reachable from upper subtrie, skipping",
);
false
} else {
true
}
})
.peekable();
let node = nodes.peek()?;
let idx =
SparseSubtrieType::from_path(&node.path).lower_index().unwrap_or_else(
|| panic!("upper subtrie node {node:?} found amongst lower nodes"),
@@ -303,41 +319,24 @@ impl SparseTrie for ParallelSparseTrie {
// shortest path being revealed for each subtrie. Therefore we can reveal the
// subtrie itself using this path and retain correct behavior.
self.lower_subtries[idx].reveal(&node.path);
Some((idx, self.lower_subtries[idx].take_revealed().expect("just revealed")))
Some((
idx,
self.lower_subtries[idx].take_revealed().expect("just revealed"),
nodes,
))
})
.collect();
// Zip the lower subtries and their corresponding node groups, and reveal lower subtrie
// nodes in parallel
let results: Vec<_> = lower_subtries
.collect::<Vec<_>>()
.into_par_iter()
.zip(node_groups.into_par_iter())
.map(|((subtrie_idx, mut subtrie), nodes)| {
.map(|(subtrie_idx, mut subtrie, nodes)| {
// Enter the parent span to propagate context (e.g., hashed_address for storage
// tries) to the worker thread
let _guard = parent_span.enter();
// reserve space in the HashMap ahead of time; doing it on a node-by-node basis
// can cause multiple re-allocations as the hashmap grows.
subtrie.nodes.reserve(nodes.len());
subtrie.nodes.reserve(nodes.size_hint().1.unwrap_or(0));
for node in nodes {
// For boundary leaves, check reachability from upper subtrie's parent
// branch
if node.path.len() == UPPER_TRIE_MAX_DEPTH &&
!Self::is_boundary_leaf_reachable(
upper_nodes,
&node.path,
&node.node,
)
{
trace!(
target: "trie::parallel_sparse",
path = ?node.path,
"Boundary leaf not reachable from upper subtrie, skipping",
);
continue;
}
// Reveal each node in the subtrie, returning early on any errors
let res = subtrie.reveal_node(node.path, &node.node, node.masks);
if res.is_err() {
@@ -346,7 +345,7 @@ impl SparseTrie for ParallelSparseTrie {
}
(subtrie_idx, subtrie, Ok(()))
})
.collect();
.collect::<Vec<_>>();
// Put subtries back which were processed in the rayon pool, collecting the last
// seen error in the process and returning that.
@@ -1958,6 +1957,7 @@ impl ParallelSparseTrie {
}
SparseTrieUpdatesAction::InsertUpdated(path, branch_node) => {
updates.updated_nodes.insert(path, branch_node);
updates.removed_nodes.remove(&path);
}
}
}

View File

@@ -1159,7 +1159,7 @@ where
name = "SparseStateTrie::prune",
target = "trie::sparse",
skip_all,
fields(%max_depth, %max_storage_tries)
fields(max_depth, max_storage_tries)
)]
pub fn prune(&mut self, max_depth: usize, max_storage_tries: usize) {
// Prune state and storage tries in parallel

View File

@@ -1282,6 +1282,16 @@ where
Ok(())
}
/// Clears internal computation state. Called after errors to ensure the calculator is not
/// left in a partially-computed state when reused.
fn clear_computation_state(&mut self) {
self.branch_stack.clear();
self.branch_path = Nibbles::new();
self.child_stack.clear();
self.cached_branch_stack.clear();
self.retained_proofs.clear();
}
/// Internal implementation of proof calculation. Assumes both cursors have already been reset.
/// See docs on [`Self::proof`] for expected behavior.
fn proof_inner(
@@ -1303,12 +1313,15 @@ where
// Divide targets into chunks, each chunk corresponding to a different sub-trie within the
// overall trie, and handle all proofs within that sub-trie.
for sub_trie_targets in iter_sub_trie_targets(targets) {
self.proof_subtrie(
if let Err(err) = self.proof_subtrie(
value_encoder,
&mut trie_cursor_state,
&mut hashed_cursor_current,
sub_trie_targets,
)?;
) {
self.clear_computation_state();
return Err(err);
}
}
trace!(
@@ -1378,12 +1391,15 @@ where
let sub_trie_targets =
SubTrieTargets { prefix: Nibbles::new(), targets: &EMPTY_TARGETS, retain_root: true };
self.proof_subtrie(
if let Err(err) = self.proof_subtrie(
value_encoder,
&mut trie_cursor_state,
&mut hashed_cursor_current,
sub_trie_targets,
)?;
) {
self.clear_computation_state();
return Err(err);
}
// proof_subtrie will retain the root node if retain_proof is true, regardless of if there
// are any targets.
@@ -1815,6 +1831,79 @@ mod tests {
}
}
/// Tests that `clear_computation_state` properly resets internal stacks, allowing a
/// `ProofCalculator` to be reused after a mid-computation error left stale state.
/// Before the fix, stale data in `branch_stack`, `child_stack`, and `branch_path`
/// could cause a `usize` underflow panic in `pop_branch`.
#[test]
fn test_proof_calculator_reuse_after_error() {
use alloy_primitives::U256;
reth_tracing::init_test_tracing();
let mut post_state = HashedPostState::default();
let addresses = [
B256::right_padding_from(&[0x10]),
B256::right_padding_from(&[0x20]),
B256::right_padding_from(&[0x30]),
B256::right_padding_from(&[0x40]),
];
for addr in &addresses {
let account =
Account { nonce: 1, balance: U256::from(100u64), bytecode_hash: Some(B256::ZERO) };
post_state.accounts.insert(*addr, Some(account));
}
let harness = ProofTestHarness::new(post_state);
let trie_cursor = harness.trie_cursor_factory.account_trie_cursor().unwrap();
let hashed_cursor = harness.hashed_cursor_factory.hashed_account_cursor().unwrap();
let mut proof_calculator = ProofCalculator::new(trie_cursor, hashed_cursor);
// Simulate stale state left by a mid-computation error: push fake entries onto internal
// stacks and set a non-empty branch_path.
proof_calculator.branch_stack.push(ProofTrieBranch {
ext_len: 2,
state_mask: TrieMask::new(0b1111),
masks: None,
});
proof_calculator.branch_stack.push(ProofTrieBranch {
ext_len: 0,
state_mask: TrieMask::new(0b11),
masks: None,
});
proof_calculator
.child_stack
.push(ProofTrieBranchChild::RlpNode(RlpNode::word_rlp(&B256::ZERO)));
proof_calculator.branch_path = Nibbles::from_nibbles([0x1, 0x2, 0x3]);
// clear_computation_state should reset everything so a subsequent proof() call works.
proof_calculator.clear_computation_state();
let mut value_encoder = SyncAccountValueEncoder::new(
harness.trie_cursor_factory.clone(),
harness.hashed_cursor_factory.clone(),
);
let mut sorted_addresses = addresses.to_vec();
sorted_addresses.sort();
let mut targets: Vec<ProofV2Target> =
sorted_addresses.iter().copied().map(ProofV2Target::new).collect();
let result = proof_calculator.proof(&mut value_encoder, &mut targets).unwrap();
// Compare against a fresh calculator to verify correctness.
let trie_cursor = harness.trie_cursor_factory.account_trie_cursor().unwrap();
let hashed_cursor = harness.hashed_cursor_factory.hashed_account_cursor().unwrap();
let mut fresh_calculator = ProofCalculator::new(trie_cursor, hashed_cursor);
let mut value_encoder = SyncAccountValueEncoder::new(
harness.trie_cursor_factory.clone(),
harness.hashed_cursor_factory,
);
let fresh_result = fresh_calculator.proof(&mut value_encoder, &mut targets).unwrap();
pretty_assertions::assert_eq!(fresh_result, result);
}
mod proptest_tests {
use super::*;
use alloy_primitives::{map::B256Map, U256};

View File

@@ -22,7 +22,7 @@ export default defineConfig({
},
{ text: 'GitHub', link: 'https://github.com/paradigmxyz/reth' },
{
text: 'v1.11.0',
text: 'v1.11.3',
items: [
{
text: 'Releases',

View File

@@ -83,7 +83,6 @@ async fn main() -> eyre::Result<()> {
IncomingEthRequest::GetReceipts { .. } => {}
IncomingEthRequest::GetReceipts69 { .. } => {}
IncomingEthRequest::GetReceipts70 { .. } => {}
IncomingEthRequest::GetBlockAccessLists { .. } => {}
}
}
transaction_message = transactions_rx.recv() => {