Compare commits

..

21 Commits

Author SHA1 Message Date
Dan Cline
108b355506 chore: bump revm to branch with backported hashmap perf fixes 2025-12-19 13:57:01 -05:00
Alexey Shekhirin
29438631be fix: propagate keccak-cache-global feature to reth-node-core (#20524) 2025-12-19 17:11:41 +00:00
Brian Picciano
0eb4e0ce29 fix(stages): Fix two bugs related to stage checkpoints and pipeline syncs (#20521) 2025-12-19 16:09:57 +00:00
gustavo
9147f9aafe perf(trie): remove more unnecessary channels (#20489) 2025-12-19 15:34:42 +00:00
Snezhkko
13b111e058 refactor: remove dead storage multiproof path (#20485) 2025-12-19 15:11:31 +00:00
Brian Picciano
571afcc3b9 clippy 2025-12-19 16:00:18 +01:00
leniram159
25c247b14c refactor(engine): simplify fork detection in insert_block (#20441) 2025-12-19 14:49:33 +00:00
Matthias Seitz
72bea44d8c chore: remove redundant num hash (#20501) 2025-12-19 14:48:42 +00:00
alex017
63b9d5fe57 refactor(db-api): remove redundant clone and unused import in unwind (#20499)
Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
2025-12-19 14:47:11 +00:00
Brian Picciano
58df28c74a elided lifetimes 2025-12-19 14:59:36 +01:00
Brian Picciano
5c8d0c0bb6 remove tx lifetime 2025-12-19 14:44:42 +01:00
Brian Picciano
fcbaa828e9 use an option rather than ManuallyDrop 2025-12-19 14:25:14 +01:00
Brian Picciano
b81cab5d28 WIP 2025-12-19 12:23:12 +01:00
Brian Picciano
9aefd9d144 doc fix 2025-12-19 11:09:30 +01:00
Brian Picciano
59c04b1c91 Final clippy fixes 2025-12-19 10:56:19 +01:00
Brian Picciano
43b37776f7 Got rid of extra state provider in payload building 2025-12-18 19:18:35 +01:00
Brian Picciano
866739394b Compiling 2025-12-18 19:09:20 +01:00
Brian Picciano
0946449b67 WIP 2025-12-18 18:48:50 +01:00
Brian Picciano
e06e6f3d25 InMemoryOverlayProvider 2025-12-18 17:36:31 +01:00
Brian Picciano
5673865fb7 WIP 2025-12-18 16:52:46 +01:00
Brian Picciano
60f0e968b8 reth-provider compiles 2025-12-18 16:25:05 +01:00
67 changed files with 513 additions and 661 deletions

39
Cargo.lock generated
View File

@@ -6354,8 +6354,7 @@ dependencies = [
[[package]]
name = "op-revm"
version = "14.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1475a779c73999fc803778524042319691b31f3d6699d2b560c4ed8be1db802a"
source = "git+https://github.com/Rjected/revm?branch=dan%2Fbackport-hash-perf-fixes#1455c4099fa4900216ec9f83f11f0ca59cbf0466"
dependencies = [
"auto_impl",
"revm",
@@ -11090,8 +11089,7 @@ dependencies = [
[[package]]
name = "revm"
version = "33.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0c85ed0028f043f87b3c88d4a4cb6f0a76440085523b6a8afe5ff003cf418054"
source = "git+https://github.com/Rjected/revm?branch=dan%2Fbackport-hash-perf-fixes#1455c4099fa4900216ec9f83f11f0ca59cbf0466"
dependencies = [
"revm-bytecode",
"revm-context",
@@ -11109,8 +11107,7 @@ dependencies = [
[[package]]
name = "revm-bytecode"
version = "7.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e2c6b5e6e8dd1e28a4a60e5f46615d4ef0809111c9e63208e55b5c7058200fb0"
source = "git+https://github.com/Rjected/revm?branch=dan%2Fbackport-hash-perf-fixes#1455c4099fa4900216ec9f83f11f0ca59cbf0466"
dependencies = [
"bitvec",
"phf",
@@ -11121,8 +11118,7 @@ dependencies = [
[[package]]
name = "revm-context"
version = "12.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f038f0c9c723393ac897a5df9140b21cfa98f5753a2cb7d0f28fa430c4118abf"
source = "git+https://github.com/Rjected/revm?branch=dan%2Fbackport-hash-perf-fixes#1455c4099fa4900216ec9f83f11f0ca59cbf0466"
dependencies = [
"bitvec",
"cfg-if",
@@ -11138,8 +11134,7 @@ dependencies = [
[[package]]
name = "revm-context-interface"
version = "13.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "431c9a14e4ef1be41ae503708fd02d974f80ef1f2b6b23b5e402e8d854d1b225"
source = "git+https://github.com/Rjected/revm?branch=dan%2Fbackport-hash-perf-fixes#1455c4099fa4900216ec9f83f11f0ca59cbf0466"
dependencies = [
"alloy-eip2930",
"alloy-eip7702",
@@ -11154,8 +11149,7 @@ dependencies = [
[[package]]
name = "revm-database"
version = "9.0.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "980d8d6bba78c5dd35b83abbb6585b0b902eb25ea4448ed7bfba6283b0337191"
source = "git+https://github.com/Rjected/revm?branch=dan%2Fbackport-hash-perf-fixes#1455c4099fa4900216ec9f83f11f0ca59cbf0466"
dependencies = [
"alloy-eips",
"revm-bytecode",
@@ -11168,8 +11162,7 @@ dependencies = [
[[package]]
name = "revm-database-interface"
version = "8.0.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8cce03e3780287b07abe58faf4a7f5d8be7e81321f93ccf3343c8f7755602bae"
source = "git+https://github.com/Rjected/revm?branch=dan%2Fbackport-hash-perf-fixes#1455c4099fa4900216ec9f83f11f0ca59cbf0466"
dependencies = [
"auto_impl",
"either",
@@ -11181,8 +11174,7 @@ dependencies = [
[[package]]
name = "revm-handler"
version = "14.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d44f8f6dbeec3fecf9fe55f78ef0a758bdd92ea46cd4f1ca6e2a946b32c367f3"
source = "git+https://github.com/Rjected/revm?branch=dan%2Fbackport-hash-perf-fixes#1455c4099fa4900216ec9f83f11f0ca59cbf0466"
dependencies = [
"auto_impl",
"derive-where",
@@ -11200,8 +11192,7 @@ dependencies = [
[[package]]
name = "revm-inspector"
version = "14.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5617e49216ce1ca6c8826bcead0386bc84f49359ef67cde6d189961735659f93"
source = "git+https://github.com/Rjected/revm?branch=dan%2Fbackport-hash-perf-fixes#1455c4099fa4900216ec9f83f11f0ca59cbf0466"
dependencies = [
"auto_impl",
"either",
@@ -11238,8 +11229,7 @@ dependencies = [
[[package]]
name = "revm-interpreter"
version = "31.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "26ec36405f7477b9dccdc6caa3be19adf5662a7a0dffa6270cdb13a090c077e5"
source = "git+https://github.com/Rjected/revm?branch=dan%2Fbackport-hash-perf-fixes#1455c4099fa4900216ec9f83f11f0ca59cbf0466"
dependencies = [
"revm-bytecode",
"revm-context-interface",
@@ -11251,8 +11241,7 @@ dependencies = [
[[package]]
name = "revm-precompile"
version = "31.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9a62958af953cc4043e93b5be9b8497df84cc3bd612b865c49a7a7dfa26a84e2"
source = "git+https://github.com/Rjected/revm?branch=dan%2Fbackport-hash-perf-fixes#1455c4099fa4900216ec9f83f11f0ca59cbf0466"
dependencies = [
"ark-bls12-381",
"ark-bn254",
@@ -11276,8 +11265,7 @@ dependencies = [
[[package]]
name = "revm-primitives"
version = "21.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "29e161db429d465c09ba9cbff0df49e31049fe6b549e28eb0b7bd642fcbd4412"
source = "git+https://github.com/Rjected/revm?branch=dan%2Fbackport-hash-perf-fixes#1455c4099fa4900216ec9f83f11f0ca59cbf0466"
dependencies = [
"alloy-primitives",
"num_enum",
@@ -11288,8 +11276,7 @@ dependencies = [
[[package]]
name = "revm-state"
version = "8.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7d8be953b7e374dbdea0773cf360debed8df394ea8d82a8b240a6b5da37592fc"
source = "git+https://github.com/Rjected/revm?branch=dan%2Fbackport-hash-perf-fixes#1455c4099fa4900216ec9f83f11f0ca59cbf0466"
dependencies = [
"bitflags 2.10.0",
"revm-bytecode",

View File

@@ -786,3 +786,13 @@ ipnet = "2.11"
# alloy-evm = { git = "https://github.com/alloy-rs/evm", rev = "a69f0b45a6b0286e16072cb8399e02ce6ceca353" }
# alloy-op-evm = { git = "https://github.com/alloy-rs/evm", rev = "a69f0b45a6b0286e16072cb8399e02ce6ceca353" }
[patch.crates-io]
revm = { git = "https://github.com/Rjected/revm", branch = "dan/backport-hash-perf-fixes" }
revm-bytecode = { git = "https://github.com/Rjected/revm", branch = "dan/backport-hash-perf-fixes" }
revm-database = { git = "https://github.com/Rjected/revm", branch = "dan/backport-hash-perf-fixes" }
revm-state = { git = "https://github.com/Rjected/revm", branch = "dan/backport-hash-perf-fixes" }
revm-primitives = { git = "https://github.com/Rjected/revm", branch = "dan/backport-hash-perf-fixes" }
revm-interpreter = { git = "https://github.com/Rjected/revm", branch = "dan/backport-hash-perf-fixes" }
revm-database-interface = { git = "https://github.com/Rjected/revm", branch = "dan/backport-hash-perf-fixes" }
op-revm = { git = "https://github.com/Rjected/revm", branch = "dan/backport-hash-perf-fixes" }

View File

@@ -103,6 +103,7 @@ asm-keccak = [
"reth-node-ethereum/asm-keccak",
]
keccak-cache-global = [
"reth-node-core/keccak-cache-global",
"reth-node-ethereum/keccak-cache-global",
]
jemalloc = [

View File

@@ -5,14 +5,14 @@ use reth_errors::ProviderResult;
use reth_primitives_traits::{Account, Bytecode, NodePrimitives};
use reth_storage_api::{
AccountReader, BlockHashReader, BytecodeReader, HashedPostStateProvider, StateProofProvider,
StateProvider, StateRootProvider, StorageRootProvider,
StateProvider, StateProviderBox, StateRootProvider, StorageRootProvider,
};
use reth_trie::{
updates::TrieUpdates, AccountProof, HashedPostState, HashedStorage, MultiProof,
MultiProofTargets, StorageMultiProof, TrieInput,
};
use revm_database::BundleState;
use std::sync::OnceLock;
use std::{borrow::Cow, sync::OnceLock};
/// A state provider that stores references to in-memory blocks along with their state as well as a
/// reference of the historical state provider for fallback lookups.
@@ -24,15 +24,11 @@ pub struct MemoryOverlayStateProviderRef<
/// Historical state provider for state lookups that are not found in memory blocks.
pub(crate) historical: Box<dyn StateProvider + 'a>,
/// The collection of executed parent blocks. Expected order is newest to oldest.
pub(crate) in_memory: Vec<ExecutedBlock<N>>,
pub(crate) in_memory: Cow<'a, [ExecutedBlock<N>]>,
/// Lazy-loaded in-memory trie data.
pub(crate) trie_input: OnceLock<TrieInput>,
}
/// A state provider that stores references to in-memory blocks along with their state as well as
/// the historical state provider for fallback lookups.
pub type MemoryOverlayStateProvider<N> = MemoryOverlayStateProviderRef<'static, N>;
impl<'a, N: NodePrimitives> MemoryOverlayStateProviderRef<'a, N> {
/// Create new memory overlay state provider.
///
@@ -42,7 +38,7 @@ impl<'a, N: NodePrimitives> MemoryOverlayStateProviderRef<'a, N> {
/// - `historical` - a historical state provider for the latest ancestor block stored in the
/// database.
pub fn new(historical: Box<dyn StateProvider + 'a>, in_memory: Vec<ExecutedBlock<N>>) -> Self {
Self { historical, in_memory, trie_input: OnceLock::new() }
Self { historical, in_memory: Cow::Owned(in_memory), trie_input: OnceLock::new() }
}
/// Turn this state provider into a state provider
@@ -71,7 +67,7 @@ impl<'a, N: NodePrimitives> MemoryOverlayStateProviderRef<'a, N> {
impl<N: NodePrimitives> BlockHashReader for MemoryOverlayStateProviderRef<'_, N> {
fn block_hash(&self, number: BlockNumber) -> ProviderResult<Option<B256>> {
for block in &self.in_memory {
for block in self.in_memory.iter() {
if block.recovered_block().number() == number {
return Ok(Some(block.recovered_block().hash()));
}
@@ -90,7 +86,7 @@ impl<N: NodePrimitives> BlockHashReader for MemoryOverlayStateProviderRef<'_, N>
let mut in_memory_hashes = Vec::with_capacity(range.size_hint().0);
// iterate in ascending order (oldest to newest = low to high)
for block in &self.in_memory {
for block in self.in_memory.iter() {
let block_num = block.recovered_block().number();
if range.contains(&block_num) {
in_memory_hashes.push(block.recovered_block().hash());
@@ -112,7 +108,7 @@ impl<N: NodePrimitives> BlockHashReader for MemoryOverlayStateProviderRef<'_, N>
impl<N: NodePrimitives> AccountReader for MemoryOverlayStateProviderRef<'_, N> {
fn basic_account(&self, address: &Address) -> ProviderResult<Option<Account>> {
for block in &self.in_memory {
for block in self.in_memory.iter() {
if let Some(account) = block.execution_output.account(address) {
return Ok(account);
}
@@ -216,7 +212,7 @@ impl<N: NodePrimitives> StateProvider for MemoryOverlayStateProviderRef<'_, N> {
address: Address,
storage_key: StorageKey,
) -> ProviderResult<Option<StorageValue>> {
for block in &self.in_memory {
for block in self.in_memory.iter() {
if let Some(value) = block.execution_output.storage(&address, storage_key.into()) {
return Ok(Some(value));
}
@@ -228,7 +224,7 @@ impl<N: NodePrimitives> StateProvider for MemoryOverlayStateProviderRef<'_, N> {
impl<N: NodePrimitives> BytecodeReader for MemoryOverlayStateProviderRef<'_, N> {
fn bytecode_by_hash(&self, code_hash: &B256) -> ProviderResult<Option<Bytecode>> {
for block in &self.in_memory {
for block in self.in_memory.iter() {
if let Some(contract) = block.execution_output.bytecode(code_hash) {
return Ok(Some(contract));
}
@@ -237,3 +233,46 @@ impl<N: NodePrimitives> BytecodeReader for MemoryOverlayStateProviderRef<'_, N>
self.historical.bytecode_by_hash(code_hash)
}
}
/// An owned state provider that stores references to in-memory blocks along with their state as
/// well as a reference of the historical state provider for fallback lookups.
#[expect(missing_debug_implementations)]
pub struct MemoryOverlayStateProvider<N: NodePrimitives = reth_ethereum_primitives::EthPrimitives> {
/// Historical state provider for state lookups that are not found in memory blocks.
pub(crate) historical: StateProviderBox,
/// The collection of executed parent blocks. Expected order is newest to oldest.
pub(crate) in_memory: Vec<ExecutedBlock<N>>,
/// Lazy-loaded in-memory trie data.
pub(crate) trie_input: OnceLock<TrieInput>,
}
impl<N: NodePrimitives> MemoryOverlayStateProvider<N> {
/// Create new memory overlay state provider.
///
/// ## Arguments
///
/// - `in_memory` - the collection of executed ancestor blocks in reverse.
/// - `historical` - a historical state provider for the latest ancestor block stored in the
/// database.
pub fn new(historical: StateProviderBox, in_memory: Vec<ExecutedBlock<N>>) -> Self {
Self { historical, in_memory, trie_input: OnceLock::new() }
}
/// Returns a new provider that takes the `TX` as reference
#[inline(always)]
fn as_ref(&self) -> MemoryOverlayStateProviderRef<'_, N> {
MemoryOverlayStateProviderRef {
historical: Box::new(self.historical.as_ref()),
in_memory: Cow::Borrowed(&self.in_memory),
trie_input: self.trie_input.clone(),
}
}
/// Wraps the [`Self`] in a `Box`.
pub fn boxed(self) -> StateProviderBox {
Box::new(self)
}
}
// Delegates all provider impls to [`MemoryOverlayStateProviderRef`]
reth_storage_api::macros::delegate_provider_impls!(MemoryOverlayStateProvider<N> where [N: NodePrimitives]);

View File

@@ -970,7 +970,7 @@ impl<H: BlockHeader> EthereumHardforks for ChainSpec<H> {
/// A trait for reading the current chainspec.
#[auto_impl::auto_impl(&, Arc)]
pub trait ChainSpecProvider: Debug + Send + Sync {
pub trait ChainSpecProvider: Debug + Send {
/// The chain spec type.
type ChainSpec: EthChainSpec + 'static;

View File

@@ -5,7 +5,7 @@ use pretty_assertions::Comparison;
use reth_engine_primitives::InvalidBlockHook;
use reth_evm::{execute::Executor, ConfigureEvm};
use reth_primitives_traits::{NodePrimitives, RecoveredBlock, SealedHeader};
use reth_provider::{BlockExecutionOutput, StateProvider, StateProviderFactory};
use reth_provider::{BlockExecutionOutput, StateProvider, StateProviderBox, StateProviderFactory};
use reth_revm::{
database::StateProviderDatabase,
db::{BundleState, State},
@@ -114,7 +114,7 @@ fn sort_bundle_state_for_comparison(bundle_state: &BundleState) -> BundleStateSo
/// Extracts execution data including codes, preimages, and hashed state from database
fn collect_execution_data(
mut db: State<StateProviderDatabase<Box<dyn StateProvider>>>,
mut db: State<StateProviderDatabase<StateProviderBox>>,
) -> eyre::Result<CollectionResult> {
let bundle_state = db.take_bundle();
let mut codes = BTreeMap::new();
@@ -530,9 +530,7 @@ mod tests {
// Create a State with StateProviderTest
let state_provider = StateProviderTest::default();
let mut state = State::builder()
.with_database(StateProviderDatabase::new(
Box::new(state_provider) as Box<dyn StateProvider>
))
.with_database(StateProviderDatabase::new(Box::new(state_provider) as StateProviderBox))
.with_bundle_update()
.build();

View File

@@ -47,7 +47,7 @@ impl BackfillSyncState {
}
/// Backfill sync mode functionality.
pub trait BackfillSync: Send + Sync {
pub trait BackfillSync: Send {
/// Performs a backfill action.
fn on_action(&mut self, action: BackfillAction);

View File

@@ -1,5 +1,4 @@
use crate::metrics::PersistenceMetrics;
use alloy_consensus::BlockHeader;
use alloy_eips::BlockNumHash;
use reth_chain_state::ExecutedBlock;
use reth_errors::ProviderError;
@@ -142,27 +141,23 @@ where
&self,
blocks: Vec<ExecutedBlock<N::Primitives>>,
) -> Result<Option<BlockNumHash>, PersistenceError> {
let first_block_hash = blocks.first().map(|b| b.recovered_block.num_hash());
let last_block_hash = blocks.last().map(|b| b.recovered_block.num_hash());
debug!(target: "engine::persistence", first=?first_block_hash, last=?last_block_hash, "Saving range of blocks");
let first_block = blocks.first().map(|b| b.recovered_block.num_hash());
let last_block = blocks.last().map(|b| b.recovered_block.num_hash());
debug!(target: "engine::persistence", first=?first_block, last=?last_block, "Saving range of blocks");
let start_time = Instant::now();
let last_block_hash_num = blocks.last().map(|block| BlockNumHash {
hash: block.recovered_block().hash(),
number: block.recovered_block().header().number(),
});
if last_block_hash_num.is_some() {
if last_block.is_some() {
let provider_rw = self.provider.database_provider_rw()?;
provider_rw.save_blocks(blocks)?;
provider_rw.commit()?;
}
debug!(target: "engine::persistence", first=?first_block_hash, last=?last_block_hash, "Saved range of blocks");
debug!(target: "engine::persistence", first=?first_block, last=?last_block, "Saved range of blocks");
self.metrics.save_blocks_duration_seconds.record(start_time.elapsed());
Ok(last_block_hash_num)
Ok(last_block)
}
}

View File

@@ -931,48 +931,6 @@ where
Ok(())
}
/// Determines if the given block is part of a fork by checking that these
/// conditions are true:
/// * walking back from the target hash to verify that the target hash is not part of an
/// extension of the canonical chain.
/// * walking back from the current head to verify that the target hash is not already part of
/// the canonical chain.
///
/// The header is required as an arg, because we might be checking that the header is a fork
/// block before it's in the tree state and before it's in the database.
fn is_fork(&self, target: BlockWithParent) -> ProviderResult<bool> {
let target_hash = target.block.hash;
// verify that the given hash is not part of an extension of the canon chain.
let canonical_head = self.state.tree_state.canonical_head();
let mut current_hash;
let mut current_block = target;
loop {
if current_block.block.hash == canonical_head.hash {
return Ok(false)
}
// We already passed the canonical head
if current_block.block.number <= canonical_head.number {
break
}
current_hash = current_block.parent;
let Some(next_block) = self.sealed_header_by_hash(current_hash)? else { break };
current_block = next_block.block_with_parent();
}
// verify that the given hash is not already part of canonical chain stored in memory
if self.canonical_in_memory_state.header_by_hash(target_hash).is_some() {
return Ok(false)
}
// verify that the given hash is not already part of persisted canonical chain
if self.provider.block_number(target_hash)?.is_some() {
return Ok(false)
}
Ok(true)
}
/// Invoked when we receive a new forkchoice update message. Calls into the blockchain tree
/// to resolve chain forks and ensure that the Execution Layer is working with the latest valid
/// chain.
@@ -2569,14 +2527,11 @@ where
Ok(Some(_)) => {}
}
// determine whether we are on a fork chain
let is_fork = match self.is_fork(block_id) {
Err(err) => {
let block = convert_to_block(self, input)?;
return Err(InsertBlockError::new(block, err.into()).into());
}
Ok(is_fork) => is_fork,
};
// determine whether we are on a fork chain by comparing the block number with the
// canonical head. This is a simple check that is sufficient for the event emission below.
// A block is considered a fork if its number is less than or equal to the canonical head,
// as this indicates there's already a canonical block at that height.
let is_fork = block_id.block.number <= self.state.tree_state.current_canonical_head.number;
let ctx = TreeCtx::new(&mut self.state, &self.canonical_in_memory_state);

View File

@@ -3,11 +3,7 @@
use crate::tree::payload_processor::bal::bal_to_hashed_post_state;
use alloy_eip7928::BlockAccessList;
use alloy_evm::block::StateChangeSource;
use alloy_primitives::{
keccak256,
map::{B256Set, HashSet},
B256,
};
use alloy_primitives::{keccak256, map::HashSet, B256};
use crossbeam_channel::{unbounded, Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
use dashmap::DashMap;
use derive_more::derive::Deref;
@@ -23,7 +19,6 @@ use reth_trie_parallel::{
proof::ParallelProof,
proof_task::{
AccountMultiproofInput, ProofResultContext, ProofResultMessage, ProofWorkerHandle,
StorageProofInput,
},
};
use std::{collections::BTreeMap, mem, ops::DerefMut, sync::Arc, time::Instant};
@@ -236,74 +231,6 @@ pub(crate) fn evm_state_to_hashed_post_state(update: EvmState) -> HashedPostStat
hashed_state
}
/// A pending multiproof task, either [`StorageMultiproofInput`] or [`MultiproofInput`].
#[derive(Debug)]
enum PendingMultiproofTask {
/// A storage multiproof task input.
Storage(StorageMultiproofInput),
/// A regular multiproof task input.
Regular(MultiproofInput),
}
impl PendingMultiproofTask {
/// Returns the proof sequence number of the task.
const fn proof_sequence_number(&self) -> u64 {
match self {
Self::Storage(input) => input.proof_sequence_number,
Self::Regular(input) => input.proof_sequence_number,
}
}
/// Returns whether or not the proof targets are empty.
fn proof_targets_is_empty(&self) -> bool {
match self {
Self::Storage(input) => input.proof_targets.is_empty(),
Self::Regular(input) => input.proof_targets.is_empty(),
}
}
/// Destroys the input and sends a [`MultiProofMessage::EmptyProof`] message to the sender.
fn send_empty_proof(self) {
match self {
Self::Storage(input) => input.send_empty_proof(),
Self::Regular(input) => input.send_empty_proof(),
}
}
}
impl From<StorageMultiproofInput> for PendingMultiproofTask {
fn from(input: StorageMultiproofInput) -> Self {
Self::Storage(input)
}
}
impl From<MultiproofInput> for PendingMultiproofTask {
fn from(input: MultiproofInput) -> Self {
Self::Regular(input)
}
}
/// Input parameters for dispatching a dedicated storage multiproof calculation.
#[derive(Debug)]
struct StorageMultiproofInput {
hashed_state_update: HashedPostState,
hashed_address: B256,
proof_targets: B256Set,
proof_sequence_number: u64,
state_root_message_sender: CrossbeamSender<MultiProofMessage>,
multi_added_removed_keys: Arc<MultiAddedRemovedKeys>,
}
impl StorageMultiproofInput {
/// Destroys the input and sends a [`MultiProofMessage::EmptyProof`] message to the sender.
fn send_empty_proof(self) {
let _ = self.state_root_message_sender.send(MultiProofMessage::EmptyProof {
sequence_number: self.proof_sequence_number,
state: self.hashed_state_update,
});
}
}
/// Input parameters for dispatching a multiproof calculation.
#[derive(Debug)]
struct MultiproofInput {
@@ -378,91 +305,18 @@ impl MultiproofManager {
}
/// Dispatches a new multiproof calculation to worker pools.
fn dispatch(&self, input: PendingMultiproofTask) {
fn dispatch(&self, input: MultiproofInput) {
// If there are no proof targets, we can just send an empty multiproof back immediately
if input.proof_targets_is_empty() {
if input.proof_targets.is_empty() {
trace!(
sequence_number = input.proof_sequence_number(),
sequence_number = input.proof_sequence_number,
"No proof targets, sending empty multiproof back immediately"
);
input.send_empty_proof();
return;
}
match input {
PendingMultiproofTask::Storage(storage_input) => {
self.dispatch_storage_proof(storage_input);
}
PendingMultiproofTask::Regular(multiproof_input) => {
self.dispatch_multiproof(multiproof_input);
}
}
}
/// Dispatches a single storage proof calculation to worker pool.
fn dispatch_storage_proof(&self, storage_multiproof_input: StorageMultiproofInput) {
let StorageMultiproofInput {
hashed_state_update,
hashed_address,
proof_targets,
proof_sequence_number,
multi_added_removed_keys,
state_root_message_sender: _,
} = storage_multiproof_input;
let storage_targets = proof_targets.len();
trace!(
target: "engine::tree::payload_processor::multiproof",
proof_sequence_number,
?proof_targets,
storage_targets,
"Dispatching storage proof to workers"
);
let start = Instant::now();
// Create prefix set from targets
let prefix_set = reth_trie::prefix_set::PrefixSetMut::from(
proof_targets.iter().map(reth_trie::Nibbles::unpack),
);
let prefix_set = prefix_set.freeze();
// Build computation input (data only)
let input = StorageProofInput::new(
hashed_address,
prefix_set,
proof_targets,
true, // with_branch_node_masks
Some(multi_added_removed_keys),
);
// Dispatch to storage worker
if let Err(e) = self.proof_worker_handle.dispatch_storage_proof(
input,
ProofResultContext::new(
self.proof_result_tx.clone(),
proof_sequence_number,
hashed_state_update,
start,
),
) {
error!(target: "engine::tree::payload_processor::multiproof", ?e, "Failed to dispatch storage proof");
return;
}
self.metrics
.active_storage_workers_histogram
.record(self.proof_worker_handle.active_storage_workers() as f64);
self.metrics
.active_account_workers_histogram
.record(self.proof_worker_handle.active_account_workers() as f64);
self.metrics
.pending_storage_multiproofs_histogram
.record(self.proof_worker_handle.pending_storage_tasks() as f64);
self.metrics
.pending_account_multiproofs_histogram
.record(self.proof_worker_handle.pending_account_tasks() as f64);
self.dispatch_multiproof(input);
}
/// Signals that a multiproof calculation has finished.
@@ -809,17 +663,14 @@ impl MultiProofTask {
available_storage_workers,
MultiProofTargets::chunks,
|proof_targets| {
self.multiproof_manager.dispatch(
MultiproofInput {
source: None,
hashed_state_update: Default::default(),
proof_targets,
proof_sequence_number: self.proof_sequencer.next_sequence(),
state_root_message_sender: self.tx.clone(),
multi_added_removed_keys: Some(multi_added_removed_keys.clone()),
}
.into(),
);
self.multiproof_manager.dispatch(MultiproofInput {
source: None,
hashed_state_update: Default::default(),
proof_targets,
proof_sequence_number: self.proof_sequencer.next_sequence(),
state_root_message_sender: self.tx.clone(),
multi_added_removed_keys: Some(multi_added_removed_keys.clone()),
});
},
);
self.metrics.prefetch_proof_chunks_histogram.record(num_chunks as f64);
@@ -967,17 +818,14 @@ impl MultiProofTask {
);
spawned_proof_targets.extend_ref(&proof_targets);
self.multiproof_manager.dispatch(
MultiproofInput {
source: Some(source),
hashed_state_update,
proof_targets,
proof_sequence_number: self.proof_sequencer.next_sequence(),
state_root_message_sender: self.tx.clone(),
multi_added_removed_keys: Some(multi_added_removed_keys.clone()),
}
.into(),
);
self.multiproof_manager.dispatch(MultiproofInput {
source: Some(source),
hashed_state_update,
proof_targets,
proof_sequence_number: self.proof_sequencer.next_sequence(),
state_root_message_sender: self.tx.clone(),
multi_added_removed_keys: Some(multi_added_removed_keys.clone()),
});
},
);
self.metrics

View File

@@ -166,8 +166,7 @@ where
// Update storage slots with new values and calculate storage roots.
let span = tracing::Span::current();
let (tx, rx) = mpsc::channel();
state
let results: Vec<_> = state
.storages
.into_iter()
.map(|(address, storage)| (address, storage, trie.take_storage_trie(&address)))
@@ -217,13 +216,7 @@ where
SparseStateTrieResult::Ok((address, storage_trie))
})
.for_each_init(
|| tx.clone(),
|tx, result| {
let _ = tx.send(result);
},
);
drop(tx);
.collect();
// Defer leaf removals until after updates/additions, so that we don't delete an intermediate
// branch node during a removal and then re-add that branch back during a later leaf addition.
@@ -235,7 +228,7 @@ where
let _enter =
tracing::debug_span!(target: "engine::tree::payload_processor::sparse_trie", "account trie")
.entered();
for result in rx {
for result in results {
let (address, storage_trie) = result?;
trie.insert_storage_trie(address, storage_trie);

View File

@@ -435,8 +435,7 @@ where
}
// Execute the block and handle any execution errors
let (output, senders) = match self.execute_block(&state_provider, env, &input, &mut handle)
{
let (output, senders) = match self.execute_block(state_provider, env, &input, &mut handle) {
Ok(output) => output,
Err(err) => return self.handle_execution_error(input, err, &parent_block),
};
@@ -603,7 +602,7 @@ where
handle: &mut PayloadHandle<impl ExecutableTxFor<Evm>, Err, N::Receipt>,
) -> Result<(BlockExecutionOutput<N::Receipt>, Vec<Address>), InsertBlockErrorKind>
where
S: StateProvider,
S: StateProvider + Send,
Err: core::error::Error + Send + Sync + 'static,
V: PayloadValidator<T, Block = N::Block>,
T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>,

View File

@@ -116,7 +116,7 @@ where
/// these stages that this work has already been done. Otherwise, there might be some conflict with
/// database integrity.
pub fn save_stage_checkpoints<P>(
provider: &P,
provider: P,
from: BlockNumber,
to: BlockNumber,
processed: u64,

View File

@@ -91,6 +91,7 @@ asm-keccak = [
]
keccak-cache-global = [
"alloy-primitives/keccak-cache-global",
"reth-node-core/keccak-cache-global",
]
js-tracer = [
"reth-node-builder/js-tracer",

View File

@@ -153,9 +153,9 @@ where
let PayloadConfig { parent_header, attributes } = config;
let state_provider = client.state_by_block_hash(parent_header.hash())?;
let state = StateProviderDatabase::new(&state_provider);
let state = StateProviderDatabase::new(state_provider.as_ref());
let mut db =
State::builder().with_database(cached_reads.as_db_mut(state)).with_bundle_update().build();
State::builder().with_database_ref(cached_reads.as_db(state)).with_bundle_update().build();
let mut builder = evm_config
.builder_for_next_block(
@@ -358,7 +358,8 @@ where
return Ok(BuildOutcome::Aborted { fees: total_fees, cached_reads })
}
let BlockBuilderOutcome { execution_result, block, .. } = builder.finish(&state_provider)?;
let BlockBuilderOutcome { execution_result, block, .. } =
builder.finish(state_provider.as_ref())?;
let requests = chain_spec
.is_prague_active_at_timestamp(attributes.timestamp)

View File

@@ -81,6 +81,7 @@ arbitrary = [
]
keccak-cache-global = [
"reth-node-ethereum?/keccak-cache-global",
"reth-node-core?/keccak-cache-global",
]
test-utils = [
"reth-chainspec/test-utils",

View File

@@ -13,7 +13,7 @@ use reth_evm_ethereum::EthEvmConfig;
use reth_node_api::NodePrimitives;
use reth_primitives_traits::{Block as _, RecoveredBlock};
use reth_provider::{
providers::ProviderNodeTypes, BlockWriter as _, ExecutionOutcome, LatestStateProviderRef,
providers::ProviderNodeTypes, BlockWriter as _, ExecutionOutcome, LatestStateProvider,
ProviderFactory,
};
use reth_revm::database::StateProviderDatabase;
@@ -69,7 +69,7 @@ where
// Execute the block to produce a block execution output
let mut block_execution_output = EthEvmConfig::ethereum(chain_spec)
.batch_executor(StateProviderDatabase::new(LatestStateProviderRef::new(&provider)))
.batch_executor(StateProviderDatabase::new(LatestStateProvider::new(provider)))
.execute(block)?;
block_execution_output.state.reverts.sort();
@@ -203,8 +203,8 @@ where
let provider = provider_factory.provider()?;
let evm_config = EthEvmConfig::new(chain_spec);
let executor = evm_config
.batch_executor(StateProviderDatabase::new(LatestStateProviderRef::new(&provider)));
let executor =
evm_config.batch_executor(StateProviderDatabase::new(LatestStateProvider::new(provider)));
let mut execution_outcome = executor.execute_batch(vec![&block1, &block2])?;
execution_outcome.state_mut().reverts.sort();

View File

@@ -13,9 +13,7 @@ pub type BodyDownloaderResult<B> = DownloadResult<Vec<BlockResponse<B>>>;
/// A downloader represents a distinct strategy for submitting requests to download block bodies,
/// while a [`BodiesClient`][crate::bodies::client::BodiesClient] represents a client capable of
/// fulfilling these requests.
pub trait BodyDownloader:
Send + Sync + Stream<Item = BodyDownloaderResult<Self::Block>> + Unpin
{
pub trait BodyDownloader: Send + Stream<Item = BodyDownloaderResult<Self::Block>> + Unpin {
/// The Block type this downloader supports
type Block: Block + 'static;

View File

@@ -938,9 +938,13 @@ where
///
/// A target block hash if the pipeline is inconsistent, otherwise `None`.
pub fn check_pipeline_consistency(&self) -> ProviderResult<Option<B256>> {
// We skip the era stage if it's not enabled
let era_enabled = self.era_import_source().is_some();
let mut all_stages =
StageId::ALL.into_iter().filter(|id| era_enabled || id != &StageId::Era);
// Get the expected first stage based on config.
let first_stage =
if self.era_import_source().is_some() { StageId::Era } else { StageId::Headers };
let first_stage = all_stages.next().expect("there must be at least one stage");
// If no target was provided, check if the stages are congruent - check if the
// checkpoint of the last stage matches the checkpoint of the first.
@@ -950,20 +954,28 @@ where
.unwrap_or_default()
.block_number;
// Skip the first stage as we've already retrieved it and comparing all other checkpoints
// against it.
for stage_id in StageId::ALL.iter().skip(1) {
// Compare all other stages against the first
for stage_id in all_stages {
let stage_checkpoint = self
.blockchain_db()
.get_stage_checkpoint(*stage_id)?
.get_stage_checkpoint(stage_id)?
.unwrap_or_default()
.block_number;
// If the checkpoint of any stage is less than the checkpoint of the first stage,
// retrieve and return the block hash of the latest header and use it as the target.
debug!(
target: "consensus::engine",
first_stage_id = %first_stage,
first_stage_checkpoint,
stage_id = %stage_id,
stage_checkpoint = stage_checkpoint,
"Checking stage against first stage",
);
if stage_checkpoint < first_stage_checkpoint {
debug!(
target: "consensus::engine",
first_stage_id = %first_stage,
first_stage_checkpoint,
inconsistent_stage_id = %stage_id,
inconsistent_stage_checkpoint = stage_checkpoint,

View File

@@ -80,7 +80,7 @@ tokio.workspace = true
# Features for vergen to generate correct env vars
jemalloc = ["reth-cli-util/jemalloc"]
asm-keccak = ["alloy-primitives/asm-keccak"]
# Feature to enable opentelemetry export
keccak-cache-global = ["alloy-primitives/keccak-cache-global"]
otlp = ["reth-tracing/otlp"]
min-error-logs = ["tracing/release_max_level_error"]

View File

@@ -96,6 +96,7 @@ asm-keccak = [
]
keccak-cache-global = [
"alloy-primitives/keccak-cache-global",
"reth-node-core/keccak-cache-global",
"reth-optimism-node/keccak-cache-global",
]
js-tracer = [

View File

@@ -76,6 +76,7 @@ arbitrary = [
]
keccak-cache-global = [
"reth-optimism-node?/keccak-cache-global",
"reth-node-core?/keccak-cache-global",
]
test-utils = [
"reth-chainspec/test-utils",

View File

@@ -88,7 +88,8 @@ impl<Client, Tx> OpTransactionValidator<Client, Tx> {
impl<Client, Tx> OpTransactionValidator<Client, Tx>
where
Client: ChainSpecProvider<ChainSpec: OpHardforks> + StateProviderFactory + BlockReaderIdExt,
Client:
ChainSpecProvider<ChainSpec: OpHardforks> + StateProviderFactory + BlockReaderIdExt + Sync,
Tx: EthPoolTransaction + OpPooledTx,
{
/// Create a new [`OpTransactionValidator`].
@@ -177,7 +178,7 @@ where
&self,
origin: TransactionOrigin,
transaction: Tx,
state: &mut Option<Box<dyn AccountInfoReader>>,
state: &mut Option<Box<dyn AccountInfoReader + Send>>,
) -> TransactionValidationOutcome<Tx> {
if transaction.is_eip4844() {
return TransactionValidationOutcome::Invalid(
@@ -289,7 +290,8 @@ where
impl<Client, Tx> TransactionValidator for OpTransactionValidator<Client, Tx>
where
Client: ChainSpecProvider<ChainSpec: OpHardforks> + StateProviderFactory + BlockReaderIdExt,
Client:
ChainSpecProvider<ChainSpec: OpHardforks> + StateProviderFactory + BlockReaderIdExt + Sync,
Tx: EthPoolTransaction + OpPooledTx,
{
type Transaction = Tx;

View File

@@ -9,7 +9,7 @@ use revm::{bytecode::Bytecode, state::AccountInfo, Database, DatabaseRef};
/// A helper trait responsible for providing state necessary for EVM execution.
///
/// This serves as the data layer for [`Database`].
pub trait EvmStateProvider: Send + Sync {
pub trait EvmStateProvider {
/// Get basic account information.
///
/// Returns [`None`] if the account doesn't exist.

View File

@@ -33,7 +33,7 @@ use reth_rpc_eth_types::{
simulate::{self, EthSimulateError},
EthApiError, StateCacheDb,
};
use reth_storage_api::{BlockIdReader, ProviderTx, StateProvider};
use reth_storage_api::{BlockIdReader, ProviderTx, StateProviderBox};
use revm::{
context::Block,
context_interface::{result::ResultAndState, Transaction},
@@ -491,11 +491,11 @@ pub trait Call:
) -> impl Future<Output = Result<R, Self::Error>> + Send
where
R: Send + 'static,
F: FnOnce(Self, &dyn StateProvider) -> Result<R, Self::Error> + Send + 'static,
F: FnOnce(Self, StateProviderBox) -> Result<R, Self::Error> + Send + 'static,
{
self.spawn_blocking_io_fut(move |this| async move {
let state = this.state_at_block_id(at).await?;
f(this, &state)
f(this, state)
})
}

View File

@@ -235,7 +235,7 @@ pub trait LoadPendingBlock:
.provider()
.history_by_block_hash(parent.hash())
.map_err(Self::Error::from_eth_err)?;
let state = StateProviderDatabase::new(&state_provider);
let state = StateProviderDatabase::new(state_provider);
let mut db = State::builder().with_database(state).with_bundle_update().build();
let mut builder = self

View File

@@ -238,7 +238,7 @@ pub struct UnwindOutput {
///
/// Stages receive [`DBProvider`](reth_provider::DBProvider).
#[auto_impl::auto_impl(Box)]
pub trait Stage<Provider>: Send + Sync {
pub trait Stage<Provider>: Send {
/// Get the ID of the stage.
///
/// Stage IDs must be unique.

View File

@@ -195,7 +195,7 @@ where
}
era::save_stage_checkpoints(
&provider,
provider,
input.checkpoint().block_number,
height,
height,

View File

@@ -5,7 +5,7 @@ use reth_consensus::ConsensusError;
use reth_primitives_traits::{GotExpected, SealedHeader};
use reth_provider::{
ChainStateBlockReader, DBProvider, HeaderProvider, ProviderError, PruneCheckpointReader,
PruneCheckpointWriter, StageCheckpointReader, TrieWriter,
PruneCheckpointWriter, StageCheckpointReader, StageCheckpointWriter, TrieWriter,
};
use reth_prune_types::{
PruneCheckpoint, PruneMode, PruneSegment, MERKLE_CHANGESETS_RETENTION_BLOCKS,
@@ -300,6 +300,7 @@ where
+ DBProvider
+ HeaderProvider
+ ChainStateBlockReader
+ StageCheckpointWriter
+ PruneCheckpointReader
+ PruneCheckpointWriter,
{
@@ -404,6 +405,28 @@ where
computed_range.start = computed_range.end;
}
// If we've unwound so far that there are no longer enough trie changesets available then
// simply clear them and the checkpoints, so that on next pipeline startup they will be
// regenerated.
debug!(
target: "sync::stages::merkle_changesets",
?computed_range,
retention_blocks=?self.retention_blocks,
"Checking if computed range is over retention threshold",
);
if computed_range.end - computed_range.start < self.retention_blocks {
debug!(
target: "sync::stages::merkle_changesets",
?computed_range,
retention_blocks=?self.retention_blocks,
"Clearing checkpoints completely",
);
provider.clear_trie_changesets()?;
provider
.save_stage_checkpoint(StageId::MerkleChangeSets, StageCheckpoint::default())?;
return Ok(UnwindOutput { checkpoint: StageCheckpoint::default() })
}
// `computed_range.end` is exclusive
let checkpoint = StageCheckpoint::new(computed_range.end.saturating_sub(1));

View File

@@ -1,4 +1,5 @@
use std::{
cell::Cell,
fmt,
ops::{Bound, RangeBounds},
};
@@ -369,3 +370,50 @@ impl<T: DupSort, CURSOR: DbDupCursorRO<T>> Iterator for DupWalker<'_, T, CURSOR>
self.cursor.next_dup().transpose()
}
}
/// A wrapper around a cursor that returns it to a cell on drop.
///
/// This allows cursors to be reused across multiple operations,
/// reducing the overhead of repeatedly creating new cursors.
pub struct ReusableCursor<'cell, T: Table, C: DbCursorRO<T>> {
cursor: Option<C>,
cell: &'cell Cell<Option<C>>,
_phantom: std::marker::PhantomData<T>,
}
impl<T, C> fmt::Debug for ReusableCursor<'_, T, C>
where
T: Table,
C: DbCursorRO<T> + fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ReusableCursor").field("cursor", &self.cursor).finish()
}
}
impl<'cell, T: Table, C: DbCursorRO<T>> ReusableCursor<'cell, T, C> {
/// Creates a new `ReusableCursor` from a cursor and a cell to return it to.
pub const fn new(cursor: C, cell: &'cell Cell<Option<C>>) -> Self {
Self { cursor: Some(cursor), cell, _phantom: std::marker::PhantomData }
}
}
impl<T: Table, C: DbCursorRO<T>> Drop for ReusableCursor<'_, T, C> {
fn drop(&mut self) {
self.cell.set(self.cursor.take());
}
}
impl<T: Table, C: DbCursorRO<T>> std::ops::Deref for ReusableCursor<'_, T, C> {
type Target = C;
fn deref(&self) -> &Self::Target {
self.cursor.as_ref().expect("cursor always exists")
}
}
impl<T: Table, C: DbCursorRO<T>> std::ops::DerefMut for ReusableCursor<'_, T, C> {
fn deref_mut(&mut self) -> &mut Self::Target {
self.cursor.as_mut().expect("cursor always exists")
}
}

View File

@@ -12,7 +12,7 @@ pub type CursorTy<TX, T> = <TX as DbTx>::Cursor<T>;
pub type CursorMutTy<TX, T> = <TX as DbTxMut>::CursorMut<T>;
/// Read only transaction
pub trait DbTx: Debug + Send + Sync {
pub trait DbTx: Debug + Send {
/// Cursor type for this read-only transaction
type Cursor<T: Table>: DbCursorRO<T> + Send + Sync;
/// `DupCursor` type for this read-only transaction
@@ -43,7 +43,7 @@ pub trait DbTx: Debug + Send + Sync {
}
/// Read write transaction that allows writing to database
pub trait DbTxMut: Send + Sync {
pub trait DbTxMut: Send {
/// Read-Write Cursor type
type CursorMut<T: Table>: DbCursorRW<T> + DbCursorRO<T> + Send + Sync;
/// Read-Write `DupCursor` type

View File

@@ -30,7 +30,7 @@ pub trait DbTxUnwindExt: DbTxMut {
let mut deleted = 0;
while let Some(Ok((entry_key, _))) = reverse_walker.next() {
if selector(entry_key.clone()) <= key {
if selector(entry_key) <= key {
break
}
reverse_walker.delete_current()?;

View File

@@ -12936,125 +12936,6 @@ int mdbx_txn_renew(MDBX_txn *txn) {
return LOG_IFERR(rc);
}
int mdbx_txn_clone(const MDBX_txn *src, MDBX_txn **dest) {
if (unlikely(!dest))
return LOG_IFERR(MDBX_EINVAL);
*dest = nullptr;
int rc = check_txn(src, MDBX_TXN_BLOCKED);
if (unlikely(rc != MDBX_SUCCESS))
return LOG_IFERR(rc);
if (unlikely((src->flags & MDBX_TXN_RDONLY) == 0))
return LOG_IFERR(MDBX_EINVAL);
MDBX_env *const env = src->env;
rc = check_env(env, true);
if (unlikely(rc != MDBX_SUCCESS))
return LOG_IFERR(rc);
if (unlikely((env->flags & MDBX_NOSTICKYTHREADS) == 0))
return LOG_IFERR(MDBX_TXN_OVERLAPPING);
MDBX_txn *txn = nullptr;
const intptr_t bitmap_bytes =
#if MDBX_ENABLE_DBI_SPARSE
ceil_powerof2(env->max_dbi, CHAR_BIT * sizeof(txn->dbi_sparse[0])) / CHAR_BIT;
#else
0;
#endif /* MDBX_ENABLE_DBI_SPARSE */
STATIC_ASSERT(sizeof(txn->tw) > sizeof(txn->to));
const size_t base = sizeof(MDBX_txn) - sizeof(txn->tw) + sizeof(txn->to);
const size_t size = base + (size_t)bitmap_bytes + env->max_dbi * sizeof(txn->dbi_seqs[0]) +
env->max_dbi * (sizeof(txn->dbs[0]) + sizeof(txn->cursors[0]) + sizeof(txn->dbi_state[0]));
txn = osal_malloc(size);
if (unlikely(txn == nullptr))
return LOG_IFERR(MDBX_ENOMEM);
#if MDBX_DEBUG
memset(txn, 0xCD, size);
VALGRIND_MAKE_MEM_UNDEFINED(txn, size);
#endif /* MDBX_DEBUG */
MDBX_ANALYSIS_ASSUME(size > base);
memset(txn, 0, (MDBX_GOOFY_MSVC_STATIC_ANALYZER && base > size) ? size : base);
txn->dbs = ptr_disp(txn, base);
txn->cursors = ptr_disp(txn->dbs, env->max_dbi * sizeof(txn->dbs[0]));
#if MDBX_DEBUG
txn->cursors[FREE_DBI] = nullptr; /* avoid SIGSEGV in an assertion later */
#endif
txn->dbi_state = ptr_disp(txn, size - env->max_dbi * sizeof(txn->dbi_state[0]));
txn->dbi_seqs = ptr_disp(txn->cursors, env->max_dbi * sizeof(txn->cursors[0]));
#if MDBX_ENABLE_DBI_SPARSE
txn->dbi_sparse = ptr_disp(txn->dbi_state, -bitmap_bytes);
#endif /* MDBX_ENABLE_DBI_SPARSE */
txn->env = env;
txn->flags = src->flags & ~txn_state_flags;
txn->parent = nullptr;
txn->nested = nullptr;
txn->txnid = src->txnid;
txn->front_txnid = src->front_txnid;
txn->geo = src->geo;
txn->canary = src->canary;
txn->owner = (env->flags & MDBX_NOSTICKYTHREADS) ? 0 : osal_thread_self();
if (unlikely(src->n_dbi > env->max_dbi)) {
rc = MDBX_CORRUPTED;
goto bailout;
}
txn->n_dbi = src->n_dbi;
memset(txn->cursors, 0, env->max_dbi * sizeof(txn->cursors[0]));
memset(txn->dbi_state, 0, env->max_dbi * sizeof(txn->dbi_state[0]));
memset(txn->dbi_seqs, 0, env->max_dbi * sizeof(txn->dbi_seqs[0]));
#if MDBX_ENABLE_DBI_SPARSE
if (bitmap_bytes)
memset(txn->dbi_sparse, 0, bitmap_bytes);
#endif /* MDBX_ENABLE_DBI_SPARSE */
memcpy(txn->dbs, src->dbs, txn->n_dbi * sizeof(txn->dbs[0]));
memcpy(txn->dbi_state, src->dbi_state, txn->n_dbi * sizeof(txn->dbi_state[0]));
memcpy(txn->dbi_seqs, src->dbi_seqs, txn->n_dbi * sizeof(txn->dbi_seqs[0]));
#if MDBX_ENABLE_DBI_SPARSE
if (bitmap_bytes)
memcpy(txn->dbi_sparse, src->dbi_sparse, bitmap_bytes);
#endif /* MDBX_ENABLE_DBI_SPARSE */
txn->to.reader = nullptr;
if (env->lck_mmap.lck) {
bsr_t brs = mvcc_bind_slot(env);
if (unlikely(brs.err != MDBX_SUCCESS)) {
rc = brs.err;
goto bailout;
}
txn->to.reader = brs.rslot;
safe64_reset(&txn->to.reader->txnid, true);
if (src->to.reader) {
atomic_store32(&txn->to.reader->snapshot_pages_used,
atomic_load32(&src->to.reader->snapshot_pages_used, mo_Relaxed), mo_Relaxed);
atomic_store64(&txn->to.reader->snapshot_pages_retired,
atomic_load64(&src->to.reader->snapshot_pages_retired, mo_Relaxed), mo_Relaxed);
} else {
atomic_store32(&txn->to.reader->snapshot_pages_used, src->geo.first_unallocated, mo_Relaxed);
atomic_store64(&txn->to.reader->snapshot_pages_retired, 0, mo_Relaxed);
}
safe64_write(&txn->to.reader->txnid, src->txnid);
atomic_store32(&env->lck->rdt_refresh_flag, true, mo_AcquireRelease);
}
txn->signature = txn_signature;
txn->userctx = nullptr;
*dest = txn;
DEBUG("clone txn %" PRIaTXN "r %p from %p on env %p", txn->txnid, (void *)txn, (void *)src, (void *)env);
return MDBX_SUCCESS;
bailout:
osal_free(txn);
return LOG_IFERR(rc);
}
int mdbx_txn_set_userctx(MDBX_txn *txn, void *ctx) {
int rc = check_txn(txn, MDBX_TXN_FINISHED);
if (unlikely(rc != MDBX_SUCCESS))

View File

@@ -3882,35 +3882,6 @@ MDBX_NOTHROW_PURE_FUNCTION LIBMDBX_API void *mdbx_env_get_userctx(const MDBX_env
LIBMDBX_API int mdbx_txn_begin_ex(MDBX_env *env, MDBX_txn *parent, MDBX_txn_flags_t flags, MDBX_txn **txn,
void *context);
/** \brief Clone a read-only transaction snapshot.
* \ingroup c_transactions
*
* Creates a new read-only transaction that uses the same MVCC snapshot as
* the \p src transaction. This allows parallel read operations across threads
* without re-opening a read transaction and re-fetching state.
*
* \note This function requires \ref MDBX_NOSTICKYTHREADS (aka MDBX_NOTLS)
* to be enabled for the environment. Otherwise it will return
* \ref MDBX_TXN_OVERLAPPING.
*
* \note The \p src transaction must be an active read-only transaction.
*
* \note The \p src transaction and the cloned transaction must not be used
* concurrently from multiple threads. Each transaction and its cursors must
* be confined to a single thread at a time.
*
* \param [in] src A read-only transaction handle returned by
* \ref mdbx_txn_begin_ex() or \ref mdbx_txn_begin().
* \param [out] dest Address where the cloned \ref MDBX_txn handle will be
* stored. Must not be NULL.
*
* \returns A non-zero error value on failure and 0 on success.
* \retval MDBX_EINVAL Invalid arguments or \p src is not read-only.
* \retval MDBX_TXN_OVERLAPPING \ref MDBX_NOSTICKYTHREADS is not enabled.
* \retval MDBX_READERS_FULL Reader lock table is full.
* \retval MDBX_ENOMEM Out of memory. */
LIBMDBX_API int mdbx_txn_clone(const MDBX_txn *src, MDBX_txn **dest);
/** \brief Create a transaction for use with the environment.
* \ingroup c_transactions
*

View File

@@ -483,20 +483,6 @@ impl Transaction<RW> {
}
impl Transaction<RO> {
/// Clones this read-only transaction, preserving the same MVCC snapshot.
///
/// This requires the environment to be opened with `MDBX_NOSTICKYTHREADS` (aka `MDBX_NOTLS`).
/// The cloned transaction must not be used concurrently with this transaction from multiple
/// threads.
pub fn clone_snapshot(&self) -> Result<Self> {
let cloned = self.txn_execute(|txn| {
let mut cloned: *mut ffi::MDBX_txn = ptr::null_mut();
mdbx_result(unsafe { ffi::mdbx_txn_clone(txn, &mut cloned) }).map(|_| cloned)
})??;
Ok(Self::new_from_ptr(self.env().clone(), cloned))
}
/// Closes the database handle.
///
/// # Safety

View File

@@ -373,35 +373,3 @@ fn test_stat_dupsort() {
assert_eq!(stat.entries(), 8);
}
}
#[test]
fn test_txn_clone_snapshot() {
let dir = tempdir().unwrap();
let env = Environment::builder().open(dir.path()).unwrap();
{
let txn = env.begin_rw_txn().unwrap();
let db = txn.open_db(None).unwrap();
txn.put(db.dbi(), b"k", b"v1", WriteFlags::empty()).unwrap();
txn.commit().unwrap();
}
let ro = env.begin_ro_txn().unwrap();
let clone = ro.clone_snapshot().unwrap();
{
let txn = env.begin_rw_txn().unwrap();
let db = txn.open_db(None).unwrap();
txn.put(db.dbi(), b"k", b"v2", WriteFlags::empty()).unwrap();
txn.commit().unwrap();
}
let db = ro.open_db(None).unwrap();
assert_eq!(ro.get::<[u8; 2]>(db.dbi(), b"k").unwrap(), Some(*b"v1"));
let db = clone.open_db(None).unwrap();
assert_eq!(clone.get::<[u8; 2]>(db.dbi(), b"k").unwrap(), Some(*b"v1"));
let ro2 = env.begin_ro_txn().unwrap();
let db = ro2.open_db(None).unwrap();
assert_eq!(ro2.get::<[u8; 2]>(db.dbi(), b"k").unwrap(), Some(*b"v2"));
}

View File

@@ -26,7 +26,7 @@ use reth_stages_types::{StageCheckpoint, StageId};
use reth_static_file_types::StaticFileSegment;
use reth_storage_api::{
BlockBodyIndicesProvider, DatabaseProviderFactory, NodePrimitivesProvider, StateProvider,
StorageChangeSetReader, TryIntoHistoricalStateProvider,
StateProviderBox, StorageChangeSetReader, TryIntoHistoricalStateProvider,
};
use reth_storage_errors::provider::ProviderResult;
use reth_trie::updates::TrieUpdatesSorted;
@@ -596,9 +596,9 @@ impl<N: ProviderNodeTypes> ConsistentProvider<N> {
pub(crate) fn into_state_provider_at_block_hash(
self,
block_hash: BlockHash,
) -> ProviderResult<Box<dyn StateProvider>> {
) -> ProviderResult<StateProviderBox> {
let Self { storage_provider, head_block, .. } = self;
let into_history_at_block_hash = |block_hash| -> ProviderResult<Box<dyn StateProvider>> {
let into_history_at_block_hash = |block_hash| -> ProviderResult<StateProviderBox> {
let block_number = storage_provider
.block_number(block_hash)?
.ok_or(ProviderError::BlockHashNotFound(block_hash))?;

View File

@@ -284,7 +284,7 @@ impl<TX, N: NodeTypes> RocksDBProviderFactory for DatabaseProvider<TX, N> {
}
}
impl<TX: Debug + Send + Sync, N: NodeTypes<ChainSpec: EthChainSpec + 'static>> ChainSpecProvider
impl<TX: Debug + Send, N: NodeTypes<ChainSpec: EthChainSpec + 'static>> ChainSpecProvider
for DatabaseProvider<TX, N>
{
type ChainSpec = N::ChainSpec;
@@ -3253,7 +3253,7 @@ impl<TX: DbTxMut, N: NodeTypes> MetadataWriter for DatabaseProvider<TX, N> {
}
}
impl<TX: Send + Sync, N: NodeTypes> StorageSettingsCache for DatabaseProvider<TX, N> {
impl<TX: Send, N: NodeTypes> StorageSettingsCache for DatabaseProvider<TX, N> {
fn cached_storage_settings(&self) -> StorageSettings {
*self.storage_settings.read()
}

View File

@@ -0,0 +1,110 @@
use reth_db_api::{
cursor::{DbCursorRO, DbDupCursorRO, ReusableCursor},
tables::{AccountsHistory, PlainStorageState, StorageChangeSets, StoragesHistory},
transaction::DbTx,
DatabaseError,
};
use std::cell::Cell;
/// Container for reusable database cursors.
///
/// Holds optional cached cursors for frequently accessed tables. When a cursor is requested,
/// it returns a cached cursor if available, otherwise creates a new one. The cursor is
/// automatically returned to the cache when dropped via the `ReusableCursor` wrapper.
///
/// This reduces cursor allocation overhead for state providers that perform many sequential
/// database operations.
pub(crate) struct ReusableStateCursors<TX: DbTx> {
storage_changesets: Cell<Option<TX::DupCursor<StorageChangeSets>>>,
plain_storage_state: Cell<Option<TX::DupCursor<PlainStorageState>>>,
accounts_history: Cell<Option<TX::Cursor<AccountsHistory>>>,
storages_history: Cell<Option<TX::Cursor<StoragesHistory>>>,
}
impl<TX: DbTx> std::fmt::Debug for ReusableStateCursors<TX> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ReusableStateCursors").finish_non_exhaustive()
}
}
impl<TX: DbTx> ReusableStateCursors<TX> {
/// Creates a new `ReusableStateCursors` with empty cursor cells.
pub(crate) const fn new() -> Self {
Self {
storage_changesets: Cell::new(None),
plain_storage_state: Cell::new(None),
accounts_history: Cell::new(None),
storages_history: Cell::new(None),
}
}
/// Gets a reusable cursor for the `StorageChangeSets` table.
///
/// If a cursor is cached, it will be reused. Otherwise, a new cursor is created.
pub(crate) fn storage_changesets(
&self,
tx: &TX,
) -> Result<
ReusableCursor<'_, StorageChangeSets, TX::DupCursor<StorageChangeSets>>,
DatabaseError,
>
where
TX::DupCursor<StorageChangeSets>: DbDupCursorRO<StorageChangeSets>,
{
let cursor =
self.storage_changesets.take().map(Ok).unwrap_or_else(|| tx.cursor_dup_read())?;
Ok(ReusableCursor::new(cursor, &self.storage_changesets))
}
/// Gets a reusable cursor for the `PlainStorageState` table.
///
/// If a cursor is cached, it will be reused. Otherwise, a new cursor is created.
pub(crate) fn plain_storage_state(
&self,
tx: &TX,
) -> Result<
ReusableCursor<'_, PlainStorageState, TX::DupCursor<PlainStorageState>>,
DatabaseError,
>
where
TX::DupCursor<PlainStorageState>: DbDupCursorRO<PlainStorageState>,
{
let cursor =
self.plain_storage_state.take().map(Ok).unwrap_or_else(|| tx.cursor_dup_read())?;
Ok(ReusableCursor::new(cursor, &self.plain_storage_state))
}
/// Gets a reusable cursor for the `AccountsHistory` table.
///
/// If a cursor is cached, it will be reused. Otherwise, a new cursor is created.
pub(crate) fn accounts_history(
&self,
tx: &TX,
) -> Result<ReusableCursor<'_, AccountsHistory, TX::Cursor<AccountsHistory>>, DatabaseError>
where
TX::Cursor<AccountsHistory>: DbCursorRO<AccountsHistory>,
{
let cursor = self.accounts_history.take().map(Ok).unwrap_or_else(|| tx.cursor_read())?;
Ok(ReusableCursor::new(cursor, &self.accounts_history))
}
/// Gets a reusable cursor for the `StoragesHistory` table.
///
/// If a cursor is cached, it will be reused. Otherwise, a new cursor is created.
pub(crate) fn storages_history(
&self,
tx: &TX,
) -> Result<ReusableCursor<'_, StoragesHistory, TX::Cursor<StoragesHistory>>, DatabaseError>
where
TX::Cursor<StoragesHistory>: DbCursorRO<StoragesHistory>,
{
let cursor = self.storages_history.take().map(Ok).unwrap_or_else(|| tx.cursor_read())?;
Ok(ReusableCursor::new(cursor, &self.storages_history))
}
}
impl<TX: DbTx> Default for ReusableStateCursors<TX> {
fn default() -> Self {
Self::new()
}
}

View File

@@ -1,5 +1,5 @@
use crate::{
providers::state::macros::delegate_provider_impls, AccountReader, BlockHashReader,
providers::state::cursor_reuse::ReusableStateCursors, AccountReader, BlockHashReader,
ChangeSetReader, HashedPostStateProvider, ProviderError, StateProvider, StateRootProvider,
};
use alloy_eips::merge::EPOCH_SLOTS;
@@ -44,13 +44,15 @@ use std::fmt::Debug;
/// - [`tables::AccountChangeSets`]
/// - [`tables::StorageChangeSets`]
#[derive(Debug)]
pub struct HistoricalStateProviderRef<'b, Provider> {
pub struct HistoricalStateProviderRef<'b, Provider: DBProvider> {
/// Database provider
provider: &'b Provider,
/// Block number is main index for the history state of accounts and storages.
block_number: BlockNumber,
/// Lowest blocks at which different parts of the state are available.
lowest_available_blocks: LowestAvailableBlocks,
/// Reusable cursors for state lookups
cursors: ReusableStateCursors<Provider::Tx>,
}
#[derive(Debug, Eq, PartialEq)]
@@ -64,7 +66,12 @@ pub enum HistoryInfo {
impl<'b, Provider: DBProvider + BlockNumReader> HistoricalStateProviderRef<'b, Provider> {
/// Create new `StateProvider` for historical block number
pub fn new(provider: &'b Provider, block_number: BlockNumber) -> Self {
Self { provider, block_number, lowest_available_blocks: Default::default() }
Self {
provider,
block_number,
lowest_available_blocks: Default::default(),
cursors: ReusableStateCursors::new(),
}
}
/// Create new `StateProvider` for historical block number and lowest block numbers at which
@@ -74,7 +81,12 @@ impl<'b, Provider: DBProvider + BlockNumReader> HistoricalStateProviderRef<'b, P
block_number: BlockNumber,
lowest_available_blocks: LowestAvailableBlocks,
) -> Self {
Self { provider, block_number, lowest_available_blocks }
Self {
provider,
block_number,
lowest_available_blocks,
cursors: ReusableStateCursors::new(),
}
}
/// Lookup an account in the `AccountsHistory` table
@@ -85,7 +97,9 @@ impl<'b, Provider: DBProvider + BlockNumReader> HistoricalStateProviderRef<'b, P
// history key to search IntegerList of block number changesets.
let history_key = ShardedKey::new(address, self.block_number);
self.history_info::<tables::AccountsHistory, _>(
let mut cursor = self.cursors.accounts_history(self.tx())?;
self.history_info::<tables::AccountsHistory, _, _>(
&mut *cursor,
history_key,
|key| key.key == address,
self.lowest_available_blocks.account_history_block_number,
@@ -104,7 +118,9 @@ impl<'b, Provider: DBProvider + BlockNumReader> HistoricalStateProviderRef<'b, P
// history key to search IntegerList of block number changesets.
let history_key = StorageShardedKey::new(address, storage_key, self.block_number);
self.history_info::<tables::StoragesHistory, _>(
let mut cursor = self.cursors.storages_history(self.tx())?;
self.history_info::<tables::StoragesHistory, _, _>(
&mut *cursor,
history_key,
|key| key.address == address && key.sharded_key.key == storage_key,
self.lowest_available_blocks.storage_history_block_number,
@@ -155,17 +171,17 @@ impl<'b, Provider: DBProvider + BlockNumReader> HistoricalStateProviderRef<'b, P
Ok(HashedStorage::from_reverts(self.tx(), address, self.block_number)?)
}
fn history_info<T, K>(
fn history_info<T, K, C>(
&self,
cursor: &mut C,
key: K,
key_filter: impl Fn(&K) -> bool,
lowest_available_block_number: Option<BlockNumber>,
) -> ProviderResult<HistoryInfo>
where
T: Table<Key = K, Value = BlockNumberList>,
C: DbCursorRO<T>,
{
let mut cursor = self.tx().cursor_read::<T>()?;
// Lookup the history chunk in the history index. If they key does not appear in the
// index, the first chunk for the next key will be returned so we filter out chunks that
// have a different key.
@@ -397,7 +413,7 @@ impl<Provider: DBProvider + BlockNumReader> StateProofProvider
}
}
impl<Provider: Sync> HashedPostStateProvider for HistoricalStateProviderRef<'_, Provider> {
impl<Provider: DBProvider> HashedPostStateProvider for HistoricalStateProviderRef<'_, Provider> {
fn hashed_post_state(&self, bundle_state: &revm_database::BundleState) -> HashedPostState {
HashedPostState::from_bundle_state::<KeccakKeyHasher>(bundle_state.state())
}
@@ -414,25 +430,28 @@ impl<Provider: DBProvider + BlockNumReader + BlockHashReader + ChangeSetReader>
) -> ProviderResult<Option<StorageValue>> {
match self.storage_history_lookup(address, storage_key)? {
HistoryInfo::NotYetWritten => Ok(None),
HistoryInfo::InChangeset(changeset_block_number) => Ok(Some(
self.tx()
.cursor_dup_read::<tables::StorageChangeSets>()?
.seek_by_key_subkey((changeset_block_number, address).into(), storage_key)?
HistoryInfo::InChangeset(changeset_block_number) => {
let mut cursor = self.cursors.storage_changesets(self.tx())?;
Ok(Some(
cursor
.seek_by_key_subkey((changeset_block_number, address).into(), storage_key)?
.filter(|entry| entry.key == storage_key)
.ok_or_else(|| ProviderError::StorageChangesetNotFound {
block_number: changeset_block_number,
address,
storage_key: Box::new(storage_key),
})?
.value,
))
}
HistoryInfo::InPlainState | HistoryInfo::MaybeInPlainState => {
let mut cursor = self.cursors.plain_storage_state(self.tx())?;
Ok(cursor
.seek_by_key_subkey(address, storage_key)?
.filter(|entry| entry.key == storage_key)
.ok_or_else(|| ProviderError::StorageChangesetNotFound {
block_number: changeset_block_number,
address,
storage_key: Box::new(storage_key),
})?
.value,
)),
HistoryInfo::InPlainState | HistoryInfo::MaybeInPlainState => Ok(self
.tx()
.cursor_dup_read::<tables::PlainStorageState>()?
.seek_by_key_subkey(address, storage_key)?
.filter(|entry| entry.key == storage_key)
.map(|entry| entry.value)
.or(Some(StorageValue::ZERO))),
.map(|entry| entry.value)
.or(Some(StorageValue::ZERO)))
}
}
}
}
@@ -494,7 +513,7 @@ impl<Provider: DBProvider + BlockNumReader> HistoricalStateProvider<Provider> {
}
// Delegates all provider impls to [HistoricalStateProviderRef]
delegate_provider_impls!(HistoricalStateProvider<Provider> where [Provider: DBProvider + BlockNumReader + BlockHashReader + ChangeSetReader]);
reth_storage_api::macros::delegate_provider_impls!(HistoricalStateProvider<Provider> where [Provider: DBProvider + BlockNumReader + BlockHashReader + ChangeSetReader]);
/// Lowest blocks at which different parts of the state are available.
/// They may be [Some] if pruning is enabled.

View File

@@ -1,5 +1,5 @@
use crate::{
providers::state::macros::delegate_provider_impls, AccountReader, BlockHashReader,
providers::state::cursor_reuse::ReusableStateCursors, AccountReader, BlockHashReader,
HashedPostStateProvider, StateProvider, StateRootProvider,
};
use alloy_primitives::{Address, BlockNumber, Bytes, StorageKey, StorageValue, B256};
@@ -23,16 +23,20 @@ use reth_trie_db::{
///
/// Wraps a [`DBProvider`] to get access to database.
#[derive(Debug)]
pub struct LatestStateProviderRef<'b, Provider>(&'b Provider);
pub struct LatestStateProviderRef<'b, Provider: DBProvider> {
provider: &'b Provider,
/// Reusable cursors for state lookups
cursors: ReusableStateCursors<Provider::Tx>,
}
impl<'b, Provider: DBProvider> LatestStateProviderRef<'b, Provider> {
/// Create new state provider
pub const fn new(provider: &'b Provider) -> Self {
Self(provider)
Self { provider, cursors: ReusableStateCursors::new() }
}
fn tx(&self) -> &Provider::Tx {
self.0.tx_ref()
self.provider.tx_ref()
}
}
@@ -43,10 +47,12 @@ impl<Provider: DBProvider> AccountReader for LatestStateProviderRef<'_, Provider
}
}
impl<Provider: BlockHashReader> BlockHashReader for LatestStateProviderRef<'_, Provider> {
impl<Provider: DBProvider + BlockHashReader> BlockHashReader
for LatestStateProviderRef<'_, Provider>
{
/// Get block hash by number.
fn block_hash(&self, number: u64) -> ProviderResult<Option<B256>> {
self.0.block_hash(number)
self.provider.block_hash(number)
}
fn canonical_hashes_range(
@@ -54,11 +60,11 @@ impl<Provider: BlockHashReader> BlockHashReader for LatestStateProviderRef<'_, P
start: BlockNumber,
end: BlockNumber,
) -> ProviderResult<Vec<B256>> {
self.0.canonical_hashes_range(start, end)
self.provider.canonical_hashes_range(start, end)
}
}
impl<Provider: DBProvider + Sync> StateRootProvider for LatestStateProviderRef<'_, Provider> {
impl<Provider: DBProvider> StateRootProvider for LatestStateProviderRef<'_, Provider> {
fn state_root(&self, hashed_state: HashedPostState) -> ProviderResult<B256> {
StateRoot::overlay_root(self.tx(), &hashed_state.into_sorted())
.map_err(|err| ProviderError::Database(err.into()))
@@ -89,7 +95,7 @@ impl<Provider: DBProvider + Sync> StateRootProvider for LatestStateProviderRef<'
}
}
impl<Provider: DBProvider + Sync> StorageRootProvider for LatestStateProviderRef<'_, Provider> {
impl<Provider: DBProvider> StorageRootProvider for LatestStateProviderRef<'_, Provider> {
fn storage_root(
&self,
address: Address,
@@ -120,7 +126,7 @@ impl<Provider: DBProvider + Sync> StorageRootProvider for LatestStateProviderRef
}
}
impl<Provider: DBProvider + Sync> StateProofProvider for LatestStateProviderRef<'_, Provider> {
impl<Provider: DBProvider> StateProofProvider for LatestStateProviderRef<'_, Provider> {
fn proof(
&self,
input: TrieInput,
@@ -147,7 +153,7 @@ impl<Provider: DBProvider + Sync> StateProofProvider for LatestStateProviderRef<
}
}
impl<Provider: DBProvider + Sync> HashedPostStateProvider for LatestStateProviderRef<'_, Provider> {
impl<Provider: DBProvider> HashedPostStateProvider for LatestStateProviderRef<'_, Provider> {
fn hashed_post_state(&self, bundle_state: &revm_database::BundleState) -> HashedPostState {
HashedPostState::from_bundle_state::<KeccakKeyHasher>(bundle_state.state())
}
@@ -162,7 +168,7 @@ impl<Provider: DBProvider + BlockHashReader> StateProvider
account: Address,
storage_key: StorageKey,
) -> ProviderResult<Option<StorageValue>> {
let mut cursor = self.tx().cursor_dup_read::<tables::PlainStorageState>()?;
let mut cursor = self.cursors.plain_storage_state(self.tx())?;
if let Some(entry) = cursor.seek_by_key_subkey(account, storage_key)? &&
entry.key == storage_key
{
@@ -199,7 +205,7 @@ impl<Provider: DBProvider> LatestStateProvider<Provider> {
}
// Delegates all provider impls to [LatestStateProviderRef]
delegate_provider_impls!(LatestStateProvider<Provider> where [Provider: DBProvider + BlockHashReader ]);
reth_storage_api::macros::delegate_provider_impls!(LatestStateProvider<Provider> where [Provider: DBProvider + BlockHashReader ]);
#[cfg(test)]
mod tests {

View File

@@ -1,5 +1,5 @@
//! [`StateProvider`](crate::StateProvider) implementations
pub(crate) mod cursor_reuse;
pub(crate) mod historical;
pub(crate) mod latest;
pub(crate) mod macros;
pub(crate) mod overlay;

View File

@@ -54,7 +54,6 @@ pub trait BlockReader:
+ TransactionsProvider
+ ReceiptProvider
+ Send
+ Sync
{
/// The block type this provider reads.
type Block: reth_primitives_traits::Block<
@@ -149,7 +148,7 @@ pub trait BlockReader:
fn block_by_transaction_id(&self, id: TxNumber) -> ProviderResult<Option<BlockNumber>>;
}
impl<T: BlockReader> BlockReader for Arc<T> {
impl<T: BlockReader + Send + Sync> BlockReader for Arc<T> {
type Block = T::Block;
fn find_block_by_hash(
@@ -210,7 +209,7 @@ impl<T: BlockReader> BlockReader for Arc<T> {
}
}
impl<T: BlockReader> BlockReader for &T {
impl<T: BlockReader + Send + Sync> BlockReader for &T {
type Block = T::Block;
fn find_block_by_hash(
@@ -382,7 +381,7 @@ pub trait BlockReaderIdExt: BlockReader + ReceiptProviderIdExt {
}
/// Functionality to read the last known chain blocks from the database.
pub trait ChainStateBlockReader: Send + Sync {
pub trait ChainStateBlockReader: Send {
/// Returns the last finalized block number.
///
/// If no finalized block has been written yet, this returns `None`.
@@ -394,7 +393,7 @@ pub trait ChainStateBlockReader: Send + Sync {
}
/// Functionality to write the last known chain blocks to the database.
pub trait ChainStateBlockWriter: Send + Sync {
pub trait ChainStateBlockWriter: Send {
/// Saves the given finalized block number in the DB.
fn save_finalized_block_number(&self, block_number: BlockNumber) -> ProviderResult<()>;

View File

@@ -4,8 +4,8 @@ use alloy_primitives::{BlockNumber, B256};
use reth_storage_errors::provider::ProviderResult;
/// Client trait for fetching block hashes by number.
#[auto_impl::auto_impl(&, Arc, Box)]
pub trait BlockHashReader: Send + Sync {
#[auto_impl::auto_impl(&, Box, Arc)]
pub trait BlockHashReader {
/// Get the hash of the block with the given number. Returns `None` if no block with this number
/// exists.
fn block_hash(&self, number: BlockNumber) -> ProviderResult<Option<B256>>;

View File

@@ -9,7 +9,7 @@ use reth_storage_errors::provider::{ProviderError, ProviderResult};
///
/// This trait also supports fetching block hashes and block numbers from a [`BlockHashOrNumber`].
#[auto_impl::auto_impl(&, Arc)]
pub trait BlockNumReader: BlockHashReader + Send + Sync {
pub trait BlockNumReader: BlockHashReader + Send {
/// Returns the current info for the chain.
fn chain_info(&self) -> ProviderResult<ChainInfo>;

View File

@@ -6,7 +6,7 @@ use reth_storage_errors::provider::ProviderResult;
/// Client trait for fetching block body indices related data.
#[auto_impl::auto_impl(&, Arc)]
pub trait BlockBodyIndicesProvider: Send + Sync {
pub trait BlockBodyIndicesProvider: Send {
/// Returns the block body indices with matching number from database.
///
/// Returns `None` if block is not found.

View File

@@ -9,7 +9,7 @@ use reth_trie_common::HashedPostStateSorted;
/// `BlockExecution` Writer
pub trait BlockExecutionWriter:
NodePrimitivesProvider<Primitives: NodePrimitives<Block = Self::Block>> + BlockWriter + Send + Sync
NodePrimitivesProvider<Primitives: NodePrimitives<Block = Self::Block>> + BlockWriter
{
/// Take all of the blocks above the provided number and their execution result
///
@@ -39,8 +39,8 @@ impl<T: BlockExecutionWriter> BlockExecutionWriter for &T {
}
/// Block Writer
#[auto_impl::auto_impl(&, Arc, Box)]
pub trait BlockWriter: Send + Sync {
#[auto_impl::auto_impl(&, Box)]
pub trait BlockWriter {
/// The body this writer can write.
type Block: Block;
/// The receipt type for [`ExecutionOutcome`].

View File

@@ -8,8 +8,8 @@ use reth_primitives_traits::{Account, StorageEntry};
use reth_storage_errors::provider::ProviderResult;
/// Hashing Writer
#[auto_impl(&, Arc, Box)]
pub trait HashingWriter: Send + Sync {
#[auto_impl(&, Box)]
pub trait HashingWriter: Send {
/// Unwind and clear account hashing.
///
/// # Returns

View File

@@ -10,7 +10,7 @@ pub type ProviderHeader<P> = <P as HeaderProvider>::Header;
/// Client trait for fetching `Header` related data.
#[auto_impl::auto_impl(&, Arc)]
pub trait HeaderProvider: Send + Sync {
pub trait HeaderProvider: Send {
/// The header type this provider supports.
type Header: BlockHeader;

View File

@@ -3,7 +3,7 @@ use reth_primitives_traits::{BlockHeader, SealedHeader};
use reth_storage_errors::provider::ProviderResult;
/// Provider for getting the local tip header for sync gap calculation.
pub trait HeaderSyncGapProvider: Send + Sync {
pub trait HeaderSyncGapProvider: Send {
/// The header type.
type Header: BlockHeader;

View File

@@ -7,8 +7,8 @@ use reth_primitives_traits::StorageEntry;
use reth_storage_errors::provider::ProviderResult;
/// History Writer
#[auto_impl(&, Arc, Box)]
pub trait HistoryWriter: Send + Sync {
#[auto_impl(&, Box)]
pub trait HistoryWriter: Send {
/// Unwind and clear account history indices.
///
/// Returns number of changesets walked.

View File

@@ -103,3 +103,5 @@ pub use reth_db_api::models::StorageSettings;
mod full;
pub use full::*;
pub mod macros;

View File

@@ -1,9 +1,10 @@
//! Helper macros for implementing traits for various [`StateProvider`](crate::StateProvider)
//! Helper macros for implementing traits for various `StateProvider`
//! implementations
/// A macro that delegates trait implementations to the `as_ref` function of the type.
///
/// Used to implement provider traits.
#[macro_export]
macro_rules! delegate_impls_to_as_ref {
(for $target:ty => $($trait:ident $(where [$($generics:tt)*])? { $(fn $func:ident$(<$($generic_arg:ident: $generic_arg_ty:path),*>)?(&self, $($arg:ident: $argty:ty),*) -> $ret:path;)* })* ) => {
@@ -19,45 +20,46 @@ macro_rules! delegate_impls_to_as_ref {
};
}
pub(crate) use delegate_impls_to_as_ref;
pub use delegate_impls_to_as_ref;
/// Delegates the provider trait implementations to the `as_ref` function of the type:
///
/// [`AccountReader`](crate::AccountReader)
/// [`BlockHashReader`](crate::BlockHashReader)
/// [`StateProvider`](crate::StateProvider)
#[macro_export]
macro_rules! delegate_provider_impls {
($target:ty $(where [$($generics:tt)*])?) => {
$crate::providers::state::macros::delegate_impls_to_as_ref!(
$crate::macros::delegate_impls_to_as_ref!(
for $target =>
AccountReader $(where [$($generics)*])? {
fn basic_account(&self, address: &alloy_primitives::Address) -> reth_storage_errors::provider::ProviderResult<Option<reth_primitives_traits::Account>>;
fn basic_account(&self, address: &alloy_primitives::Address) -> reth_storage_api::errors::provider::ProviderResult<Option<reth_primitives_traits::Account>>;
}
BlockHashReader $(where [$($generics)*])? {
fn block_hash(&self, number: u64) -> reth_storage_errors::provider::ProviderResult<Option<alloy_primitives::B256>>;
fn canonical_hashes_range(&self, start: alloy_primitives::BlockNumber, end: alloy_primitives::BlockNumber) -> reth_storage_errors::provider::ProviderResult<Vec<alloy_primitives::B256>>;
fn block_hash(&self, number: u64) -> reth_storage_api::errors::provider::ProviderResult<Option<alloy_primitives::B256>>;
fn canonical_hashes_range(&self, start: alloy_primitives::BlockNumber, end: alloy_primitives::BlockNumber) -> reth_storage_api::errors::provider::ProviderResult<Vec<alloy_primitives::B256>>;
}
StateProvider $(where [$($generics)*])? {
fn storage(&self, account: alloy_primitives::Address, storage_key: alloy_primitives::StorageKey) -> reth_storage_errors::provider::ProviderResult<Option<alloy_primitives::StorageValue>>;
fn storage(&self, account: alloy_primitives::Address, storage_key: alloy_primitives::StorageKey) -> reth_storage_api::errors::provider::ProviderResult<Option<alloy_primitives::StorageValue>>;
}
BytecodeReader $(where [$($generics)*])? {
fn bytecode_by_hash(&self, code_hash: &alloy_primitives::B256) -> reth_storage_errors::provider::ProviderResult<Option<reth_primitives_traits::Bytecode>>;
fn bytecode_by_hash(&self, code_hash: &alloy_primitives::B256) -> reth_storage_api::errors::provider::ProviderResult<Option<reth_primitives_traits::Bytecode>>;
}
StateRootProvider $(where [$($generics)*])? {
fn state_root(&self, state: reth_trie::HashedPostState) -> reth_storage_errors::provider::ProviderResult<alloy_primitives::B256>;
fn state_root_from_nodes(&self, input: reth_trie::TrieInput) -> reth_storage_errors::provider::ProviderResult<alloy_primitives::B256>;
fn state_root_with_updates(&self, state: reth_trie::HashedPostState) -> reth_storage_errors::provider::ProviderResult<(alloy_primitives::B256, reth_trie::updates::TrieUpdates)>;
fn state_root_from_nodes_with_updates(&self, input: reth_trie::TrieInput) -> reth_storage_errors::provider::ProviderResult<(alloy_primitives::B256, reth_trie::updates::TrieUpdates)>;
fn state_root(&self, state: reth_trie::HashedPostState) -> reth_storage_api::errors::provider::ProviderResult<alloy_primitives::B256>;
fn state_root_from_nodes(&self, input: reth_trie::TrieInput) -> reth_storage_api::errors::provider::ProviderResult<alloy_primitives::B256>;
fn state_root_with_updates(&self, state: reth_trie::HashedPostState) -> reth_storage_api::errors::provider::ProviderResult<(alloy_primitives::B256, reth_trie::updates::TrieUpdates)>;
fn state_root_from_nodes_with_updates(&self, input: reth_trie::TrieInput) -> reth_storage_api::errors::provider::ProviderResult<(alloy_primitives::B256, reth_trie::updates::TrieUpdates)>;
}
StorageRootProvider $(where [$($generics)*])? {
fn storage_root(&self, address: alloy_primitives::Address, storage: reth_trie::HashedStorage) -> reth_storage_errors::provider::ProviderResult<alloy_primitives::B256>;
fn storage_proof(&self, address: alloy_primitives::Address, slot: alloy_primitives::B256, storage: reth_trie::HashedStorage) -> reth_storage_errors::provider::ProviderResult<reth_trie::StorageProof>;
fn storage_multiproof(&self, address: alloy_primitives::Address, slots: &[alloy_primitives::B256], storage: reth_trie::HashedStorage) -> reth_storage_errors::provider::ProviderResult<reth_trie::StorageMultiProof>;
fn storage_root(&self, address: alloy_primitives::Address, storage: reth_trie::HashedStorage) -> reth_storage_api::errors::provider::ProviderResult<alloy_primitives::B256>;
fn storage_proof(&self, address: alloy_primitives::Address, slot: alloy_primitives::B256, storage: reth_trie::HashedStorage) -> reth_storage_api::errors::provider::ProviderResult<reth_trie::StorageProof>;
fn storage_multiproof(&self, address: alloy_primitives::Address, slots: &[alloy_primitives::B256], storage: reth_trie::HashedStorage) -> reth_storage_api::errors::provider::ProviderResult<reth_trie::StorageMultiProof>;
}
StateProofProvider $(where [$($generics)*])? {
fn proof(&self, input: reth_trie::TrieInput, address: alloy_primitives::Address, slots: &[alloy_primitives::B256]) -> reth_storage_errors::provider::ProviderResult<reth_trie::AccountProof>;
fn multiproof(&self, input: reth_trie::TrieInput, targets: reth_trie::MultiProofTargets) -> reth_storage_errors::provider::ProviderResult<reth_trie::MultiProof>;
fn witness(&self, input: reth_trie::TrieInput, target: reth_trie::HashedPostState) -> reth_storage_errors::provider::ProviderResult<Vec<alloy_primitives::Bytes>>;
fn proof(&self, input: reth_trie::TrieInput, address: alloy_primitives::Address, slots: &[alloy_primitives::B256]) -> reth_storage_api::errors::provider::ProviderResult<reth_trie::AccountProof>;
fn multiproof(&self, input: reth_trie::TrieInput, targets: reth_trie::MultiProofTargets) -> reth_storage_api::errors::provider::ProviderResult<reth_trie::MultiProof>;
fn witness(&self, input: reth_trie::TrieInput, target: reth_trie::HashedPostState) -> reth_storage_api::errors::provider::ProviderResult<Vec<alloy_primitives::Bytes>>;
}
HashedPostStateProvider $(where [$($generics)*])? {
fn hashed_post_state(&self, bundle_state: &revm_database::BundleState) -> reth_trie::HashedPostState;
@@ -66,4 +68,4 @@ macro_rules! delegate_provider_impls {
}
}
pub(crate) use delegate_provider_impls;
pub use delegate_provider_impls;

View File

@@ -10,8 +10,8 @@ pub mod keys {
}
/// Client trait for reading node metadata from the database.
#[auto_impl::auto_impl(&, Arc)]
pub trait MetadataProvider: Send + Sync {
#[auto_impl::auto_impl(&)]
pub trait MetadataProvider: Send {
/// Get a metadata value by key
fn get_metadata(&self, key: &str) -> ProviderResult<Option<Vec<u8>>>;
@@ -24,7 +24,7 @@ pub trait MetadataProvider: Send + Sync {
}
/// Client trait for writing node metadata to the database.
pub trait MetadataWriter: Send + Sync {
pub trait MetadataWriter: Send {
/// Write a metadata value
fn write_metadata(&self, key: &str, value: Vec<u8>) -> ProviderResult<()>;
@@ -41,7 +41,7 @@ pub trait MetadataWriter: Send + Sync {
}
/// Trait for caching storage settings on a provider factory.
pub trait StorageSettingsCache: Send + Sync {
pub trait StorageSettingsCache: Send {
/// Gets the cached storage settings.
fn cached_storage_settings(&self) -> StorageSettings;

View File

@@ -3,8 +3,8 @@ use reth_prune_types::{PruneCheckpoint, PruneSegment};
use reth_storage_errors::provider::ProviderResult;
/// The trait for fetching prune checkpoint related data.
#[auto_impl::auto_impl(&, Arc)]
pub trait PruneCheckpointReader: Send + Sync {
#[auto_impl::auto_impl(&)]
pub trait PruneCheckpointReader: Send {
/// Fetch the prune checkpoint for the given segment.
fn get_prune_checkpoint(
&self,
@@ -16,8 +16,8 @@ pub trait PruneCheckpointReader: Send + Sync {
}
/// The trait for updating prune checkpoint related data.
#[auto_impl::auto_impl(&, Arc)]
pub trait PruneCheckpointWriter: Send + Sync {
#[auto_impl::auto_impl(&)]
pub trait PruneCheckpointWriter {
/// Save prune checkpoint.
fn save_prune_checkpoint(
&self,

View File

@@ -11,7 +11,7 @@ pub type ProviderReceipt<P> = <P as ReceiptProvider>::Receipt;
/// Client trait for fetching receipt data.
#[auto_impl::auto_impl(&, Arc)]
pub trait ReceiptProvider: Send + Sync {
pub trait ReceiptProvider {
/// The receipt type.
type Receipt: Receipt;

View File

@@ -4,8 +4,8 @@ use reth_stages_types::{StageCheckpoint, StageId};
use reth_storage_errors::provider::ProviderResult;
/// The trait for fetching stage checkpoint related data.
#[auto_impl::auto_impl(&, Arc)]
pub trait StageCheckpointReader: Send + Sync {
#[auto_impl::auto_impl(&)]
pub trait StageCheckpointReader: Send {
/// Fetch the checkpoint for the given stage.
fn get_stage_checkpoint(&self, id: StageId) -> ProviderResult<Option<StageCheckpoint>>;
@@ -18,8 +18,8 @@ pub trait StageCheckpointReader: Send + Sync {
}
/// The trait for updating stage checkpoint related data.
#[auto_impl::auto_impl(&, Arc)]
pub trait StageCheckpointWriter: Send + Sync {
#[auto_impl::auto_impl(&)]
pub trait StageCheckpointWriter {
/// Save stage checkpoint.
fn save_stage_checkpoint(&self, id: StageId, checkpoint: StageCheckpoint)
-> ProviderResult<()>;

View File

@@ -14,8 +14,8 @@ use reth_trie_common::HashedPostState;
use revm_database::BundleState;
/// This just receives state, or [`ExecutionOutcome`], from the provider
#[auto_impl::auto_impl(&, Arc, Box)]
pub trait StateReader: Send + Sync {
#[auto_impl::auto_impl(&, Box)]
pub trait StateReader: Send {
/// Receipt type in [`ExecutionOutcome`].
type Receipt: Send + Sync;
@@ -27,10 +27,10 @@ pub trait StateReader: Send + Sync {
}
/// Type alias of boxed [`StateProvider`].
pub type StateProviderBox = Box<dyn StateProvider>;
pub type StateProviderBox = Box<dyn StateProvider + Send + 'static>;
/// An abstraction for a type that provides state data.
#[auto_impl(&, Arc, Box)]
#[auto_impl(&, Box)]
pub trait StateProvider:
BlockHashReader
+ AccountReader
@@ -39,8 +39,6 @@ pub trait StateProvider:
+ StorageRootProvider
+ StateProofProvider
+ HashedPostStateProvider
+ Send
+ Sync
{
/// Get storage of given account.
fn storage(
@@ -97,15 +95,15 @@ pub trait AccountInfoReader: AccountReader + BytecodeReader {}
impl<T: AccountReader + BytecodeReader> AccountInfoReader for T {}
/// Trait that provides the hashed state from various sources.
#[auto_impl(&, Arc, Box)]
pub trait HashedPostStateProvider: Send + Sync {
#[auto_impl(&, Box)]
pub trait HashedPostStateProvider {
/// Returns the `HashedPostState` of the provided [`BundleState`].
fn hashed_post_state(&self, bundle_state: &BundleState) -> HashedPostState;
}
/// Trait for reading bytecode associated with a given code hash.
#[auto_impl(&, Arc, Box)]
pub trait BytecodeReader: Send + Sync {
#[auto_impl(&, Box)]
pub trait BytecodeReader {
/// Get account code by its hash
fn bytecode_by_hash(&self, code_hash: &B256) -> ProviderResult<Option<Bytecode>>;
}
@@ -142,8 +140,8 @@ pub trait TryIntoHistoricalStateProvider {
/// This affects tracing, or replaying blocks, which will need to be executed on top of the state of
/// the parent block. For example, in order to trace block `n`, the state after block `n - 1` needs
/// to be used, since block `n` was executed on its parent block's state.
#[auto_impl(&, Arc, Box)]
pub trait StateProviderFactory: BlockIdReader + Send + Sync {
#[auto_impl(&, Box, Arc)]
pub trait StateProviderFactory: BlockIdReader + Send {
/// Storage provider for latest block.
fn latest(&self) -> ProviderResult<StateProviderBox>;

View File

@@ -1,8 +1,8 @@
use reth_db_api::table::Table;
/// The trait for fetching provider statistics.
#[auto_impl::auto_impl(&, Arc)]
pub trait StatsReader: Send + Sync {
#[auto_impl::auto_impl(&)]
pub trait StatsReader {
/// Fetch the number of entries in the corresponding [Table]. Depending on the provider, it may
/// route to different data sources other than [Table].
fn count_entries<T: Table>(&self) -> reth_storage_errors::provider::ProviderResult<usize>;

View File

@@ -8,8 +8,8 @@ use reth_primitives_traits::StorageEntry;
use reth_storage_errors::provider::ProviderResult;
/// Storage reader
#[auto_impl::auto_impl(&, Arc, Box)]
pub trait StorageReader: Send + Sync {
#[auto_impl::auto_impl(&, Box)]
pub trait StorageReader: Send {
/// Get plainstate storages for addresses and storage keys.
fn plain_state_storages(
&self,
@@ -34,8 +34,8 @@ pub trait StorageReader: Send + Sync {
/// Storage `ChangeSet` reader
#[cfg(feature = "db-api")]
#[auto_impl::auto_impl(&, Arc, Box)]
pub trait StorageChangeSetReader: Send + Sync {
#[auto_impl::auto_impl(&, Box)]
pub trait StorageChangeSetReader: Send {
/// Iterate over storage changesets and return the storage state from before this block.
fn storage_changeset(
&self,

View File

@@ -22,7 +22,7 @@ pub enum TransactionVariant {
/// Client trait for fetching transactions related data.
#[auto_impl::auto_impl(&, Arc)]
pub trait TransactionsProvider: BlockNumReader + Send + Sync {
pub trait TransactionsProvider: BlockNumReader + Send {
/// The transaction type this provider reads.
type Transaction: Send + Sync + SignedTransaction;

View File

@@ -9,7 +9,7 @@ use reth_trie_common::{
/// A type that can compute the state root of a given post state.
#[auto_impl::auto_impl(&, Box, Arc)]
pub trait StateRootProvider: Send + Sync {
pub trait StateRootProvider {
/// Returns the state root of the `BundleState` on top of the current state.
///
/// # Note
@@ -40,8 +40,8 @@ pub trait StateRootProvider: Send + Sync {
}
/// A type that can compute the storage root for a given account.
#[auto_impl::auto_impl(&, Box, Arc)]
pub trait StorageRootProvider: Send + Sync {
#[auto_impl::auto_impl(&, Box)]
pub trait StorageRootProvider {
/// Returns the storage root of the `HashedStorage` for target address on top of the current
/// state.
fn storage_root(&self, address: Address, hashed_storage: HashedStorage)
@@ -66,8 +66,8 @@ pub trait StorageRootProvider: Send + Sync {
}
/// A type that can generate state proof on top of a given post state.
#[auto_impl::auto_impl(&, Box, Arc)]
pub trait StateProofProvider: Send + Sync {
#[auto_impl::auto_impl(&, Box)]
pub trait StateProofProvider {
/// Get account and storage proofs of target keys in the `HashedPostState`
/// on top of the current state.
fn proof(
@@ -90,8 +90,8 @@ pub trait StateProofProvider: Send + Sync {
}
/// Trie Reader
#[auto_impl::auto_impl(&, Arc, Box)]
pub trait TrieReader: Send + Sync {
#[auto_impl::auto_impl(&, Box)]
pub trait TrieReader: Send {
/// Returns the [`TrieUpdatesSorted`] for reverting the trie database to its state prior to the
/// given block and onwards having been processed.
fn trie_reverts(&self, from: BlockNumber) -> ProviderResult<TrieUpdatesSorted>;
@@ -104,8 +104,8 @@ pub trait TrieReader: Send + Sync {
}
/// Trie Writer
#[auto_impl::auto_impl(&, Arc, Box)]
pub trait TrieWriter: Send + Sync {
#[auto_impl::auto_impl(&, Box)]
pub trait TrieWriter: Send {
/// Writes trie updates to the database.
///
/// Returns the number of entries modified.
@@ -146,8 +146,8 @@ pub trait TrieWriter: Send + Sync {
}
/// Storage Trie Writer
#[auto_impl::auto_impl(&, Arc, Box)]
pub trait StorageTrieWriter: Send + Sync {
#[auto_impl::auto_impl(&, Box)]
pub trait StorageTrieWriter: Send {
/// Writes storage trie updates from the given storage trie map with already sorted updates.
///
/// Expects the storage trie updates to already be sorted by the hashed address key.

View File

@@ -207,7 +207,7 @@ where
&self,
origin: TransactionOrigin,
transaction: Tx,
state: &mut Option<Box<dyn AccountInfoReader>>,
state: &mut Option<Box<dyn AccountInfoReader + Send>>,
) -> TransactionValidationOutcome<Tx> {
self.validate_one_with_provider(origin, transaction, state)
}
@@ -219,7 +219,7 @@ where
&self,
origin: TransactionOrigin,
transaction: Tx,
maybe_state: &mut Option<Box<dyn AccountInfoReader>>,
maybe_state: &mut Option<Box<dyn AccountInfoReader + Send>>,
) -> TransactionValidationOutcome<Tx> {
match self.validate_one_no_state(origin, transaction) {
Ok(transaction) => {

View File

@@ -276,13 +276,12 @@ where
{
use rayon::iter::{ParallelBridge, ParallelIterator};
let (tx, rx) = std::sync::mpsc::channel();
let retain_updates = self.retain_updates;
// Process all storage trie revealings in parallel, having first removed the
// `reveal_nodes` tracking and `SparseTrie`s for each account from their HashMaps.
// These will be returned after processing.
storages
let results: Vec<_> = storages
.into_iter()
.map(|(account, storage_subtree)| {
let revealed_nodes = self.storage.take_or_create_revealed_paths(&account);
@@ -301,14 +300,12 @@ where
(account, revealed_nodes, trie, result)
})
.for_each_init(|| tx.clone(), |tx, result| tx.send(result).unwrap());
drop(tx);
.collect();
// Return `revealed_nodes` and `SparseTrie` for each account, incrementing metrics and
// returning the last error seen if any.
let mut any_err = Ok(());
for (account, revealed_nodes, trie, result) in rx {
for (account, revealed_nodes, trie, result) in results {
self.storage.revealed_paths.insert(account, revealed_nodes);
self.storage.tries.insert(account, trie);
if let Ok(_metric_values) = result {

View File

@@ -27,8 +27,8 @@ impl<T, H> ProofTrieNodeProviderFactory<T, H> {
impl<T, H> TrieNodeProviderFactory for ProofTrieNodeProviderFactory<T, H>
where
T: TrieCursorFactory + Clone + Send + Sync,
H: HashedCursorFactory + Clone + Send + Sync,
T: TrieCursorFactory + Clone,
H: HashedCursorFactory + Clone,
{
type AccountNodeProvider = ProofBlindedAccountProvider<T, H>;
type StorageNodeProvider = ProofBlindedStorageProvider<T, H>;

View File

@@ -95,8 +95,8 @@ impl<T, H> TrieWitness<T, H> {
impl<T, H> TrieWitness<T, H>
where
T: TrieCursorFactory + Clone + Send + Sync,
H: HashedCursorFactory + Clone + Send + Sync,
T: TrieCursorFactory + Clone,
H: HashedCursorFactory + Clone,
{
/// Compute the state transition witness for the trie. Gather all required nodes
/// to apply `state` on top of the current trie state.

View File

@@ -65,7 +65,7 @@ where
impl<Provider, T> ImportService<Provider, T>
where
Provider: BlockNumReader + Clone + 'static,
Provider: BlockNumReader + Sync + Clone + 'static,
T: PayloadTypes,
{
/// Create a new block import service
@@ -198,7 +198,7 @@ where
impl<Provider, T> Future for ImportService<Provider, T>
where
Provider: BlockNumReader + BlockHashReader + Clone + 'static + Unpin,
Provider: BlockNumReader + BlockHashReader + Sync + Clone + 'static + Unpin,
T: PayloadTypes,
{
type Output = Result<(), Box<dyn std::error::Error>>;