Compare commits

...

11 Commits

Author SHA1 Message Date
klkvr
ae9042a5fb wip 2026-04-23 21:25:48 +04:00
Emma Jamieson-Hoare
62d99888d2 fix(db): move unix deps section after strum in Cargo.toml (#23697)
Co-authored-by: Amp <amp@ampcode.com>
2026-04-23 14:08:15 +00:00
dependabot[bot]
73f5d77b51 chore(deps): bump actions/setup-python from 5 to 6 (#23689)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2026-04-23 11:42:47 +00:00
figtracer
b62f71977a fix(era): align ERA1 export with spec (#23693) 2026-04-23 11:09:13 +00:00
JOJO
ad27be67be fix(net): track unknown tx types in announcement metrics (#23688)
Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
2026-04-23 10:44:22 +00:00
Veronica Hayes
63f80907cc fix(cli): use TxTy and ReceiptTy for static-file db get (#23692) 2026-04-23 10:29:00 +00:00
Soubhik Singha Mahapatra
a57930481c chore: add DecodedBal in ExecutionEnv (#23675) 2026-04-23 09:48:50 +00:00
Matthias Seitz
bbcfe354a1 fix(rpc): clean up eth state cache reorg entries (#23683) 2026-04-23 07:03:57 +00:00
Arsenii Kulikov
7839f3d876 perf: avoid reopening .csoff on every changeset lookup (#23687) 2026-04-22 23:24:35 +00:00
Matthias Seitz
e89b4611e4 fix(engine): configure invalid header cache hit eviction (#23670) 2026-04-22 20:20:41 +00:00
Emma Jamieson-Hoare
2b7d4b54d4 feat(p2p): Discv5 is enabled by default (#23686) 2026-04-22 17:49:52 +00:00
32 changed files with 531 additions and 172 deletions

View File

@@ -21,7 +21,7 @@ jobs:
steps:
- uses: actions/checkout@v6
- uses: actions/setup-python@v5
- uses: actions/setup-python@v6
with:
python-version: "3.12"

8
Cargo.lock generated
View File

@@ -250,14 +250,15 @@ dependencies = [
[[package]]
name = "alloy-eip7928"
version = "0.3.3"
version = "0.3.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f8222b1d88f9a6d03be84b0f5e76bb60cd83991b43ad8ab6477f0e4a7809b98d"
checksum = "407510740da514b694fecb44d8b3cebdc60d448f70cc5d24743e8ba273448a6e"
dependencies = [
"alloy-primitives",
"alloy-rlp",
"arbitrary",
"borsh",
"once_cell",
"serde",
]
@@ -2949,7 +2950,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7ab67060fc6b8ef687992d439ca0fa36e7ed17e9a0b16b25b601e8757df720de"
dependencies = [
"data-encoding",
"syn 1.0.109",
"syn 2.0.117",
]
[[package]]
@@ -8281,6 +8282,7 @@ dependencies = [
"reqwest 0.13.2",
"reth-era-downloader",
"reth-ethereum-primitives",
"sha2",
"snap",
"tempfile",
"test-case",

View File

@@ -449,7 +449,7 @@ alloy-sol-types = { version = "1.5.6", default-features = false }
alloy-chains = { version = "0.2.33", default-features = false }
alloy-eip2124 = { version = "0.2.0", default-features = false }
alloy-eip7928 = { version = "0.3.0", default-features = false }
alloy-eip7928 = { version = "0.3.4", default-features = false }
alloy-evm = { version = "0.33.0", default-features = false }
alloy-rlp = { version = "0.3.13", default-features = false, features = ["core-net"] }
alloy-trie = { version = "0.9.4", default-features = false }

View File

@@ -14,7 +14,7 @@ use reth_db_api::{
table::{Compress, Decompress, DupSort, Table},
tables,
transaction::DbTx,
RawKey, RawTable, Receipts, TableViewer, Transactions,
RawKey, RawTable, TableViewer,
};
use reth_db_common::DbTool;
use reth_node_api::{HeaderTy, ReceiptTy, TxTy};
@@ -264,15 +264,12 @@ impl Command {
);
}
StaticFileSegment::Transactions => {
let transaction = <<Transactions as Table>::Value>::decompress(
content[0].as_slice(),
)?;
let transaction = TxTy::<N>::decompress(content[0].as_slice())?;
println!("{}", serde_json::to_string_pretty(&transaction)?);
}
StaticFileSegment::Receipts => {
let receipt = <<Receipts as Table>::Value>::decompress(
content[0].as_slice(),
)?;
let receipt =
ReceiptTy::<N>::decompress(content[0].as_slice())?;
println!("{}", serde_json::to_string_pretty(&receipt)?);
}
StaticFileSegment::TransactionSenders => {

View File

@@ -15,6 +15,9 @@ pub const DEFAULT_MEMORY_BLOCK_BUFFER_TARGET: u64 = 0;
/// The size of proof targets chunk to spawn in one multiproof calculation.
pub const DEFAULT_MULTIPROOF_TASK_CHUNK_SIZE: usize = 5;
/// Default number of cache hits before an invalid header entry is evicted and reprocessed.
pub const DEFAULT_INVALID_HEADER_HIT_EVICTION_THRESHOLD: u8 = 128;
/// Gas threshold below which the small block chunk size is used.
pub const SMALL_BLOCK_GAS_THRESHOLD: u64 = 20_000_000;
@@ -102,6 +105,11 @@ pub struct TreeConfig {
block_buffer_limit: u32,
/// Number of invalid headers to keep in cache.
max_invalid_header_cache_length: u32,
/// Number of cache hits before an invalid header entry is evicted and reprocessed.
///
/// Setting this to `0` effectively disables the cache because entries are evicted on the
/// first lookup.
invalid_header_hit_eviction_threshold: u8,
/// Maximum number of blocks to execute sequentially in a batch.
///
/// This is used as a cutoff to prevent long-running sequential block execution when we receive
@@ -206,6 +214,7 @@ impl Default for TreeConfig {
persistence_backpressure_threshold: DEFAULT_PERSISTENCE_BACKPRESSURE_THRESHOLD,
block_buffer_limit: DEFAULT_BLOCK_BUFFER_LIMIT,
max_invalid_header_cache_length: DEFAULT_MAX_INVALID_HEADER_CACHE_LENGTH,
invalid_header_hit_eviction_threshold: DEFAULT_INVALID_HEADER_HIT_EVICTION_THRESHOLD,
max_execute_block_batch_size: DEFAULT_MAX_EXECUTE_BLOCK_BATCH_SIZE,
legacy_state_root: false,
always_compare_trie_updates: false,
@@ -248,6 +257,7 @@ impl TreeConfig {
persistence_backpressure_threshold: u64,
block_buffer_limit: u32,
max_invalid_header_cache_length: u32,
invalid_header_hit_eviction_threshold: u8,
max_execute_block_batch_size: usize,
legacy_state_root: bool,
always_compare_trie_updates: bool,
@@ -281,6 +291,7 @@ impl TreeConfig {
persistence_backpressure_threshold,
block_buffer_limit,
max_invalid_header_cache_length,
invalid_header_hit_eviction_threshold,
max_execute_block_batch_size,
legacy_state_root,
always_compare_trie_updates,
@@ -338,6 +349,14 @@ impl TreeConfig {
self.max_invalid_header_cache_length
}
/// Return the invalid header cache hit eviction threshold.
///
/// Setting this to `0` effectively disables the cache because entries are evicted on the
/// first lookup.
pub const fn invalid_header_hit_eviction_threshold(&self) -> u8 {
self.invalid_header_hit_eviction_threshold
}
/// Return the maximum execute block batch size.
pub const fn max_execute_block_batch_size(&self) -> usize {
self.max_execute_block_batch_size
@@ -468,6 +487,15 @@ impl TreeConfig {
self
}
/// Setter for the invalid header cache hit eviction threshold.
pub const fn with_invalid_header_hit_eviction_threshold(
mut self,
invalid_header_hit_eviction_threshold: u8,
) -> Self {
self.invalid_header_hit_eviction_threshold = invalid_header_hit_eviction_threshold;
self
}
/// Setter for maximum execute block batch size.
pub const fn with_max_execute_block_batch_size(
mut self,

View File

@@ -8,25 +8,28 @@ use schnellru::{ByLength, LruMap};
use std::fmt::Debug;
use tracing::warn;
/// The max hit counter for invalid headers in the cache before it is forcefully evicted.
///
/// In other words, if a header is referenced more than this number of times, it will be evicted to
/// allow for reprocessing.
const INVALID_HEADER_HIT_EVICTION_THRESHOLD: u8 = 128;
/// Keeps track of invalid headers.
#[derive(Debug)]
pub struct InvalidHeaderCache {
/// This maps a header hash to a reference to its invalid ancestor.
headers: LruMap<B256, HeaderEntry>,
/// Number of cache hits before an invalid header entry is evicted and reprocessed.
hit_eviction_threshold: u8,
/// Metrics for the cache.
metrics: InvalidHeaderCacheMetrics,
}
impl InvalidHeaderCache {
/// Invalid header cache constructor.
pub fn new(max_length: u32) -> Self {
Self { headers: LruMap::new(ByLength::new(max_length)), metrics: Default::default() }
///
/// Setting `hit_eviction_threshold` to `0` effectively disables the cache because entries are
/// evicted on the first lookup.
pub fn new(max_length: u32, hit_eviction_threshold: u8) -> Self {
Self {
headers: LruMap::new(ByLength::new(max_length)),
hit_eviction_threshold,
metrics: Default::default(),
}
}
fn insert_entry(&mut self, hash: B256, header: BlockWithParent) {
@@ -41,7 +44,7 @@ impl InvalidHeaderCache {
{
let entry = self.headers.get(hash)?;
entry.hit_count += 1;
if entry.hit_count < INVALID_HEADER_HIT_EVICTION_THRESHOLD {
if entry.hit_count < self.hit_eviction_threshold {
return Some(entry.header)
}
}
@@ -110,17 +113,28 @@ mod tests {
#[test]
fn test_hit_eviction() {
let mut cache = InvalidHeaderCache::new(10);
let hit_eviction_threshold = 3;
let mut cache = InvalidHeaderCache::new(10, hit_eviction_threshold);
let header = Header::default();
let header = SealedHeader::seal_slow(header);
cache.insert(header.block_with_parent());
assert_eq!(cache.headers.get(&header.hash()).unwrap().hit_count, 0);
for hit in 1..INVALID_HEADER_HIT_EVICTION_THRESHOLD {
for hit in 1..hit_eviction_threshold {
assert!(cache.get(&header.hash()).is_some());
assert_eq!(cache.headers.get(&header.hash()).unwrap().hit_count, hit);
}
assert!(cache.get(&header.hash()).is_none());
}
#[test]
fn test_zero_hit_eviction_threshold_effectively_disables_cache() {
let mut cache = InvalidHeaderCache::new(10, 0);
let header = SealedHeader::seal_slow(Header::default());
cache.insert(header.block_with_parent());
assert!(cache.get(&header.hash()).is_none());
assert_eq!(cache.headers.len(), 0);
}
}

View File

@@ -151,11 +151,15 @@ impl<N: NodePrimitives> EngineApiTreeState<N> {
fn new(
block_buffer_limit: u32,
max_invalid_header_cache_length: u32,
invalid_header_hit_eviction_threshold: u8,
canonical_block: BlockNumHash,
engine_kind: EngineApiKind,
) -> Self {
Self {
invalid_headers: InvalidHeaderCache::new(max_invalid_header_cache_length),
invalid_headers: InvalidHeaderCache::new(
max_invalid_header_cache_length,
invalid_header_hit_eviction_threshold,
),
buffer: BlockBuffer::new(block_buffer_limit),
tree_state: TreeState::new(canonical_block, engine_kind),
forkchoice_state_tracker: ForkchoiceStateTracker::default(),
@@ -436,6 +440,7 @@ where
let state = EngineApiTreeState::new(
config.block_buffer_limit(),
config.max_invalid_header_cache_length(),
config.invalid_header_hit_eviction_threshold(),
header.num_hash(),
kind,
);

View File

@@ -7,7 +7,7 @@ use crate::tree::{
CacheWaitDurations, CachedStateMetrics, CachedStateMetricsSource, ExecutionCache,
PayloadExecutionCache, SavedCache, StateProviderBuilder, TreeConfig, WaitForCaches,
};
use alloy_eip7928::BlockAccessList;
use alloy_eip7928::bal::DecodedBal;
use alloy_eips::{eip1898::BlockWithParent, eip4895::Withdrawal};
use alloy_primitives::B256;
use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
@@ -250,7 +250,6 @@ where
provider_builder: StateProviderBuilder<N, P>,
multiproof_provider_factory: F,
config: &TreeConfig,
bal: Option<Arc<BlockAccessList>>,
) -> IteratorPayloadHandle<Evm, I, N>
where
P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
@@ -273,13 +272,12 @@ where
halve_workers,
config,
);
let install_state_hook = bal.is_none();
let install_state_hook = env.decoded_bal.is_none();
let prewarm_handle = self.spawn_caching_with(
env,
prewarm_rx,
provider_builder,
Some(state_root_handle.updates_tx().clone()),
bal,
);
PayloadHandle {
@@ -300,14 +298,13 @@ where
env: ExecutionEnv<Evm>,
transactions: I,
provider_builder: StateProviderBuilder<N, P>,
bal: Option<Arc<BlockAccessList>>,
) -> IteratorPayloadHandle<Evm, I, N>
where
P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
{
let (prewarm_rx, execution_rx) =
self.spawn_tx_iterator(transactions, env.transaction_count);
let prewarm_handle = self.spawn_caching_with(env, prewarm_rx, provider_builder, None, bal);
let prewarm_handle = self.spawn_caching_with(env, prewarm_rx, provider_builder, None);
PayloadHandle {
state_root_handle: None,
install_state_hook: false,
@@ -465,7 +462,7 @@ where
level = "debug",
target = "engine::tree::payload_processor",
skip_all,
fields(bal=%bal.is_some())
fields(bal=%env.decoded_bal.is_some())
)]
fn spawn_caching_with<P>(
&self,
@@ -473,7 +470,6 @@ where
transactions: mpsc::Receiver<(usize, impl ExecutableTxFor<Evm> + Clone + Send + 'static)>,
provider_builder: StateProviderBuilder<N, P>,
to_sparse_trie_task: Option<CrossbeamSender<StateRootMessage>>,
bal: Option<Arc<BlockAccessList>>,
) -> CacheTaskHandle<N::Receipt>
where
P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
@@ -484,7 +480,7 @@ where
let saved_cache = self.disable_state_cache.not().then(|| self.cache_for(env.parent_hash));
let executed_tx_index = Arc::new(AtomicUsize::new(0));
let maybe_decoded_bal = env.decoded_bal.clone();
// configure prewarming
let prewarm_ctx = PrewarmContext {
env,
@@ -507,15 +503,16 @@ where
prewarm_ctx,
to_sparse_trie_task,
);
{
let to_prewarm_task = to_prewarm_task.clone();
let disable_bal_parallel_execution = self.disable_bal_parallel_execution;
self.executor.spawn_blocking_named("prewarm", move || {
let mode = if skip_prewarm {
PrewarmMode::Skipped
} else if let Some(bal) = bal.filter(|_| !disable_bal_parallel_execution) {
PrewarmMode::BlockAccessList(bal)
} else if let Some(decoded_bal) =
maybe_decoded_bal.filter(|_| !disable_bal_parallel_execution)
{
PrewarmMode::BlockAccessList(decoded_bal)
} else {
PrewarmMode::Transactions(transactions)
};
@@ -936,6 +933,9 @@ pub struct ExecutionEnv<Evm: ConfigureEvm> {
/// Withdrawals included in the block.
/// Used to generate prefetch targets for withdrawal addresses.
pub withdrawals: Option<Vec<Withdrawal>>,
/// Optional decoded BAL for the block.
/// Used to validate and optimize execution.
pub decoded_bal: Option<Arc<DecodedBal>>,
}
impl<Evm: ConfigureEvm> ExecutionEnv<Evm>
@@ -953,6 +953,7 @@ where
transaction_count: 0,
gas_used: 0,
withdrawals: None,
decoded_bal: None,
}
}
}
@@ -1251,7 +1252,6 @@ mod tests {
StateProviderBuilder::new(provider_factory.clone(), genesis_hash, None),
OverlayStateProviderFactory::new(provider_factory, ChangesetCache::new()),
&TreeConfig::default(),
None, // No BAL for test
);
let mut state_hook = handle.state_hook().expect("state hook is None");

View File

@@ -18,7 +18,7 @@ use crate::tree::{
StateProviderBuilder,
};
use alloy_consensus::transaction::TxHashRef;
use alloy_eip7928::BlockAccessList;
use alloy_eip7928::bal::DecodedBal;
use alloy_eips::eip4895::Withdrawal;
use alloy_primitives::{keccak256, StorageKey, B256};
use crossbeam_channel::Sender as CrossbeamSender;
@@ -48,7 +48,7 @@ pub enum PrewarmMode<Tx> {
/// Prewarm by executing transactions from a stream, each paired with its block index.
Transactions(Receiver<(usize, Tx)>),
/// Prewarm by prefetching slots from a Block Access List.
BlockAccessList(Arc<BlockAccessList>),
BlockAccessList(Arc<DecodedBal>),
/// Transaction prewarming is skipped (e.g. small blocks where the overhead exceeds the
/// benefit). No workers are spawned.
Skipped,
@@ -331,9 +331,10 @@ where
#[instrument(level = "debug", target = "engine::tree::payload_processor::prewarm", skip_all)]
fn run_bal_prewarm(
&self,
bal: Arc<BlockAccessList>,
decoded_bal: Arc<DecodedBal>,
actions_tx: Sender<PrewarmTaskEvent<N::Receipt>>,
) {
let bal = decoded_bal.as_bal();
if bal.is_empty() {
if let Some(to_sparse_trie_task) = self.to_sparse_trie_task.as_ref() {
let _ = to_sparse_trie_task.send(StateRootMessage::FinishedStateUpdates);
@@ -355,8 +356,8 @@ where
let parent_span = Span::current();
let prefetch_parent_span = parent_span.clone();
let stream_parent_span = parent_span;
let prefetch_bal = Arc::clone(&bal);
let stream_bal = Arc::clone(&bal);
let prefetch_bal = Arc::clone(&decoded_bal);
let stream_bal = Arc::clone(&decoded_bal);
let (prefetch_tx, prefetch_rx) = oneshot::channel();
let (stream_tx, stream_rx) = oneshot::channel();
@@ -367,12 +368,12 @@ where
target: "engine::tree::payload_processor::prewarm",
parent: &prefetch_parent_span,
"bal_prefetch_storage",
bal_accounts = prefetch_bal.len(),
bal_accounts = prefetch_bal.as_bal().len(),
);
let provider_parent_span = branch_span.clone();
let _span = branch_span.entered();
prefetch_bal.par_iter().for_each_init(
prefetch_bal.as_bal().par_iter().for_each_init(
|| {
(
prefetch_ctx.clone(),
@@ -400,12 +401,12 @@ where
target: "engine::tree::payload_processor::prewarm",
parent: &stream_parent_span,
"bal_hashed_state_stream",
bal_accounts = stream_bal.len(),
bal_accounts = stream_bal.as_bal().len(),
);
let provider_parent_span = branch_span.clone();
let _span = branch_span.entered();
stream_bal.par_iter().for_each_init(
stream_bal.as_bal().par_iter().for_each_init(
|| (ctx.clone(), None::<Box<dyn AccountReader>>, provider_parent_span.clone()),
|(ctx, provider, parent_span), account_changes| {
ctx.send_bal_hashed_state(

View File

@@ -48,7 +48,10 @@ use crate::tree::{
PayloadHandle, StateProviderBuilder, StateProviderDatabase, TreeConfig, WaitForCaches,
};
use alloy_consensus::transaction::{Either, TxHashRef};
use alloy_eip7928::{bal::Bal, BlockAccessList};
use alloy_eip7928::{
bal::{Bal, DecodedBal},
BlockAccessList,
};
use alloy_eips::{eip1898::BlockWithParent, eip4895::Withdrawal, NumHash};
use alloy_evm::Evm;
use alloy_primitives::{map::B256Set, B256};
@@ -487,6 +490,12 @@ where
.in_scope(|| self.evm_env_for(&input))
.map_err(NewPayloadError::other)?;
// Extract the decoded BAL, if valid and available.
let decoded_bal = ensure_ok!(input
.try_decoded_access_list()
.map_err(|err| { Box::<dyn std::error::Error + Send + Sync>::from(err) }))
.map(Arc::new);
let env = ExecutionEnv {
evm_env,
hash: input.hash(),
@@ -495,6 +504,7 @@ where
transaction_count: input.transaction_count(),
gas_used: input.gas_used(),
withdrawals: input.withdrawals().map(|w| w.to_vec()),
decoded_bal,
};
// Plan the strategy used for state root computation.
@@ -509,14 +519,6 @@ where
// Get an iterator over the transactions in the payload
let txs = self.tx_iterator_for(&input)?;
// Extract the BAL, if valid and available
let block_access_list = ensure_ok!(input
.block_access_list()
.transpose()
// Eventually gets converted to a `InsertBlockErrorKind::Other`
.map_err(Box::<dyn std::error::Error + Send + Sync>::from))
.map(Arc::new);
// Create lazy overlay from ancestors - this doesn't block, allowing execution to start
// before the trie data is ready. The overlay will be computed on first access.
let (lazy_overlay, anchor_hash) = Self::get_parent_lazy_overlay(parent_hash, ctx.state());
@@ -532,10 +534,9 @@ where
let mut handle = ensure_ok!(self.spawn_payload_processor(
env.clone(),
txs,
provider_builder,
provider_builder.clone(),
overlay_factory.clone(),
strategy,
block_access_list,
));
// Create optional cache stats for detailed block logging
@@ -763,10 +764,19 @@ where
}
let (root, updates) = ensure_ok_post_block!(
Self::compute_state_root_serial(overlay_factory.clone(), &hashed_state),
Self::compute_state_root_serial_with_provider(
provider_builder.clone(),
&hashed_state
),
block
);
self.compare_trie_updates_with_serial(
overlay_factory.clone(),
&hashed_state,
updates.clone(),
);
if state_root_task_failed {
self.metrics.block_validation.state_root_task_fallback_success_total.increment(1);
}
@@ -1125,6 +1135,14 @@ where
.root_with_updates()?)
}
fn compute_state_root_serial_with_provider(
provider_builder: StateProviderBuilder<N, P>,
hashed_state: &LazyHashedPostState,
) -> ProviderResult<(B256, TrieUpdates)> {
let provider = provider_builder.build()?;
provider.state_root_with_updates(hashed_state.get().clone())
}
/// Awaits the state root from the background task, with an optional timeout fallback.
///
/// If a timeout is configured (`state_root_task_timeout`), this method first waits for the
@@ -1439,7 +1457,6 @@ where
provider_builder: StateProviderBuilder<N, P>,
overlay_factory: OverlayStateProviderFactory<P>,
strategy: StateRootStrategy,
block_access_list: Option<Arc<BlockAccessList>>,
) -> Result<
PayloadHandle<
impl ExecutableTxFor<Evm> + use<N, P, Evm, V, T>,
@@ -1459,7 +1476,6 @@ where
provider_builder,
overlay_factory,
&self.config,
block_access_list,
);
// record prewarming initialization duration
@@ -1472,12 +1488,8 @@ where
}
StateRootStrategy::Parallel | StateRootStrategy::Synchronous => {
let start = Instant::now();
let handle = self.payload_processor.spawn_cache_exclusive(
env,
txs,
provider_builder,
block_access_list,
);
let handle =
self.payload_processor.spawn_cache_exclusive(env, txs, provider_builder);
// Record prewarming initialization duration
self.metrics
@@ -2110,6 +2122,17 @@ impl<T: PayloadTypes> BlockOrPayload<T> {
}
}
/// Returns the decoded block access list, if present and successfully decoded.
pub fn try_decoded_access_list(&self) -> Result<Option<DecodedBal>, alloy_rlp::Error> {
match self {
Self::Payload(payload) => payload
.block_access_list()
.map(|block_access_list| DecodedBal::from_rlp_bytes(block_access_list.clone()))
.transpose(),
Self::Block(_) => Ok(None),
}
}
/// Returns the number of transactions in the payload or block.
pub fn transaction_count(&self) -> usize
where

View File

@@ -184,11 +184,18 @@ impl TestHarness {
let payload_validator = MockEngineValidator;
let (from_tree_tx, from_tree_rx) = unbounded_channel();
let tree_config =
TreeConfig::default().with_legacy_state_root(false).with_has_enough_parallelism(true);
let header = chain_spec.genesis_header().clone();
let header = SealedHeader::seal_slow(header);
let engine_api_tree_state =
EngineApiTreeState::new(10, 10, header.num_hash(), EngineApiKind::Ethereum);
let engine_api_tree_state = EngineApiTreeState::new(
10,
10,
tree_config.invalid_header_hit_eviction_threshold(),
header.num_hash(),
EngineApiKind::Ethereum,
);
let canonical_in_memory_state = CanonicalInMemoryState::with_head(header, None, None);
let (to_payload_service, _payload_command_rx) = unbounded_channel();
@@ -217,8 +224,7 @@ impl TestHarness {
persistence_handle,
PersistenceState { last_persisted_block: BlockNumHash::default(), rx: None },
payload_builder,
// always assume enough parallelism for tests
TreeConfig::default().with_legacy_state_root(false).with_has_enough_parallelism(true),
tree_config,
EngineApiKind::Ethereum,
evm_config,
changeset_cache,

View File

@@ -2,8 +2,8 @@
//! and injecting them into era1 files with `Era1Writer`.
use crate::calculate_td_by_number;
use alloy_consensus::BlockHeader;
use alloy_primitives::{BlockNumber, B256, U256};
use alloy_consensus::{BlockHeader, Sealable, TxReceipt};
use alloy_primitives::{BlockNumber, U256};
use eyre::{eyre, Result};
use reth_era::{
common::file_ops::{EraFileId, StreamWriter},
@@ -13,7 +13,7 @@ use reth_era::{
types::{
execution::{
Accumulator, BlockTuple, CompressedBody, CompressedHeader, CompressedReceipts,
TotalDifficulty, MAX_BLOCKS_PER_ERA1,
HeaderRecord, TotalDifficulty, MAX_BLOCKS_PER_ERA1,
},
group::{BlockIndex, Era1Id},
},
@@ -139,17 +139,21 @@ where
let headers = provider.headers_range(start_block..=end_block)?;
// Extract first 4 bytes of last block's state root as historical identifier
let historical_root = headers
.last()
.map(|header| {
let state_root = header.state_root();
[state_root[0], state_root[1], state_root[2], state_root[3]]
// Pre-compute accumulator from headers to determine filename
let mut precompute_td = total_difficulty;
let header_records: Vec<HeaderRecord> = headers
.iter()
.map(|h| {
precompute_td += h.difficulty();
HeaderRecord { block_hash: h.hash_slow(), total_difficulty: precompute_td }
})
.unwrap_or([0u8; 4]);
.collect();
let accumulator = Accumulator::from_header_records(&header_records)
.map_err(|e| eyre!("Failed to compute accumulator: {e}"))?;
let file_hash: [u8; 4] = accumulator.root[..4].try_into().unwrap();
let era1_id = Era1Id::new(&config.network, start_block, block_count as u32)
.with_hash(historical_root);
let era1_id =
Era1Id::new(&config.network, start_block, block_count as u32).with_hash(file_hash);
let era1_id = if config.max_blocks_per_file == MAX_BLOCKS_PER_ERA1 as u64 {
era1_id
@@ -166,7 +170,6 @@ where
let mut offsets = Vec::<i64>::with_capacity(block_count);
let mut position = VERSION_ENTRY_SIZE as i64;
let mut blocks_written = 0;
let mut final_header_data = Vec::new();
for (i, header) in headers.into_iter().enumerate() {
let expected_block_number = start_block + i as u64;
@@ -178,11 +181,6 @@ where
&mut total_difficulty,
)?;
// Save last block's header data for accumulator
if expected_block_number == end_block {
final_header_data = compressed_header.data.clone();
}
let difficulty = TotalDifficulty::new(total_difficulty);
let header_size = compressed_header.data.len() + ENTRY_HEADER_SIZE;
@@ -218,10 +216,12 @@ where
}
}
if blocks_written > 0 {
let accumulator_hash =
B256::from_slice(&final_header_data[0..32.min(final_header_data.len())]);
let accumulator = Accumulator::new(accumulator_hash);
let block_index = BlockIndex::new(start_block, offsets);
// Convert absolute offsets to relative (measured from block-index entry start)
let accumulator_entry_size = (ENTRY_HEADER_SIZE + 32) as i64;
let block_index_position = position + accumulator_entry_size;
let relative_offsets: Vec<i64> =
offsets.iter().map(|&abs| abs - block_index_position).collect();
let block_index = BlockIndex::new(start_block, relative_offsets);
writer.write_accumulator(&accumulator)?;
writer.write_block_index(&block_index)?;
@@ -310,7 +310,9 @@ where
let compressed_header = CompressedHeader::from_header(&header)?;
let compressed_body = CompressedBody::from_body(&body)?;
let compressed_receipts = CompressedReceipts::from_encodable_list(&receipts)
let receipts_with_bloom: Vec<_> =
receipts.iter().map(|r| TxReceipt::with_bloom_ref(r)).collect();
let compressed_receipts = CompressedReceipts::from_encodable_list(&receipts_with_bloom)
.map_err(|e| eyre!("Failed to compress receipts: {}", e))?;
Ok((compressed_header, compressed_body, compressed_receipts))

View File

@@ -24,6 +24,7 @@ snap.workspace = true
# ssz encoding and decoding
ethereum_ssz.workspace = true
ethereum_ssz_derive.workspace = true
sha2.workspace = true
[dev-dependencies]
eyre.workspace = true

View File

@@ -76,6 +76,7 @@ use crate::{
use alloy_consensus::{Block, BlockBody, Header};
use alloy_primitives::{B256, U256};
use alloy_rlp::{Decodable, Encodable};
use sha2::{Digest, Sha256};
use snap::{read::FrameDecoder, write::FrameEncoder};
use std::{
io::{Read, Write},
@@ -493,6 +494,73 @@ impl Accumulator {
Ok(Self { root: B256::from(root) })
}
/// Compute the accumulator from a list of header records.
///
/// Implements `hash_tree_root(List[HeaderRecord, 8192])` per the ERA1 spec:
/// - Each leaf is `sha256(block_hash || total_difficulty_le_bytes32)`
/// - Leaves are padded to `MAX_BLOCKS_PER_ERA1` (8192) with zero hashes
/// - Binary Merkle tree is computed bottom-up
/// - Final root is `sha256(merkle_root || le_bytes32(actual_count))`
///
/// Returns `Err` if `records` exceeds [`MAX_BLOCKS_PER_ERA1`].
pub fn from_header_records(records: &[HeaderRecord]) -> Result<Self, E2sError> {
let capacity = MAX_BLOCKS_PER_ERA1;
if records.len() > capacity {
return Err(E2sError::Ssz(format!(
"Too many header records: got {}, max {}",
records.len(),
capacity
)));
}
// Compute leaf hash for each header record
let mut leaves = Vec::with_capacity(capacity);
for record in records {
let mut data = [0u8; 64];
data[..32].copy_from_slice(record.block_hash.as_slice());
data[32..].copy_from_slice(&record.total_difficulty.to_le_bytes::<32>());
leaves.push(<[u8; 32]>::from(Sha256::digest(data)));
}
// Pad to capacity with zero hashes
leaves.resize(capacity, [0u8; 32]);
// Binary Merkle tree bottom-up (capacity is always a power of two)
while leaves.len() > 1 {
let mut next_level = Vec::with_capacity(leaves.len() / 2);
for pair in leaves.chunks_exact(2) {
let mut data = [0u8; 64];
data[..32].copy_from_slice(&pair[0]);
data[32..].copy_from_slice(&pair[1]);
next_level.push(<[u8; 32]>::from(Sha256::digest(data)));
}
leaves = next_level;
}
let merkle_root = leaves[0];
// mix_in_length: sha256(merkle_root || le_bytes32(actual_length))
let mut mix = [0u8; 64];
mix[..32].copy_from_slice(&merkle_root);
let length = records.len() as u64;
mix[32..40].copy_from_slice(&length.to_le_bytes());
// remaining bytes stay zero (uint256 LE padding)
Ok(Self { root: B256::from(<[u8; 32]>::from(Sha256::digest(mix))) })
}
}
/// A header record used to compute the ERA1 accumulator.
///
/// Per the ERA1 spec: `header-record := { block-hash: Bytes32, total-difficulty: Uint256 }`
#[derive(Debug, Clone)]
pub struct HeaderRecord {
/// The block hash (keccak256 of RLP-encoded header)
pub block_hash: B256,
/// The cumulative total difficulty at this block
pub total_difficulty: U256,
}
/// A block tuple in an Era1 file, containing all components for a single block
@@ -691,6 +759,44 @@ mod tests {
}
}
#[test]
fn test_accumulator_from_header_records_known_vectors() {
// Known-answer vectors computed from the SSZ spec:
// hash_tree_root(List[HeaderRecord, 8192])
let expected_empty: B256 =
"4a8c3a07c8d23adc5bac61157555c3c784d53d9bc110c1370809bd23cd93777d".parse().unwrap();
let expected_single_zero: B256 =
"81fd641249670887a731386e756a7a1538dc781b1b0bf016889045d350812817".parse().unwrap();
let expected_single_nonzero: B256 =
"ada35c48d81117f4fd588554cd4c4752356336e84cb41106dea1ceb4cfac8799".parse().unwrap();
// Empty list
let acc_empty = Accumulator::from_header_records(&[]).unwrap();
assert_eq!(acc_empty.root, expected_empty);
// Single record with zero values
let records = vec![HeaderRecord { block_hash: B256::ZERO, total_difficulty: U256::ZERO }];
let acc = Accumulator::from_header_records(&records).unwrap();
assert_eq!(acc.root, expected_single_zero);
// Single record with non-zero values
let records2 = vec![HeaderRecord {
block_hash: B256::from([1u8; 32]),
total_difficulty: U256::from(100u64),
}];
let acc2 = Accumulator::from_header_records(&records2).unwrap();
assert_eq!(acc2.root, expected_single_nonzero);
}
#[test]
fn test_accumulator_rejects_oversized_input() {
let records = vec![
HeaderRecord { block_hash: B256::ZERO, total_difficulty: U256::ZERO };
MAX_BLOCKS_PER_ERA1 + 1
];
assert!(Accumulator::from_header_records(&records).is_err());
}
#[test]
fn test_receipt_list_compression() {
let receipts = create_test_receipts();

View File

@@ -102,8 +102,8 @@ pub struct Era1Id {
/// Number of blocks in the file
pub block_count: u32,
/// Optional hash identifier for this file
/// First 4 bytes of the last historical root in the last state in the era file
/// Optional hash identifier for this file.
/// First 4 bytes of the accumulator root hash.
pub hash: Option<[u8; 4]>,
/// Whether to include era count in filename

View File

@@ -577,6 +577,9 @@ pub struct AnnouncedTxTypesMetrics {
/// Histogram for tracking frequency of EIP-7702 transaction type
pub(crate) eip7702: Histogram,
/// Histogram for tracking frequency of unknown/other transaction types
pub(crate) other: Histogram,
}
/// Counts the number of transactions by their type in a block or collection.
@@ -599,6 +602,9 @@ pub struct TxTypesCounter {
/// Count of transactions conforming to EIP-7702 (Restricted Storage Windows).
pub(crate) eip7702: usize,
/// Count of unknown/other transaction types not matching any known EIP.
pub(crate) other: usize,
}
impl TxTypesCounter {
@@ -621,6 +627,10 @@ impl TxTypesCounter {
}
}
}
pub(crate) const fn increase_other(&mut self) {
self.other += 1;
}
}
impl AnnouncedTxTypesMetrics {
@@ -632,5 +642,6 @@ impl AnnouncedTxTypesMetrics {
self.eip1559.record(tx_types_counter.eip1559 as f64);
self.eip4844.record(tx_types_counter.eip4844 as f64);
self.eip7702.record(tx_types_counter.eip7702 as f64);
self.other.record(tx_types_counter.other as f64);
}
}

View File

@@ -700,11 +700,11 @@ impl<Pool: TransactionPool, N: NetworkPrimitives> TransactionsManager<Pool, N> {
}
};
if is_eth68_message &&
let Some((actual_ty_byte, _)) = *metadata_ref_mut &&
let Ok(parsed_tx_type) = TxType::try_from(actual_ty_byte)
{
tx_types_counter.increase_by_tx_type(parsed_tx_type);
if is_eth68_message && let Some((actual_ty_byte, _)) = *metadata_ref_mut {
match TxType::try_from(actual_ty_byte) {
Ok(parsed_tx_type) => tx_types_counter.increase_by_tx_type(parsed_tx_type),
Err(_) => tx_types_counter.increase_other(),
}
}
let decision = self

View File

@@ -4,8 +4,9 @@ use clap::{builder::Resettable, Args};
use eyre::ensure;
use reth_cli_util::{parse_duration_from_secs_or_ms, parsers::format_duration_as_secs_or_ms};
use reth_engine_primitives::{
TreeConfig, DEFAULT_MULTIPROOF_TASK_CHUNK_SIZE, DEFAULT_PERSISTENCE_BACKPRESSURE_THRESHOLD,
DEFAULT_SPARSE_TRIE_MAX_HOT_ACCOUNTS, DEFAULT_SPARSE_TRIE_MAX_HOT_SLOTS,
TreeConfig, DEFAULT_INVALID_HEADER_HIT_EVICTION_THRESHOLD, DEFAULT_MULTIPROOF_TASK_CHUNK_SIZE,
DEFAULT_PERSISTENCE_BACKPRESSURE_THRESHOLD, DEFAULT_SPARSE_TRIE_MAX_HOT_ACCOUNTS,
DEFAULT_SPARSE_TRIE_MAX_HOT_SLOTS,
};
use std::{sync::OnceLock, time::Duration};
@@ -25,6 +26,7 @@ pub struct DefaultEngineValues {
persistence_threshold: u64,
persistence_backpressure_threshold: u64,
memory_block_buffer_target: u64,
invalid_header_hit_eviction_threshold: u8,
legacy_state_root_task_enabled: bool,
state_cache_disabled: bool,
prewarming_disabled: bool,
@@ -83,6 +85,12 @@ impl DefaultEngineValues {
self
}
/// Set the invalid header cache hit eviction threshold
pub const fn with_invalid_header_hit_eviction_threshold(mut self, v: u8) -> Self {
self.invalid_header_hit_eviction_threshold = v;
self
}
/// Set whether to enable legacy state root task by default
pub const fn with_legacy_state_root_task_enabled(mut self, v: bool) -> Self {
self.legacy_state_root_task_enabled = v;
@@ -255,6 +263,7 @@ impl Default for DefaultEngineValues {
persistence_threshold: DEFAULT_PERSISTENCE_THRESHOLD,
persistence_backpressure_threshold: DEFAULT_PERSISTENCE_BACKPRESSURE_THRESHOLD,
memory_block_buffer_target: DEFAULT_MEMORY_BLOCK_BUFFER_TARGET,
invalid_header_hit_eviction_threshold: DEFAULT_INVALID_HEADER_HIT_EVICTION_THRESHOLD,
legacy_state_root_task_enabled: false,
state_cache_disabled: false,
prewarming_disabled: false,
@@ -309,6 +318,14 @@ pub struct EngineArgs {
#[arg(long = "engine.memory-block-buffer-target", default_value_t = DefaultEngineValues::get_global().memory_block_buffer_target)]
pub memory_block_buffer_target: u64,
/// Configure how many cache hits an invalid header can accumulate before it is evicted and
/// reprocessed.
///
/// Set to `0` to effectively disable the cache because entries are evicted on the first
/// lookup.
#[arg(long = "engine.invalid-header-cache-hit-eviction-threshold", default_value_t = DefaultEngineValues::get_global().invalid_header_hit_eviction_threshold)]
pub invalid_header_hit_eviction_threshold: u8,
/// Enable legacy state root
#[arg(long = "engine.legacy-state-root", default_value_t = DefaultEngineValues::get_global().legacy_state_root_task_enabled)]
pub legacy_state_root_task_enabled: bool,
@@ -530,6 +547,7 @@ impl Default for EngineArgs {
persistence_threshold,
persistence_backpressure_threshold,
memory_block_buffer_target,
invalid_header_hit_eviction_threshold,
legacy_state_root_task_enabled,
state_cache_disabled,
prewarming_disabled,
@@ -562,6 +580,7 @@ impl Default for EngineArgs {
persistence_threshold,
persistence_backpressure_threshold,
memory_block_buffer_target,
invalid_header_hit_eviction_threshold,
legacy_state_root_task_enabled,
state_root_task_compare_updates,
caching_and_prewarming_enabled: true,
@@ -620,6 +639,7 @@ impl EngineArgs {
.with_persistence_threshold(self.persistence_threshold)
.with_persistence_backpressure_threshold(self.persistence_backpressure_threshold)
.with_memory_block_buffer_target(self.memory_block_buffer_target)
.with_invalid_header_hit_eviction_threshold(self.invalid_header_hit_eviction_threshold)
.with_legacy_state_root(self.legacy_state_root_task_enabled)
.without_state_cache(self.state_cache_disabled)
.without_prewarming(self.prewarming_disabled)
@@ -682,6 +702,7 @@ mod tests {
persistence_threshold: 100,
persistence_backpressure_threshold: 101,
memory_block_buffer_target: 50,
invalid_header_hit_eviction_threshold: 7,
legacy_state_root_task_enabled: true,
caching_and_prewarming_enabled: true,
state_cache_disabled: true,
@@ -726,6 +747,8 @@ mod tests {
"101",
"--engine.memory-block-buffer-target",
"50",
"--engine.invalid-header-cache-hit-eviction-threshold",
"7",
"--engine.legacy-state-root",
"--engine.disable-state-cache",
"--engine.disable-prewarming",
@@ -808,6 +831,28 @@ mod tests {
assert_eq!(args.slow_block_threshold, Some(Duration::from_millis(500)));
}
#[test]
fn test_parse_invalid_header_hit_eviction_threshold() {
let args = CommandParser::<EngineArgs>::parse_from(["reth"]).args;
assert_eq!(
args.invalid_header_hit_eviction_threshold,
DEFAULT_INVALID_HEADER_HIT_EVICTION_THRESHOLD
);
assert_eq!(
args.tree_config().invalid_header_hit_eviction_threshold(),
DEFAULT_INVALID_HEADER_HIT_EVICTION_THRESHOLD
);
let args = CommandParser::<EngineArgs>::parse_from([
"reth",
"--engine.invalid-header-cache-hit-eviction-threshold",
"0",
])
.args;
assert_eq!(args.invalid_header_hit_eviction_threshold, 0);
assert_eq!(args.tree_config().invalid_header_hit_eviction_threshold(), 0);
}
#[test]
fn test_parse_share_sparse_trie_flag() {
let args = CommandParser::<EngineArgs>::parse_from(["reth"]).args;

View File

@@ -719,9 +719,16 @@ pub struct DiscoveryArgs {
pub disable_discv4_discovery: bool,
/// Enable Discv5 discovery.
#[arg(long, conflicts_with = "disable_discovery")]
///
/// Discv5 is now enabled by default, so this flag is a no-op and will be removed in a future
/// release.
#[arg(long, conflicts_with = "disable_discovery", hide = true)]
pub enable_discv5_discovery: bool,
/// Disable Discv5 discovery.
#[arg(long, conflicts_with = "disable_discovery")]
pub disable_discv5_discovery: bool,
/// Disable Nat discovery.
#[arg(long, conflicts_with = "disable_discovery")]
pub disable_nat: bool,
@@ -852,21 +859,23 @@ impl DiscoveryArgs {
.bootstrap_lookup_countdown(*discv5_bootstrap_lookup_countdown)
}
/// Returns true if discv5 discovery should be configured
/// Returns true if discv5 discovery should be configured.
///
/// Discv5 is enabled by default and can be disabled with `--disable-discv5-discovery`.
const fn should_enable_discv5(&self) -> bool {
if self.disable_discovery {
if self.disable_discovery || self.disable_discv5_discovery {
return false;
}
self.enable_discv5_discovery ||
self.discv5_addr.is_some() ||
self.discv5_addr_ipv6.is_some()
true
}
/// Set the discovery port to zero, to allow the OS to assign a random unused port when
/// discovery binds to the socket.
/// Set the discovery ports to zero, to allow the OS to assign random unused ports when
/// discovery binds to the sockets.
pub const fn with_unused_discovery_port(mut self) -> Self {
self.port = 0;
self.discv5_port = 0;
self.discv5_port_ipv6 = 0;
self
}
@@ -896,6 +905,7 @@ impl Default for DiscoveryArgs {
disable_dns_discovery: false,
disable_discv4_discovery: false,
enable_discv5_discovery: false,
disable_discv5_discovery: false,
disable_nat: false,
addr: DEFAULT_DISCOVERY_ADDR,
port: DEFAULT_DISCOVERY_PORT,

View File

@@ -344,17 +344,9 @@ where
}
/// Removes transaction index entries for a reorged block.
///
/// Only removes entries that still point to this block, preserving mappings for transactions
/// that were re-mined in a new canonical block.
fn remove_block_transactions(&mut self, block: &RecoveredBlock<Provider::Block>) {
let block_hash = block.hash();
for tx in block.body().transactions() {
if let Some((mapped_hash, _)) = self.tx_hash_index.get(tx.tx_hash()) &&
*mapped_hash == block_hash
{
self.tx_hash_index.remove(tx.tx_hash());
}
self.tx_hash_index.remove(tx.tx_hash());
}
}
@@ -421,6 +413,15 @@ where
}
}
fn on_reorg_header(&mut self, block_hash: B256, res: ProviderResult<Provider::Header>) {
if let Some(queued) = self.headers_cache.remove(&block_hash) {
// send the response to queued senders
for tx in queued {
let _ = tx.send(res.clone());
}
}
}
/// Shrinks the queues but leaves some space for the next requests
fn shrink_queues(&mut self) {
let min_capacity = 2;
@@ -597,9 +598,12 @@ where
}
CacheAction::RemoveReorgedChain { chain_change } => {
for block in chain_change.blocks {
let block_hash = block.hash();
let header = block.clone_header();
// Remove transaction index entries for reorged blocks
this.remove_block_transactions(&block);
this.on_reorg_block(block.hash(), Ok(Some(block)));
this.on_reorg_block(block_hash, Ok(Some(block)));
this.on_reorg_header(block_hash, Ok(header));
}
for block_receipts in chain_change.receipts {
@@ -825,3 +829,89 @@ pub async fn cache_new_blocks_task<St, N: NodePrimitives>(
eth_state_cache.to_service.send(CacheAction::CacheNewCanonicalChain { chain_change });
}
}
#[cfg(test)]
mod tests {
use super::*;
use alloy_consensus::Header;
use alloy_primitives::{Address, Signature};
use reth_ethereum_primitives::{
Block, BlockBody, EthPrimitives, Transaction, TransactionSigned,
};
use reth_primitives_traits::RecoveredBlock;
use reth_storage_api::noop::NoopProvider;
fn test_service() -> EthStateCacheService<NoopProvider, Runtime> {
let (_cache, service) = EthStateCache::<EthPrimitives>::create(
NoopProvider::default(),
Runtime::test(),
4,
4,
4,
1,
16,
);
service
}
fn test_block() -> RecoveredBlock<Block> {
RecoveredBlock::new_unhashed(
Block {
header: Header { number: 1, ..Default::default() },
body: BlockBody {
transactions: vec![TransactionSigned::new_unhashed(
Transaction::Legacy(Default::default()),
Signature::test_signature(),
)],
..Default::default()
},
},
vec![Address::ZERO],
)
}
#[test]
fn reorg_evicts_cached_headers() {
let mut service = test_service();
let block_hash = B256::repeat_byte(0x11);
assert!(service
.headers_cache
.insert(block_hash, Header { number: 42, ..Default::default() }));
assert!(service.headers_cache.get(&block_hash).is_some());
service.on_reorg_header(block_hash, Ok(Header { number: 7, ..Default::default() }));
assert!(service.headers_cache.get(&block_hash).is_none());
}
#[test]
fn reorg_forwards_header_to_queued_requests() {
let mut service = test_service();
let block_hash = B256::repeat_byte(0x22);
let (response_tx, mut response_rx) = oneshot::channel();
let header = Header { number: 7, ..Default::default() };
assert!(service.headers_cache.queue(block_hash, response_tx));
service.on_reorg_header(block_hash, Ok(header));
let header =
response_rx.try_recv().expect("queued header response").expect("header result");
assert_eq!(header.number, 7);
}
#[test]
fn reorg_removes_tx_hash_index_entries_unconditionally() {
let mut service = test_service();
let block = test_block();
let tx_hash = *block.body().transactions().next().expect("test transaction").tx_hash();
service.tx_hash_index.insert(tx_hash, (B256::repeat_byte(0x33), 0));
service.remove_block_transactions(&block);
assert!(service.tx_hash_index.get(&tx_hash).is_none());
}
}

View File

@@ -6,7 +6,8 @@
use crate::ChangesetOffset;
use std::{
fs::{File, OpenOptions},
io::{self, Read, Seek, SeekFrom, Write},
io::{self, Write},
os::unix::fs::FileExt,
path::Path,
};
@@ -177,16 +178,14 @@ impl ChangesetOffsetReader {
/// Reads a single changeset offset by block index.
/// Returns None if index is out of bounds.
pub fn get(&mut self, block_index: u64) -> io::Result<Option<ChangesetOffset>> {
pub fn get(&self, block_index: u64) -> io::Result<Option<ChangesetOffset>> {
if block_index >= self.len {
return Ok(None);
}
let byte_pos = block_index * Self::RECORD_SIZE as u64;
self.file.seek(SeekFrom::Start(byte_pos))?;
let mut buf = [0u8; Self::RECORD_SIZE];
self.file.read_exact(&mut buf)?;
self.file.read_exact_at(&mut buf, byte_pos)?;
let offset = u64::from_le_bytes(buf[..8].try_into().unwrap());
let num_changes = u64::from_le_bytes(buf[8..].try_into().unwrap());
@@ -195,7 +194,7 @@ impl ChangesetOffsetReader {
}
/// Reads a range of changeset offsets.
pub fn get_range(&mut self, start: u64, end: u64) -> io::Result<Vec<ChangesetOffset>> {
pub fn get_range(&self, start: u64, end: u64) -> io::Result<Vec<ChangesetOffset>> {
let end = end.min(self.len);
if start >= end {
return Ok(Vec::new());
@@ -203,13 +202,13 @@ impl ChangesetOffsetReader {
let count = (end - start) as usize;
let byte_pos = start * Self::RECORD_SIZE as u64;
self.file.seek(SeekFrom::Start(byte_pos))?;
let mut result = Vec::with_capacity(count);
let mut buf = [0u8; Self::RECORD_SIZE];
for _ in 0..count {
self.file.read_exact(&mut buf)?;
for i in 0..count {
let pos = byte_pos + (i as u64) * Self::RECORD_SIZE as u64;
self.file.read_exact_at(&mut buf, pos)?;
let offset = u64::from_le_bytes(buf[..8].try_into().unwrap());
let num_changes = u64::from_le_bytes(buf[8..].try_into().unwrap());
result.push(ChangesetOffset::new(offset, num_changes));
@@ -251,7 +250,7 @@ mod tests {
// Read
{
let mut reader = ChangesetOffsetReader::new(&path, 3).unwrap();
let reader = ChangesetOffsetReader::new(&path, 3).unwrap();
assert_eq!(reader.len(), 3);
let entry = reader.get(0).unwrap().unwrap();
@@ -284,7 +283,7 @@ mod tests {
writer.truncate(2).unwrap();
assert_eq!(writer.len(), 2);
let mut reader = ChangesetOffsetReader::new(&path, 2).unwrap();
let reader = ChangesetOffsetReader::new(&path, 2).unwrap();
assert_eq!(reader.len(), 2);
assert!(reader.get(2).unwrap().is_none());
}
@@ -317,7 +316,7 @@ mod tests {
assert_eq!(std::fs::metadata(&path).unwrap().len(), 16);
// Verify the complete record is readable
let mut reader = ChangesetOffsetReader::new(&path, 1).unwrap();
let reader = ChangesetOffsetReader::new(&path, 1).unwrap();
assert_eq!(reader.len(), 1);
let entry = reader.get(0).unwrap().unwrap();
assert_eq!(entry.offset(), 100);
@@ -340,7 +339,7 @@ mod tests {
}
// Open with len=2, ignoring the 3rd record
let mut reader = ChangesetOffsetReader::new(&path, 2).unwrap();
let reader = ChangesetOffsetReader::new(&path, 2).unwrap();
assert_eq!(reader.len(), 2);
// First two records should be readable
@@ -397,7 +396,7 @@ mod tests {
// Verify the records are correct
{
let mut reader = ChangesetOffsetReader::new(&path, 3).unwrap();
let reader = ChangesetOffsetReader::new(&path, 3).unwrap();
assert_eq!(reader.len(), 3);
let entry0 = reader.get(0).unwrap().unwrap();

View File

@@ -15,9 +15,9 @@ mod compression;
mod event;
mod segment;
#[cfg(feature = "std")]
#[cfg(all(feature = "std", unix))]
mod changeset_offsets;
#[cfg(feature = "std")]
#[cfg(all(feature = "std", unix))]
pub use changeset_offsets::{ChangesetOffsetReader, ChangesetOffsetWriter};
use alloy_primitives::BlockNumber;

View File

@@ -42,12 +42,12 @@ rustc-hash = { workspace = true, optional = true, features = ["std"] }
sysinfo = { workspace = true, features = ["system"] }
parking_lot = { workspace = true, optional = true }
[target.'cfg(unix)'.dependencies]
libc.workspace = true
# arbitrary utils
strum = { workspace = true, features = ["derive"], optional = true }
[target.'cfg(unix)'.dependencies]
libc.workspace = true
[dev-dependencies]
# reth libs with arbitrary
reth-primitives-traits = { workspace = true, features = ["reth-codec"] }

View File

@@ -17,7 +17,7 @@ use reth_db::static_file::{
use reth_db_api::table::{Decompress, Value};
use reth_node_types::NodePrimitives;
use reth_primitives_traits::{SealedHeader, SignedTransaction};
use reth_static_file_types::{ChangesetOffset, ChangesetOffsetReader};
use reth_static_file_types::ChangesetOffset;
use reth_storage_api::range_size_hint;
use reth_storage_errors::provider::{ProviderError, ProviderResult};
use std::{
@@ -111,15 +111,11 @@ impl<'a, N: NodePrimitives> StaticFileJarProvider<'a, N> {
return Ok(None);
};
let csoff_path = self.data_path().with_extension("csoff");
if !csoff_path.exists() {
return Ok(None);
if let Some(reader) = self.jar.value().csoff_reader() {
reader.get(index).map_err(ProviderError::other)
} else {
Ok(None)
}
let len = header.changeset_offsets_len();
let mut reader =
ChangesetOffsetReader::new(&csoff_path, len).map_err(ProviderError::other)?;
reader.get(index).map_err(ProviderError::other)
}
/// Reads all changeset offsets from the sidecar file.
@@ -138,15 +134,12 @@ impl<'a, N: NodePrimitives> StaticFileJarProvider<'a, N> {
return Ok(Some(Vec::new()));
}
let csoff_path = self.data_path().with_extension("csoff");
if !csoff_path.exists() {
return Ok(None);
if let Some(reader) = self.jar.value().csoff_reader() {
let offsets = reader.get_range(0, len).map_err(ProviderError::other)?;
Ok(Some(offsets))
} else {
Ok(None)
}
let mut reader =
ChangesetOffsetReader::new(&csoff_path, len).map_err(ProviderError::other)?;
let offsets = reader.get_range(0, len).map_err(ProviderError::other)?;
Ok(Some(offsets))
}
}

View File

@@ -16,9 +16,9 @@ mod metrics;
mod writer_tests;
use reth_nippy_jar::NippyJar;
use reth_static_file_types::{SegmentHeader, StaticFileSegment};
use reth_static_file_types::{ChangesetOffsetReader, SegmentHeader, StaticFileSegment};
use reth_storage_errors::provider::{ProviderError, ProviderResult};
use std::{ops::Deref, sync::Arc};
use std::{io, ops::Deref, sync::Arc};
/// Alias type for each specific `NippyJar`.
type LoadedJarRef<'a> =
@@ -29,6 +29,7 @@ type LoadedJarRef<'a> =
pub struct LoadedJar {
jar: NippyJar<SegmentHeader>,
mmap_handle: Arc<reth_nippy_jar::DataReader>,
csoff_reader: Option<ChangesetOffsetReader>,
}
impl LoadedJar {
@@ -36,7 +37,20 @@ impl LoadedJar {
match jar.open_data_reader() {
Ok(data_reader) => {
let mmap_handle = Arc::new(data_reader);
Ok(Self { jar, mmap_handle })
let csoff_reader = if jar.user_header().segment().is_change_based() {
let csoff_path = jar.data_path().with_extension("csoff");
let len = jar.user_header().changeset_offsets_len();
match ChangesetOffsetReader::new(&csoff_path, len) {
Ok(reader) => Some(reader),
Err(err) if err.kind() == io::ErrorKind::NotFound && len == 0 => None,
Err(err) => return Err(ProviderError::other(err)),
}
} else {
None
};
Ok(Self { jar, mmap_handle, csoff_reader })
}
Err(e) => Err(ProviderError::other(e)),
}
@@ -55,6 +69,11 @@ impl LoadedJar {
fn size(&self) -> usize {
self.mmap_handle.size() + self.mmap_handle.offsets_size()
}
/// Returns a reference to the cached changeset offset reader.
const fn csoff_reader(&self) -> Option<&ChangesetOffsetReader> {
self.csoff_reader.as_ref()
}
}
impl Deref for LoadedJar {

View File

@@ -400,7 +400,7 @@ impl<N: NodePrimitives> StaticFileProviderRW<N> {
// Step 2: Validate sidecar offsets against actual NippyJar state
let valid_blocks = if actual_sidecar_blocks > 0 {
let mut reader = ChangesetOffsetReader::new(&csoff_path, actual_sidecar_blocks)
let reader = ChangesetOffsetReader::new(&csoff_path, actual_sidecar_blocks)
.map_err(ProviderError::other)?;
// Find last block where offset + num_changes <= actual_nippy_rows
@@ -896,7 +896,7 @@ impl<N: NodePrimitives> StaticFileProviderRW<N> {
// Read offset for the block after last_block from sidecar.
// Use committed length from header, ignoring any uncommitted records
// that may exist in the file after a crash.
let mut reader = ChangesetOffsetReader::new(&csoff_path, changeset_offsets_len)
let reader = ChangesetOffsetReader::new(&csoff_path, changeset_offsets_len)
.map_err(ProviderError::other)?;
if let Some(next_offset) = reader.get(blocks_to_keep).map_err(ProviderError::other)? {
next_offset.offset()

View File

@@ -614,7 +614,7 @@ mod tests {
assert_eq!(get_header_block_count(&provider, 0), 5);
// Verify offsets are correct
let mut reader = ChangesetOffsetReader::new(&sidecar_path, 5).unwrap();
let reader = ChangesetOffsetReader::new(&sidecar_path, 5).unwrap();
let o0 = reader.get(0).unwrap().unwrap();
assert_eq!(o0.offset(), 0);

View File

@@ -87,8 +87,8 @@ Networking:
--disable-discv4-discovery
Disable Discv4 discovery
--enable-discv5-discovery
Enable Discv5 discovery
--disable-discv5-discovery
Disable Discv5 discovery
--disable-nat
Disable Nat discovery
@@ -950,6 +950,13 @@ Engine:
[default: 0]
--engine.invalid-header-cache-hit-eviction-threshold <INVALID_HEADER_HIT_EVICTION_THRESHOLD>
Configure how many cache hits an invalid header can accumulate before it is evicted and reprocessed.
Set to `0` to effectively disable the cache because entries are evicted on the first lookup.
[default: 128]
--engine.legacy-state-root
Enable legacy state root

View File

@@ -27,8 +27,8 @@ Networking:
--disable-discv4-discovery
Disable Discv4 discovery
--enable-discv5-discovery
Enable Discv5 discovery
--disable-discv5-discovery
Disable Discv5 discovery
--disable-nat
Disable Nat discovery

View File

@@ -27,8 +27,8 @@ Networking:
--disable-discv4-discovery
Disable Discv4 discovery
--enable-discv5-discovery
Enable Discv5 discovery
--disable-discv5-discovery
Disable Discv5 discovery
--disable-nat
Disable Nat discovery

View File

@@ -175,8 +175,8 @@ Networking:
--disable-discv4-discovery
Disable Discv4 discovery
--enable-discv5-discovery
Enable Discv5 discovery
--disable-discv5-discovery
Disable Discv5 discovery
--disable-nat
Disable Nat discovery

View File

@@ -17,8 +17,8 @@ This section provides essential information about the ports used by the system,
- **Port:** `9200`
- **Protocol:** UDP
- **Purpose:** Used for discv5 peer discovery protocol. This is a newer discovery protocol that can be enabled with `--enable-discv5-discovery`. It operates independently from the legacy discv4 discovery on port 30303.
- **Exposure Recommendation:** This port should be exposed if discv5 discovery is enabled to allow peer discovery.
- **Purpose:** Used for discv5 peer discovery protocol. This is enabled by default and can be disabled with `--disable-discv5-discovery`. It operates independently from the legacy discv4 discovery on port 30303.
- **Exposure Recommendation:** This port should be exposed to allow peer discovery.
## Metrics Port