mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-02-19 03:04:27 -05:00
perf: use separate pool for save_blocks (#21764)
This commit is contained in:
@@ -14,6 +14,8 @@
|
||||
#[global_allocator]
|
||||
static ALLOC: reth_cli_util::allocator::Allocator = reth_cli_util::allocator::new_allocator();
|
||||
|
||||
use alloy_primitives as _;
|
||||
|
||||
mod benchmark;
|
||||
mod cli;
|
||||
mod comparison;
|
||||
|
||||
@@ -146,6 +146,24 @@ impl DatabaseArguments {
|
||||
}
|
||||
}
|
||||
|
||||
/// Create database arguments suitable for testing.
|
||||
///
|
||||
/// Uses a small geometry (64MB max, 4MB growth) to avoid exhausting the system's
|
||||
/// virtual memory map limit (`vm.max_map_count`) when many test databases are open
|
||||
/// concurrently.
|
||||
pub fn test() -> Self {
|
||||
Self {
|
||||
geometry: Geometry {
|
||||
size: Some(0..(64 * MEGABYTE)),
|
||||
growth_step: Some(4 * MEGABYTE as isize),
|
||||
shrink_threshold: Some(0),
|
||||
page_size: Some(PageSize::Set(default_page_size())),
|
||||
},
|
||||
max_read_transaction_duration: Some(MaxReadTransactionDuration::Unbounded),
|
||||
..Self::new(ClientVersion::default())
|
||||
}
|
||||
}
|
||||
|
||||
/// Sets the upper size limit of the db environment, the maximum database size in bytes.
|
||||
pub const fn with_geometry_max_size(mut self, max_size: Option<usize>) -> Self {
|
||||
if let Some(max_size) = max_size {
|
||||
|
||||
@@ -43,11 +43,8 @@ pub mod test_utils {
|
||||
use super::*;
|
||||
use crate::mdbx::DatabaseArguments;
|
||||
use parking_lot::RwLock;
|
||||
use reth_db_api::{
|
||||
database::Database, database_metrics::DatabaseMetrics, models::ClientVersion,
|
||||
};
|
||||
use reth_db_api::{database::Database, database_metrics::DatabaseMetrics};
|
||||
use reth_fs_util;
|
||||
use reth_libmdbx::MaxReadTransactionDuration;
|
||||
use std::{
|
||||
fmt::Formatter,
|
||||
path::{Path, PathBuf},
|
||||
@@ -179,12 +176,7 @@ pub mod test_utils {
|
||||
let path = tempdir_path();
|
||||
let emsg = format!("{ERROR_DB_CREATION}: {path:?}");
|
||||
|
||||
let db = init_db(
|
||||
&path,
|
||||
DatabaseArguments::new(ClientVersion::default())
|
||||
.with_max_read_transaction_duration(Some(MaxReadTransactionDuration::Unbounded)),
|
||||
)
|
||||
.expect(&emsg);
|
||||
let db = init_db(&path, DatabaseArguments::test()).expect(&emsg);
|
||||
|
||||
Arc::new(TempDatabase::new(db, path))
|
||||
}
|
||||
@@ -194,12 +186,7 @@ pub mod test_utils {
|
||||
pub fn create_test_rw_db_with_path<P: AsRef<Path>>(path: P) -> Arc<TempDatabase<DatabaseEnv>> {
|
||||
let path = path.as_ref().to_path_buf();
|
||||
let emsg = format!("{ERROR_DB_CREATION}: {path:?}");
|
||||
let db = init_db(
|
||||
path.as_path(),
|
||||
DatabaseArguments::new(ClientVersion::default())
|
||||
.with_max_read_transaction_duration(Some(MaxReadTransactionDuration::Unbounded)),
|
||||
)
|
||||
.expect(&emsg);
|
||||
let db = init_db(path.as_path(), DatabaseArguments::test()).expect(&emsg);
|
||||
Arc::new(TempDatabase::new(db, path))
|
||||
}
|
||||
|
||||
@@ -214,20 +201,14 @@ pub mod test_utils {
|
||||
let datadir = datadir.as_ref().to_path_buf();
|
||||
let db_path = datadir.join("db");
|
||||
let emsg = format!("{ERROR_DB_CREATION}: {db_path:?}");
|
||||
let db = init_db(
|
||||
&db_path,
|
||||
DatabaseArguments::new(ClientVersion::default())
|
||||
.with_max_read_transaction_duration(Some(MaxReadTransactionDuration::Unbounded)),
|
||||
)
|
||||
.expect(&emsg);
|
||||
let db = init_db(&db_path, DatabaseArguments::test()).expect(&emsg);
|
||||
Arc::new(TempDatabase::new(db, datadir))
|
||||
}
|
||||
|
||||
/// Create read only database for testing
|
||||
#[track_caller]
|
||||
pub fn create_test_ro_db() -> Arc<TempDatabase<DatabaseEnv>> {
|
||||
let args = DatabaseArguments::new(ClientVersion::default())
|
||||
.with_max_read_transaction_duration(Some(MaxReadTransactionDuration::Unbounded));
|
||||
let args = DatabaseArguments::test();
|
||||
|
||||
let path = tempdir_path();
|
||||
let emsg = format!("{ERROR_DB_CREATION}: {path:?}");
|
||||
|
||||
@@ -24,6 +24,7 @@ reth-db = { workspace = true, features = ["mdbx"] }
|
||||
reth-db-api.workspace = true
|
||||
reth-prune-types.workspace = true
|
||||
reth-stages-types.workspace = true
|
||||
reth-tasks.workspace = true
|
||||
reth-trie = { workspace = true, features = ["metrics"] }
|
||||
reth-trie-db = { workspace = true, features = ["metrics"] }
|
||||
reth-nippy-jar.workspace = true
|
||||
@@ -32,7 +33,6 @@ reth-chain-state.workspace = true
|
||||
reth-node-types.workspace = true
|
||||
reth-static-file-types = { workspace = true, features = ["std"] }
|
||||
reth-fs-util.workspace = true
|
||||
reth-tasks.workspace = true
|
||||
|
||||
# ethereum
|
||||
alloy-eips.workspace = true
|
||||
|
||||
@@ -28,6 +28,9 @@ pub use providers::{
|
||||
pub mod changeset_walker;
|
||||
pub mod changesets_utils;
|
||||
|
||||
mod storage_threadpool;
|
||||
use storage_threadpool::STORAGE_POOL;
|
||||
|
||||
#[cfg(any(test, feature = "test-utils"))]
|
||||
/// Common test helpers for mocking the Provider.
|
||||
pub mod test_utils;
|
||||
|
||||
@@ -18,7 +18,7 @@ use crate::{
|
||||
PruneCheckpointReader, PruneCheckpointWriter, RawRocksDBBatch, RevertsInit, RocksBatchArg,
|
||||
RocksDBProviderFactory, StageCheckpointReader, StateProviderBox, StateWriter,
|
||||
StaticFileProviderFactory, StatsReader, StorageReader, StorageTrieWriter, TransactionVariant,
|
||||
TransactionsProvider, TransactionsProviderExt, TrieWriter,
|
||||
TransactionsProvider, TransactionsProviderExt, TrieWriter, STORAGE_POOL,
|
||||
};
|
||||
use alloy_consensus::{
|
||||
transaction::{SignerRecoverable, TransactionMeta, TxHashRef},
|
||||
@@ -64,7 +64,6 @@ use reth_storage_api::{
|
||||
StorageSettingsCache, TryIntoHistoricalStateProvider, WriteStateInput,
|
||||
};
|
||||
use reth_storage_errors::provider::{ProviderResult, StaticFileWriterError};
|
||||
use reth_tasks::spawn_scoped_os_thread;
|
||||
use reth_trie::{
|
||||
updates::{StorageTrieUpdatesSorted, TrieUpdatesSorted},
|
||||
HashedPostStateSorted, StoredNibbles,
|
||||
@@ -79,7 +78,6 @@ use std::{
|
||||
fmt::Debug,
|
||||
ops::{Deref, DerefMut, Range, RangeBounds, RangeInclusive},
|
||||
sync::Arc,
|
||||
thread,
|
||||
time::Instant,
|
||||
};
|
||||
use tracing::{debug, instrument, trace};
|
||||
@@ -550,24 +548,37 @@ impl<TX: DbTx + DbTxMut + 'static, N: NodeTypesForProvider> DatabaseProvider<TX,
|
||||
let rocksdb_provider = self.rocksdb_provider.clone();
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
let rocksdb_ctx = self.rocksdb_write_ctx(first_number);
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
let rocksdb_enabled = rocksdb_ctx.storage_settings.any_in_rocksdb();
|
||||
|
||||
thread::scope(|s| {
|
||||
let mut sf_result = None;
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
let mut rocksdb_result = None;
|
||||
|
||||
// Write to all backends in parallel.
|
||||
STORAGE_POOL.in_place_scope(|s| {
|
||||
// SF writes
|
||||
let sf_handle = spawn_scoped_os_thread(s, "static-files", || {
|
||||
s.spawn(|_| {
|
||||
let start = Instant::now();
|
||||
sf_provider.write_blocks_data(&blocks, &tx_nums, sf_ctx)?;
|
||||
Ok::<_, ProviderError>(start.elapsed())
|
||||
sf_result = Some(
|
||||
sf_provider
|
||||
.write_blocks_data(&blocks, &tx_nums, sf_ctx)
|
||||
.map(|()| start.elapsed()),
|
||||
);
|
||||
});
|
||||
|
||||
// RocksDB writes
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
let rocksdb_handle = rocksdb_ctx.storage_settings.any_in_rocksdb().then(|| {
|
||||
spawn_scoped_os_thread(s, "rocksdb", || {
|
||||
if rocksdb_enabled {
|
||||
s.spawn(|_| {
|
||||
let start = Instant::now();
|
||||
rocksdb_provider.write_blocks_data(&blocks, &tx_nums, rocksdb_ctx)?;
|
||||
Ok::<_, ProviderError>(start.elapsed())
|
||||
})
|
||||
});
|
||||
rocksdb_result = Some(
|
||||
rocksdb_provider
|
||||
.write_blocks_data(&blocks, &tx_nums, rocksdb_ctx)
|
||||
.map(|()| start.elapsed()),
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
// MDBX writes
|
||||
let mdbx_start = Instant::now();
|
||||
@@ -672,24 +683,27 @@ impl<TX: DbTx + DbTxMut + 'static, N: NodeTypesForProvider> DatabaseProvider<TX,
|
||||
|
||||
timings.mdbx = mdbx_start.elapsed();
|
||||
|
||||
// Wait for SF thread
|
||||
timings.sf = sf_handle
|
||||
.join()
|
||||
.map_err(|_| StaticFileWriterError::ThreadPanic("static file"))??;
|
||||
Ok::<_, ProviderError>(())
|
||||
})?;
|
||||
|
||||
// Wait for RocksDB thread
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
if let Some(handle) = rocksdb_handle {
|
||||
timings.rocksdb = handle.join().expect("RocksDB thread panicked")?;
|
||||
}
|
||||
// Collect results from spawned tasks
|
||||
timings.sf = sf_result.ok_or(StaticFileWriterError::ThreadPanic("static file"))??;
|
||||
|
||||
timings.total = total_start.elapsed();
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
if rocksdb_enabled {
|
||||
timings.rocksdb = rocksdb_result.ok_or_else(|| {
|
||||
ProviderError::Database(reth_db_api::DatabaseError::Other(
|
||||
"RocksDB thread panicked".into(),
|
||||
))
|
||||
})??;
|
||||
}
|
||||
|
||||
self.metrics.record_save_blocks(&timings);
|
||||
debug!(target: "providers::db", range = ?first_number..=last_block_number, "Appended block data");
|
||||
timings.total = total_start.elapsed();
|
||||
|
||||
Ok(())
|
||||
})
|
||||
self.metrics.record_save_blocks(&timings);
|
||||
debug!(target: "providers::db", range = ?first_number..=last_block_number, "Appended block data");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Writes MDBX-only data for a block (indices, lookups, and senders if configured for MDBX).
|
||||
|
||||
@@ -1,5 +1,8 @@
|
||||
use super::metrics::{RocksDBMetrics, RocksDBOperation, ROCKSDB_TABLES};
|
||||
use crate::providers::{compute_history_rank, needs_prev_shard_check, HistoryInfo};
|
||||
use crate::{
|
||||
providers::{compute_history_rank, needs_prev_shard_check, HistoryInfo},
|
||||
STORAGE_POOL,
|
||||
};
|
||||
use alloy_consensus::transaction::TxHashRef;
|
||||
use alloy_primitives::{
|
||||
map::{AddressMap, HashMap},
|
||||
@@ -24,7 +27,6 @@ use reth_storage_errors::{
|
||||
db::{DatabaseErrorInfo, DatabaseWriteError, DatabaseWriteOperation, LogLevel},
|
||||
provider::{ProviderError, ProviderResult},
|
||||
};
|
||||
use reth_tasks::spawn_scoped_os_thread;
|
||||
use rocksdb::{
|
||||
BlockBasedOptions, Cache, ColumnFamilyDescriptor, CompactionPri, DBCompressionType,
|
||||
DBRawIteratorWithThreadMode, IteratorMode, OptimisticTransactionDB,
|
||||
@@ -36,7 +38,6 @@ use std::{
|
||||
fmt,
|
||||
path::{Path, PathBuf},
|
||||
sync::Arc,
|
||||
thread,
|
||||
time::Instant,
|
||||
};
|
||||
use tracing::instrument;
|
||||
@@ -1197,41 +1198,58 @@ impl RocksDBProvider {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
thread::scope(|s| {
|
||||
let handles: Vec<_> = [
|
||||
(ctx.storage_settings.transaction_hash_numbers_in_rocksdb &&
|
||||
ctx.prune_tx_lookup.is_none_or(|m| !m.is_full()))
|
||||
.then(|| {
|
||||
spawn_scoped_os_thread(s, "rocksdb-tx-hash", || {
|
||||
self.write_tx_hash_numbers(blocks, tx_nums, &ctx)
|
||||
})
|
||||
}),
|
||||
ctx.storage_settings.account_history_in_rocksdb.then(|| {
|
||||
spawn_scoped_os_thread(s, "rocksdb-account-history", || {
|
||||
self.write_account_history(blocks, &ctx)
|
||||
})
|
||||
}),
|
||||
ctx.storage_settings.storages_history_in_rocksdb.then(|| {
|
||||
spawn_scoped_os_thread(s, "rocksdb-storage-history", || {
|
||||
self.write_storage_history(blocks, &ctx)
|
||||
})
|
||||
}),
|
||||
]
|
||||
.into_iter()
|
||||
.enumerate()
|
||||
.filter_map(|(i, h)| h.map(|h| (i, h)))
|
||||
.collect();
|
||||
let mut r_tx_hash = None;
|
||||
let mut r_account_history = None;
|
||||
let mut r_storage_history = None;
|
||||
|
||||
for (i, handle) in handles {
|
||||
handle.join().map_err(|_| {
|
||||
ProviderError::Database(DatabaseError::Other(format!(
|
||||
"rocksdb write thread {i} panicked"
|
||||
)))
|
||||
})??;
|
||||
let write_tx_hash = ctx.storage_settings.transaction_hash_numbers_in_rocksdb &&
|
||||
ctx.prune_tx_lookup.is_none_or(|m| !m.is_full());
|
||||
let write_account_history = ctx.storage_settings.account_history_in_rocksdb;
|
||||
let write_storage_history = ctx.storage_settings.storages_history_in_rocksdb;
|
||||
|
||||
STORAGE_POOL.in_place_scope(|s| {
|
||||
if write_tx_hash {
|
||||
s.spawn(|_| {
|
||||
r_tx_hash = Some(self.write_tx_hash_numbers(blocks, tx_nums, &ctx));
|
||||
});
|
||||
}
|
||||
|
||||
Ok(())
|
||||
})
|
||||
if write_account_history {
|
||||
s.spawn(|_| {
|
||||
r_account_history = Some(self.write_account_history(blocks, &ctx));
|
||||
});
|
||||
}
|
||||
|
||||
if write_storage_history {
|
||||
s.spawn(|_| {
|
||||
r_storage_history = Some(self.write_storage_history(blocks, &ctx));
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
if write_tx_hash {
|
||||
r_tx_hash.ok_or_else(|| {
|
||||
ProviderError::Database(DatabaseError::Other(
|
||||
"rocksdb tx-hash write thread panicked".into(),
|
||||
))
|
||||
})??;
|
||||
}
|
||||
if write_account_history {
|
||||
r_account_history.ok_or_else(|| {
|
||||
ProviderError::Database(DatabaseError::Other(
|
||||
"rocksdb account-history write thread panicked".into(),
|
||||
))
|
||||
})??;
|
||||
}
|
||||
if write_storage_history {
|
||||
r_storage_history.ok_or_else(|| {
|
||||
ProviderError::Database(DatabaseError::Other(
|
||||
"rocksdb storage-history write thread panicked".into(),
|
||||
))
|
||||
})??;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Writes transaction hash to number mappings for the given blocks.
|
||||
|
||||
@@ -6,7 +6,7 @@ use crate::{
|
||||
changeset_walker::{StaticFileAccountChangesetWalker, StaticFileStorageChangesetWalker},
|
||||
to_range, BlockHashReader, BlockNumReader, BlockReader, BlockSource, EitherWriter,
|
||||
EitherWriterDestination, HeaderProvider, ReceiptProvider, StageCheckpointReader, StatsReader,
|
||||
TransactionVariant, TransactionsProvider, TransactionsProviderExt,
|
||||
TransactionVariant, TransactionsProvider, TransactionsProviderExt, STORAGE_POOL,
|
||||
};
|
||||
use alloy_consensus::{transaction::TransactionMeta, Header};
|
||||
use alloy_eips::{eip2718::Encodable2718, BlockHashOrNumber};
|
||||
@@ -47,14 +47,12 @@ use reth_storage_api::{
|
||||
StorageChangeSetReader, StorageSettingsCache,
|
||||
};
|
||||
use reth_storage_errors::provider::{ProviderError, ProviderResult, StaticFileWriterError};
|
||||
use reth_tasks::spawn_scoped_os_thread;
|
||||
use std::{
|
||||
collections::BTreeMap,
|
||||
fmt::Debug,
|
||||
ops::{Bound, Deref, Range, RangeBounds, RangeInclusive},
|
||||
path::{Path, PathBuf},
|
||||
sync::{atomic::AtomicU64, mpsc, Arc},
|
||||
thread,
|
||||
};
|
||||
use tracing::{debug, info, info_span, instrument, trace, warn};
|
||||
|
||||
@@ -656,31 +654,28 @@ impl<N: NodePrimitives> StaticFileProvider<N> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Spawns a scoped thread that writes to a static file segment using the provided closure.
|
||||
/// Writes to a static file segment using the provided closure.
|
||||
///
|
||||
/// The closure receives a mutable reference to the segment writer. After the closure completes,
|
||||
/// `sync_all()` is called to flush writes to disk.
|
||||
fn spawn_segment_writer<'scope, 'env, F>(
|
||||
&'env self,
|
||||
scope: &'scope thread::Scope<'scope, 'env>,
|
||||
fn write_segment<F>(
|
||||
&self,
|
||||
segment: StaticFileSegment,
|
||||
first_block_number: BlockNumber,
|
||||
f: F,
|
||||
) -> thread::ScopedJoinHandle<'scope, ProviderResult<()>>
|
||||
) -> ProviderResult<()>
|
||||
where
|
||||
F: FnOnce(&mut StaticFileProviderRWRefMut<'_, N>) -> ProviderResult<()> + Send + 'env,
|
||||
F: FnOnce(&mut StaticFileProviderRWRefMut<'_, N>) -> ProviderResult<()>,
|
||||
{
|
||||
spawn_scoped_os_thread(scope, segment.as_short_str(), move || {
|
||||
let mut w = self.get_writer(first_block_number, segment)?;
|
||||
f(&mut w)?;
|
||||
w.sync_all()
|
||||
})
|
||||
let mut w = self.get_writer(first_block_number, segment)?;
|
||||
f(&mut w)?;
|
||||
w.sync_all()
|
||||
}
|
||||
|
||||
/// Writes all static file data for multiple blocks in parallel per-segment.
|
||||
///
|
||||
/// This spawns separate threads for each segment type and each thread calls `sync_all()` on its
|
||||
/// writer when done.
|
||||
/// This spawns tasks on the storage thread pool for each segment type and each task calls
|
||||
/// `sync_all()` on its writer when done.
|
||||
#[instrument(level = "debug", target = "providers::static_file", skip_all)]
|
||||
pub fn write_blocks_data(
|
||||
&self,
|
||||
@@ -694,70 +689,87 @@ impl<N: NodePrimitives> StaticFileProvider<N> {
|
||||
|
||||
let first_block_number = blocks[0].recovered_block().number();
|
||||
|
||||
thread::scope(|s| {
|
||||
let h_headers =
|
||||
self.spawn_segment_writer(s, StaticFileSegment::Headers, first_block_number, |w| {
|
||||
Self::write_headers(w, blocks)
|
||||
let mut r_headers = None;
|
||||
let mut r_txs = None;
|
||||
let mut r_senders = None;
|
||||
let mut r_receipts = None;
|
||||
let mut r_account_changesets = None;
|
||||
let mut r_storage_changesets = None;
|
||||
|
||||
STORAGE_POOL.in_place_scope(|s| {
|
||||
s.spawn(|_| {
|
||||
r_headers =
|
||||
Some(self.write_segment(StaticFileSegment::Headers, first_block_number, |w| {
|
||||
Self::write_headers(w, blocks)
|
||||
}));
|
||||
});
|
||||
|
||||
s.spawn(|_| {
|
||||
r_txs = Some(self.write_segment(
|
||||
StaticFileSegment::Transactions,
|
||||
first_block_number,
|
||||
|w| Self::write_transactions(w, blocks, tx_nums),
|
||||
));
|
||||
});
|
||||
|
||||
if ctx.write_senders {
|
||||
s.spawn(|_| {
|
||||
r_senders = Some(self.write_segment(
|
||||
StaticFileSegment::TransactionSenders,
|
||||
first_block_number,
|
||||
|w| Self::write_transaction_senders(w, blocks, tx_nums),
|
||||
));
|
||||
});
|
||||
|
||||
let h_txs = self.spawn_segment_writer(
|
||||
s,
|
||||
StaticFileSegment::Transactions,
|
||||
first_block_number,
|
||||
|w| Self::write_transactions(w, blocks, tx_nums),
|
||||
);
|
||||
|
||||
let h_senders = ctx.write_senders.then(|| {
|
||||
self.spawn_segment_writer(
|
||||
s,
|
||||
StaticFileSegment::TransactionSenders,
|
||||
first_block_number,
|
||||
|w| Self::write_transaction_senders(w, blocks, tx_nums),
|
||||
)
|
||||
});
|
||||
|
||||
let h_receipts = ctx.write_receipts.then(|| {
|
||||
self.spawn_segment_writer(s, StaticFileSegment::Receipts, first_block_number, |w| {
|
||||
Self::write_receipts(w, blocks, tx_nums, &ctx)
|
||||
})
|
||||
});
|
||||
|
||||
let h_account_changesets = ctx.write_account_changesets.then(|| {
|
||||
self.spawn_segment_writer(
|
||||
s,
|
||||
StaticFileSegment::AccountChangeSets,
|
||||
first_block_number,
|
||||
|w| Self::write_account_changesets(w, blocks),
|
||||
)
|
||||
});
|
||||
|
||||
let h_storage_changesets = ctx.write_storage_changesets.then(|| {
|
||||
self.spawn_segment_writer(
|
||||
s,
|
||||
StaticFileSegment::StorageChangeSets,
|
||||
first_block_number,
|
||||
|w| Self::write_storage_changesets(w, blocks),
|
||||
)
|
||||
});
|
||||
|
||||
h_headers.join().map_err(|_| StaticFileWriterError::ThreadPanic("headers"))??;
|
||||
h_txs.join().map_err(|_| StaticFileWriterError::ThreadPanic("transactions"))??;
|
||||
if let Some(h) = h_senders {
|
||||
h.join().map_err(|_| StaticFileWriterError::ThreadPanic("senders"))??;
|
||||
}
|
||||
if let Some(h) = h_receipts {
|
||||
h.join().map_err(|_| StaticFileWriterError::ThreadPanic("receipts"))??;
|
||||
|
||||
if ctx.write_receipts {
|
||||
s.spawn(|_| {
|
||||
r_receipts = Some(self.write_segment(
|
||||
StaticFileSegment::Receipts,
|
||||
first_block_number,
|
||||
|w| Self::write_receipts(w, blocks, tx_nums, &ctx),
|
||||
));
|
||||
});
|
||||
}
|
||||
if let Some(h) = h_account_changesets {
|
||||
h.join()
|
||||
.map_err(|_| StaticFileWriterError::ThreadPanic("account_changesets"))??;
|
||||
|
||||
if ctx.write_account_changesets {
|
||||
s.spawn(|_| {
|
||||
r_account_changesets = Some(self.write_segment(
|
||||
StaticFileSegment::AccountChangeSets,
|
||||
first_block_number,
|
||||
|w| Self::write_account_changesets(w, blocks),
|
||||
));
|
||||
});
|
||||
}
|
||||
if let Some(h) = h_storage_changesets {
|
||||
h.join()
|
||||
.map_err(|_| StaticFileWriterError::ThreadPanic("storage_changesets"))??;
|
||||
|
||||
if ctx.write_storage_changesets {
|
||||
s.spawn(|_| {
|
||||
r_storage_changesets = Some(self.write_segment(
|
||||
StaticFileSegment::StorageChangeSets,
|
||||
first_block_number,
|
||||
|w| Self::write_storage_changesets(w, blocks),
|
||||
));
|
||||
});
|
||||
}
|
||||
Ok(())
|
||||
})
|
||||
});
|
||||
|
||||
r_headers.ok_or(StaticFileWriterError::ThreadPanic("headers"))??;
|
||||
r_txs.ok_or(StaticFileWriterError::ThreadPanic("transactions"))??;
|
||||
if ctx.write_senders {
|
||||
r_senders.ok_or(StaticFileWriterError::ThreadPanic("senders"))??;
|
||||
}
|
||||
if ctx.write_receipts {
|
||||
r_receipts.ok_or(StaticFileWriterError::ThreadPanic("receipts"))??;
|
||||
}
|
||||
if ctx.write_account_changesets {
|
||||
r_account_changesets
|
||||
.ok_or(StaticFileWriterError::ThreadPanic("account_changesets"))??;
|
||||
}
|
||||
if ctx.write_storage_changesets {
|
||||
r_storage_changesets
|
||||
.ok_or(StaticFileWriterError::ThreadPanic("storage_changesets"))??;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Gets the [`StaticFileJarProvider`] of the requested segment and start index that can be
|
||||
|
||||
23
crates/storage/provider/src/storage_threadpool.rs
Normal file
23
crates/storage/provider/src/storage_threadpool.rs
Normal file
@@ -0,0 +1,23 @@
|
||||
//! Dedicated thread pool for storage I/O operations.
|
||||
//!
|
||||
//! This module provides a static rayon thread pool used for parallel writes to static files,
|
||||
//! `RocksDB`, and other storage backends during block persistence.
|
||||
|
||||
use rayon::{ThreadPool, ThreadPoolBuilder};
|
||||
use std::sync::LazyLock;
|
||||
|
||||
/// Number of threads in the storage I/O thread pool.
|
||||
const STORAGE_POOL_THREADS: usize = 16;
|
||||
|
||||
/// Static thread pool for storage I/O operations.
|
||||
///
|
||||
/// This pool is used exclusively by [`save_blocks`](crate::DatabaseProvider::save_blocks) to
|
||||
/// parallelize writes to different storage backends (static files, `RocksDB`). Since this is the
|
||||
/// only call site, all threads are always available when needed.
|
||||
pub(crate) static STORAGE_POOL: LazyLock<ThreadPool> = LazyLock::new(|| {
|
||||
ThreadPoolBuilder::new()
|
||||
.num_threads(STORAGE_POOL_THREADS)
|
||||
.thread_name(|idx| format!("save-blocks-{idx}"))
|
||||
.build()
|
||||
.expect("failed to create storage thread pool")
|
||||
});
|
||||
@@ -1,7 +1,7 @@
|
||||
//! Test case definitions
|
||||
|
||||
use crate::result::{CaseResult, Error};
|
||||
use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
|
||||
use rayon::prelude::*;
|
||||
use std::{
|
||||
fmt::Debug,
|
||||
path::{Path, PathBuf},
|
||||
@@ -10,7 +10,7 @@ use std::{
|
||||
/// A single test case, capable of loading a JSON description of itself and running it.
|
||||
///
|
||||
/// See <https://ethereum-tests.readthedocs.io/> for test specs.
|
||||
pub trait Case: Debug + Sync + Sized {
|
||||
pub trait Case: Debug + Send + Sync + Sized + 'static {
|
||||
/// A description of the test.
|
||||
fn description(&self) -> String {
|
||||
"no description".to_string()
|
||||
@@ -22,7 +22,7 @@ pub trait Case: Debug + Sync + Sized {
|
||||
fn load(path: &Path) -> Result<Self, Error>;
|
||||
|
||||
/// Run the test.
|
||||
fn run(&self) -> Result<(), Error>;
|
||||
fn run(self) -> Result<(), Error>;
|
||||
}
|
||||
|
||||
/// A container for multiple test cases.
|
||||
@@ -34,10 +34,10 @@ pub struct Cases<T> {
|
||||
|
||||
impl<T: Case> Cases<T> {
|
||||
/// Run the contained test cases.
|
||||
pub fn run(&self) -> Vec<CaseResult> {
|
||||
pub fn run(self) -> Vec<CaseResult> {
|
||||
self.test_cases
|
||||
.par_iter()
|
||||
.map(|(path, case)| CaseResult::new(path, case, case.run()))
|
||||
.into_par_iter()
|
||||
.map(|(path, case)| CaseResult::new(&path, case.description(), case.run()))
|
||||
.collect()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5,7 +5,7 @@ use crate::{
|
||||
Case, Error, Suite,
|
||||
};
|
||||
use alloy_rlp::{Decodable, Encodable};
|
||||
use rayon::iter::ParallelIterator;
|
||||
use rayon::iter::{IndexedParallelIterator, ParallelIterator};
|
||||
use reth_chainspec::ChainSpec;
|
||||
use reth_consensus::{Consensus, HeaderValidator};
|
||||
use reth_db_common::init::{insert_genesis_hashes, insert_genesis_history, insert_genesis_state};
|
||||
@@ -172,7 +172,7 @@ impl Case for BlockchainTestCase {
|
||||
///
|
||||
/// # Errors
|
||||
/// Returns an error if the test is flagged for skipping or encounters issues during execution.
|
||||
fn run(&self) -> Result<(), Error> {
|
||||
fn run(self) -> Result<(), Error> {
|
||||
// If the test is marked for skipping, return a Skipped error immediately.
|
||||
if self.skip {
|
||||
return Err(Error::Skipped);
|
||||
@@ -180,12 +180,11 @@ impl Case for BlockchainTestCase {
|
||||
|
||||
// Iterate through test cases, filtering by the network type to exclude specific forks.
|
||||
self.tests
|
||||
.iter()
|
||||
.into_iter()
|
||||
.filter(|(_, case)| !Self::excluded_fork(case.network))
|
||||
.par_bridge_buffered()
|
||||
.try_for_each(|(name, case)| Self::run_single_case(name, case).map(|_| ()))?;
|
||||
|
||||
Ok(())
|
||||
.with_min_len(64)
|
||||
.try_for_each(|(name, case)| Self::run_single_case(&name, &case).map(|_| ()))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -207,7 +206,7 @@ fn run_case(
|
||||
case: &BlockchainTest,
|
||||
) -> Result<Vec<(RecoveredBlock<Block>, ExecutionWitness)>, Error> {
|
||||
// Create a new test database and initialize a provider for the test case.
|
||||
let chain_spec: Arc<ChainSpec> = Arc::new(case.network.into());
|
||||
let chain_spec = case.network.to_chain_spec();
|
||||
let factory = create_test_provider_factory_with_chain_spec(chain_spec.clone());
|
||||
let provider = factory.database_provider_rw().unwrap();
|
||||
|
||||
|
||||
@@ -8,8 +8,13 @@ use alloy_primitives::{keccak256, Address, Bloom, Bytes, B256, B64, U256};
|
||||
use reth_chainspec::{ChainSpec, ChainSpecBuilder, EthereumHardfork, ForkCondition};
|
||||
use reth_db_api::{cursor::DbDupCursorRO, tables, transaction::DbTx};
|
||||
use reth_primitives_traits::SealedHeader;
|
||||
use revm::primitives::HashMap;
|
||||
use serde::Deserialize;
|
||||
use std::{collections::BTreeMap, ops::Deref};
|
||||
use std::{
|
||||
collections::BTreeMap,
|
||||
ops::Deref,
|
||||
sync::{Arc, OnceLock, RwLock},
|
||||
};
|
||||
|
||||
/// The definition of a blockchain test.
|
||||
#[derive(Debug, PartialEq, Eq, Deserialize)]
|
||||
@@ -321,59 +326,72 @@ pub enum ForkSpec {
|
||||
Osaka,
|
||||
}
|
||||
|
||||
impl From<ForkSpec> for ChainSpec {
|
||||
fn from(fork_spec: ForkSpec) -> Self {
|
||||
impl ForkSpec {
|
||||
/// Converts this EF fork spec to a Reth [`ChainSpec`].
|
||||
pub fn to_chain_spec(self) -> Arc<ChainSpec> {
|
||||
static MAP: OnceLock<RwLock<HashMap<ForkSpec, Arc<ChainSpec>>>> = OnceLock::new();
|
||||
let map = MAP.get_or_init(Default::default);
|
||||
if let Some(r) = map.read().unwrap().get(&self) {
|
||||
return r.clone();
|
||||
}
|
||||
map.write()
|
||||
.unwrap()
|
||||
.entry(self)
|
||||
.or_insert_with(|| Arc::new(self.to_chain_spec_inner()))
|
||||
.clone()
|
||||
}
|
||||
|
||||
fn to_chain_spec_inner(self) -> ChainSpec {
|
||||
let spec_builder = ChainSpecBuilder::mainnet().reset();
|
||||
|
||||
match fork_spec {
|
||||
ForkSpec::Frontier => spec_builder.frontier_activated(),
|
||||
ForkSpec::FrontierToHomesteadAt5 => spec_builder
|
||||
match self {
|
||||
Self::Frontier => spec_builder.frontier_activated(),
|
||||
Self::FrontierToHomesteadAt5 => spec_builder
|
||||
.frontier_activated()
|
||||
.with_fork(EthereumHardfork::Homestead, ForkCondition::Block(5)),
|
||||
ForkSpec::Homestead => spec_builder.homestead_activated(),
|
||||
ForkSpec::HomesteadToDaoAt5 => spec_builder
|
||||
Self::Homestead => spec_builder.homestead_activated(),
|
||||
Self::HomesteadToDaoAt5 => spec_builder
|
||||
.homestead_activated()
|
||||
.with_fork(EthereumHardfork::Dao, ForkCondition::Block(5)),
|
||||
ForkSpec::HomesteadToEIP150At5 => spec_builder
|
||||
Self::HomesteadToEIP150At5 => spec_builder
|
||||
.homestead_activated()
|
||||
.with_fork(EthereumHardfork::Tangerine, ForkCondition::Block(5)),
|
||||
ForkSpec::EIP150 => spec_builder.tangerine_whistle_activated(),
|
||||
ForkSpec::EIP158 => spec_builder.spurious_dragon_activated(),
|
||||
ForkSpec::EIP158ToByzantiumAt5 => spec_builder
|
||||
Self::EIP150 => spec_builder.tangerine_whistle_activated(),
|
||||
Self::EIP158 => spec_builder.spurious_dragon_activated(),
|
||||
Self::EIP158ToByzantiumAt5 => spec_builder
|
||||
.spurious_dragon_activated()
|
||||
.with_fork(EthereumHardfork::Byzantium, ForkCondition::Block(5)),
|
||||
ForkSpec::Byzantium => spec_builder.byzantium_activated(),
|
||||
ForkSpec::ByzantiumToConstantinopleAt5 => spec_builder
|
||||
Self::Byzantium => spec_builder.byzantium_activated(),
|
||||
Self::ByzantiumToConstantinopleAt5 => spec_builder
|
||||
.byzantium_activated()
|
||||
.with_fork(EthereumHardfork::Constantinople, ForkCondition::Block(5)),
|
||||
ForkSpec::ByzantiumToConstantinopleFixAt5 => spec_builder
|
||||
Self::ByzantiumToConstantinopleFixAt5 => spec_builder
|
||||
.byzantium_activated()
|
||||
.with_fork(EthereumHardfork::Petersburg, ForkCondition::Block(5)),
|
||||
ForkSpec::Constantinople => spec_builder.constantinople_activated(),
|
||||
ForkSpec::ConstantinopleFix => spec_builder.petersburg_activated(),
|
||||
ForkSpec::Istanbul => spec_builder.istanbul_activated(),
|
||||
ForkSpec::Berlin => spec_builder.berlin_activated(),
|
||||
ForkSpec::BerlinToLondonAt5 => spec_builder
|
||||
Self::Constantinople => spec_builder.constantinople_activated(),
|
||||
Self::ConstantinopleFix => spec_builder.petersburg_activated(),
|
||||
Self::Istanbul => spec_builder.istanbul_activated(),
|
||||
Self::Berlin => spec_builder.berlin_activated(),
|
||||
Self::BerlinToLondonAt5 => spec_builder
|
||||
.berlin_activated()
|
||||
.with_fork(EthereumHardfork::London, ForkCondition::Block(5)),
|
||||
ForkSpec::London => spec_builder.london_activated(),
|
||||
ForkSpec::Merge |
|
||||
ForkSpec::MergeEOF |
|
||||
ForkSpec::MergeMeterInitCode |
|
||||
ForkSpec::MergePush0 => spec_builder.paris_activated(),
|
||||
ForkSpec::ParisToShanghaiAtTime15k => spec_builder
|
||||
Self::London => spec_builder.london_activated(),
|
||||
Self::Merge | Self::MergeEOF | Self::MergeMeterInitCode | Self::MergePush0 => {
|
||||
spec_builder.paris_activated()
|
||||
}
|
||||
Self::ParisToShanghaiAtTime15k => spec_builder
|
||||
.paris_activated()
|
||||
.with_fork(EthereumHardfork::Shanghai, ForkCondition::Timestamp(15_000)),
|
||||
ForkSpec::Shanghai => spec_builder.shanghai_activated(),
|
||||
ForkSpec::ShanghaiToCancunAtTime15k => spec_builder
|
||||
Self::Shanghai => spec_builder.shanghai_activated(),
|
||||
Self::ShanghaiToCancunAtTime15k => spec_builder
|
||||
.shanghai_activated()
|
||||
.with_fork(EthereumHardfork::Cancun, ForkCondition::Timestamp(15_000)),
|
||||
ForkSpec::Cancun => spec_builder.cancun_activated(),
|
||||
ForkSpec::CancunToPragueAtTime15k => spec_builder
|
||||
Self::Cancun => spec_builder.cancun_activated(),
|
||||
Self::CancunToPragueAtTime15k => spec_builder
|
||||
.cancun_activated()
|
||||
.with_fork(EthereumHardfork::Prague, ForkCondition::Timestamp(15_000)),
|
||||
ForkSpec::Prague => spec_builder.prague_activated(),
|
||||
ForkSpec::Osaka => spec_builder.osaka_activated(),
|
||||
Self::Prague => spec_builder.prague_activated(),
|
||||
Self::Osaka => spec_builder.osaka_activated(),
|
||||
}
|
||||
.build()
|
||||
}
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
//! Test results and errors
|
||||
|
||||
use crate::Case;
|
||||
use reth_db::DatabaseError;
|
||||
use reth_ethereum_primitives::Block;
|
||||
use reth_primitives_traits::RecoveredBlock;
|
||||
@@ -93,8 +92,8 @@ pub struct CaseResult {
|
||||
|
||||
impl CaseResult {
|
||||
/// Create a new test result.
|
||||
pub fn new(path: &Path, case: &impl Case, result: Result<(), Error>) -> Self {
|
||||
Self { desc: case.description(), path: path.into(), result }
|
||||
pub fn new(path: &Path, desc: String, result: Result<(), Error>) -> Self {
|
||||
Self { desc, path: path.into(), result }
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -108,5 +108,9 @@ fn eest_fixtures() {
|
||||
.join("execution-spec-tests")
|
||||
.join("blockchain_tests");
|
||||
|
||||
if !suite_path.exists() {
|
||||
return;
|
||||
}
|
||||
|
||||
BlockchainTests::new(suite_path).run();
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user