diff --git a/bin/reth-bench-compare/src/main.rs b/bin/reth-bench-compare/src/main.rs index e866afb250..234a0fb8d3 100644 --- a/bin/reth-bench-compare/src/main.rs +++ b/bin/reth-bench-compare/src/main.rs @@ -14,6 +14,8 @@ #[global_allocator] static ALLOC: reth_cli_util::allocator::Allocator = reth_cli_util::allocator::new_allocator(); +use alloy_primitives as _; + mod benchmark; mod cli; mod comparison; diff --git a/crates/storage/db/src/implementation/mdbx/mod.rs b/crates/storage/db/src/implementation/mdbx/mod.rs index 5a53fa53e6..484b99e517 100644 --- a/crates/storage/db/src/implementation/mdbx/mod.rs +++ b/crates/storage/db/src/implementation/mdbx/mod.rs @@ -146,6 +146,24 @@ impl DatabaseArguments { } } + /// Create database arguments suitable for testing. + /// + /// Uses a small geometry (64MB max, 4MB growth) to avoid exhausting the system's + /// virtual memory map limit (`vm.max_map_count`) when many test databases are open + /// concurrently. + pub fn test() -> Self { + Self { + geometry: Geometry { + size: Some(0..(64 * MEGABYTE)), + growth_step: Some(4 * MEGABYTE as isize), + shrink_threshold: Some(0), + page_size: Some(PageSize::Set(default_page_size())), + }, + max_read_transaction_duration: Some(MaxReadTransactionDuration::Unbounded), + ..Self::new(ClientVersion::default()) + } + } + /// Sets the upper size limit of the db environment, the maximum database size in bytes. pub const fn with_geometry_max_size(mut self, max_size: Option) -> Self { if let Some(max_size) = max_size { diff --git a/crates/storage/db/src/lib.rs b/crates/storage/db/src/lib.rs index be823616ac..160dc7b08d 100644 --- a/crates/storage/db/src/lib.rs +++ b/crates/storage/db/src/lib.rs @@ -43,11 +43,8 @@ pub mod test_utils { use super::*; use crate::mdbx::DatabaseArguments; use parking_lot::RwLock; - use reth_db_api::{ - database::Database, database_metrics::DatabaseMetrics, models::ClientVersion, - }; + use reth_db_api::{database::Database, database_metrics::DatabaseMetrics}; use reth_fs_util; - use reth_libmdbx::MaxReadTransactionDuration; use std::{ fmt::Formatter, path::{Path, PathBuf}, @@ -179,12 +176,7 @@ pub mod test_utils { let path = tempdir_path(); let emsg = format!("{ERROR_DB_CREATION}: {path:?}"); - let db = init_db( - &path, - DatabaseArguments::new(ClientVersion::default()) - .with_max_read_transaction_duration(Some(MaxReadTransactionDuration::Unbounded)), - ) - .expect(&emsg); + let db = init_db(&path, DatabaseArguments::test()).expect(&emsg); Arc::new(TempDatabase::new(db, path)) } @@ -194,12 +186,7 @@ pub mod test_utils { pub fn create_test_rw_db_with_path>(path: P) -> Arc> { let path = path.as_ref().to_path_buf(); let emsg = format!("{ERROR_DB_CREATION}: {path:?}"); - let db = init_db( - path.as_path(), - DatabaseArguments::new(ClientVersion::default()) - .with_max_read_transaction_duration(Some(MaxReadTransactionDuration::Unbounded)), - ) - .expect(&emsg); + let db = init_db(path.as_path(), DatabaseArguments::test()).expect(&emsg); Arc::new(TempDatabase::new(db, path)) } @@ -214,20 +201,14 @@ pub mod test_utils { let datadir = datadir.as_ref().to_path_buf(); let db_path = datadir.join("db"); let emsg = format!("{ERROR_DB_CREATION}: {db_path:?}"); - let db = init_db( - &db_path, - DatabaseArguments::new(ClientVersion::default()) - .with_max_read_transaction_duration(Some(MaxReadTransactionDuration::Unbounded)), - ) - .expect(&emsg); + let db = init_db(&db_path, DatabaseArguments::test()).expect(&emsg); Arc::new(TempDatabase::new(db, datadir)) } /// Create read only database for testing #[track_caller] pub fn create_test_ro_db() -> Arc> { - let args = DatabaseArguments::new(ClientVersion::default()) - .with_max_read_transaction_duration(Some(MaxReadTransactionDuration::Unbounded)); + let args = DatabaseArguments::test(); let path = tempdir_path(); let emsg = format!("{ERROR_DB_CREATION}: {path:?}"); diff --git a/crates/storage/provider/Cargo.toml b/crates/storage/provider/Cargo.toml index 802ad4099d..185872698c 100644 --- a/crates/storage/provider/Cargo.toml +++ b/crates/storage/provider/Cargo.toml @@ -24,6 +24,7 @@ reth-db = { workspace = true, features = ["mdbx"] } reth-db-api.workspace = true reth-prune-types.workspace = true reth-stages-types.workspace = true +reth-tasks.workspace = true reth-trie = { workspace = true, features = ["metrics"] } reth-trie-db = { workspace = true, features = ["metrics"] } reth-nippy-jar.workspace = true @@ -32,7 +33,6 @@ reth-chain-state.workspace = true reth-node-types.workspace = true reth-static-file-types = { workspace = true, features = ["std"] } reth-fs-util.workspace = true -reth-tasks.workspace = true # ethereum alloy-eips.workspace = true diff --git a/crates/storage/provider/src/lib.rs b/crates/storage/provider/src/lib.rs index 6f3f2295f8..3b9b57e475 100644 --- a/crates/storage/provider/src/lib.rs +++ b/crates/storage/provider/src/lib.rs @@ -28,6 +28,9 @@ pub use providers::{ pub mod changeset_walker; pub mod changesets_utils; +mod storage_threadpool; +use storage_threadpool::STORAGE_POOL; + #[cfg(any(test, feature = "test-utils"))] /// Common test helpers for mocking the Provider. pub mod test_utils; diff --git a/crates/storage/provider/src/providers/database/provider.rs b/crates/storage/provider/src/providers/database/provider.rs index 2fd840c015..b5a36cc028 100644 --- a/crates/storage/provider/src/providers/database/provider.rs +++ b/crates/storage/provider/src/providers/database/provider.rs @@ -18,7 +18,7 @@ use crate::{ PruneCheckpointReader, PruneCheckpointWriter, RawRocksDBBatch, RevertsInit, RocksBatchArg, RocksDBProviderFactory, StageCheckpointReader, StateProviderBox, StateWriter, StaticFileProviderFactory, StatsReader, StorageReader, StorageTrieWriter, TransactionVariant, - TransactionsProvider, TransactionsProviderExt, TrieWriter, + TransactionsProvider, TransactionsProviderExt, TrieWriter, STORAGE_POOL, }; use alloy_consensus::{ transaction::{SignerRecoverable, TransactionMeta, TxHashRef}, @@ -64,7 +64,6 @@ use reth_storage_api::{ StorageSettingsCache, TryIntoHistoricalStateProvider, WriteStateInput, }; use reth_storage_errors::provider::{ProviderResult, StaticFileWriterError}; -use reth_tasks::spawn_scoped_os_thread; use reth_trie::{ updates::{StorageTrieUpdatesSorted, TrieUpdatesSorted}, HashedPostStateSorted, StoredNibbles, @@ -79,7 +78,6 @@ use std::{ fmt::Debug, ops::{Deref, DerefMut, Range, RangeBounds, RangeInclusive}, sync::Arc, - thread, time::Instant, }; use tracing::{debug, instrument, trace}; @@ -550,24 +548,37 @@ impl DatabaseProvider(start.elapsed()) + sf_result = Some( + sf_provider + .write_blocks_data(&blocks, &tx_nums, sf_ctx) + .map(|()| start.elapsed()), + ); }); // RocksDB writes #[cfg(all(unix, feature = "rocksdb"))] - let rocksdb_handle = rocksdb_ctx.storage_settings.any_in_rocksdb().then(|| { - spawn_scoped_os_thread(s, "rocksdb", || { + if rocksdb_enabled { + s.spawn(|_| { let start = Instant::now(); - rocksdb_provider.write_blocks_data(&blocks, &tx_nums, rocksdb_ctx)?; - Ok::<_, ProviderError>(start.elapsed()) - }) - }); + rocksdb_result = Some( + rocksdb_provider + .write_blocks_data(&blocks, &tx_nums, rocksdb_ctx) + .map(|()| start.elapsed()), + ); + }); + } // MDBX writes let mdbx_start = Instant::now(); @@ -672,24 +683,27 @@ impl DatabaseProvider(()) + })?; - // Wait for RocksDB thread - #[cfg(all(unix, feature = "rocksdb"))] - if let Some(handle) = rocksdb_handle { - timings.rocksdb = handle.join().expect("RocksDB thread panicked")?; - } + // Collect results from spawned tasks + timings.sf = sf_result.ok_or(StaticFileWriterError::ThreadPanic("static file"))??; - timings.total = total_start.elapsed(); + #[cfg(all(unix, feature = "rocksdb"))] + if rocksdb_enabled { + timings.rocksdb = rocksdb_result.ok_or_else(|| { + ProviderError::Database(reth_db_api::DatabaseError::Other( + "RocksDB thread panicked".into(), + )) + })??; + } - self.metrics.record_save_blocks(&timings); - debug!(target: "providers::db", range = ?first_number..=last_block_number, "Appended block data"); + timings.total = total_start.elapsed(); - Ok(()) - }) + self.metrics.record_save_blocks(&timings); + debug!(target: "providers::db", range = ?first_number..=last_block_number, "Appended block data"); + + Ok(()) } /// Writes MDBX-only data for a block (indices, lookups, and senders if configured for MDBX). diff --git a/crates/storage/provider/src/providers/rocksdb/provider.rs b/crates/storage/provider/src/providers/rocksdb/provider.rs index 40f05d8bb5..c5231cc459 100644 --- a/crates/storage/provider/src/providers/rocksdb/provider.rs +++ b/crates/storage/provider/src/providers/rocksdb/provider.rs @@ -1,5 +1,8 @@ use super::metrics::{RocksDBMetrics, RocksDBOperation, ROCKSDB_TABLES}; -use crate::providers::{compute_history_rank, needs_prev_shard_check, HistoryInfo}; +use crate::{ + providers::{compute_history_rank, needs_prev_shard_check, HistoryInfo}, + STORAGE_POOL, +}; use alloy_consensus::transaction::TxHashRef; use alloy_primitives::{ map::{AddressMap, HashMap}, @@ -24,7 +27,6 @@ use reth_storage_errors::{ db::{DatabaseErrorInfo, DatabaseWriteError, DatabaseWriteOperation, LogLevel}, provider::{ProviderError, ProviderResult}, }; -use reth_tasks::spawn_scoped_os_thread; use rocksdb::{ BlockBasedOptions, Cache, ColumnFamilyDescriptor, CompactionPri, DBCompressionType, DBRawIteratorWithThreadMode, IteratorMode, OptimisticTransactionDB, @@ -36,7 +38,6 @@ use std::{ fmt, path::{Path, PathBuf}, sync::Arc, - thread, time::Instant, }; use tracing::instrument; @@ -1197,41 +1198,58 @@ impl RocksDBProvider { return Ok(()); } - thread::scope(|s| { - let handles: Vec<_> = [ - (ctx.storage_settings.transaction_hash_numbers_in_rocksdb && - ctx.prune_tx_lookup.is_none_or(|m| !m.is_full())) - .then(|| { - spawn_scoped_os_thread(s, "rocksdb-tx-hash", || { - self.write_tx_hash_numbers(blocks, tx_nums, &ctx) - }) - }), - ctx.storage_settings.account_history_in_rocksdb.then(|| { - spawn_scoped_os_thread(s, "rocksdb-account-history", || { - self.write_account_history(blocks, &ctx) - }) - }), - ctx.storage_settings.storages_history_in_rocksdb.then(|| { - spawn_scoped_os_thread(s, "rocksdb-storage-history", || { - self.write_storage_history(blocks, &ctx) - }) - }), - ] - .into_iter() - .enumerate() - .filter_map(|(i, h)| h.map(|h| (i, h))) - .collect(); + let mut r_tx_hash = None; + let mut r_account_history = None; + let mut r_storage_history = None; - for (i, handle) in handles { - handle.join().map_err(|_| { - ProviderError::Database(DatabaseError::Other(format!( - "rocksdb write thread {i} panicked" - ))) - })??; + let write_tx_hash = ctx.storage_settings.transaction_hash_numbers_in_rocksdb && + ctx.prune_tx_lookup.is_none_or(|m| !m.is_full()); + let write_account_history = ctx.storage_settings.account_history_in_rocksdb; + let write_storage_history = ctx.storage_settings.storages_history_in_rocksdb; + + STORAGE_POOL.in_place_scope(|s| { + if write_tx_hash { + s.spawn(|_| { + r_tx_hash = Some(self.write_tx_hash_numbers(blocks, tx_nums, &ctx)); + }); } - Ok(()) - }) + if write_account_history { + s.spawn(|_| { + r_account_history = Some(self.write_account_history(blocks, &ctx)); + }); + } + + if write_storage_history { + s.spawn(|_| { + r_storage_history = Some(self.write_storage_history(blocks, &ctx)); + }); + } + }); + + if write_tx_hash { + r_tx_hash.ok_or_else(|| { + ProviderError::Database(DatabaseError::Other( + "rocksdb tx-hash write thread panicked".into(), + )) + })??; + } + if write_account_history { + r_account_history.ok_or_else(|| { + ProviderError::Database(DatabaseError::Other( + "rocksdb account-history write thread panicked".into(), + )) + })??; + } + if write_storage_history { + r_storage_history.ok_or_else(|| { + ProviderError::Database(DatabaseError::Other( + "rocksdb storage-history write thread panicked".into(), + )) + })??; + } + + Ok(()) } /// Writes transaction hash to number mappings for the given blocks. diff --git a/crates/storage/provider/src/providers/static_file/manager.rs b/crates/storage/provider/src/providers/static_file/manager.rs index 6d4a5500fa..277474e506 100644 --- a/crates/storage/provider/src/providers/static_file/manager.rs +++ b/crates/storage/provider/src/providers/static_file/manager.rs @@ -6,7 +6,7 @@ use crate::{ changeset_walker::{StaticFileAccountChangesetWalker, StaticFileStorageChangesetWalker}, to_range, BlockHashReader, BlockNumReader, BlockReader, BlockSource, EitherWriter, EitherWriterDestination, HeaderProvider, ReceiptProvider, StageCheckpointReader, StatsReader, - TransactionVariant, TransactionsProvider, TransactionsProviderExt, + TransactionVariant, TransactionsProvider, TransactionsProviderExt, STORAGE_POOL, }; use alloy_consensus::{transaction::TransactionMeta, Header}; use alloy_eips::{eip2718::Encodable2718, BlockHashOrNumber}; @@ -47,14 +47,12 @@ use reth_storage_api::{ StorageChangeSetReader, StorageSettingsCache, }; use reth_storage_errors::provider::{ProviderError, ProviderResult, StaticFileWriterError}; -use reth_tasks::spawn_scoped_os_thread; use std::{ collections::BTreeMap, fmt::Debug, ops::{Bound, Deref, Range, RangeBounds, RangeInclusive}, path::{Path, PathBuf}, sync::{atomic::AtomicU64, mpsc, Arc}, - thread, }; use tracing::{debug, info, info_span, instrument, trace, warn}; @@ -656,31 +654,28 @@ impl StaticFileProvider { Ok(()) } - /// Spawns a scoped thread that writes to a static file segment using the provided closure. + /// Writes to a static file segment using the provided closure. /// /// The closure receives a mutable reference to the segment writer. After the closure completes, /// `sync_all()` is called to flush writes to disk. - fn spawn_segment_writer<'scope, 'env, F>( - &'env self, - scope: &'scope thread::Scope<'scope, 'env>, + fn write_segment( + &self, segment: StaticFileSegment, first_block_number: BlockNumber, f: F, - ) -> thread::ScopedJoinHandle<'scope, ProviderResult<()>> + ) -> ProviderResult<()> where - F: FnOnce(&mut StaticFileProviderRWRefMut<'_, N>) -> ProviderResult<()> + Send + 'env, + F: FnOnce(&mut StaticFileProviderRWRefMut<'_, N>) -> ProviderResult<()>, { - spawn_scoped_os_thread(scope, segment.as_short_str(), move || { - let mut w = self.get_writer(first_block_number, segment)?; - f(&mut w)?; - w.sync_all() - }) + let mut w = self.get_writer(first_block_number, segment)?; + f(&mut w)?; + w.sync_all() } /// Writes all static file data for multiple blocks in parallel per-segment. /// - /// This spawns separate threads for each segment type and each thread calls `sync_all()` on its - /// writer when done. + /// This spawns tasks on the storage thread pool for each segment type and each task calls + /// `sync_all()` on its writer when done. #[instrument(level = "debug", target = "providers::static_file", skip_all)] pub fn write_blocks_data( &self, @@ -694,70 +689,87 @@ impl StaticFileProvider { let first_block_number = blocks[0].recovered_block().number(); - thread::scope(|s| { - let h_headers = - self.spawn_segment_writer(s, StaticFileSegment::Headers, first_block_number, |w| { - Self::write_headers(w, blocks) + let mut r_headers = None; + let mut r_txs = None; + let mut r_senders = None; + let mut r_receipts = None; + let mut r_account_changesets = None; + let mut r_storage_changesets = None; + + STORAGE_POOL.in_place_scope(|s| { + s.spawn(|_| { + r_headers = + Some(self.write_segment(StaticFileSegment::Headers, first_block_number, |w| { + Self::write_headers(w, blocks) + })); + }); + + s.spawn(|_| { + r_txs = Some(self.write_segment( + StaticFileSegment::Transactions, + first_block_number, + |w| Self::write_transactions(w, blocks, tx_nums), + )); + }); + + if ctx.write_senders { + s.spawn(|_| { + r_senders = Some(self.write_segment( + StaticFileSegment::TransactionSenders, + first_block_number, + |w| Self::write_transaction_senders(w, blocks, tx_nums), + )); }); - - let h_txs = self.spawn_segment_writer( - s, - StaticFileSegment::Transactions, - first_block_number, - |w| Self::write_transactions(w, blocks, tx_nums), - ); - - let h_senders = ctx.write_senders.then(|| { - self.spawn_segment_writer( - s, - StaticFileSegment::TransactionSenders, - first_block_number, - |w| Self::write_transaction_senders(w, blocks, tx_nums), - ) - }); - - let h_receipts = ctx.write_receipts.then(|| { - self.spawn_segment_writer(s, StaticFileSegment::Receipts, first_block_number, |w| { - Self::write_receipts(w, blocks, tx_nums, &ctx) - }) - }); - - let h_account_changesets = ctx.write_account_changesets.then(|| { - self.spawn_segment_writer( - s, - StaticFileSegment::AccountChangeSets, - first_block_number, - |w| Self::write_account_changesets(w, blocks), - ) - }); - - let h_storage_changesets = ctx.write_storage_changesets.then(|| { - self.spawn_segment_writer( - s, - StaticFileSegment::StorageChangeSets, - first_block_number, - |w| Self::write_storage_changesets(w, blocks), - ) - }); - - h_headers.join().map_err(|_| StaticFileWriterError::ThreadPanic("headers"))??; - h_txs.join().map_err(|_| StaticFileWriterError::ThreadPanic("transactions"))??; - if let Some(h) = h_senders { - h.join().map_err(|_| StaticFileWriterError::ThreadPanic("senders"))??; } - if let Some(h) = h_receipts { - h.join().map_err(|_| StaticFileWriterError::ThreadPanic("receipts"))??; + + if ctx.write_receipts { + s.spawn(|_| { + r_receipts = Some(self.write_segment( + StaticFileSegment::Receipts, + first_block_number, + |w| Self::write_receipts(w, blocks, tx_nums, &ctx), + )); + }); } - if let Some(h) = h_account_changesets { - h.join() - .map_err(|_| StaticFileWriterError::ThreadPanic("account_changesets"))??; + + if ctx.write_account_changesets { + s.spawn(|_| { + r_account_changesets = Some(self.write_segment( + StaticFileSegment::AccountChangeSets, + first_block_number, + |w| Self::write_account_changesets(w, blocks), + )); + }); } - if let Some(h) = h_storage_changesets { - h.join() - .map_err(|_| StaticFileWriterError::ThreadPanic("storage_changesets"))??; + + if ctx.write_storage_changesets { + s.spawn(|_| { + r_storage_changesets = Some(self.write_segment( + StaticFileSegment::StorageChangeSets, + first_block_number, + |w| Self::write_storage_changesets(w, blocks), + )); + }); } - Ok(()) - }) + }); + + r_headers.ok_or(StaticFileWriterError::ThreadPanic("headers"))??; + r_txs.ok_or(StaticFileWriterError::ThreadPanic("transactions"))??; + if ctx.write_senders { + r_senders.ok_or(StaticFileWriterError::ThreadPanic("senders"))??; + } + if ctx.write_receipts { + r_receipts.ok_or(StaticFileWriterError::ThreadPanic("receipts"))??; + } + if ctx.write_account_changesets { + r_account_changesets + .ok_or(StaticFileWriterError::ThreadPanic("account_changesets"))??; + } + if ctx.write_storage_changesets { + r_storage_changesets + .ok_or(StaticFileWriterError::ThreadPanic("storage_changesets"))??; + } + Ok(()) } /// Gets the [`StaticFileJarProvider`] of the requested segment and start index that can be diff --git a/crates/storage/provider/src/storage_threadpool.rs b/crates/storage/provider/src/storage_threadpool.rs new file mode 100644 index 0000000000..e98c16564b --- /dev/null +++ b/crates/storage/provider/src/storage_threadpool.rs @@ -0,0 +1,23 @@ +//! Dedicated thread pool for storage I/O operations. +//! +//! This module provides a static rayon thread pool used for parallel writes to static files, +//! `RocksDB`, and other storage backends during block persistence. + +use rayon::{ThreadPool, ThreadPoolBuilder}; +use std::sync::LazyLock; + +/// Number of threads in the storage I/O thread pool. +const STORAGE_POOL_THREADS: usize = 16; + +/// Static thread pool for storage I/O operations. +/// +/// This pool is used exclusively by [`save_blocks`](crate::DatabaseProvider::save_blocks) to +/// parallelize writes to different storage backends (static files, `RocksDB`). Since this is the +/// only call site, all threads are always available when needed. +pub(crate) static STORAGE_POOL: LazyLock = LazyLock::new(|| { + ThreadPoolBuilder::new() + .num_threads(STORAGE_POOL_THREADS) + .thread_name(|idx| format!("save-blocks-{idx}")) + .build() + .expect("failed to create storage thread pool") +}); diff --git a/testing/ef-tests/src/case.rs b/testing/ef-tests/src/case.rs index 24b74fbd91..fdbe5509bc 100644 --- a/testing/ef-tests/src/case.rs +++ b/testing/ef-tests/src/case.rs @@ -1,7 +1,7 @@ //! Test case definitions use crate::result::{CaseResult, Error}; -use rayon::iter::{IntoParallelRefIterator, ParallelIterator}; +use rayon::prelude::*; use std::{ fmt::Debug, path::{Path, PathBuf}, @@ -10,7 +10,7 @@ use std::{ /// A single test case, capable of loading a JSON description of itself and running it. /// /// See for test specs. -pub trait Case: Debug + Sync + Sized { +pub trait Case: Debug + Send + Sync + Sized + 'static { /// A description of the test. fn description(&self) -> String { "no description".to_string() @@ -22,7 +22,7 @@ pub trait Case: Debug + Sync + Sized { fn load(path: &Path) -> Result; /// Run the test. - fn run(&self) -> Result<(), Error>; + fn run(self) -> Result<(), Error>; } /// A container for multiple test cases. @@ -34,10 +34,10 @@ pub struct Cases { impl Cases { /// Run the contained test cases. - pub fn run(&self) -> Vec { + pub fn run(self) -> Vec { self.test_cases - .par_iter() - .map(|(path, case)| CaseResult::new(path, case, case.run())) + .into_par_iter() + .map(|(path, case)| CaseResult::new(&path, case.description(), case.run())) .collect() } } diff --git a/testing/ef-tests/src/cases/blockchain_test.rs b/testing/ef-tests/src/cases/blockchain_test.rs index 1d7d040aed..c4bfbc34b4 100644 --- a/testing/ef-tests/src/cases/blockchain_test.rs +++ b/testing/ef-tests/src/cases/blockchain_test.rs @@ -5,7 +5,7 @@ use crate::{ Case, Error, Suite, }; use alloy_rlp::{Decodable, Encodable}; -use rayon::iter::ParallelIterator; +use rayon::iter::{IndexedParallelIterator, ParallelIterator}; use reth_chainspec::ChainSpec; use reth_consensus::{Consensus, HeaderValidator}; use reth_db_common::init::{insert_genesis_hashes, insert_genesis_history, insert_genesis_state}; @@ -172,7 +172,7 @@ impl Case for BlockchainTestCase { /// /// # Errors /// Returns an error if the test is flagged for skipping or encounters issues during execution. - fn run(&self) -> Result<(), Error> { + fn run(self) -> Result<(), Error> { // If the test is marked for skipping, return a Skipped error immediately. if self.skip { return Err(Error::Skipped); @@ -180,12 +180,11 @@ impl Case for BlockchainTestCase { // Iterate through test cases, filtering by the network type to exclude specific forks. self.tests - .iter() + .into_iter() .filter(|(_, case)| !Self::excluded_fork(case.network)) .par_bridge_buffered() - .try_for_each(|(name, case)| Self::run_single_case(name, case).map(|_| ()))?; - - Ok(()) + .with_min_len(64) + .try_for_each(|(name, case)| Self::run_single_case(&name, &case).map(|_| ())) } } @@ -207,7 +206,7 @@ fn run_case( case: &BlockchainTest, ) -> Result, ExecutionWitness)>, Error> { // Create a new test database and initialize a provider for the test case. - let chain_spec: Arc = Arc::new(case.network.into()); + let chain_spec = case.network.to_chain_spec(); let factory = create_test_provider_factory_with_chain_spec(chain_spec.clone()); let provider = factory.database_provider_rw().unwrap(); diff --git a/testing/ef-tests/src/models.rs b/testing/ef-tests/src/models.rs index bc9af3fab1..d2ac0a5d93 100644 --- a/testing/ef-tests/src/models.rs +++ b/testing/ef-tests/src/models.rs @@ -8,8 +8,13 @@ use alloy_primitives::{keccak256, Address, Bloom, Bytes, B256, B64, U256}; use reth_chainspec::{ChainSpec, ChainSpecBuilder, EthereumHardfork, ForkCondition}; use reth_db_api::{cursor::DbDupCursorRO, tables, transaction::DbTx}; use reth_primitives_traits::SealedHeader; +use revm::primitives::HashMap; use serde::Deserialize; -use std::{collections::BTreeMap, ops::Deref}; +use std::{ + collections::BTreeMap, + ops::Deref, + sync::{Arc, OnceLock, RwLock}, +}; /// The definition of a blockchain test. #[derive(Debug, PartialEq, Eq, Deserialize)] @@ -321,59 +326,72 @@ pub enum ForkSpec { Osaka, } -impl From for ChainSpec { - fn from(fork_spec: ForkSpec) -> Self { +impl ForkSpec { + /// Converts this EF fork spec to a Reth [`ChainSpec`]. + pub fn to_chain_spec(self) -> Arc { + static MAP: OnceLock>>> = OnceLock::new(); + let map = MAP.get_or_init(Default::default); + if let Some(r) = map.read().unwrap().get(&self) { + return r.clone(); + } + map.write() + .unwrap() + .entry(self) + .or_insert_with(|| Arc::new(self.to_chain_spec_inner())) + .clone() + } + + fn to_chain_spec_inner(self) -> ChainSpec { let spec_builder = ChainSpecBuilder::mainnet().reset(); - match fork_spec { - ForkSpec::Frontier => spec_builder.frontier_activated(), - ForkSpec::FrontierToHomesteadAt5 => spec_builder + match self { + Self::Frontier => spec_builder.frontier_activated(), + Self::FrontierToHomesteadAt5 => spec_builder .frontier_activated() .with_fork(EthereumHardfork::Homestead, ForkCondition::Block(5)), - ForkSpec::Homestead => spec_builder.homestead_activated(), - ForkSpec::HomesteadToDaoAt5 => spec_builder + Self::Homestead => spec_builder.homestead_activated(), + Self::HomesteadToDaoAt5 => spec_builder .homestead_activated() .with_fork(EthereumHardfork::Dao, ForkCondition::Block(5)), - ForkSpec::HomesteadToEIP150At5 => spec_builder + Self::HomesteadToEIP150At5 => spec_builder .homestead_activated() .with_fork(EthereumHardfork::Tangerine, ForkCondition::Block(5)), - ForkSpec::EIP150 => spec_builder.tangerine_whistle_activated(), - ForkSpec::EIP158 => spec_builder.spurious_dragon_activated(), - ForkSpec::EIP158ToByzantiumAt5 => spec_builder + Self::EIP150 => spec_builder.tangerine_whistle_activated(), + Self::EIP158 => spec_builder.spurious_dragon_activated(), + Self::EIP158ToByzantiumAt5 => spec_builder .spurious_dragon_activated() .with_fork(EthereumHardfork::Byzantium, ForkCondition::Block(5)), - ForkSpec::Byzantium => spec_builder.byzantium_activated(), - ForkSpec::ByzantiumToConstantinopleAt5 => spec_builder + Self::Byzantium => spec_builder.byzantium_activated(), + Self::ByzantiumToConstantinopleAt5 => spec_builder .byzantium_activated() .with_fork(EthereumHardfork::Constantinople, ForkCondition::Block(5)), - ForkSpec::ByzantiumToConstantinopleFixAt5 => spec_builder + Self::ByzantiumToConstantinopleFixAt5 => spec_builder .byzantium_activated() .with_fork(EthereumHardfork::Petersburg, ForkCondition::Block(5)), - ForkSpec::Constantinople => spec_builder.constantinople_activated(), - ForkSpec::ConstantinopleFix => spec_builder.petersburg_activated(), - ForkSpec::Istanbul => spec_builder.istanbul_activated(), - ForkSpec::Berlin => spec_builder.berlin_activated(), - ForkSpec::BerlinToLondonAt5 => spec_builder + Self::Constantinople => spec_builder.constantinople_activated(), + Self::ConstantinopleFix => spec_builder.petersburg_activated(), + Self::Istanbul => spec_builder.istanbul_activated(), + Self::Berlin => spec_builder.berlin_activated(), + Self::BerlinToLondonAt5 => spec_builder .berlin_activated() .with_fork(EthereumHardfork::London, ForkCondition::Block(5)), - ForkSpec::London => spec_builder.london_activated(), - ForkSpec::Merge | - ForkSpec::MergeEOF | - ForkSpec::MergeMeterInitCode | - ForkSpec::MergePush0 => spec_builder.paris_activated(), - ForkSpec::ParisToShanghaiAtTime15k => spec_builder + Self::London => spec_builder.london_activated(), + Self::Merge | Self::MergeEOF | Self::MergeMeterInitCode | Self::MergePush0 => { + spec_builder.paris_activated() + } + Self::ParisToShanghaiAtTime15k => spec_builder .paris_activated() .with_fork(EthereumHardfork::Shanghai, ForkCondition::Timestamp(15_000)), - ForkSpec::Shanghai => spec_builder.shanghai_activated(), - ForkSpec::ShanghaiToCancunAtTime15k => spec_builder + Self::Shanghai => spec_builder.shanghai_activated(), + Self::ShanghaiToCancunAtTime15k => spec_builder .shanghai_activated() .with_fork(EthereumHardfork::Cancun, ForkCondition::Timestamp(15_000)), - ForkSpec::Cancun => spec_builder.cancun_activated(), - ForkSpec::CancunToPragueAtTime15k => spec_builder + Self::Cancun => spec_builder.cancun_activated(), + Self::CancunToPragueAtTime15k => spec_builder .cancun_activated() .with_fork(EthereumHardfork::Prague, ForkCondition::Timestamp(15_000)), - ForkSpec::Prague => spec_builder.prague_activated(), - ForkSpec::Osaka => spec_builder.osaka_activated(), + Self::Prague => spec_builder.prague_activated(), + Self::Osaka => spec_builder.osaka_activated(), } .build() } diff --git a/testing/ef-tests/src/result.rs b/testing/ef-tests/src/result.rs index 481d1fe770..2d7c923997 100644 --- a/testing/ef-tests/src/result.rs +++ b/testing/ef-tests/src/result.rs @@ -1,6 +1,5 @@ //! Test results and errors -use crate::Case; use reth_db::DatabaseError; use reth_ethereum_primitives::Block; use reth_primitives_traits::RecoveredBlock; @@ -93,8 +92,8 @@ pub struct CaseResult { impl CaseResult { /// Create a new test result. - pub fn new(path: &Path, case: &impl Case, result: Result<(), Error>) -> Self { - Self { desc: case.description(), path: path.into(), result } + pub fn new(path: &Path, desc: String, result: Result<(), Error>) -> Self { + Self { desc, path: path.into(), result } } } diff --git a/testing/ef-tests/tests/tests.rs b/testing/ef-tests/tests/tests.rs index 2728246901..5a73d31cce 100644 --- a/testing/ef-tests/tests/tests.rs +++ b/testing/ef-tests/tests/tests.rs @@ -108,5 +108,9 @@ fn eest_fixtures() { .join("execution-spec-tests") .join("blockchain_tests"); + if !suite_path.exists() { + return; + } + BlockchainTests::new(suite_path).run(); }