mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-04-30 03:01:58 -04:00
Compare commits
11 Commits
dan/latenc
...
klkvr/debu
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ae9042a5fb | ||
|
|
62d99888d2 | ||
|
|
73f5d77b51 | ||
|
|
b62f71977a | ||
|
|
ad27be67be | ||
|
|
63f80907cc | ||
|
|
a57930481c | ||
|
|
bbcfe354a1 | ||
|
|
7839f3d876 | ||
|
|
e89b4611e4 | ||
|
|
2b7d4b54d4 |
@@ -21,7 +21,7 @@ jobs:
|
||||
steps:
|
||||
- uses: actions/checkout@v6
|
||||
|
||||
- uses: actions/setup-python@v5
|
||||
- uses: actions/setup-python@v6
|
||||
with:
|
||||
python-version: "3.12"
|
||||
|
||||
|
||||
8
Cargo.lock
generated
8
Cargo.lock
generated
@@ -250,14 +250,15 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "alloy-eip7928"
|
||||
version = "0.3.3"
|
||||
version = "0.3.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f8222b1d88f9a6d03be84b0f5e76bb60cd83991b43ad8ab6477f0e4a7809b98d"
|
||||
checksum = "407510740da514b694fecb44d8b3cebdc60d448f70cc5d24743e8ba273448a6e"
|
||||
dependencies = [
|
||||
"alloy-primitives",
|
||||
"alloy-rlp",
|
||||
"arbitrary",
|
||||
"borsh",
|
||||
"once_cell",
|
||||
"serde",
|
||||
]
|
||||
|
||||
@@ -2949,7 +2950,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7ab67060fc6b8ef687992d439ca0fa36e7ed17e9a0b16b25b601e8757df720de"
|
||||
dependencies = [
|
||||
"data-encoding",
|
||||
"syn 1.0.109",
|
||||
"syn 2.0.117",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -8281,6 +8282,7 @@ dependencies = [
|
||||
"reqwest 0.13.2",
|
||||
"reth-era-downloader",
|
||||
"reth-ethereum-primitives",
|
||||
"sha2",
|
||||
"snap",
|
||||
"tempfile",
|
||||
"test-case",
|
||||
|
||||
@@ -449,7 +449,7 @@ alloy-sol-types = { version = "1.5.6", default-features = false }
|
||||
|
||||
alloy-chains = { version = "0.2.33", default-features = false }
|
||||
alloy-eip2124 = { version = "0.2.0", default-features = false }
|
||||
alloy-eip7928 = { version = "0.3.0", default-features = false }
|
||||
alloy-eip7928 = { version = "0.3.4", default-features = false }
|
||||
alloy-evm = { version = "0.33.0", default-features = false }
|
||||
alloy-rlp = { version = "0.3.13", default-features = false, features = ["core-net"] }
|
||||
alloy-trie = { version = "0.9.4", default-features = false }
|
||||
|
||||
@@ -14,7 +14,7 @@ use reth_db_api::{
|
||||
table::{Compress, Decompress, DupSort, Table},
|
||||
tables,
|
||||
transaction::DbTx,
|
||||
RawKey, RawTable, Receipts, TableViewer, Transactions,
|
||||
RawKey, RawTable, TableViewer,
|
||||
};
|
||||
use reth_db_common::DbTool;
|
||||
use reth_node_api::{HeaderTy, ReceiptTy, TxTy};
|
||||
@@ -264,15 +264,12 @@ impl Command {
|
||||
);
|
||||
}
|
||||
StaticFileSegment::Transactions => {
|
||||
let transaction = <<Transactions as Table>::Value>::decompress(
|
||||
content[0].as_slice(),
|
||||
)?;
|
||||
let transaction = TxTy::<N>::decompress(content[0].as_slice())?;
|
||||
println!("{}", serde_json::to_string_pretty(&transaction)?);
|
||||
}
|
||||
StaticFileSegment::Receipts => {
|
||||
let receipt = <<Receipts as Table>::Value>::decompress(
|
||||
content[0].as_slice(),
|
||||
)?;
|
||||
let receipt =
|
||||
ReceiptTy::<N>::decompress(content[0].as_slice())?;
|
||||
println!("{}", serde_json::to_string_pretty(&receipt)?);
|
||||
}
|
||||
StaticFileSegment::TransactionSenders => {
|
||||
|
||||
@@ -15,6 +15,9 @@ pub const DEFAULT_MEMORY_BLOCK_BUFFER_TARGET: u64 = 0;
|
||||
/// The size of proof targets chunk to spawn in one multiproof calculation.
|
||||
pub const DEFAULT_MULTIPROOF_TASK_CHUNK_SIZE: usize = 5;
|
||||
|
||||
/// Default number of cache hits before an invalid header entry is evicted and reprocessed.
|
||||
pub const DEFAULT_INVALID_HEADER_HIT_EVICTION_THRESHOLD: u8 = 128;
|
||||
|
||||
/// Gas threshold below which the small block chunk size is used.
|
||||
pub const SMALL_BLOCK_GAS_THRESHOLD: u64 = 20_000_000;
|
||||
|
||||
@@ -102,6 +105,11 @@ pub struct TreeConfig {
|
||||
block_buffer_limit: u32,
|
||||
/// Number of invalid headers to keep in cache.
|
||||
max_invalid_header_cache_length: u32,
|
||||
/// Number of cache hits before an invalid header entry is evicted and reprocessed.
|
||||
///
|
||||
/// Setting this to `0` effectively disables the cache because entries are evicted on the
|
||||
/// first lookup.
|
||||
invalid_header_hit_eviction_threshold: u8,
|
||||
/// Maximum number of blocks to execute sequentially in a batch.
|
||||
///
|
||||
/// This is used as a cutoff to prevent long-running sequential block execution when we receive
|
||||
@@ -206,6 +214,7 @@ impl Default for TreeConfig {
|
||||
persistence_backpressure_threshold: DEFAULT_PERSISTENCE_BACKPRESSURE_THRESHOLD,
|
||||
block_buffer_limit: DEFAULT_BLOCK_BUFFER_LIMIT,
|
||||
max_invalid_header_cache_length: DEFAULT_MAX_INVALID_HEADER_CACHE_LENGTH,
|
||||
invalid_header_hit_eviction_threshold: DEFAULT_INVALID_HEADER_HIT_EVICTION_THRESHOLD,
|
||||
max_execute_block_batch_size: DEFAULT_MAX_EXECUTE_BLOCK_BATCH_SIZE,
|
||||
legacy_state_root: false,
|
||||
always_compare_trie_updates: false,
|
||||
@@ -248,6 +257,7 @@ impl TreeConfig {
|
||||
persistence_backpressure_threshold: u64,
|
||||
block_buffer_limit: u32,
|
||||
max_invalid_header_cache_length: u32,
|
||||
invalid_header_hit_eviction_threshold: u8,
|
||||
max_execute_block_batch_size: usize,
|
||||
legacy_state_root: bool,
|
||||
always_compare_trie_updates: bool,
|
||||
@@ -281,6 +291,7 @@ impl TreeConfig {
|
||||
persistence_backpressure_threshold,
|
||||
block_buffer_limit,
|
||||
max_invalid_header_cache_length,
|
||||
invalid_header_hit_eviction_threshold,
|
||||
max_execute_block_batch_size,
|
||||
legacy_state_root,
|
||||
always_compare_trie_updates,
|
||||
@@ -338,6 +349,14 @@ impl TreeConfig {
|
||||
self.max_invalid_header_cache_length
|
||||
}
|
||||
|
||||
/// Return the invalid header cache hit eviction threshold.
|
||||
///
|
||||
/// Setting this to `0` effectively disables the cache because entries are evicted on the
|
||||
/// first lookup.
|
||||
pub const fn invalid_header_hit_eviction_threshold(&self) -> u8 {
|
||||
self.invalid_header_hit_eviction_threshold
|
||||
}
|
||||
|
||||
/// Return the maximum execute block batch size.
|
||||
pub const fn max_execute_block_batch_size(&self) -> usize {
|
||||
self.max_execute_block_batch_size
|
||||
@@ -468,6 +487,15 @@ impl TreeConfig {
|
||||
self
|
||||
}
|
||||
|
||||
/// Setter for the invalid header cache hit eviction threshold.
|
||||
pub const fn with_invalid_header_hit_eviction_threshold(
|
||||
mut self,
|
||||
invalid_header_hit_eviction_threshold: u8,
|
||||
) -> Self {
|
||||
self.invalid_header_hit_eviction_threshold = invalid_header_hit_eviction_threshold;
|
||||
self
|
||||
}
|
||||
|
||||
/// Setter for maximum execute block batch size.
|
||||
pub const fn with_max_execute_block_batch_size(
|
||||
mut self,
|
||||
|
||||
@@ -8,25 +8,28 @@ use schnellru::{ByLength, LruMap};
|
||||
use std::fmt::Debug;
|
||||
use tracing::warn;
|
||||
|
||||
/// The max hit counter for invalid headers in the cache before it is forcefully evicted.
|
||||
///
|
||||
/// In other words, if a header is referenced more than this number of times, it will be evicted to
|
||||
/// allow for reprocessing.
|
||||
const INVALID_HEADER_HIT_EVICTION_THRESHOLD: u8 = 128;
|
||||
|
||||
/// Keeps track of invalid headers.
|
||||
#[derive(Debug)]
|
||||
pub struct InvalidHeaderCache {
|
||||
/// This maps a header hash to a reference to its invalid ancestor.
|
||||
headers: LruMap<B256, HeaderEntry>,
|
||||
/// Number of cache hits before an invalid header entry is evicted and reprocessed.
|
||||
hit_eviction_threshold: u8,
|
||||
/// Metrics for the cache.
|
||||
metrics: InvalidHeaderCacheMetrics,
|
||||
}
|
||||
|
||||
impl InvalidHeaderCache {
|
||||
/// Invalid header cache constructor.
|
||||
pub fn new(max_length: u32) -> Self {
|
||||
Self { headers: LruMap::new(ByLength::new(max_length)), metrics: Default::default() }
|
||||
///
|
||||
/// Setting `hit_eviction_threshold` to `0` effectively disables the cache because entries are
|
||||
/// evicted on the first lookup.
|
||||
pub fn new(max_length: u32, hit_eviction_threshold: u8) -> Self {
|
||||
Self {
|
||||
headers: LruMap::new(ByLength::new(max_length)),
|
||||
hit_eviction_threshold,
|
||||
metrics: Default::default(),
|
||||
}
|
||||
}
|
||||
|
||||
fn insert_entry(&mut self, hash: B256, header: BlockWithParent) {
|
||||
@@ -41,7 +44,7 @@ impl InvalidHeaderCache {
|
||||
{
|
||||
let entry = self.headers.get(hash)?;
|
||||
entry.hit_count += 1;
|
||||
if entry.hit_count < INVALID_HEADER_HIT_EVICTION_THRESHOLD {
|
||||
if entry.hit_count < self.hit_eviction_threshold {
|
||||
return Some(entry.header)
|
||||
}
|
||||
}
|
||||
@@ -110,17 +113,28 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_hit_eviction() {
|
||||
let mut cache = InvalidHeaderCache::new(10);
|
||||
let hit_eviction_threshold = 3;
|
||||
let mut cache = InvalidHeaderCache::new(10, hit_eviction_threshold);
|
||||
let header = Header::default();
|
||||
let header = SealedHeader::seal_slow(header);
|
||||
cache.insert(header.block_with_parent());
|
||||
assert_eq!(cache.headers.get(&header.hash()).unwrap().hit_count, 0);
|
||||
|
||||
for hit in 1..INVALID_HEADER_HIT_EVICTION_THRESHOLD {
|
||||
for hit in 1..hit_eviction_threshold {
|
||||
assert!(cache.get(&header.hash()).is_some());
|
||||
assert_eq!(cache.headers.get(&header.hash()).unwrap().hit_count, hit);
|
||||
}
|
||||
|
||||
assert!(cache.get(&header.hash()).is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_zero_hit_eviction_threshold_effectively_disables_cache() {
|
||||
let mut cache = InvalidHeaderCache::new(10, 0);
|
||||
let header = SealedHeader::seal_slow(Header::default());
|
||||
cache.insert(header.block_with_parent());
|
||||
|
||||
assert!(cache.get(&header.hash()).is_none());
|
||||
assert_eq!(cache.headers.len(), 0);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -151,11 +151,15 @@ impl<N: NodePrimitives> EngineApiTreeState<N> {
|
||||
fn new(
|
||||
block_buffer_limit: u32,
|
||||
max_invalid_header_cache_length: u32,
|
||||
invalid_header_hit_eviction_threshold: u8,
|
||||
canonical_block: BlockNumHash,
|
||||
engine_kind: EngineApiKind,
|
||||
) -> Self {
|
||||
Self {
|
||||
invalid_headers: InvalidHeaderCache::new(max_invalid_header_cache_length),
|
||||
invalid_headers: InvalidHeaderCache::new(
|
||||
max_invalid_header_cache_length,
|
||||
invalid_header_hit_eviction_threshold,
|
||||
),
|
||||
buffer: BlockBuffer::new(block_buffer_limit),
|
||||
tree_state: TreeState::new(canonical_block, engine_kind),
|
||||
forkchoice_state_tracker: ForkchoiceStateTracker::default(),
|
||||
@@ -436,6 +440,7 @@ where
|
||||
let state = EngineApiTreeState::new(
|
||||
config.block_buffer_limit(),
|
||||
config.max_invalid_header_cache_length(),
|
||||
config.invalid_header_hit_eviction_threshold(),
|
||||
header.num_hash(),
|
||||
kind,
|
||||
);
|
||||
|
||||
@@ -7,7 +7,7 @@ use crate::tree::{
|
||||
CacheWaitDurations, CachedStateMetrics, CachedStateMetricsSource, ExecutionCache,
|
||||
PayloadExecutionCache, SavedCache, StateProviderBuilder, TreeConfig, WaitForCaches,
|
||||
};
|
||||
use alloy_eip7928::BlockAccessList;
|
||||
use alloy_eip7928::bal::DecodedBal;
|
||||
use alloy_eips::{eip1898::BlockWithParent, eip4895::Withdrawal};
|
||||
use alloy_primitives::B256;
|
||||
use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
|
||||
@@ -250,7 +250,6 @@ where
|
||||
provider_builder: StateProviderBuilder<N, P>,
|
||||
multiproof_provider_factory: F,
|
||||
config: &TreeConfig,
|
||||
bal: Option<Arc<BlockAccessList>>,
|
||||
) -> IteratorPayloadHandle<Evm, I, N>
|
||||
where
|
||||
P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
|
||||
@@ -273,13 +272,12 @@ where
|
||||
halve_workers,
|
||||
config,
|
||||
);
|
||||
let install_state_hook = bal.is_none();
|
||||
let install_state_hook = env.decoded_bal.is_none();
|
||||
let prewarm_handle = self.spawn_caching_with(
|
||||
env,
|
||||
prewarm_rx,
|
||||
provider_builder,
|
||||
Some(state_root_handle.updates_tx().clone()),
|
||||
bal,
|
||||
);
|
||||
|
||||
PayloadHandle {
|
||||
@@ -300,14 +298,13 @@ where
|
||||
env: ExecutionEnv<Evm>,
|
||||
transactions: I,
|
||||
provider_builder: StateProviderBuilder<N, P>,
|
||||
bal: Option<Arc<BlockAccessList>>,
|
||||
) -> IteratorPayloadHandle<Evm, I, N>
|
||||
where
|
||||
P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
|
||||
{
|
||||
let (prewarm_rx, execution_rx) =
|
||||
self.spawn_tx_iterator(transactions, env.transaction_count);
|
||||
let prewarm_handle = self.spawn_caching_with(env, prewarm_rx, provider_builder, None, bal);
|
||||
let prewarm_handle = self.spawn_caching_with(env, prewarm_rx, provider_builder, None);
|
||||
PayloadHandle {
|
||||
state_root_handle: None,
|
||||
install_state_hook: false,
|
||||
@@ -465,7 +462,7 @@ where
|
||||
level = "debug",
|
||||
target = "engine::tree::payload_processor",
|
||||
skip_all,
|
||||
fields(bal=%bal.is_some())
|
||||
fields(bal=%env.decoded_bal.is_some())
|
||||
)]
|
||||
fn spawn_caching_with<P>(
|
||||
&self,
|
||||
@@ -473,7 +470,6 @@ where
|
||||
transactions: mpsc::Receiver<(usize, impl ExecutableTxFor<Evm> + Clone + Send + 'static)>,
|
||||
provider_builder: StateProviderBuilder<N, P>,
|
||||
to_sparse_trie_task: Option<CrossbeamSender<StateRootMessage>>,
|
||||
bal: Option<Arc<BlockAccessList>>,
|
||||
) -> CacheTaskHandle<N::Receipt>
|
||||
where
|
||||
P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
|
||||
@@ -484,7 +480,7 @@ where
|
||||
let saved_cache = self.disable_state_cache.not().then(|| self.cache_for(env.parent_hash));
|
||||
|
||||
let executed_tx_index = Arc::new(AtomicUsize::new(0));
|
||||
|
||||
let maybe_decoded_bal = env.decoded_bal.clone();
|
||||
// configure prewarming
|
||||
let prewarm_ctx = PrewarmContext {
|
||||
env,
|
||||
@@ -507,15 +503,16 @@ where
|
||||
prewarm_ctx,
|
||||
to_sparse_trie_task,
|
||||
);
|
||||
|
||||
{
|
||||
let to_prewarm_task = to_prewarm_task.clone();
|
||||
let disable_bal_parallel_execution = self.disable_bal_parallel_execution;
|
||||
self.executor.spawn_blocking_named("prewarm", move || {
|
||||
let mode = if skip_prewarm {
|
||||
PrewarmMode::Skipped
|
||||
} else if let Some(bal) = bal.filter(|_| !disable_bal_parallel_execution) {
|
||||
PrewarmMode::BlockAccessList(bal)
|
||||
} else if let Some(decoded_bal) =
|
||||
maybe_decoded_bal.filter(|_| !disable_bal_parallel_execution)
|
||||
{
|
||||
PrewarmMode::BlockAccessList(decoded_bal)
|
||||
} else {
|
||||
PrewarmMode::Transactions(transactions)
|
||||
};
|
||||
@@ -936,6 +933,9 @@ pub struct ExecutionEnv<Evm: ConfigureEvm> {
|
||||
/// Withdrawals included in the block.
|
||||
/// Used to generate prefetch targets for withdrawal addresses.
|
||||
pub withdrawals: Option<Vec<Withdrawal>>,
|
||||
/// Optional decoded BAL for the block.
|
||||
/// Used to validate and optimize execution.
|
||||
pub decoded_bal: Option<Arc<DecodedBal>>,
|
||||
}
|
||||
|
||||
impl<Evm: ConfigureEvm> ExecutionEnv<Evm>
|
||||
@@ -953,6 +953,7 @@ where
|
||||
transaction_count: 0,
|
||||
gas_used: 0,
|
||||
withdrawals: None,
|
||||
decoded_bal: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1251,7 +1252,6 @@ mod tests {
|
||||
StateProviderBuilder::new(provider_factory.clone(), genesis_hash, None),
|
||||
OverlayStateProviderFactory::new(provider_factory, ChangesetCache::new()),
|
||||
&TreeConfig::default(),
|
||||
None, // No BAL for test
|
||||
);
|
||||
|
||||
let mut state_hook = handle.state_hook().expect("state hook is None");
|
||||
|
||||
@@ -18,7 +18,7 @@ use crate::tree::{
|
||||
StateProviderBuilder,
|
||||
};
|
||||
use alloy_consensus::transaction::TxHashRef;
|
||||
use alloy_eip7928::BlockAccessList;
|
||||
use alloy_eip7928::bal::DecodedBal;
|
||||
use alloy_eips::eip4895::Withdrawal;
|
||||
use alloy_primitives::{keccak256, StorageKey, B256};
|
||||
use crossbeam_channel::Sender as CrossbeamSender;
|
||||
@@ -48,7 +48,7 @@ pub enum PrewarmMode<Tx> {
|
||||
/// Prewarm by executing transactions from a stream, each paired with its block index.
|
||||
Transactions(Receiver<(usize, Tx)>),
|
||||
/// Prewarm by prefetching slots from a Block Access List.
|
||||
BlockAccessList(Arc<BlockAccessList>),
|
||||
BlockAccessList(Arc<DecodedBal>),
|
||||
/// Transaction prewarming is skipped (e.g. small blocks where the overhead exceeds the
|
||||
/// benefit). No workers are spawned.
|
||||
Skipped,
|
||||
@@ -331,9 +331,10 @@ where
|
||||
#[instrument(level = "debug", target = "engine::tree::payload_processor::prewarm", skip_all)]
|
||||
fn run_bal_prewarm(
|
||||
&self,
|
||||
bal: Arc<BlockAccessList>,
|
||||
decoded_bal: Arc<DecodedBal>,
|
||||
actions_tx: Sender<PrewarmTaskEvent<N::Receipt>>,
|
||||
) {
|
||||
let bal = decoded_bal.as_bal();
|
||||
if bal.is_empty() {
|
||||
if let Some(to_sparse_trie_task) = self.to_sparse_trie_task.as_ref() {
|
||||
let _ = to_sparse_trie_task.send(StateRootMessage::FinishedStateUpdates);
|
||||
@@ -355,8 +356,8 @@ where
|
||||
let parent_span = Span::current();
|
||||
let prefetch_parent_span = parent_span.clone();
|
||||
let stream_parent_span = parent_span;
|
||||
let prefetch_bal = Arc::clone(&bal);
|
||||
let stream_bal = Arc::clone(&bal);
|
||||
let prefetch_bal = Arc::clone(&decoded_bal);
|
||||
let stream_bal = Arc::clone(&decoded_bal);
|
||||
let (prefetch_tx, prefetch_rx) = oneshot::channel();
|
||||
let (stream_tx, stream_rx) = oneshot::channel();
|
||||
|
||||
@@ -367,12 +368,12 @@ where
|
||||
target: "engine::tree::payload_processor::prewarm",
|
||||
parent: &prefetch_parent_span,
|
||||
"bal_prefetch_storage",
|
||||
bal_accounts = prefetch_bal.len(),
|
||||
bal_accounts = prefetch_bal.as_bal().len(),
|
||||
);
|
||||
let provider_parent_span = branch_span.clone();
|
||||
let _span = branch_span.entered();
|
||||
|
||||
prefetch_bal.par_iter().for_each_init(
|
||||
prefetch_bal.as_bal().par_iter().for_each_init(
|
||||
|| {
|
||||
(
|
||||
prefetch_ctx.clone(),
|
||||
@@ -400,12 +401,12 @@ where
|
||||
target: "engine::tree::payload_processor::prewarm",
|
||||
parent: &stream_parent_span,
|
||||
"bal_hashed_state_stream",
|
||||
bal_accounts = stream_bal.len(),
|
||||
bal_accounts = stream_bal.as_bal().len(),
|
||||
);
|
||||
let provider_parent_span = branch_span.clone();
|
||||
let _span = branch_span.entered();
|
||||
|
||||
stream_bal.par_iter().for_each_init(
|
||||
stream_bal.as_bal().par_iter().for_each_init(
|
||||
|| (ctx.clone(), None::<Box<dyn AccountReader>>, provider_parent_span.clone()),
|
||||
|(ctx, provider, parent_span), account_changes| {
|
||||
ctx.send_bal_hashed_state(
|
||||
|
||||
@@ -48,7 +48,10 @@ use crate::tree::{
|
||||
PayloadHandle, StateProviderBuilder, StateProviderDatabase, TreeConfig, WaitForCaches,
|
||||
};
|
||||
use alloy_consensus::transaction::{Either, TxHashRef};
|
||||
use alloy_eip7928::{bal::Bal, BlockAccessList};
|
||||
use alloy_eip7928::{
|
||||
bal::{Bal, DecodedBal},
|
||||
BlockAccessList,
|
||||
};
|
||||
use alloy_eips::{eip1898::BlockWithParent, eip4895::Withdrawal, NumHash};
|
||||
use alloy_evm::Evm;
|
||||
use alloy_primitives::{map::B256Set, B256};
|
||||
@@ -487,6 +490,12 @@ where
|
||||
.in_scope(|| self.evm_env_for(&input))
|
||||
.map_err(NewPayloadError::other)?;
|
||||
|
||||
// Extract the decoded BAL, if valid and available.
|
||||
let decoded_bal = ensure_ok!(input
|
||||
.try_decoded_access_list()
|
||||
.map_err(|err| { Box::<dyn std::error::Error + Send + Sync>::from(err) }))
|
||||
.map(Arc::new);
|
||||
|
||||
let env = ExecutionEnv {
|
||||
evm_env,
|
||||
hash: input.hash(),
|
||||
@@ -495,6 +504,7 @@ where
|
||||
transaction_count: input.transaction_count(),
|
||||
gas_used: input.gas_used(),
|
||||
withdrawals: input.withdrawals().map(|w| w.to_vec()),
|
||||
decoded_bal,
|
||||
};
|
||||
|
||||
// Plan the strategy used for state root computation.
|
||||
@@ -509,14 +519,6 @@ where
|
||||
// Get an iterator over the transactions in the payload
|
||||
let txs = self.tx_iterator_for(&input)?;
|
||||
|
||||
// Extract the BAL, if valid and available
|
||||
let block_access_list = ensure_ok!(input
|
||||
.block_access_list()
|
||||
.transpose()
|
||||
// Eventually gets converted to a `InsertBlockErrorKind::Other`
|
||||
.map_err(Box::<dyn std::error::Error + Send + Sync>::from))
|
||||
.map(Arc::new);
|
||||
|
||||
// Create lazy overlay from ancestors - this doesn't block, allowing execution to start
|
||||
// before the trie data is ready. The overlay will be computed on first access.
|
||||
let (lazy_overlay, anchor_hash) = Self::get_parent_lazy_overlay(parent_hash, ctx.state());
|
||||
@@ -532,10 +534,9 @@ where
|
||||
let mut handle = ensure_ok!(self.spawn_payload_processor(
|
||||
env.clone(),
|
||||
txs,
|
||||
provider_builder,
|
||||
provider_builder.clone(),
|
||||
overlay_factory.clone(),
|
||||
strategy,
|
||||
block_access_list,
|
||||
));
|
||||
|
||||
// Create optional cache stats for detailed block logging
|
||||
@@ -763,10 +764,19 @@ where
|
||||
}
|
||||
|
||||
let (root, updates) = ensure_ok_post_block!(
|
||||
Self::compute_state_root_serial(overlay_factory.clone(), &hashed_state),
|
||||
Self::compute_state_root_serial_with_provider(
|
||||
provider_builder.clone(),
|
||||
&hashed_state
|
||||
),
|
||||
block
|
||||
);
|
||||
|
||||
self.compare_trie_updates_with_serial(
|
||||
overlay_factory.clone(),
|
||||
&hashed_state,
|
||||
updates.clone(),
|
||||
);
|
||||
|
||||
if state_root_task_failed {
|
||||
self.metrics.block_validation.state_root_task_fallback_success_total.increment(1);
|
||||
}
|
||||
@@ -1125,6 +1135,14 @@ where
|
||||
.root_with_updates()?)
|
||||
}
|
||||
|
||||
fn compute_state_root_serial_with_provider(
|
||||
provider_builder: StateProviderBuilder<N, P>,
|
||||
hashed_state: &LazyHashedPostState,
|
||||
) -> ProviderResult<(B256, TrieUpdates)> {
|
||||
let provider = provider_builder.build()?;
|
||||
provider.state_root_with_updates(hashed_state.get().clone())
|
||||
}
|
||||
|
||||
/// Awaits the state root from the background task, with an optional timeout fallback.
|
||||
///
|
||||
/// If a timeout is configured (`state_root_task_timeout`), this method first waits for the
|
||||
@@ -1439,7 +1457,6 @@ where
|
||||
provider_builder: StateProviderBuilder<N, P>,
|
||||
overlay_factory: OverlayStateProviderFactory<P>,
|
||||
strategy: StateRootStrategy,
|
||||
block_access_list: Option<Arc<BlockAccessList>>,
|
||||
) -> Result<
|
||||
PayloadHandle<
|
||||
impl ExecutableTxFor<Evm> + use<N, P, Evm, V, T>,
|
||||
@@ -1459,7 +1476,6 @@ where
|
||||
provider_builder,
|
||||
overlay_factory,
|
||||
&self.config,
|
||||
block_access_list,
|
||||
);
|
||||
|
||||
// record prewarming initialization duration
|
||||
@@ -1472,12 +1488,8 @@ where
|
||||
}
|
||||
StateRootStrategy::Parallel | StateRootStrategy::Synchronous => {
|
||||
let start = Instant::now();
|
||||
let handle = self.payload_processor.spawn_cache_exclusive(
|
||||
env,
|
||||
txs,
|
||||
provider_builder,
|
||||
block_access_list,
|
||||
);
|
||||
let handle =
|
||||
self.payload_processor.spawn_cache_exclusive(env, txs, provider_builder);
|
||||
|
||||
// Record prewarming initialization duration
|
||||
self.metrics
|
||||
@@ -2110,6 +2122,17 @@ impl<T: PayloadTypes> BlockOrPayload<T> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the decoded block access list, if present and successfully decoded.
|
||||
pub fn try_decoded_access_list(&self) -> Result<Option<DecodedBal>, alloy_rlp::Error> {
|
||||
match self {
|
||||
Self::Payload(payload) => payload
|
||||
.block_access_list()
|
||||
.map(|block_access_list| DecodedBal::from_rlp_bytes(block_access_list.clone()))
|
||||
.transpose(),
|
||||
Self::Block(_) => Ok(None),
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the number of transactions in the payload or block.
|
||||
pub fn transaction_count(&self) -> usize
|
||||
where
|
||||
|
||||
@@ -184,11 +184,18 @@ impl TestHarness {
|
||||
let payload_validator = MockEngineValidator;
|
||||
|
||||
let (from_tree_tx, from_tree_rx) = unbounded_channel();
|
||||
let tree_config =
|
||||
TreeConfig::default().with_legacy_state_root(false).with_has_enough_parallelism(true);
|
||||
|
||||
let header = chain_spec.genesis_header().clone();
|
||||
let header = SealedHeader::seal_slow(header);
|
||||
let engine_api_tree_state =
|
||||
EngineApiTreeState::new(10, 10, header.num_hash(), EngineApiKind::Ethereum);
|
||||
let engine_api_tree_state = EngineApiTreeState::new(
|
||||
10,
|
||||
10,
|
||||
tree_config.invalid_header_hit_eviction_threshold(),
|
||||
header.num_hash(),
|
||||
EngineApiKind::Ethereum,
|
||||
);
|
||||
let canonical_in_memory_state = CanonicalInMemoryState::with_head(header, None, None);
|
||||
|
||||
let (to_payload_service, _payload_command_rx) = unbounded_channel();
|
||||
@@ -217,8 +224,7 @@ impl TestHarness {
|
||||
persistence_handle,
|
||||
PersistenceState { last_persisted_block: BlockNumHash::default(), rx: None },
|
||||
payload_builder,
|
||||
// always assume enough parallelism for tests
|
||||
TreeConfig::default().with_legacy_state_root(false).with_has_enough_parallelism(true),
|
||||
tree_config,
|
||||
EngineApiKind::Ethereum,
|
||||
evm_config,
|
||||
changeset_cache,
|
||||
|
||||
@@ -2,8 +2,8 @@
|
||||
//! and injecting them into era1 files with `Era1Writer`.
|
||||
|
||||
use crate::calculate_td_by_number;
|
||||
use alloy_consensus::BlockHeader;
|
||||
use alloy_primitives::{BlockNumber, B256, U256};
|
||||
use alloy_consensus::{BlockHeader, Sealable, TxReceipt};
|
||||
use alloy_primitives::{BlockNumber, U256};
|
||||
use eyre::{eyre, Result};
|
||||
use reth_era::{
|
||||
common::file_ops::{EraFileId, StreamWriter},
|
||||
@@ -13,7 +13,7 @@ use reth_era::{
|
||||
types::{
|
||||
execution::{
|
||||
Accumulator, BlockTuple, CompressedBody, CompressedHeader, CompressedReceipts,
|
||||
TotalDifficulty, MAX_BLOCKS_PER_ERA1,
|
||||
HeaderRecord, TotalDifficulty, MAX_BLOCKS_PER_ERA1,
|
||||
},
|
||||
group::{BlockIndex, Era1Id},
|
||||
},
|
||||
@@ -139,17 +139,21 @@ where
|
||||
|
||||
let headers = provider.headers_range(start_block..=end_block)?;
|
||||
|
||||
// Extract first 4 bytes of last block's state root as historical identifier
|
||||
let historical_root = headers
|
||||
.last()
|
||||
.map(|header| {
|
||||
let state_root = header.state_root();
|
||||
[state_root[0], state_root[1], state_root[2], state_root[3]]
|
||||
// Pre-compute accumulator from headers to determine filename
|
||||
let mut precompute_td = total_difficulty;
|
||||
let header_records: Vec<HeaderRecord> = headers
|
||||
.iter()
|
||||
.map(|h| {
|
||||
precompute_td += h.difficulty();
|
||||
HeaderRecord { block_hash: h.hash_slow(), total_difficulty: precompute_td }
|
||||
})
|
||||
.unwrap_or([0u8; 4]);
|
||||
.collect();
|
||||
let accumulator = Accumulator::from_header_records(&header_records)
|
||||
.map_err(|e| eyre!("Failed to compute accumulator: {e}"))?;
|
||||
let file_hash: [u8; 4] = accumulator.root[..4].try_into().unwrap();
|
||||
|
||||
let era1_id = Era1Id::new(&config.network, start_block, block_count as u32)
|
||||
.with_hash(historical_root);
|
||||
let era1_id =
|
||||
Era1Id::new(&config.network, start_block, block_count as u32).with_hash(file_hash);
|
||||
|
||||
let era1_id = if config.max_blocks_per_file == MAX_BLOCKS_PER_ERA1 as u64 {
|
||||
era1_id
|
||||
@@ -166,7 +170,6 @@ where
|
||||
let mut offsets = Vec::<i64>::with_capacity(block_count);
|
||||
let mut position = VERSION_ENTRY_SIZE as i64;
|
||||
let mut blocks_written = 0;
|
||||
let mut final_header_data = Vec::new();
|
||||
|
||||
for (i, header) in headers.into_iter().enumerate() {
|
||||
let expected_block_number = start_block + i as u64;
|
||||
@@ -178,11 +181,6 @@ where
|
||||
&mut total_difficulty,
|
||||
)?;
|
||||
|
||||
// Save last block's header data for accumulator
|
||||
if expected_block_number == end_block {
|
||||
final_header_data = compressed_header.data.clone();
|
||||
}
|
||||
|
||||
let difficulty = TotalDifficulty::new(total_difficulty);
|
||||
|
||||
let header_size = compressed_header.data.len() + ENTRY_HEADER_SIZE;
|
||||
@@ -218,10 +216,12 @@ where
|
||||
}
|
||||
}
|
||||
if blocks_written > 0 {
|
||||
let accumulator_hash =
|
||||
B256::from_slice(&final_header_data[0..32.min(final_header_data.len())]);
|
||||
let accumulator = Accumulator::new(accumulator_hash);
|
||||
let block_index = BlockIndex::new(start_block, offsets);
|
||||
// Convert absolute offsets to relative (measured from block-index entry start)
|
||||
let accumulator_entry_size = (ENTRY_HEADER_SIZE + 32) as i64;
|
||||
let block_index_position = position + accumulator_entry_size;
|
||||
let relative_offsets: Vec<i64> =
|
||||
offsets.iter().map(|&abs| abs - block_index_position).collect();
|
||||
let block_index = BlockIndex::new(start_block, relative_offsets);
|
||||
|
||||
writer.write_accumulator(&accumulator)?;
|
||||
writer.write_block_index(&block_index)?;
|
||||
@@ -310,7 +310,9 @@ where
|
||||
|
||||
let compressed_header = CompressedHeader::from_header(&header)?;
|
||||
let compressed_body = CompressedBody::from_body(&body)?;
|
||||
let compressed_receipts = CompressedReceipts::from_encodable_list(&receipts)
|
||||
let receipts_with_bloom: Vec<_> =
|
||||
receipts.iter().map(|r| TxReceipt::with_bloom_ref(r)).collect();
|
||||
let compressed_receipts = CompressedReceipts::from_encodable_list(&receipts_with_bloom)
|
||||
.map_err(|e| eyre!("Failed to compress receipts: {}", e))?;
|
||||
|
||||
Ok((compressed_header, compressed_body, compressed_receipts))
|
||||
|
||||
@@ -24,6 +24,7 @@ snap.workspace = true
|
||||
# ssz encoding and decoding
|
||||
ethereum_ssz.workspace = true
|
||||
ethereum_ssz_derive.workspace = true
|
||||
sha2.workspace = true
|
||||
|
||||
[dev-dependencies]
|
||||
eyre.workspace = true
|
||||
|
||||
@@ -76,6 +76,7 @@ use crate::{
|
||||
use alloy_consensus::{Block, BlockBody, Header};
|
||||
use alloy_primitives::{B256, U256};
|
||||
use alloy_rlp::{Decodable, Encodable};
|
||||
use sha2::{Digest, Sha256};
|
||||
use snap::{read::FrameDecoder, write::FrameEncoder};
|
||||
use std::{
|
||||
io::{Read, Write},
|
||||
@@ -493,6 +494,73 @@ impl Accumulator {
|
||||
|
||||
Ok(Self { root: B256::from(root) })
|
||||
}
|
||||
|
||||
/// Compute the accumulator from a list of header records.
|
||||
///
|
||||
/// Implements `hash_tree_root(List[HeaderRecord, 8192])` per the ERA1 spec:
|
||||
/// - Each leaf is `sha256(block_hash || total_difficulty_le_bytes32)`
|
||||
/// - Leaves are padded to `MAX_BLOCKS_PER_ERA1` (8192) with zero hashes
|
||||
/// - Binary Merkle tree is computed bottom-up
|
||||
/// - Final root is `sha256(merkle_root || le_bytes32(actual_count))`
|
||||
///
|
||||
/// Returns `Err` if `records` exceeds [`MAX_BLOCKS_PER_ERA1`].
|
||||
pub fn from_header_records(records: &[HeaderRecord]) -> Result<Self, E2sError> {
|
||||
let capacity = MAX_BLOCKS_PER_ERA1;
|
||||
|
||||
if records.len() > capacity {
|
||||
return Err(E2sError::Ssz(format!(
|
||||
"Too many header records: got {}, max {}",
|
||||
records.len(),
|
||||
capacity
|
||||
)));
|
||||
}
|
||||
|
||||
// Compute leaf hash for each header record
|
||||
let mut leaves = Vec::with_capacity(capacity);
|
||||
for record in records {
|
||||
let mut data = [0u8; 64];
|
||||
data[..32].copy_from_slice(record.block_hash.as_slice());
|
||||
data[32..].copy_from_slice(&record.total_difficulty.to_le_bytes::<32>());
|
||||
leaves.push(<[u8; 32]>::from(Sha256::digest(data)));
|
||||
}
|
||||
|
||||
// Pad to capacity with zero hashes
|
||||
leaves.resize(capacity, [0u8; 32]);
|
||||
|
||||
// Binary Merkle tree bottom-up (capacity is always a power of two)
|
||||
while leaves.len() > 1 {
|
||||
let mut next_level = Vec::with_capacity(leaves.len() / 2);
|
||||
for pair in leaves.chunks_exact(2) {
|
||||
let mut data = [0u8; 64];
|
||||
data[..32].copy_from_slice(&pair[0]);
|
||||
data[32..].copy_from_slice(&pair[1]);
|
||||
next_level.push(<[u8; 32]>::from(Sha256::digest(data)));
|
||||
}
|
||||
leaves = next_level;
|
||||
}
|
||||
|
||||
let merkle_root = leaves[0];
|
||||
|
||||
// mix_in_length: sha256(merkle_root || le_bytes32(actual_length))
|
||||
let mut mix = [0u8; 64];
|
||||
mix[..32].copy_from_slice(&merkle_root);
|
||||
let length = records.len() as u64;
|
||||
mix[32..40].copy_from_slice(&length.to_le_bytes());
|
||||
// remaining bytes stay zero (uint256 LE padding)
|
||||
|
||||
Ok(Self { root: B256::from(<[u8; 32]>::from(Sha256::digest(mix))) })
|
||||
}
|
||||
}
|
||||
|
||||
/// A header record used to compute the ERA1 accumulator.
|
||||
///
|
||||
/// Per the ERA1 spec: `header-record := { block-hash: Bytes32, total-difficulty: Uint256 }`
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct HeaderRecord {
|
||||
/// The block hash (keccak256 of RLP-encoded header)
|
||||
pub block_hash: B256,
|
||||
/// The cumulative total difficulty at this block
|
||||
pub total_difficulty: U256,
|
||||
}
|
||||
|
||||
/// A block tuple in an Era1 file, containing all components for a single block
|
||||
@@ -691,6 +759,44 @@ mod tests {
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_accumulator_from_header_records_known_vectors() {
|
||||
// Known-answer vectors computed from the SSZ spec:
|
||||
// hash_tree_root(List[HeaderRecord, 8192])
|
||||
let expected_empty: B256 =
|
||||
"4a8c3a07c8d23adc5bac61157555c3c784d53d9bc110c1370809bd23cd93777d".parse().unwrap();
|
||||
let expected_single_zero: B256 =
|
||||
"81fd641249670887a731386e756a7a1538dc781b1b0bf016889045d350812817".parse().unwrap();
|
||||
let expected_single_nonzero: B256 =
|
||||
"ada35c48d81117f4fd588554cd4c4752356336e84cb41106dea1ceb4cfac8799".parse().unwrap();
|
||||
|
||||
// Empty list
|
||||
let acc_empty = Accumulator::from_header_records(&[]).unwrap();
|
||||
assert_eq!(acc_empty.root, expected_empty);
|
||||
|
||||
// Single record with zero values
|
||||
let records = vec![HeaderRecord { block_hash: B256::ZERO, total_difficulty: U256::ZERO }];
|
||||
let acc = Accumulator::from_header_records(&records).unwrap();
|
||||
assert_eq!(acc.root, expected_single_zero);
|
||||
|
||||
// Single record with non-zero values
|
||||
let records2 = vec![HeaderRecord {
|
||||
block_hash: B256::from([1u8; 32]),
|
||||
total_difficulty: U256::from(100u64),
|
||||
}];
|
||||
let acc2 = Accumulator::from_header_records(&records2).unwrap();
|
||||
assert_eq!(acc2.root, expected_single_nonzero);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_accumulator_rejects_oversized_input() {
|
||||
let records = vec![
|
||||
HeaderRecord { block_hash: B256::ZERO, total_difficulty: U256::ZERO };
|
||||
MAX_BLOCKS_PER_ERA1 + 1
|
||||
];
|
||||
assert!(Accumulator::from_header_records(&records).is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_receipt_list_compression() {
|
||||
let receipts = create_test_receipts();
|
||||
|
||||
@@ -102,8 +102,8 @@ pub struct Era1Id {
|
||||
/// Number of blocks in the file
|
||||
pub block_count: u32,
|
||||
|
||||
/// Optional hash identifier for this file
|
||||
/// First 4 bytes of the last historical root in the last state in the era file
|
||||
/// Optional hash identifier for this file.
|
||||
/// First 4 bytes of the accumulator root hash.
|
||||
pub hash: Option<[u8; 4]>,
|
||||
|
||||
/// Whether to include era count in filename
|
||||
|
||||
@@ -577,6 +577,9 @@ pub struct AnnouncedTxTypesMetrics {
|
||||
|
||||
/// Histogram for tracking frequency of EIP-7702 transaction type
|
||||
pub(crate) eip7702: Histogram,
|
||||
|
||||
/// Histogram for tracking frequency of unknown/other transaction types
|
||||
pub(crate) other: Histogram,
|
||||
}
|
||||
|
||||
/// Counts the number of transactions by their type in a block or collection.
|
||||
@@ -599,6 +602,9 @@ pub struct TxTypesCounter {
|
||||
|
||||
/// Count of transactions conforming to EIP-7702 (Restricted Storage Windows).
|
||||
pub(crate) eip7702: usize,
|
||||
|
||||
/// Count of unknown/other transaction types not matching any known EIP.
|
||||
pub(crate) other: usize,
|
||||
}
|
||||
|
||||
impl TxTypesCounter {
|
||||
@@ -621,6 +627,10 @@ impl TxTypesCounter {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) const fn increase_other(&mut self) {
|
||||
self.other += 1;
|
||||
}
|
||||
}
|
||||
|
||||
impl AnnouncedTxTypesMetrics {
|
||||
@@ -632,5 +642,6 @@ impl AnnouncedTxTypesMetrics {
|
||||
self.eip1559.record(tx_types_counter.eip1559 as f64);
|
||||
self.eip4844.record(tx_types_counter.eip4844 as f64);
|
||||
self.eip7702.record(tx_types_counter.eip7702 as f64);
|
||||
self.other.record(tx_types_counter.other as f64);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -700,11 +700,11 @@ impl<Pool: TransactionPool, N: NetworkPrimitives> TransactionsManager<Pool, N> {
|
||||
}
|
||||
};
|
||||
|
||||
if is_eth68_message &&
|
||||
let Some((actual_ty_byte, _)) = *metadata_ref_mut &&
|
||||
let Ok(parsed_tx_type) = TxType::try_from(actual_ty_byte)
|
||||
{
|
||||
tx_types_counter.increase_by_tx_type(parsed_tx_type);
|
||||
if is_eth68_message && let Some((actual_ty_byte, _)) = *metadata_ref_mut {
|
||||
match TxType::try_from(actual_ty_byte) {
|
||||
Ok(parsed_tx_type) => tx_types_counter.increase_by_tx_type(parsed_tx_type),
|
||||
Err(_) => tx_types_counter.increase_other(),
|
||||
}
|
||||
}
|
||||
|
||||
let decision = self
|
||||
|
||||
@@ -4,8 +4,9 @@ use clap::{builder::Resettable, Args};
|
||||
use eyre::ensure;
|
||||
use reth_cli_util::{parse_duration_from_secs_or_ms, parsers::format_duration_as_secs_or_ms};
|
||||
use reth_engine_primitives::{
|
||||
TreeConfig, DEFAULT_MULTIPROOF_TASK_CHUNK_SIZE, DEFAULT_PERSISTENCE_BACKPRESSURE_THRESHOLD,
|
||||
DEFAULT_SPARSE_TRIE_MAX_HOT_ACCOUNTS, DEFAULT_SPARSE_TRIE_MAX_HOT_SLOTS,
|
||||
TreeConfig, DEFAULT_INVALID_HEADER_HIT_EVICTION_THRESHOLD, DEFAULT_MULTIPROOF_TASK_CHUNK_SIZE,
|
||||
DEFAULT_PERSISTENCE_BACKPRESSURE_THRESHOLD, DEFAULT_SPARSE_TRIE_MAX_HOT_ACCOUNTS,
|
||||
DEFAULT_SPARSE_TRIE_MAX_HOT_SLOTS,
|
||||
};
|
||||
use std::{sync::OnceLock, time::Duration};
|
||||
|
||||
@@ -25,6 +26,7 @@ pub struct DefaultEngineValues {
|
||||
persistence_threshold: u64,
|
||||
persistence_backpressure_threshold: u64,
|
||||
memory_block_buffer_target: u64,
|
||||
invalid_header_hit_eviction_threshold: u8,
|
||||
legacy_state_root_task_enabled: bool,
|
||||
state_cache_disabled: bool,
|
||||
prewarming_disabled: bool,
|
||||
@@ -83,6 +85,12 @@ impl DefaultEngineValues {
|
||||
self
|
||||
}
|
||||
|
||||
/// Set the invalid header cache hit eviction threshold
|
||||
pub const fn with_invalid_header_hit_eviction_threshold(mut self, v: u8) -> Self {
|
||||
self.invalid_header_hit_eviction_threshold = v;
|
||||
self
|
||||
}
|
||||
|
||||
/// Set whether to enable legacy state root task by default
|
||||
pub const fn with_legacy_state_root_task_enabled(mut self, v: bool) -> Self {
|
||||
self.legacy_state_root_task_enabled = v;
|
||||
@@ -255,6 +263,7 @@ impl Default for DefaultEngineValues {
|
||||
persistence_threshold: DEFAULT_PERSISTENCE_THRESHOLD,
|
||||
persistence_backpressure_threshold: DEFAULT_PERSISTENCE_BACKPRESSURE_THRESHOLD,
|
||||
memory_block_buffer_target: DEFAULT_MEMORY_BLOCK_BUFFER_TARGET,
|
||||
invalid_header_hit_eviction_threshold: DEFAULT_INVALID_HEADER_HIT_EVICTION_THRESHOLD,
|
||||
legacy_state_root_task_enabled: false,
|
||||
state_cache_disabled: false,
|
||||
prewarming_disabled: false,
|
||||
@@ -309,6 +318,14 @@ pub struct EngineArgs {
|
||||
#[arg(long = "engine.memory-block-buffer-target", default_value_t = DefaultEngineValues::get_global().memory_block_buffer_target)]
|
||||
pub memory_block_buffer_target: u64,
|
||||
|
||||
/// Configure how many cache hits an invalid header can accumulate before it is evicted and
|
||||
/// reprocessed.
|
||||
///
|
||||
/// Set to `0` to effectively disable the cache because entries are evicted on the first
|
||||
/// lookup.
|
||||
#[arg(long = "engine.invalid-header-cache-hit-eviction-threshold", default_value_t = DefaultEngineValues::get_global().invalid_header_hit_eviction_threshold)]
|
||||
pub invalid_header_hit_eviction_threshold: u8,
|
||||
|
||||
/// Enable legacy state root
|
||||
#[arg(long = "engine.legacy-state-root", default_value_t = DefaultEngineValues::get_global().legacy_state_root_task_enabled)]
|
||||
pub legacy_state_root_task_enabled: bool,
|
||||
@@ -530,6 +547,7 @@ impl Default for EngineArgs {
|
||||
persistence_threshold,
|
||||
persistence_backpressure_threshold,
|
||||
memory_block_buffer_target,
|
||||
invalid_header_hit_eviction_threshold,
|
||||
legacy_state_root_task_enabled,
|
||||
state_cache_disabled,
|
||||
prewarming_disabled,
|
||||
@@ -562,6 +580,7 @@ impl Default for EngineArgs {
|
||||
persistence_threshold,
|
||||
persistence_backpressure_threshold,
|
||||
memory_block_buffer_target,
|
||||
invalid_header_hit_eviction_threshold,
|
||||
legacy_state_root_task_enabled,
|
||||
state_root_task_compare_updates,
|
||||
caching_and_prewarming_enabled: true,
|
||||
@@ -620,6 +639,7 @@ impl EngineArgs {
|
||||
.with_persistence_threshold(self.persistence_threshold)
|
||||
.with_persistence_backpressure_threshold(self.persistence_backpressure_threshold)
|
||||
.with_memory_block_buffer_target(self.memory_block_buffer_target)
|
||||
.with_invalid_header_hit_eviction_threshold(self.invalid_header_hit_eviction_threshold)
|
||||
.with_legacy_state_root(self.legacy_state_root_task_enabled)
|
||||
.without_state_cache(self.state_cache_disabled)
|
||||
.without_prewarming(self.prewarming_disabled)
|
||||
@@ -682,6 +702,7 @@ mod tests {
|
||||
persistence_threshold: 100,
|
||||
persistence_backpressure_threshold: 101,
|
||||
memory_block_buffer_target: 50,
|
||||
invalid_header_hit_eviction_threshold: 7,
|
||||
legacy_state_root_task_enabled: true,
|
||||
caching_and_prewarming_enabled: true,
|
||||
state_cache_disabled: true,
|
||||
@@ -726,6 +747,8 @@ mod tests {
|
||||
"101",
|
||||
"--engine.memory-block-buffer-target",
|
||||
"50",
|
||||
"--engine.invalid-header-cache-hit-eviction-threshold",
|
||||
"7",
|
||||
"--engine.legacy-state-root",
|
||||
"--engine.disable-state-cache",
|
||||
"--engine.disable-prewarming",
|
||||
@@ -808,6 +831,28 @@ mod tests {
|
||||
assert_eq!(args.slow_block_threshold, Some(Duration::from_millis(500)));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parse_invalid_header_hit_eviction_threshold() {
|
||||
let args = CommandParser::<EngineArgs>::parse_from(["reth"]).args;
|
||||
assert_eq!(
|
||||
args.invalid_header_hit_eviction_threshold,
|
||||
DEFAULT_INVALID_HEADER_HIT_EVICTION_THRESHOLD
|
||||
);
|
||||
assert_eq!(
|
||||
args.tree_config().invalid_header_hit_eviction_threshold(),
|
||||
DEFAULT_INVALID_HEADER_HIT_EVICTION_THRESHOLD
|
||||
);
|
||||
|
||||
let args = CommandParser::<EngineArgs>::parse_from([
|
||||
"reth",
|
||||
"--engine.invalid-header-cache-hit-eviction-threshold",
|
||||
"0",
|
||||
])
|
||||
.args;
|
||||
assert_eq!(args.invalid_header_hit_eviction_threshold, 0);
|
||||
assert_eq!(args.tree_config().invalid_header_hit_eviction_threshold(), 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parse_share_sparse_trie_flag() {
|
||||
let args = CommandParser::<EngineArgs>::parse_from(["reth"]).args;
|
||||
|
||||
@@ -719,9 +719,16 @@ pub struct DiscoveryArgs {
|
||||
pub disable_discv4_discovery: bool,
|
||||
|
||||
/// Enable Discv5 discovery.
|
||||
#[arg(long, conflicts_with = "disable_discovery")]
|
||||
///
|
||||
/// Discv5 is now enabled by default, so this flag is a no-op and will be removed in a future
|
||||
/// release.
|
||||
#[arg(long, conflicts_with = "disable_discovery", hide = true)]
|
||||
pub enable_discv5_discovery: bool,
|
||||
|
||||
/// Disable Discv5 discovery.
|
||||
#[arg(long, conflicts_with = "disable_discovery")]
|
||||
pub disable_discv5_discovery: bool,
|
||||
|
||||
/// Disable Nat discovery.
|
||||
#[arg(long, conflicts_with = "disable_discovery")]
|
||||
pub disable_nat: bool,
|
||||
@@ -852,21 +859,23 @@ impl DiscoveryArgs {
|
||||
.bootstrap_lookup_countdown(*discv5_bootstrap_lookup_countdown)
|
||||
}
|
||||
|
||||
/// Returns true if discv5 discovery should be configured
|
||||
/// Returns true if discv5 discovery should be configured.
|
||||
///
|
||||
/// Discv5 is enabled by default and can be disabled with `--disable-discv5-discovery`.
|
||||
const fn should_enable_discv5(&self) -> bool {
|
||||
if self.disable_discovery {
|
||||
if self.disable_discovery || self.disable_discv5_discovery {
|
||||
return false;
|
||||
}
|
||||
|
||||
self.enable_discv5_discovery ||
|
||||
self.discv5_addr.is_some() ||
|
||||
self.discv5_addr_ipv6.is_some()
|
||||
true
|
||||
}
|
||||
|
||||
/// Set the discovery port to zero, to allow the OS to assign a random unused port when
|
||||
/// discovery binds to the socket.
|
||||
/// Set the discovery ports to zero, to allow the OS to assign random unused ports when
|
||||
/// discovery binds to the sockets.
|
||||
pub const fn with_unused_discovery_port(mut self) -> Self {
|
||||
self.port = 0;
|
||||
self.discv5_port = 0;
|
||||
self.discv5_port_ipv6 = 0;
|
||||
self
|
||||
}
|
||||
|
||||
@@ -896,6 +905,7 @@ impl Default for DiscoveryArgs {
|
||||
disable_dns_discovery: false,
|
||||
disable_discv4_discovery: false,
|
||||
enable_discv5_discovery: false,
|
||||
disable_discv5_discovery: false,
|
||||
disable_nat: false,
|
||||
addr: DEFAULT_DISCOVERY_ADDR,
|
||||
port: DEFAULT_DISCOVERY_PORT,
|
||||
|
||||
110
crates/rpc/rpc-eth-types/src/cache/mod.rs
vendored
110
crates/rpc/rpc-eth-types/src/cache/mod.rs
vendored
@@ -344,17 +344,9 @@ where
|
||||
}
|
||||
|
||||
/// Removes transaction index entries for a reorged block.
|
||||
///
|
||||
/// Only removes entries that still point to this block, preserving mappings for transactions
|
||||
/// that were re-mined in a new canonical block.
|
||||
fn remove_block_transactions(&mut self, block: &RecoveredBlock<Provider::Block>) {
|
||||
let block_hash = block.hash();
|
||||
for tx in block.body().transactions() {
|
||||
if let Some((mapped_hash, _)) = self.tx_hash_index.get(tx.tx_hash()) &&
|
||||
*mapped_hash == block_hash
|
||||
{
|
||||
self.tx_hash_index.remove(tx.tx_hash());
|
||||
}
|
||||
self.tx_hash_index.remove(tx.tx_hash());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -421,6 +413,15 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
fn on_reorg_header(&mut self, block_hash: B256, res: ProviderResult<Provider::Header>) {
|
||||
if let Some(queued) = self.headers_cache.remove(&block_hash) {
|
||||
// send the response to queued senders
|
||||
for tx in queued {
|
||||
let _ = tx.send(res.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Shrinks the queues but leaves some space for the next requests
|
||||
fn shrink_queues(&mut self) {
|
||||
let min_capacity = 2;
|
||||
@@ -597,9 +598,12 @@ where
|
||||
}
|
||||
CacheAction::RemoveReorgedChain { chain_change } => {
|
||||
for block in chain_change.blocks {
|
||||
let block_hash = block.hash();
|
||||
let header = block.clone_header();
|
||||
// Remove transaction index entries for reorged blocks
|
||||
this.remove_block_transactions(&block);
|
||||
this.on_reorg_block(block.hash(), Ok(Some(block)));
|
||||
this.on_reorg_block(block_hash, Ok(Some(block)));
|
||||
this.on_reorg_header(block_hash, Ok(header));
|
||||
}
|
||||
|
||||
for block_receipts in chain_change.receipts {
|
||||
@@ -825,3 +829,89 @@ pub async fn cache_new_blocks_task<St, N: NodePrimitives>(
|
||||
eth_state_cache.to_service.send(CacheAction::CacheNewCanonicalChain { chain_change });
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use alloy_consensus::Header;
|
||||
use alloy_primitives::{Address, Signature};
|
||||
use reth_ethereum_primitives::{
|
||||
Block, BlockBody, EthPrimitives, Transaction, TransactionSigned,
|
||||
};
|
||||
use reth_primitives_traits::RecoveredBlock;
|
||||
use reth_storage_api::noop::NoopProvider;
|
||||
|
||||
fn test_service() -> EthStateCacheService<NoopProvider, Runtime> {
|
||||
let (_cache, service) = EthStateCache::<EthPrimitives>::create(
|
||||
NoopProvider::default(),
|
||||
Runtime::test(),
|
||||
4,
|
||||
4,
|
||||
4,
|
||||
1,
|
||||
16,
|
||||
);
|
||||
service
|
||||
}
|
||||
|
||||
fn test_block() -> RecoveredBlock<Block> {
|
||||
RecoveredBlock::new_unhashed(
|
||||
Block {
|
||||
header: Header { number: 1, ..Default::default() },
|
||||
body: BlockBody {
|
||||
transactions: vec![TransactionSigned::new_unhashed(
|
||||
Transaction::Legacy(Default::default()),
|
||||
Signature::test_signature(),
|
||||
)],
|
||||
..Default::default()
|
||||
},
|
||||
},
|
||||
vec![Address::ZERO],
|
||||
)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn reorg_evicts_cached_headers() {
|
||||
let mut service = test_service();
|
||||
let block_hash = B256::repeat_byte(0x11);
|
||||
|
||||
assert!(service
|
||||
.headers_cache
|
||||
.insert(block_hash, Header { number: 42, ..Default::default() }));
|
||||
assert!(service.headers_cache.get(&block_hash).is_some());
|
||||
|
||||
service.on_reorg_header(block_hash, Ok(Header { number: 7, ..Default::default() }));
|
||||
|
||||
assert!(service.headers_cache.get(&block_hash).is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn reorg_forwards_header_to_queued_requests() {
|
||||
let mut service = test_service();
|
||||
let block_hash = B256::repeat_byte(0x22);
|
||||
let (response_tx, mut response_rx) = oneshot::channel();
|
||||
let header = Header { number: 7, ..Default::default() };
|
||||
|
||||
assert!(service.headers_cache.queue(block_hash, response_tx));
|
||||
|
||||
service.on_reorg_header(block_hash, Ok(header));
|
||||
|
||||
let header =
|
||||
response_rx.try_recv().expect("queued header response").expect("header result");
|
||||
|
||||
assert_eq!(header.number, 7);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn reorg_removes_tx_hash_index_entries_unconditionally() {
|
||||
let mut service = test_service();
|
||||
let block = test_block();
|
||||
let tx_hash = *block.body().transactions().next().expect("test transaction").tx_hash();
|
||||
|
||||
service.tx_hash_index.insert(tx_hash, (B256::repeat_byte(0x33), 0));
|
||||
|
||||
service.remove_block_transactions(&block);
|
||||
|
||||
assert!(service.tx_hash_index.get(&tx_hash).is_none());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -6,7 +6,8 @@
|
||||
use crate::ChangesetOffset;
|
||||
use std::{
|
||||
fs::{File, OpenOptions},
|
||||
io::{self, Read, Seek, SeekFrom, Write},
|
||||
io::{self, Write},
|
||||
os::unix::fs::FileExt,
|
||||
path::Path,
|
||||
};
|
||||
|
||||
@@ -177,16 +178,14 @@ impl ChangesetOffsetReader {
|
||||
|
||||
/// Reads a single changeset offset by block index.
|
||||
/// Returns None if index is out of bounds.
|
||||
pub fn get(&mut self, block_index: u64) -> io::Result<Option<ChangesetOffset>> {
|
||||
pub fn get(&self, block_index: u64) -> io::Result<Option<ChangesetOffset>> {
|
||||
if block_index >= self.len {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let byte_pos = block_index * Self::RECORD_SIZE as u64;
|
||||
self.file.seek(SeekFrom::Start(byte_pos))?;
|
||||
|
||||
let mut buf = [0u8; Self::RECORD_SIZE];
|
||||
self.file.read_exact(&mut buf)?;
|
||||
self.file.read_exact_at(&mut buf, byte_pos)?;
|
||||
|
||||
let offset = u64::from_le_bytes(buf[..8].try_into().unwrap());
|
||||
let num_changes = u64::from_le_bytes(buf[8..].try_into().unwrap());
|
||||
@@ -195,7 +194,7 @@ impl ChangesetOffsetReader {
|
||||
}
|
||||
|
||||
/// Reads a range of changeset offsets.
|
||||
pub fn get_range(&mut self, start: u64, end: u64) -> io::Result<Vec<ChangesetOffset>> {
|
||||
pub fn get_range(&self, start: u64, end: u64) -> io::Result<Vec<ChangesetOffset>> {
|
||||
let end = end.min(self.len);
|
||||
if start >= end {
|
||||
return Ok(Vec::new());
|
||||
@@ -203,13 +202,13 @@ impl ChangesetOffsetReader {
|
||||
|
||||
let count = (end - start) as usize;
|
||||
let byte_pos = start * Self::RECORD_SIZE as u64;
|
||||
self.file.seek(SeekFrom::Start(byte_pos))?;
|
||||
|
||||
let mut result = Vec::with_capacity(count);
|
||||
let mut buf = [0u8; Self::RECORD_SIZE];
|
||||
|
||||
for _ in 0..count {
|
||||
self.file.read_exact(&mut buf)?;
|
||||
for i in 0..count {
|
||||
let pos = byte_pos + (i as u64) * Self::RECORD_SIZE as u64;
|
||||
self.file.read_exact_at(&mut buf, pos)?;
|
||||
let offset = u64::from_le_bytes(buf[..8].try_into().unwrap());
|
||||
let num_changes = u64::from_le_bytes(buf[8..].try_into().unwrap());
|
||||
result.push(ChangesetOffset::new(offset, num_changes));
|
||||
@@ -251,7 +250,7 @@ mod tests {
|
||||
|
||||
// Read
|
||||
{
|
||||
let mut reader = ChangesetOffsetReader::new(&path, 3).unwrap();
|
||||
let reader = ChangesetOffsetReader::new(&path, 3).unwrap();
|
||||
assert_eq!(reader.len(), 3);
|
||||
|
||||
let entry = reader.get(0).unwrap().unwrap();
|
||||
@@ -284,7 +283,7 @@ mod tests {
|
||||
writer.truncate(2).unwrap();
|
||||
assert_eq!(writer.len(), 2);
|
||||
|
||||
let mut reader = ChangesetOffsetReader::new(&path, 2).unwrap();
|
||||
let reader = ChangesetOffsetReader::new(&path, 2).unwrap();
|
||||
assert_eq!(reader.len(), 2);
|
||||
assert!(reader.get(2).unwrap().is_none());
|
||||
}
|
||||
@@ -317,7 +316,7 @@ mod tests {
|
||||
assert_eq!(std::fs::metadata(&path).unwrap().len(), 16);
|
||||
|
||||
// Verify the complete record is readable
|
||||
let mut reader = ChangesetOffsetReader::new(&path, 1).unwrap();
|
||||
let reader = ChangesetOffsetReader::new(&path, 1).unwrap();
|
||||
assert_eq!(reader.len(), 1);
|
||||
let entry = reader.get(0).unwrap().unwrap();
|
||||
assert_eq!(entry.offset(), 100);
|
||||
@@ -340,7 +339,7 @@ mod tests {
|
||||
}
|
||||
|
||||
// Open with len=2, ignoring the 3rd record
|
||||
let mut reader = ChangesetOffsetReader::new(&path, 2).unwrap();
|
||||
let reader = ChangesetOffsetReader::new(&path, 2).unwrap();
|
||||
assert_eq!(reader.len(), 2);
|
||||
|
||||
// First two records should be readable
|
||||
@@ -397,7 +396,7 @@ mod tests {
|
||||
|
||||
// Verify the records are correct
|
||||
{
|
||||
let mut reader = ChangesetOffsetReader::new(&path, 3).unwrap();
|
||||
let reader = ChangesetOffsetReader::new(&path, 3).unwrap();
|
||||
assert_eq!(reader.len(), 3);
|
||||
|
||||
let entry0 = reader.get(0).unwrap().unwrap();
|
||||
|
||||
@@ -15,9 +15,9 @@ mod compression;
|
||||
mod event;
|
||||
mod segment;
|
||||
|
||||
#[cfg(feature = "std")]
|
||||
#[cfg(all(feature = "std", unix))]
|
||||
mod changeset_offsets;
|
||||
#[cfg(feature = "std")]
|
||||
#[cfg(all(feature = "std", unix))]
|
||||
pub use changeset_offsets::{ChangesetOffsetReader, ChangesetOffsetWriter};
|
||||
|
||||
use alloy_primitives::BlockNumber;
|
||||
|
||||
@@ -42,12 +42,12 @@ rustc-hash = { workspace = true, optional = true, features = ["std"] }
|
||||
sysinfo = { workspace = true, features = ["system"] }
|
||||
parking_lot = { workspace = true, optional = true }
|
||||
|
||||
[target.'cfg(unix)'.dependencies]
|
||||
libc.workspace = true
|
||||
|
||||
# arbitrary utils
|
||||
strum = { workspace = true, features = ["derive"], optional = true }
|
||||
|
||||
[target.'cfg(unix)'.dependencies]
|
||||
libc.workspace = true
|
||||
|
||||
[dev-dependencies]
|
||||
# reth libs with arbitrary
|
||||
reth-primitives-traits = { workspace = true, features = ["reth-codec"] }
|
||||
|
||||
@@ -17,7 +17,7 @@ use reth_db::static_file::{
|
||||
use reth_db_api::table::{Decompress, Value};
|
||||
use reth_node_types::NodePrimitives;
|
||||
use reth_primitives_traits::{SealedHeader, SignedTransaction};
|
||||
use reth_static_file_types::{ChangesetOffset, ChangesetOffsetReader};
|
||||
use reth_static_file_types::ChangesetOffset;
|
||||
use reth_storage_api::range_size_hint;
|
||||
use reth_storage_errors::provider::{ProviderError, ProviderResult};
|
||||
use std::{
|
||||
@@ -111,15 +111,11 @@ impl<'a, N: NodePrimitives> StaticFileJarProvider<'a, N> {
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
let csoff_path = self.data_path().with_extension("csoff");
|
||||
if !csoff_path.exists() {
|
||||
return Ok(None);
|
||||
if let Some(reader) = self.jar.value().csoff_reader() {
|
||||
reader.get(index).map_err(ProviderError::other)
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
let len = header.changeset_offsets_len();
|
||||
let mut reader =
|
||||
ChangesetOffsetReader::new(&csoff_path, len).map_err(ProviderError::other)?;
|
||||
reader.get(index).map_err(ProviderError::other)
|
||||
}
|
||||
|
||||
/// Reads all changeset offsets from the sidecar file.
|
||||
@@ -138,15 +134,12 @@ impl<'a, N: NodePrimitives> StaticFileJarProvider<'a, N> {
|
||||
return Ok(Some(Vec::new()));
|
||||
}
|
||||
|
||||
let csoff_path = self.data_path().with_extension("csoff");
|
||||
if !csoff_path.exists() {
|
||||
return Ok(None);
|
||||
if let Some(reader) = self.jar.value().csoff_reader() {
|
||||
let offsets = reader.get_range(0, len).map_err(ProviderError::other)?;
|
||||
Ok(Some(offsets))
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
let mut reader =
|
||||
ChangesetOffsetReader::new(&csoff_path, len).map_err(ProviderError::other)?;
|
||||
let offsets = reader.get_range(0, len).map_err(ProviderError::other)?;
|
||||
Ok(Some(offsets))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -16,9 +16,9 @@ mod metrics;
|
||||
mod writer_tests;
|
||||
|
||||
use reth_nippy_jar::NippyJar;
|
||||
use reth_static_file_types::{SegmentHeader, StaticFileSegment};
|
||||
use reth_static_file_types::{ChangesetOffsetReader, SegmentHeader, StaticFileSegment};
|
||||
use reth_storage_errors::provider::{ProviderError, ProviderResult};
|
||||
use std::{ops::Deref, sync::Arc};
|
||||
use std::{io, ops::Deref, sync::Arc};
|
||||
|
||||
/// Alias type for each specific `NippyJar`.
|
||||
type LoadedJarRef<'a> =
|
||||
@@ -29,6 +29,7 @@ type LoadedJarRef<'a> =
|
||||
pub struct LoadedJar {
|
||||
jar: NippyJar<SegmentHeader>,
|
||||
mmap_handle: Arc<reth_nippy_jar::DataReader>,
|
||||
csoff_reader: Option<ChangesetOffsetReader>,
|
||||
}
|
||||
|
||||
impl LoadedJar {
|
||||
@@ -36,7 +37,20 @@ impl LoadedJar {
|
||||
match jar.open_data_reader() {
|
||||
Ok(data_reader) => {
|
||||
let mmap_handle = Arc::new(data_reader);
|
||||
Ok(Self { jar, mmap_handle })
|
||||
|
||||
let csoff_reader = if jar.user_header().segment().is_change_based() {
|
||||
let csoff_path = jar.data_path().with_extension("csoff");
|
||||
let len = jar.user_header().changeset_offsets_len();
|
||||
match ChangesetOffsetReader::new(&csoff_path, len) {
|
||||
Ok(reader) => Some(reader),
|
||||
Err(err) if err.kind() == io::ErrorKind::NotFound && len == 0 => None,
|
||||
Err(err) => return Err(ProviderError::other(err)),
|
||||
}
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
Ok(Self { jar, mmap_handle, csoff_reader })
|
||||
}
|
||||
Err(e) => Err(ProviderError::other(e)),
|
||||
}
|
||||
@@ -55,6 +69,11 @@ impl LoadedJar {
|
||||
fn size(&self) -> usize {
|
||||
self.mmap_handle.size() + self.mmap_handle.offsets_size()
|
||||
}
|
||||
|
||||
/// Returns a reference to the cached changeset offset reader.
|
||||
const fn csoff_reader(&self) -> Option<&ChangesetOffsetReader> {
|
||||
self.csoff_reader.as_ref()
|
||||
}
|
||||
}
|
||||
|
||||
impl Deref for LoadedJar {
|
||||
|
||||
@@ -400,7 +400,7 @@ impl<N: NodePrimitives> StaticFileProviderRW<N> {
|
||||
|
||||
// Step 2: Validate sidecar offsets against actual NippyJar state
|
||||
let valid_blocks = if actual_sidecar_blocks > 0 {
|
||||
let mut reader = ChangesetOffsetReader::new(&csoff_path, actual_sidecar_blocks)
|
||||
let reader = ChangesetOffsetReader::new(&csoff_path, actual_sidecar_blocks)
|
||||
.map_err(ProviderError::other)?;
|
||||
|
||||
// Find last block where offset + num_changes <= actual_nippy_rows
|
||||
@@ -896,7 +896,7 @@ impl<N: NodePrimitives> StaticFileProviderRW<N> {
|
||||
// Read offset for the block after last_block from sidecar.
|
||||
// Use committed length from header, ignoring any uncommitted records
|
||||
// that may exist in the file after a crash.
|
||||
let mut reader = ChangesetOffsetReader::new(&csoff_path, changeset_offsets_len)
|
||||
let reader = ChangesetOffsetReader::new(&csoff_path, changeset_offsets_len)
|
||||
.map_err(ProviderError::other)?;
|
||||
if let Some(next_offset) = reader.get(blocks_to_keep).map_err(ProviderError::other)? {
|
||||
next_offset.offset()
|
||||
|
||||
@@ -614,7 +614,7 @@ mod tests {
|
||||
assert_eq!(get_header_block_count(&provider, 0), 5);
|
||||
|
||||
// Verify offsets are correct
|
||||
let mut reader = ChangesetOffsetReader::new(&sidecar_path, 5).unwrap();
|
||||
let reader = ChangesetOffsetReader::new(&sidecar_path, 5).unwrap();
|
||||
|
||||
let o0 = reader.get(0).unwrap().unwrap();
|
||||
assert_eq!(o0.offset(), 0);
|
||||
|
||||
@@ -87,8 +87,8 @@ Networking:
|
||||
--disable-discv4-discovery
|
||||
Disable Discv4 discovery
|
||||
|
||||
--enable-discv5-discovery
|
||||
Enable Discv5 discovery
|
||||
--disable-discv5-discovery
|
||||
Disable Discv5 discovery
|
||||
|
||||
--disable-nat
|
||||
Disable Nat discovery
|
||||
@@ -950,6 +950,13 @@ Engine:
|
||||
|
||||
[default: 0]
|
||||
|
||||
--engine.invalid-header-cache-hit-eviction-threshold <INVALID_HEADER_HIT_EVICTION_THRESHOLD>
|
||||
Configure how many cache hits an invalid header can accumulate before it is evicted and reprocessed.
|
||||
|
||||
Set to `0` to effectively disable the cache because entries are evicted on the first lookup.
|
||||
|
||||
[default: 128]
|
||||
|
||||
--engine.legacy-state-root
|
||||
Enable legacy state root
|
||||
|
||||
|
||||
@@ -27,8 +27,8 @@ Networking:
|
||||
--disable-discv4-discovery
|
||||
Disable Discv4 discovery
|
||||
|
||||
--enable-discv5-discovery
|
||||
Enable Discv5 discovery
|
||||
--disable-discv5-discovery
|
||||
Disable Discv5 discovery
|
||||
|
||||
--disable-nat
|
||||
Disable Nat discovery
|
||||
|
||||
@@ -27,8 +27,8 @@ Networking:
|
||||
--disable-discv4-discovery
|
||||
Disable Discv4 discovery
|
||||
|
||||
--enable-discv5-discovery
|
||||
Enable Discv5 discovery
|
||||
--disable-discv5-discovery
|
||||
Disable Discv5 discovery
|
||||
|
||||
--disable-nat
|
||||
Disable Nat discovery
|
||||
|
||||
@@ -175,8 +175,8 @@ Networking:
|
||||
--disable-discv4-discovery
|
||||
Disable Discv4 discovery
|
||||
|
||||
--enable-discv5-discovery
|
||||
Enable Discv5 discovery
|
||||
--disable-discv5-discovery
|
||||
Disable Discv5 discovery
|
||||
|
||||
--disable-nat
|
||||
Disable Nat discovery
|
||||
|
||||
@@ -17,8 +17,8 @@ This section provides essential information about the ports used by the system,
|
||||
|
||||
- **Port:** `9200`
|
||||
- **Protocol:** UDP
|
||||
- **Purpose:** Used for discv5 peer discovery protocol. This is a newer discovery protocol that can be enabled with `--enable-discv5-discovery`. It operates independently from the legacy discv4 discovery on port 30303.
|
||||
- **Exposure Recommendation:** This port should be exposed if discv5 discovery is enabled to allow peer discovery.
|
||||
- **Purpose:** Used for discv5 peer discovery protocol. This is enabled by default and can be disabled with `--disable-discv5-discovery`. It operates independently from the legacy discv4 discovery on port 30303.
|
||||
- **Exposure Recommendation:** This port should be exposed to allow peer discovery.
|
||||
|
||||
## Metrics Port
|
||||
|
||||
|
||||
Reference in New Issue
Block a user