mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-02-19 03:04:27 -05:00
perf: use fixed-map for StaticFileSegment maps (#21001)
Co-authored-by: Amp <amp@ampcode.com>
This commit is contained in:
22
Cargo.lock
generated
22
Cargo.lock
generated
@@ -4091,6 +4091,27 @@ dependencies = [
|
||||
"static_assertions",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "fixed-map"
|
||||
version = "0.9.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "86ed19add84e8cb9e8cc5f7074de0324247149ffef0b851e215fb0edc50c229b"
|
||||
dependencies = [
|
||||
"fixed-map-derive",
|
||||
"serde",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "fixed-map-derive"
|
||||
version = "0.9.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6dc7a9cb3326bafb80642c5ce99b39a2c0702d4bfa8ee8a3e773791a6cbe2407"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.113",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "fixedbitset"
|
||||
version = "0.5.7"
|
||||
@@ -10908,6 +10929,7 @@ dependencies = [
|
||||
"alloy-primitives",
|
||||
"clap",
|
||||
"derive_more",
|
||||
"fixed-map",
|
||||
"insta",
|
||||
"reth-nippy-jar",
|
||||
"serde",
|
||||
|
||||
@@ -555,6 +555,7 @@ dirs-next = "2.0.0"
|
||||
dyn-clone = "1.0.17"
|
||||
eyre = "0.6"
|
||||
fdlimit = "0.3.0"
|
||||
fixed-map = { version = "0.9", default-features = false }
|
||||
humantime = "2.1"
|
||||
humantime-serde = "1.1"
|
||||
itertools = { version = "0.14", default-features = false }
|
||||
|
||||
@@ -29,7 +29,7 @@ impl Command {
|
||||
let static_file_provider = tool.provider_factory.static_file_provider();
|
||||
let static_files = iter_static_files(static_file_provider.directory())?;
|
||||
|
||||
if let Some(segment_static_files) = static_files.get(&segment) {
|
||||
if let Some(segment_static_files) = static_files.get(segment) {
|
||||
for (block_range, _) in segment_static_files {
|
||||
static_file_provider.delete_jar(segment, block_range.start())?;
|
||||
}
|
||||
|
||||
@@ -2,9 +2,8 @@
|
||||
use reth_network_types::{PeersConfig, SessionsConfig};
|
||||
use reth_prune_types::PruneModes;
|
||||
use reth_stages_types::ExecutionStageThresholds;
|
||||
use reth_static_file_types::StaticFileSegment;
|
||||
use reth_static_file_types::{StaticFileMap, StaticFileSegment};
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
path::{Path, PathBuf},
|
||||
time::Duration,
|
||||
};
|
||||
@@ -473,8 +472,8 @@ impl StaticFilesConfig {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Converts the blocks per file configuration into a [`HashMap`] per segment.
|
||||
pub fn as_blocks_per_file_map(&self) -> HashMap<StaticFileSegment, u64> {
|
||||
/// Converts the blocks per file configuration into a [`StaticFileMap`].
|
||||
pub fn as_blocks_per_file_map(&self) -> StaticFileMap<u64> {
|
||||
let BlocksPerFileConfig {
|
||||
headers,
|
||||
transactions,
|
||||
@@ -483,7 +482,7 @@ impl StaticFilesConfig {
|
||||
account_change_sets,
|
||||
} = self.blocks_per_file;
|
||||
|
||||
let mut map = HashMap::new();
|
||||
let mut map = StaticFileMap::default();
|
||||
// Iterating over all possible segments allows us to do an exhaustive match here,
|
||||
// to not forget to configure new segments in the future.
|
||||
for segment in StaticFileSegment::iter() {
|
||||
|
||||
@@ -483,7 +483,7 @@ where
|
||||
let static_file_provider =
|
||||
StaticFileProviderBuilder::read_write(self.data_dir().static_files())
|
||||
.with_metrics()
|
||||
.with_blocks_per_file_for_segments(static_files_config.as_blocks_per_file_map())
|
||||
.with_blocks_per_file_for_segments(&static_files_config.as_blocks_per_file_map())
|
||||
.with_genesis_block_number(self.chain_spec().genesis().number.unwrap_or_default())
|
||||
.build()?;
|
||||
|
||||
@@ -946,7 +946,7 @@ where
|
||||
error!(
|
||||
"Op-mainnet has been launched without importing the pre-Bedrock state. The chain can't progress without this. See also https://reth.rs/run/sync-op-mainnet.html?minimal-bootstrap-recommended"
|
||||
);
|
||||
return Err(ProviderError::BestBlockNotFound)
|
||||
return Err(ProviderError::BestBlockNotFound);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -15,6 +15,7 @@ workspace = true
|
||||
alloy-primitives.workspace = true
|
||||
|
||||
clap = { workspace = true, features = ["derive"], optional = true }
|
||||
fixed-map.workspace = true
|
||||
derive_more.workspace = true
|
||||
serde = { workspace = true, features = ["alloc", "derive"] }
|
||||
strum = { workspace = true, features = ["derive"] }
|
||||
@@ -32,5 +33,6 @@ std = [
|
||||
"serde/std",
|
||||
"strum/std",
|
||||
"serde_json/std",
|
||||
"fixed-map/std",
|
||||
]
|
||||
clap = ["dep:clap"]
|
||||
|
||||
@@ -21,6 +21,9 @@ use core::ops::RangeInclusive;
|
||||
pub use event::StaticFileProducerEvent;
|
||||
pub use segment::{SegmentConfig, SegmentHeader, SegmentRangeInclusive, StaticFileSegment};
|
||||
|
||||
/// Map keyed by [`StaticFileSegment`].
|
||||
pub type StaticFileMap<T> = alloc::boxed::Box<fixed_map::Map<StaticFileSegment, T>>;
|
||||
|
||||
/// Default static file block count.
|
||||
pub const DEFAULT_BLOCKS_PER_STATIC_FILE: u64 = 500_000;
|
||||
|
||||
|
||||
@@ -22,6 +22,7 @@ use strum::{EnumIs, EnumString};
|
||||
EnumIs,
|
||||
Serialize,
|
||||
Deserialize,
|
||||
fixed_map::Key,
|
||||
)]
|
||||
#[strum(serialize_all = "kebab-case")]
|
||||
#[cfg_attr(feature = "clap", derive(clap::ValueEnum))]
|
||||
|
||||
@@ -1,6 +1,9 @@
|
||||
//! reth's static file database table import and access
|
||||
|
||||
use alloy_primitives::map::HashMap;
|
||||
use reth_nippy_jar::{NippyJar, NippyJarError};
|
||||
use reth_static_file_types::{
|
||||
SegmentHeader, SegmentRangeInclusive, StaticFileMap, StaticFileSegment,
|
||||
};
|
||||
use std::path::Path;
|
||||
|
||||
mod cursor;
|
||||
@@ -8,14 +11,12 @@ pub use cursor::StaticFileCursor;
|
||||
|
||||
mod mask;
|
||||
pub use mask::*;
|
||||
use reth_nippy_jar::{NippyJar, NippyJarError};
|
||||
|
||||
mod masks;
|
||||
pub use masks::*;
|
||||
use reth_static_file_types::{SegmentHeader, SegmentRangeInclusive, StaticFileSegment};
|
||||
|
||||
/// Alias type for a map of [`StaticFileSegment`] and sorted lists of existing static file ranges.
|
||||
type SortedStaticFiles = HashMap<StaticFileSegment, Vec<(SegmentRangeInclusive, SegmentHeader)>>;
|
||||
type SortedStaticFiles = StaticFileMap<Vec<(SegmentRangeInclusive, SegmentHeader)>>;
|
||||
|
||||
/// Given the `static_files` directory path, it returns a list over the existing `static_files`
|
||||
/// organized by [`StaticFileSegment`]. Each segment has a sorted list of block ranges and
|
||||
|
||||
@@ -35,15 +35,15 @@ use reth_node_types::NodePrimitives;
|
||||
use reth_primitives_traits::{RecoveredBlock, SealedHeader, SignedTransaction};
|
||||
use reth_stages_types::{PipelineTarget, StageId};
|
||||
use reth_static_file_types::{
|
||||
find_fixed_range, HighestStaticFiles, SegmentHeader, SegmentRangeInclusive, StaticFileSegment,
|
||||
DEFAULT_BLOCKS_PER_STATIC_FILE,
|
||||
find_fixed_range, HighestStaticFiles, SegmentHeader, SegmentRangeInclusive, StaticFileMap,
|
||||
StaticFileSegment, DEFAULT_BLOCKS_PER_STATIC_FILE,
|
||||
};
|
||||
use reth_storage_api::{
|
||||
BlockBodyIndicesProvider, ChangeSetReader, DBProvider, StorageSettingsCache,
|
||||
};
|
||||
use reth_storage_errors::provider::{ProviderError, ProviderResult};
|
||||
use std::{
|
||||
collections::{BTreeMap, HashMap},
|
||||
collections::BTreeMap,
|
||||
fmt::Debug,
|
||||
ops::{Deref, Range, RangeBounds, RangeInclusive},
|
||||
path::{Path, PathBuf},
|
||||
@@ -99,7 +99,7 @@ impl<N> Clone for StaticFileProvider<N> {
|
||||
pub struct StaticFileProviderBuilder<P> {
|
||||
access: StaticFileAccess,
|
||||
use_metrics: bool,
|
||||
blocks_per_file: HashMap<StaticFileSegment, u64>,
|
||||
blocks_per_file: StaticFileMap<u64>,
|
||||
path: P,
|
||||
genesis_block_number: u64,
|
||||
}
|
||||
@@ -140,9 +140,11 @@ impl<P: AsRef<Path>> StaticFileProviderBuilder<P> {
|
||||
/// setting.
|
||||
pub fn with_blocks_per_file_for_segments(
|
||||
mut self,
|
||||
segments: HashMap<StaticFileSegment, u64>,
|
||||
segments: &<StaticFileMap<u64> as Deref>::Target,
|
||||
) -> Self {
|
||||
self.blocks_per_file.extend(segments);
|
||||
for (segment, &blocks_per_file) in segments {
|
||||
self.blocks_per_file.insert(segment, blocks_per_file);
|
||||
}
|
||||
self
|
||||
}
|
||||
|
||||
@@ -194,7 +196,9 @@ impl<P: AsRef<Path>> StaticFileProviderBuilder<P> {
|
||||
provider.metrics = Some(Arc::new(StaticFileProviderMetrics::default()));
|
||||
}
|
||||
|
||||
provider.blocks_per_file.extend(self.blocks_per_file);
|
||||
for (segment, blocks_per_file) in *self.blocks_per_file {
|
||||
provider.blocks_per_file.insert(segment, blocks_per_file);
|
||||
}
|
||||
provider.genesis_block_number = self.genesis_block_number;
|
||||
|
||||
let provider = StaticFileProvider(Arc::new(provider));
|
||||
@@ -269,7 +273,7 @@ impl<N: NodePrimitives> StaticFileProvider<N> {
|
||||
notify::EventKind::Create(_) |
|
||||
notify::EventKind::Remove(_)
|
||||
) {
|
||||
continue
|
||||
continue;
|
||||
}
|
||||
|
||||
// We only trigger a re-initialization if a configuration file was
|
||||
@@ -282,7 +286,7 @@ impl<N: NodePrimitives> StaticFileProvider<N> {
|
||||
.extension()
|
||||
.is_none_or(|s| s.to_str() != Some(CONFIG_FILE_EXTENSION))
|
||||
{
|
||||
continue
|
||||
continue;
|
||||
}
|
||||
|
||||
// Ensure it's well formatted static file name
|
||||
@@ -291,7 +295,7 @@ impl<N: NodePrimitives> StaticFileProvider<N> {
|
||||
)
|
||||
.is_none()
|
||||
{
|
||||
continue
|
||||
continue;
|
||||
}
|
||||
|
||||
// If we can read the metadata and modified timestamp, ensure this is
|
||||
@@ -302,7 +306,7 @@ impl<N: NodePrimitives> StaticFileProvider<N> {
|
||||
if last_event_timestamp.is_some_and(|last_timestamp| {
|
||||
last_timestamp >= current_modified_timestamp
|
||||
}) {
|
||||
continue
|
||||
continue;
|
||||
}
|
||||
last_event_timestamp = Some(current_modified_timestamp);
|
||||
}
|
||||
@@ -311,7 +315,7 @@ impl<N: NodePrimitives> StaticFileProvider<N> {
|
||||
if let Err(err) = provider.initialize_index() {
|
||||
warn!(target: "providers::static_file", "failed to re-initialize index: {err}");
|
||||
}
|
||||
break
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -337,7 +341,7 @@ pub struct StaticFileProviderInner<N> {
|
||||
/// segments and ranges.
|
||||
map: DashMap<(BlockNumber, StaticFileSegment), LoadedJar>,
|
||||
/// Indexes per segment.
|
||||
indexes: RwLock<HashMap<StaticFileSegment, StaticFileSegmentIndex>>,
|
||||
indexes: RwLock<StaticFileMap<StaticFileSegmentIndex>>,
|
||||
/// This is an additional index that tracks the expired height, this will track the highest
|
||||
/// block number that has been expired (missing). The first, non expired block is
|
||||
/// `expired_history_height + 1`.
|
||||
@@ -358,7 +362,7 @@ pub struct StaticFileProviderInner<N> {
|
||||
/// Access rights of the provider.
|
||||
access: StaticFileAccess,
|
||||
/// Number of blocks per file, per segment.
|
||||
blocks_per_file: HashMap<StaticFileSegment, u64>,
|
||||
blocks_per_file: StaticFileMap<u64>,
|
||||
/// Write lock for when access is [`StaticFileAccess::RW`].
|
||||
_lock_file: Option<StorageLock>,
|
||||
/// Genesis block number, default is 0;
|
||||
@@ -374,7 +378,7 @@ impl<N: NodePrimitives> StaticFileProviderInner<N> {
|
||||
None
|
||||
};
|
||||
|
||||
let mut blocks_per_file = HashMap::new();
|
||||
let mut blocks_per_file = StaticFileMap::default();
|
||||
for segment in StaticFileSegment::iter() {
|
||||
blocks_per_file.insert(segment, DEFAULT_BLOCKS_PER_STATIC_FILE);
|
||||
}
|
||||
@@ -414,14 +418,14 @@ impl<N: NodePrimitives> StaticFileProviderInner<N> {
|
||||
block: BlockNumber,
|
||||
) -> SegmentRangeInclusive {
|
||||
let blocks_per_file =
|
||||
self.blocks_per_file.get(&segment).copied().unwrap_or(DEFAULT_BLOCKS_PER_STATIC_FILE);
|
||||
self.blocks_per_file.get(segment).copied().unwrap_or(DEFAULT_BLOCKS_PER_STATIC_FILE);
|
||||
|
||||
if let Some(block_index) = block_index {
|
||||
// Find first block range that contains the requested block
|
||||
if let Some((_, range)) = block_index.iter().find(|(max_block, _)| block <= **max_block)
|
||||
{
|
||||
// Found matching range for an existing file using block index
|
||||
return *range
|
||||
return *range;
|
||||
} else if let Some((_, range)) = block_index.last_key_value() {
|
||||
// Didn't find matching range for an existing file, derive a new range from the end
|
||||
// of the last existing file range.
|
||||
@@ -431,7 +435,7 @@ impl<N: NodePrimitives> StaticFileProviderInner<N> {
|
||||
let blocks_after_last_range = block - range.end();
|
||||
let segments_to_skip = (blocks_after_last_range - 1) / blocks_per_file;
|
||||
let start = range.end() + 1 + segments_to_skip * blocks_per_file;
|
||||
return SegmentRangeInclusive::new(start, start + blocks_per_file - 1)
|
||||
return SegmentRangeInclusive::new(start, start + blocks_per_file - 1);
|
||||
}
|
||||
}
|
||||
// No block index is available, derive a new range using the fixed number of blocks,
|
||||
@@ -458,10 +462,7 @@ impl<N: NodePrimitives> StaticFileProviderInner<N> {
|
||||
) -> SegmentRangeInclusive {
|
||||
self.find_fixed_range_with_block_index(
|
||||
segment,
|
||||
self.indexes
|
||||
.read()
|
||||
.get(&segment)
|
||||
.map(|index| &index.expected_block_ranges_by_max_block),
|
||||
self.indexes.read().get(segment).map(|index| &index.expected_block_ranges_by_max_block),
|
||||
block,
|
||||
)
|
||||
}
|
||||
@@ -481,11 +482,11 @@ impl<N: NodePrimitives> StaticFileProvider<N> {
|
||||
let Some(metrics) = &self.metrics else { return Ok(()) };
|
||||
|
||||
let static_files = iter_static_files(&self.path).map_err(ProviderError::other)?;
|
||||
for (segment, headers) in static_files {
|
||||
for (segment, headers) in &*static_files {
|
||||
let mut entries = 0;
|
||||
let mut size = 0;
|
||||
|
||||
for (block_range, _) in &headers {
|
||||
for (block_range, _) in headers {
|
||||
let fixed_block_range = self.find_fixed_range(segment, block_range.start());
|
||||
let jar_provider = self
|
||||
.get_segment_provider_for_range(segment, || Some(fixed_block_range), None)?
|
||||
@@ -594,7 +595,7 @@ impl<N: NodePrimitives> StaticFileProvider<N> {
|
||||
)
|
||||
.and_then(|(parsed_segment, block_range)| {
|
||||
if parsed_segment == segment {
|
||||
return Some(block_range)
|
||||
return Some(block_range);
|
||||
}
|
||||
None
|
||||
}),
|
||||
@@ -603,7 +604,7 @@ impl<N: NodePrimitives> StaticFileProvider<N> {
|
||||
|
||||
// Return cached `LoadedJar` or insert it for the first time, and then, return it.
|
||||
if let Some(block_range) = block_range {
|
||||
return Ok(Some(self.get_or_create_jar_provider(segment, &block_range)?))
|
||||
return Ok(Some(self.get_or_create_jar_provider(segment, &block_range)?));
|
||||
}
|
||||
|
||||
Ok(None)
|
||||
@@ -658,7 +659,7 @@ impl<N: NodePrimitives> StaticFileProvider<N> {
|
||||
) -> ProviderResult<Vec<SegmentHeader>> {
|
||||
// Nothing to delete if block is 0.
|
||||
if block == 0 {
|
||||
return Ok(Vec::new())
|
||||
return Ok(Vec::new());
|
||||
}
|
||||
|
||||
let highest_block = self.get_highest_static_file_block(segment);
|
||||
@@ -666,12 +667,12 @@ impl<N: NodePrimitives> StaticFileProvider<N> {
|
||||
|
||||
loop {
|
||||
let Some(block_height) = self.get_lowest_range_end(segment) else {
|
||||
return Ok(deleted_headers)
|
||||
return Ok(deleted_headers);
|
||||
};
|
||||
|
||||
// Stop if we've reached the target block or the highest static file
|
||||
if block_height >= block || Some(block_height) == highest_block {
|
||||
return Ok(deleted_headers)
|
||||
return Ok(deleted_headers);
|
||||
}
|
||||
|
||||
debug!(
|
||||
@@ -765,7 +766,7 @@ impl<N: NodePrimitives> StaticFileProvider<N> {
|
||||
block: u64,
|
||||
) -> Option<SegmentRangeInclusive> {
|
||||
let indexes = self.indexes.read();
|
||||
let index = indexes.get(&segment)?;
|
||||
let index = indexes.get(segment)?;
|
||||
|
||||
(index.max_block >= block).then(|| {
|
||||
self.find_fixed_range_with_block_index(
|
||||
@@ -784,7 +785,7 @@ impl<N: NodePrimitives> StaticFileProvider<N> {
|
||||
tx: u64,
|
||||
) -> Option<SegmentRangeInclusive> {
|
||||
let indexes = self.indexes.read();
|
||||
let index = indexes.get(&segment)?;
|
||||
let index = indexes.get(segment)?;
|
||||
let available_block_ranges_by_max_tx = index.available_block_ranges_by_max_tx.as_ref()?;
|
||||
|
||||
// It's more probable that the request comes from a newer tx height, so we iterate
|
||||
@@ -794,7 +795,7 @@ impl<N: NodePrimitives> StaticFileProvider<N> {
|
||||
while let Some((tx_end, block_range)) = static_files_rev_iter.next() {
|
||||
if tx > *tx_end {
|
||||
// request tx is higher than highest static file tx
|
||||
return None
|
||||
return None;
|
||||
}
|
||||
let tx_start = static_files_rev_iter.peek().map(|(tx_end, _)| *tx_end + 1).unwrap_or(0);
|
||||
if tx_start <= tx {
|
||||
@@ -802,7 +803,7 @@ impl<N: NodePrimitives> StaticFileProvider<N> {
|
||||
segment,
|
||||
Some(&index.expected_block_ranges_by_max_block),
|
||||
block_range.end(),
|
||||
))
|
||||
));
|
||||
}
|
||||
}
|
||||
None
|
||||
@@ -831,7 +832,7 @@ impl<N: NodePrimitives> StaticFileProvider<N> {
|
||||
Some(segment_max_block) => {
|
||||
let fixed_range = self.find_fixed_range_with_block_index(
|
||||
segment,
|
||||
indexes.get(&segment).map(|index| &index.expected_block_ranges_by_max_block),
|
||||
indexes.get(segment).map(|index| &index.expected_block_ranges_by_max_block),
|
||||
segment_max_block,
|
||||
);
|
||||
|
||||
@@ -941,7 +942,7 @@ impl<N: NodePrimitives> StaticFileProvider<N> {
|
||||
}
|
||||
None => {
|
||||
debug!(target: "provider::static_file", ?segment, "Removing segment from index");
|
||||
indexes.remove(&segment);
|
||||
indexes.remove(segment);
|
||||
}
|
||||
};
|
||||
|
||||
@@ -954,7 +955,7 @@ impl<N: NodePrimitives> StaticFileProvider<N> {
|
||||
let mut indexes = self.indexes.write();
|
||||
indexes.clear();
|
||||
|
||||
for (segment, headers) in iter_static_files(&self.path).map_err(ProviderError::other)? {
|
||||
for (segment, headers) in &*iter_static_files(&self.path).map_err(ProviderError::other)? {
|
||||
// Update first and last block for each segment
|
||||
//
|
||||
// It's safe to call `expect` here, because every segment has at least one header
|
||||
@@ -976,7 +977,7 @@ impl<N: NodePrimitives> StaticFileProvider<N> {
|
||||
|
||||
available_block_ranges_by_max_tx
|
||||
.get_or_insert_with(BTreeMap::default)
|
||||
.insert(tx_end, block_range);
|
||||
.insert(tx_end, *block_range);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -996,7 +997,7 @@ impl<N: NodePrimitives> StaticFileProvider<N> {
|
||||
|
||||
// initialize the expired history height to the lowest static file block
|
||||
if let Some(lowest_range) =
|
||||
indexes.get(&StaticFileSegment::Transactions).and_then(|index| index.min_block_range)
|
||||
indexes.get(StaticFileSegment::Transactions).and_then(|index| index.min_block_range)
|
||||
{
|
||||
// the earliest height is the lowest available block number
|
||||
self.earliest_history_height
|
||||
@@ -1057,7 +1058,7 @@ impl<N: NodePrimitives> StaticFileProvider<N> {
|
||||
info!(target: "reth::cli",
|
||||
"Skipping storage verification for OP mainnet, expected inconsistency in OVM chain"
|
||||
);
|
||||
return Ok(None)
|
||||
return Ok(None);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1107,18 +1108,18 @@ impl<N: NodePrimitives> StaticFileProvider<N> {
|
||||
if let Some(indices) = provider.block_body_indices(last_block)? {
|
||||
debug!(target: "reth::providers::static_file", ?segment, last_block, last_tx_num = indices.last_tx_num(), highest_tx, "Found block body indices");
|
||||
if indices.last_tx_num() <= highest_tx {
|
||||
break
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
debug!(target: "reth::providers::static_file", ?segment, last_block, "Block body indices not found, static files ahead of database");
|
||||
// If the block body indices can not be found, then it means that static
|
||||
// files is ahead of database, and the `ensure_invariants` check will fix
|
||||
// it by comparing with stage checkpoints.
|
||||
break
|
||||
break;
|
||||
}
|
||||
if last_block == 0 {
|
||||
debug!(target: "reth::providers::static_file", ?segment, "Reached block 0 in verification loop");
|
||||
break
|
||||
break;
|
||||
}
|
||||
last_block -= 1;
|
||||
|
||||
@@ -1227,7 +1228,7 @@ impl<N: NodePrimitives> StaticFileProvider<N> {
|
||||
// Old pruned nodes (including full node) do not store receipts as static
|
||||
// files.
|
||||
debug!(target: "reth::providers::static_file", ?segment, "Skipping receipts segment: receipts stored in database");
|
||||
return false
|
||||
return false;
|
||||
}
|
||||
|
||||
if NamedChain::Gnosis == provider.chain_spec().chain_id() ||
|
||||
@@ -1248,7 +1249,7 @@ impl<N: NodePrimitives> StaticFileProvider<N> {
|
||||
StaticFileSegment::AccountChangeSets => {
|
||||
if EitherWriter::account_changesets_destination(provider).is_database() {
|
||||
debug!(target: "reth::providers::static_file", ?segment, "Skipping account changesets segment: changesets stored in database");
|
||||
return false
|
||||
return false;
|
||||
}
|
||||
true
|
||||
}
|
||||
@@ -1361,7 +1362,7 @@ impl<N: NodePrimitives> StaticFileProvider<N> {
|
||||
?segment,
|
||||
"Setting unwind target."
|
||||
);
|
||||
return Ok(Some(highest_block))
|
||||
return Ok(Some(highest_block));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1370,7 +1371,7 @@ impl<N: NodePrimitives> StaticFileProvider<N> {
|
||||
.is_none_or(|highest_entry| db_last_entry > highest_entry)
|
||||
{
|
||||
debug!(target: "reth::providers::static_file", ?segment, db_last_entry, ?highest_static_file_entry, "Database has entries beyond static files, no unwind needed");
|
||||
return Ok(None)
|
||||
return Ok(None);
|
||||
}
|
||||
} else {
|
||||
debug!(target: "reth::providers::static_file", ?segment, "No database entries found");
|
||||
@@ -1402,7 +1403,7 @@ impl<N: NodePrimitives> StaticFileProvider<N> {
|
||||
?segment,
|
||||
"Setting unwind target."
|
||||
);
|
||||
return Ok(Some(highest_static_file_block))
|
||||
return Ok(Some(highest_static_file_block));
|
||||
}
|
||||
|
||||
// If the checkpoint is behind, then we failed to do a database commit **but committed** to
|
||||
@@ -1477,7 +1478,7 @@ impl<N: NodePrimitives> StaticFileProvider<N> {
|
||||
///
|
||||
/// If there is nothing on disk for the given segment, this will return [`None`].
|
||||
pub fn get_lowest_range(&self, segment: StaticFileSegment) -> Option<SegmentRangeInclusive> {
|
||||
self.indexes.read().get(&segment).and_then(|index| index.min_block_range)
|
||||
self.indexes.read().get(segment).and_then(|index| index.min_block_range)
|
||||
}
|
||||
|
||||
/// Gets the lowest static file's block range start if it exists for a static file segment.
|
||||
@@ -1502,7 +1503,7 @@ impl<N: NodePrimitives> StaticFileProvider<N> {
|
||||
///
|
||||
/// If there is nothing on disk for the given segment, this will return [`None`].
|
||||
pub fn get_highest_static_file_block(&self, segment: StaticFileSegment) -> Option<BlockNumber> {
|
||||
self.indexes.read().get(&segment).map(|index| index.max_block)
|
||||
self.indexes.read().get(segment).map(|index| index.max_block)
|
||||
}
|
||||
|
||||
/// Gets the highest static file transaction.
|
||||
@@ -1511,7 +1512,7 @@ impl<N: NodePrimitives> StaticFileProvider<N> {
|
||||
pub fn get_highest_static_file_tx(&self, segment: StaticFileSegment) -> Option<TxNumber> {
|
||||
self.indexes
|
||||
.read()
|
||||
.get(&segment)
|
||||
.get(segment)
|
||||
.and_then(|index| index.available_block_ranges_by_max_tx.as_ref())
|
||||
.and_then(|index| index.last_key_value().map(|(last_tx, _)| *last_tx))
|
||||
}
|
||||
@@ -1531,12 +1532,12 @@ impl<N: NodePrimitives> StaticFileProvider<N> {
|
||||
func: impl Fn(StaticFileJarProvider<'_, N>) -> ProviderResult<Option<T>>,
|
||||
) -> ProviderResult<Option<T>> {
|
||||
if let Some(ranges) =
|
||||
self.indexes.read().get(&segment).map(|index| &index.expected_block_ranges_by_max_block)
|
||||
self.indexes.read().get(segment).map(|index| &index.expected_block_ranges_by_max_block)
|
||||
{
|
||||
// Iterate through all ranges in reverse order (highest to lowest)
|
||||
for range in ranges.values().rev() {
|
||||
if let Some(res) = func(self.get_or_create_jar_provider(segment, range)?)? {
|
||||
return Ok(Some(res))
|
||||
return Ok(Some(res));
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1593,14 +1594,14 @@ impl<N: NodePrimitives> StaticFileProvider<N> {
|
||||
match get_fn(&mut cursor, number)? {
|
||||
Some(res) => {
|
||||
if !predicate(&res) {
|
||||
break 'outer
|
||||
break 'outer;
|
||||
}
|
||||
result.push(res);
|
||||
break 'inner
|
||||
break 'inner;
|
||||
}
|
||||
None => {
|
||||
if retrying {
|
||||
return Ok(result)
|
||||
return Ok(result);
|
||||
}
|
||||
// There is a very small chance of hitting a deadlock if two consecutive
|
||||
// static files share the same bucket in the
|
||||
@@ -1694,7 +1695,7 @@ impl<N: NodePrimitives> StaticFileProvider<N> {
|
||||
if static_file_upper_bound
|
||||
.is_some_and(|static_file_upper_bound| static_file_upper_bound >= number)
|
||||
{
|
||||
return fetch_from_static_file(self)
|
||||
return fetch_from_static_file(self);
|
||||
}
|
||||
fetch_from_database()
|
||||
}
|
||||
@@ -1759,7 +1760,7 @@ impl<N: NodePrimitives> StaticFileProvider<N> {
|
||||
pub fn tx_index(&self, segment: StaticFileSegment) -> Option<SegmentRanges> {
|
||||
self.indexes
|
||||
.read()
|
||||
.get(&segment)
|
||||
.get(segment)
|
||||
.and_then(|index| index.available_block_ranges_by_max_tx.as_ref())
|
||||
.cloned()
|
||||
}
|
||||
@@ -1769,7 +1770,7 @@ impl<N: NodePrimitives> StaticFileProvider<N> {
|
||||
pub fn expected_block_index(&self, segment: StaticFileSegment) -> Option<SegmentRanges> {
|
||||
self.indexes
|
||||
.read()
|
||||
.get(&segment)
|
||||
.get(segment)
|
||||
.map(|index| &index.expected_block_ranges_by_max_block)
|
||||
.cloned()
|
||||
}
|
||||
@@ -1846,7 +1847,7 @@ impl<N: NodePrimitives> StaticFileWriter for StaticFileProvider<N> {
|
||||
segment: StaticFileSegment,
|
||||
) -> ProviderResult<StaticFileProviderRWRefMut<'_, Self::Primitives>> {
|
||||
if self.access.is_read_only() {
|
||||
return Err(ProviderError::ReadOnlyStaticFileAccess)
|
||||
return Err(ProviderError::ReadOnlyStaticFileAccess);
|
||||
}
|
||||
|
||||
trace!(target: "provider::static_file", ?block, ?segment, "Getting static file writer.");
|
||||
@@ -1988,7 +1989,7 @@ impl<N: NodePrimitives> ChangeSetReader for StaticFileProvider<N> {
|
||||
|
||||
// iterate through static files and sum changeset metadata via each static file header
|
||||
let static_files = iter_static_files(&self.path).map_err(ProviderError::other)?;
|
||||
if let Some(changeset_segments) = static_files.get(&StaticFileSegment::AccountChangeSets) {
|
||||
if let Some(changeset_segments) = static_files.get(StaticFileSegment::AccountChangeSets) {
|
||||
for (_, header) in changeset_segments {
|
||||
if let Some(changeset_offsets) = header.changeset_offsets() {
|
||||
for offset in changeset_offsets {
|
||||
@@ -2030,7 +2031,7 @@ impl<N: NodePrimitives<BlockHeader: Value>> HeaderProvider for StaticFileProvide
|
||||
.get_two::<HeaderWithHashMask<Self::Header>>((&block_hash).into())?
|
||||
.and_then(|(header, hash)| {
|
||||
if hash == block_hash {
|
||||
return Some(header)
|
||||
return Some(header);
|
||||
}
|
||||
None
|
||||
}))
|
||||
@@ -2140,7 +2141,7 @@ impl<N: NodePrimitives<SignedTx: Value + SignedTransaction, Receipt: Value>> Rec
|
||||
|
||||
fn receipt_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Receipt>> {
|
||||
if let Some(num) = self.transaction_id(hash)? {
|
||||
return self.receipt(num)
|
||||
return self.receipt(num);
|
||||
}
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
@@ -3,13 +3,13 @@ use std::{collections::HashMap, time::Duration};
|
||||
use itertools::Itertools;
|
||||
use metrics::{Counter, Gauge, Histogram};
|
||||
use reth_metrics::Metrics;
|
||||
use reth_static_file_types::StaticFileSegment;
|
||||
use reth_static_file_types::{StaticFileMap, StaticFileSegment};
|
||||
use strum::{EnumIter, IntoEnumIterator};
|
||||
|
||||
/// Metrics for the static file provider.
|
||||
#[derive(Debug)]
|
||||
pub struct StaticFileProviderMetrics {
|
||||
segments: HashMap<StaticFileSegment, StaticFileSegmentMetrics>,
|
||||
segments: StaticFileMap<StaticFileSegmentMetrics>,
|
||||
segment_operations: HashMap<
|
||||
(StaticFileSegment, StaticFileProviderOperation),
|
||||
StaticFileProviderOperationMetrics,
|
||||
@@ -19,14 +19,19 @@ pub struct StaticFileProviderMetrics {
|
||||
impl Default for StaticFileProviderMetrics {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
segments: StaticFileSegment::iter()
|
||||
.map(|segment| {
|
||||
(
|
||||
segment,
|
||||
StaticFileSegmentMetrics::new_with_labels(&[("segment", segment.as_str())]),
|
||||
)
|
||||
})
|
||||
.collect(),
|
||||
segments: Box::new(
|
||||
StaticFileSegment::iter()
|
||||
.map(|segment| {
|
||||
(
|
||||
segment,
|
||||
StaticFileSegmentMetrics::new_with_labels(&[(
|
||||
"segment",
|
||||
segment.as_str(),
|
||||
)]),
|
||||
)
|
||||
})
|
||||
.collect(),
|
||||
),
|
||||
segment_operations: StaticFileSegment::iter()
|
||||
.cartesian_product(StaticFileProviderOperation::iter())
|
||||
.map(|(segment, operation)| {
|
||||
@@ -51,10 +56,10 @@ impl StaticFileProviderMetrics {
|
||||
files: usize,
|
||||
entries: usize,
|
||||
) {
|
||||
self.segments.get(&segment).expect("segment metrics should exist").size.set(size as f64);
|
||||
self.segments.get(&segment).expect("segment metrics should exist").files.set(files as f64);
|
||||
self.segments.get(segment).expect("segment metrics should exist").size.set(size as f64);
|
||||
self.segments.get(segment).expect("segment metrics should exist").files.set(files as f64);
|
||||
self.segments
|
||||
.get(&segment)
|
||||
.get(segment)
|
||||
.expect("segment metrics should exist")
|
||||
.entries
|
||||
.set(entries as f64);
|
||||
|
||||
Reference in New Issue
Block a user