Compare commits

..

74 Commits

Author SHA1 Message Date
Matthias Seitz
1f93ee97bb chore: touchups 2025-12-16 12:17:16 +01:00
Matthias Seitz
3adac16571 Merge branch 'main' into eth70 2025-12-16 11:31:13 +01:00
Matthias Seitz
bbd51862d4 chore: rm flaky bench (#20413) 2025-12-16 09:35:38 +00:00
Arsenii Kulikov
08a16a5bde perf: recover transactions in parallel during network import (#20385) 2025-12-16 09:33:24 +00:00
Snezhkko
f2c39db7a2 chore(rpc): fix misleading link and comment (#20367) 2025-12-16 09:32:25 +00:00
oooLowNeoNooo
ae9e84d6e3 fix(discv4): correct ping_interval default value in docs (#20396) 2025-12-16 09:29:45 +00:00
theo
c51da593d1 feat(net/p2p): support fixed external addresses with DNS resolution (#20411) 2025-12-16 09:28:31 +00:00
Matthias Seitz
0e08f9f56c perf: remove unnecessary channels from parallel trie operations (#20406) 2025-12-16 09:15:27 +00:00
sashass1315
7eef092110 docs(exex): sync hello-world notifications loop with code (#20403) 2025-12-16 08:39:45 +00:00
YK
40e8241bf5 feat(storage): use RocksDBBatch in EitherWriter and related modules (#20377) 2025-12-16 03:57:41 +00:00
dependabot[bot]
dd9ff731e4 chore(deps): bump peter-evans/create-pull-request from 7 to 8 (#20402)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2025-12-16 00:11:22 +00:00
dependabot[bot]
83f9d1837f chore(deps): bump actions/download-artifact from 4 to 7 (#20401)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2025-12-16 00:11:00 +00:00
dependabot[bot]
68911e617b chore(deps): bump actions/upload-artifact from 5 to 6 (#20400)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2025-12-16 00:10:40 +00:00
0xcharry
36ba6db029 chore: remove redundant .as_str() calls after to_string() (#20404) 2025-12-16 00:10:03 +00:00
Matthias Seitz
fec4432d82 perf: defer transaction pool notifications until after lock release (#20405) 2025-12-15 23:06:34 +00:00
Matthias Seitz
179da26305 perf: use RwLock for transaction pool listeners (#20398)
Co-authored-by: Arsenii Kulikov <klkvrr@gmail.com>
2025-12-15 21:47:59 +00:00
Matthias Seitz
b5e7a694d2 chore: update metric once (#20371) 2025-12-15 20:38:24 +00:00
Maxim Evtush
9489667814 fix: post-state generator to include deletions in proptest (#20276) 2025-12-15 16:43:02 +00:00
gustavo
004877ba59 refactor(cli): cleanup repair-trie metrics (#20226) 2025-12-15 16:41:48 +00:00
Brian Picciano
a9e36923e1 feat(trie): Proof Rewrite: Use cached branch nodes (#20075)
Co-authored-by: YK <chiayongkang@hotmail.com>
Co-authored-by: Alexey Shekhirin <5773434+shekhirin@users.noreply.github.com>
2025-12-15 15:27:04 +00:00
DaniPopes
74a3816611 ci: reduce feature powerset depth (#20379) 2025-12-15 14:42:14 +00:00
Alexey Shekhirin
5576d4547f revert: feat(engine): run sync state root if not enough parallelism (#20127) (#20378) 2025-12-15 14:05:54 +00:00
DaniPopes
21216e2f24 perf: use indexed parallel iterators for tx recovery (#20342) 2025-12-15 13:40:03 +00:00
YK
42c1e1afe1 feat(storage): add account history constructors to EitherWriter/EitherReader (#20366) 2025-12-15 12:45:07 +00:00
MoNyAvA
5f7e87fa2a docs: add blob sub-pool to tx pool docs (#20375) 2025-12-15 12:27:54 +00:00
Matthias Seitz
1b417dacc4 chore: sanity check for u64::Max (#20373) 2025-12-15 11:33:50 +00:00
Niven
bb952be5b5 feat(flashblocks): support eth_getBlockTransactionCount for flashblocks (#20291)
Co-authored-by: lucas <66681646+limyeechern@users.noreply.github.com>
Co-authored-by: lucas.lim <lucas.lim@okg.com>
2025-12-15 11:29:23 +00:00
Federico Magnani
f927eec880 chore: export FlashBlockDecoder (#20370) 2025-12-15 11:00:46 +00:00
Tomass
9c61f5568c fix(rpc-testing-util): use buffer_unordered in trace_block_opcode_gas_unordered (#20369) 2025-12-15 10:38:40 +00:00
ligt
662c0486a1 feat(storage): add rocksdb provider into database provider (#20253) 2025-12-15 10:15:57 +00:00
Matthias Seitz
997848c2a1 fix(txpool): remove stale senderinfo (#20368) 2025-12-15 10:00:25 +00:00
Olexandr88
155bdecf3b docs(repo): add Ethereum-specific crates section (#20363) 2025-12-15 09:56:40 +00:00
Karl
bafe9943fd let eth/70 payload deal with manual rlp 2025-12-14 12:51:47 +08:00
Karl
ac21a606a0 remove 2025-12-14 12:21:57 +08:00
Karl
8f940c2d69 remove unnessary change and remove statusMessage for eth/70 2025-12-14 12:15:47 +08:00
Karl
1af8a03ab6 Delegating constraints to the method level 2025-12-12 11:06:38 +08:00
Karl
3ab7774c5e restore unnessary session changes 2025-12-11 22:44:13 +08:00
Karl
912ce282dd cargo fmt 2025-12-11 22:26:11 +08:00
Karl
6a4e5eb409 remove unnessary import 2025-12-11 22:24:11 +08:00
Karl
fd3b778f04 restore unnessary comment change 2025-12-11 22:13:10 +08:00
Karl
5db0e519ca restore max 2025-12-11 22:11:33 +08:00
Karl
fc8931c30e remove unnessary macro 2025-12-11 22:10:06 +08:00
Karl
72bfdfe6ca remove unnessary macro 2025-12-11 22:06:02 +08:00
Karl
b6d0e04a16 cargo fmt 2025-12-11 21:27:22 +08:00
Karl
35d214f616 fix clippy 2025-12-11 21:25:33 +08:00
Karl
ed093cd5a0 clean up 2025-12-11 20:46:59 +08:00
Karl
04d22f6469 share eth/69 status with eth/70 2025-12-11 20:40:35 +08:00
Karl
26db968153 remove blockRangeUpdate from eth/70 2025-12-11 20:18:03 +08:00
Karl
d2420598b0 apply mattese suggestions 2025-12-11 19:50:27 +08:00
Karl
0b4d697ef3 add manual encode for Receipts70 2025-12-11 16:30:11 +08:00
Karl
f257508554 remove unnessary last_range_update 2025-12-11 15:22:03 +08:00
Karl
c6628ae3a4 fix test error 2025-12-11 11:41:04 +08:00
Karl
3f2f5baaca fix cargo doc 2025-12-11 11:30:55 +08:00
Karl
d44a500750 remove eth/70 block_range 2025-12-11 11:07:32 +08:00
Karl
b7b3f327fc address mattse suggestions 2025-12-11 10:58:59 +08:00
Karl
2c70f5157f remove block-range 2025-12-10 22:20:12 +08:00
Karl
6d364b1379 Reuse Receipts69 bloom helper for Receipts70 2025-12-10 15:59:47 +08:00
Karl
3b67d78e24 Add server-side support for eth/70 receipts pagination 2025-12-10 14:24:03 +08:00
Karl
b5ccdb89ac remove unused import 2025-12-09 22:45:44 +08:00
Karl
3e4e764b5a fix cargo doc 2025-12-09 22:30:37 +08:00
Karl
39c725e66c add eth/70 GetReceipts/Receipts message types and session wiring 2025-12-09 21:21:58 +08:00
Karl
19bdc02e57 chore: rerun CI 2025-12-07 12:04:20 +08:00
Karl
c68bd5036c simplify maybe_request_block_range and add comments 2025-12-06 23:02:14 +08:00
Karl
f49b9542f5 Throttle range requests using interval ticks 2025-12-06 22:36:29 +08:00
Karl
04824d304b cargo fmt 2025-12-06 22:21:10 +08:00
Karl
0d38ba9fc5 optimize instant related code and remove test for now 2025-12-06 22:19:20 +08:00
Karl
63ac93a431 instantiate based on message variant 2025-12-06 21:58:35 +08:00
Karl
90492aa3cf keep eth/69 once enable full support 2025-12-06 21:47:34 +08:00
Karl
dd7a10ba4a cargo fmt 2025-12-05 18:19:55 +08:00
Karl
14453ccca5 add more comments to undertsand precisely 2025-12-05 18:09:56 +08:00
Karl
13e9c00ae9 touchups 2025-12-05 17:57:08 +08:00
Karl
3b129f5a34 fix tests 2025-12-05 17:51:34 +08:00
Karl
70480acd66 add comments 2025-12-05 17:46:11 +08:00
Karl
7856d256e6 Add eth/70 protocol support and block range messaging 2025-12-05 15:53:03 +08:00
138 changed files with 2625 additions and 1050 deletions

View File

@@ -67,7 +67,7 @@ jobs:
chmod +x hive
- name: Upload hive assets
uses: actions/upload-artifact@v5
uses: actions/upload-artifact@v6
with:
name: hive_assets
path: ./hive_assets
@@ -187,13 +187,13 @@ jobs:
fetch-depth: 0
- name: Download hive assets
uses: actions/download-artifact@v6
uses: actions/download-artifact@v7
with:
name: hive_assets
path: /tmp
- name: Download reth image
uses: actions/download-artifact@v6
uses: actions/download-artifact@v7
with:
name: artifacts
path: /tmp

View File

@@ -41,7 +41,7 @@ jobs:
fetch-depth: 0
- name: Download reth image
uses: actions/download-artifact@v6
uses: actions/download-artifact@v7
with:
name: artifacts
path: /tmp

View File

@@ -39,7 +39,7 @@ jobs:
fetch-depth: 0
- name: Download reth image
uses: actions/download-artifact@v6
uses: actions/download-artifact@v7
with:
name: artifacts
path: /tmp

View File

@@ -245,12 +245,8 @@ jobs:
# Checks that selected crates can compile with power set of features
features:
name: features (${{ matrix.partition }}/${{ matrix.total_partitions }})
name: features
runs-on: depot-ubuntu-latest
strategy:
matrix:
partition: [1, 2]
total_partitions: [2]
timeout-minutes: 30
steps:
- uses: actions/checkout@v6
@@ -268,7 +264,7 @@ jobs:
--package reth-primitives-traits \
--package reth-primitives \
--feature-powerset \
--partition ${{ matrix.partition }}/${{ matrix.total_partitions }}
--depth 2
env:
RUSTFLAGS: -D warnings

View File

@@ -50,7 +50,7 @@ jobs:
- name: Upload reth image
id: upload
uses: actions/upload-artifact@v5
uses: actions/upload-artifact@v6
with:
name: artifacts
path: ./artifacts

View File

@@ -144,14 +144,14 @@ jobs:
- name: Upload artifact
if: ${{ github.event.inputs.dry_run != 'true' }}
uses: actions/upload-artifact@v5
uses: actions/upload-artifact@v6
with:
name: ${{ matrix.build.binary }}-${{ needs.extract-version.outputs.VERSION }}-${{ matrix.configs.target }}.tar.gz
path: ${{ matrix.build.binary }}-${{ needs.extract-version.outputs.VERSION }}-${{ matrix.configs.target }}.tar.gz
- name: Upload signature
if: ${{ github.event.inputs.dry_run != 'true' }}
uses: actions/upload-artifact@v5
uses: actions/upload-artifact@v6
with:
name: ${{ matrix.build.binary }}-${{ needs.extract-version.outputs.VERSION }}-${{ matrix.configs.target }}.tar.gz.asc
path: ${{ matrix.build.binary }}-${{ needs.extract-version.outputs.VERSION }}-${{ matrix.configs.target }}.tar.gz.asc
@@ -173,7 +173,7 @@ jobs:
with:
fetch-depth: 0
- name: Download artifacts
uses: actions/download-artifact@v6
uses: actions/download-artifact@v7
- name: Generate full changelog
id: changelog
run: |

View File

@@ -42,7 +42,7 @@ jobs:
echo "Binaries SHA256 on ${{ matrix.machine }}: $(cat checksum.sha256)"
- name: Upload the hash
uses: actions/upload-artifact@v5
uses: actions/upload-artifact@v6
with:
name: checksum-${{ matrix.machine }}
path: |
@@ -55,12 +55,12 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Download artifacts from machine-1
uses: actions/download-artifact@v4
uses: actions/download-artifact@v7
with:
name: checksum-machine-1
path: machine-1/
- name: Download artifacts from machine-2
uses: actions/download-artifact@v4
uses: actions/download-artifact@v7
with:
name: checksum-machine-2
path: machine-2/

View File

@@ -27,7 +27,7 @@ jobs:
./fetch_superchain_config.sh
- name: Create Pull Request
uses: peter-evans/create-pull-request@v7
uses: peter-evans/create-pull-request@v8
with:
commit-message: "chore: update superchain config"
title: "chore: update superchain config"

3
Cargo.lock generated
View File

@@ -8639,6 +8639,7 @@ dependencies = [
"derive_more",
"futures-util",
"metrics",
"rayon",
"reth-ethereum-forks",
"reth-ethereum-primitives",
"reth-execution-errors",
@@ -8944,6 +8945,7 @@ dependencies = [
"pin-project",
"rand 0.8.5",
"rand 0.9.2",
"rayon",
"reth-chainspec",
"reth-consensus",
"reth-discv4",
@@ -10894,6 +10896,7 @@ dependencies = [
"pretty_assertions",
"proptest",
"proptest-arbitrary-interop",
"rand 0.9.2",
"reth-ethereum-primitives",
"reth-execution-errors",
"reth-metrics",

View File

@@ -521,5 +521,3 @@ pr:
make update-book-cli && \
cargo docs --document-private-items && \
make test
check-features:

View File

@@ -23,7 +23,7 @@ use reth_node_core::{
dirs::{ChainPath, DataDirPath},
};
use reth_provider::{
providers::{BlockchainProvider, NodeTypesForProvider, StaticFileProvider},
providers::{BlockchainProvider, NodeTypesForProvider, RocksDBProvider, StaticFileProvider},
ProviderFactory, StaticFileProviderFactory,
};
use reth_stages::{sets::DefaultStages, Pipeline, PipelineTarget};
@@ -75,10 +75,12 @@ impl<C: ChainSpecParser> EnvironmentArgs<C> {
let data_dir = self.datadir.clone().resolve_datadir(self.chain.chain());
let db_path = data_dir.db();
let sf_path = data_dir.static_files();
let rocksdb_path = data_dir.rocksdb();
if access.is_read_write() {
reth_fs_util::create_dir_all(&db_path)?;
reth_fs_util::create_dir_all(&sf_path)?;
reth_fs_util::create_dir_all(&rocksdb_path)?;
}
let config_path = self.config.clone().unwrap_or_else(|| data_dir.config());
@@ -108,8 +110,13 @@ impl<C: ChainSpecParser> EnvironmentArgs<C> {
StaticFileProvider::read_only(sf_path, false)?,
),
};
// TransactionDB only support read-write mode
let rocksdb_provider = RocksDBProvider::builder(data_dir.rocksdb())
.with_database_log_level(self.db.log_level)
.build()?;
let provider_factory = self.create_provider_factory(&config, db, sfp, access)?;
let provider_factory =
self.create_provider_factory(&config, db, sfp, rocksdb_provider, access)?;
if access.is_read_write() {
debug!(target: "reth::cli", chain=%self.chain.chain(), genesis=?self.chain.genesis_hash(), "Initializing genesis");
init_genesis_with_settings(&provider_factory, self.static_files.to_settings())?;
@@ -128,6 +135,7 @@ impl<C: ChainSpecParser> EnvironmentArgs<C> {
config: &Config,
db: Arc<DatabaseEnv>,
static_file_provider: StaticFileProvider<N::Primitives>,
rocksdb_provider: RocksDBProvider,
access: AccessRights,
) -> eyre::Result<ProviderFactory<NodeTypesWithDBAdapter<N, Arc<DatabaseEnv>>>>
where
@@ -138,6 +146,7 @@ impl<C: ChainSpecParser> EnvironmentArgs<C> {
db,
self.chain.clone(),
static_file_provider,
rocksdb_provider,
)?
.with_prune_modes(prune_modes.clone());

View File

@@ -2,6 +2,7 @@ use crate::common::{AccessRights, CliNodeTypes, Environment, EnvironmentArgs};
use clap::{Parser, Subcommand};
use reth_chainspec::{EthChainSpec, EthereumHardforks};
use reth_cli::chainspec::ChainSpecParser;
use reth_cli_runner::CliContext;
use reth_db::version::{get_db_version, DatabaseVersionError, DB_VERSION};
use reth_db_common::DbTool;
use std::{
@@ -79,7 +80,10 @@ macro_rules! db_exec {
impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> Command<C> {
/// Execute `db` command
pub async fn execute<N: CliNodeTypes<ChainSpec = C::ChainSpec>>(self) -> eyre::Result<()> {
pub async fn execute<N: CliNodeTypes<ChainSpec = C::ChainSpec>>(
self,
ctx: CliContext,
) -> eyre::Result<()> {
let data_dir = self.env.datadir.clone().resolve_datadir(self.env.chain.chain());
let db_path = data_dir.db();
let static_files_path = data_dir.static_files();
@@ -158,7 +162,7 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> Command<C>
let access_rights =
if command.dry_run { AccessRights::RO } else { AccessRights::RW };
db_exec!(self.env, tool, N, access_rights, {
command.execute(&tool)?;
command.execute(&tool, ctx.task_executor.clone())?;
});
}
Subcommands::StaticFileHeader(command) => {

View File

@@ -18,6 +18,7 @@ use reth_node_metrics::{
};
use reth_provider::{providers::ProviderNodeTypes, ChainSpecProvider, StageCheckpointReader};
use reth_stages::StageId;
use reth_tasks::TaskExecutor;
use reth_trie::{
verify::{Output, Verifier},
Nibbles,
@@ -48,52 +49,37 @@ pub struct Command {
impl Command {
/// Execute `db repair-trie` command
pub fn execute<N: ProviderNodeTypes>(self, tool: &DbTool<N>) -> eyre::Result<()> {
pub fn execute<N: ProviderNodeTypes>(
self,
tool: &DbTool<N>,
task_executor: TaskExecutor,
) -> eyre::Result<()> {
// Set up metrics server if requested
let _metrics_handle = if let Some(listen_addr) = self.metrics {
// Spawn an OS thread with a single-threaded tokio runtime for the metrics server
let chain_name = tool.provider_factory.chain_spec().chain().to_string();
let executor = task_executor.clone();
let handle = std::thread::Builder::new().name("metrics-server".to_string()).spawn(
move || {
// Create a single-threaded tokio runtime
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("Failed to create tokio runtime for metrics server");
let handle = task_executor.spawn_critical("metrics server", async move {
let config = MetricServerConfig::new(
listen_addr,
VersionInfo {
version: version_metadata().cargo_pkg_version.as_ref(),
build_timestamp: version_metadata().vergen_build_timestamp.as_ref(),
cargo_features: version_metadata().vergen_cargo_features.as_ref(),
git_sha: version_metadata().vergen_git_sha.as_ref(),
target_triple: version_metadata().vergen_cargo_target_triple.as_ref(),
build_profile: version_metadata().build_profile_name.as_ref(),
},
ChainSpecInfo { name: chain_name },
executor,
Hooks::builder().build(),
);
let handle = runtime.handle().clone();
runtime.block_on(async move {
let task_manager = reth_tasks::TaskManager::new(handle.clone());
let task_executor = task_manager.executor();
let config = MetricServerConfig::new(
listen_addr,
VersionInfo {
version: version_metadata().cargo_pkg_version.as_ref(),
build_timestamp: version_metadata().vergen_build_timestamp.as_ref(),
cargo_features: version_metadata().vergen_cargo_features.as_ref(),
git_sha: version_metadata().vergen_git_sha.as_ref(),
target_triple: version_metadata()
.vergen_cargo_target_triple
.as_ref(),
build_profile: version_metadata().build_profile_name.as_ref(),
},
ChainSpecInfo { name: chain_name },
task_executor,
Hooks::builder().build(),
);
// Spawn the metrics server
if let Err(e) = MetricServer::new(config).serve().await {
tracing::error!("Metrics server error: {}", e);
}
// Block forever to keep the runtime alive
std::future::pending::<()>().await
});
},
)?;
// Spawn the metrics server
if let Err(e) = MetricServer::new(config).serve().await {
tracing::error!("Metrics server error: {}", e);
}
});
Some(handle)
} else {

View File

@@ -189,7 +189,7 @@ impl<C: ChainSpecParser> DownloadArgs<C> {
let net = NetworkConfigBuilder::<N::NetworkPrimitives>::new(p2p_secret_key)
.peer_config(config.peers_config_with_basic_nodes_from_file(None))
.external_ip_resolver(self.network.nat)
.external_ip_resolver(self.network.nat.clone())
.network_id(self.network.network_id)
.boot_nodes(boot_nodes.clone())
.apply(|builder| {

View File

@@ -9,7 +9,7 @@ use reth_evm::ConfigureEvm;
use reth_node_builder::NodeTypesWithDB;
use reth_node_core::dirs::{ChainPath, DataDirPath};
use reth_provider::{
providers::{ProviderNodeTypes, StaticFileProvider},
providers::{ProviderNodeTypes, RocksDBProvider, StaticFileProvider},
DatabaseProviderFactory, ProviderFactory,
};
use reth_stages::{stages::ExecutionStage, Stage, StageCheckpoint, UnwindInput};
@@ -42,6 +42,7 @@ where
Arc::new(output_db),
db_tool.chain(),
StaticFileProvider::read_write(output_datadir.static_files())?,
RocksDBProvider::builder(output_datadir.rocksdb()).build()?,
)?,
to,
from,

View File

@@ -6,7 +6,7 @@ use reth_db_api::{database::Database, table::TableImporter, tables};
use reth_db_common::DbTool;
use reth_node_core::dirs::{ChainPath, DataDirPath};
use reth_provider::{
providers::{ProviderNodeTypes, StaticFileProvider},
providers::{ProviderNodeTypes, RocksDBProvider, StaticFileProvider},
DatabaseProviderFactory, ProviderFactory,
};
use reth_stages::{stages::AccountHashingStage, Stage, StageCheckpoint, UnwindInput};
@@ -39,6 +39,7 @@ pub(crate) async fn dump_hashing_account_stage<N: ProviderNodeTypes<DB = Arc<Dat
Arc::new(output_db),
db_tool.chain(),
StaticFileProvider::read_write(output_datadir.static_files())?,
RocksDBProvider::builder(output_datadir.rocksdb()).build()?,
)?,
to,
from,

View File

@@ -5,7 +5,7 @@ use reth_db_api::{database::Database, table::TableImporter, tables};
use reth_db_common::DbTool;
use reth_node_core::dirs::{ChainPath, DataDirPath};
use reth_provider::{
providers::{ProviderNodeTypes, StaticFileProvider},
providers::{ProviderNodeTypes, RocksDBProvider, StaticFileProvider},
DatabaseProviderFactory, ProviderFactory,
};
use reth_stages::{stages::StorageHashingStage, Stage, StageCheckpoint, UnwindInput};
@@ -29,6 +29,7 @@ pub(crate) async fn dump_hashing_storage_stage<N: ProviderNodeTypes<DB = Arc<Dat
Arc::new(output_db),
db_tool.chain(),
StaticFileProvider::read_write(output_datadir.static_files())?,
RocksDBProvider::builder(output_datadir.rocksdb()).build()?,
)?,
to,
from,

View File

@@ -12,7 +12,7 @@ use reth_evm::ConfigureEvm;
use reth_exex::ExExManagerHandle;
use reth_node_core::dirs::{ChainPath, DataDirPath};
use reth_provider::{
providers::{ProviderNodeTypes, StaticFileProvider},
providers::{ProviderNodeTypes, RocksDBProvider, StaticFileProvider},
DatabaseProviderFactory, ProviderFactory,
};
use reth_stages::{
@@ -62,6 +62,7 @@ where
Arc::new(output_db),
db_tool.chain(),
StaticFileProvider::read_write(output_datadir.static_files())?,
RocksDBProvider::builder(output_datadir.rocksdb()).build()?,
)?,
to,
from,

View File

@@ -97,6 +97,57 @@ impl CliRunner {
command_res
}
/// Executes a command in a blocking context with access to `CliContext`.
///
/// See [`Runtime::spawn_blocking`](tokio::runtime::Runtime::spawn_blocking).
pub fn run_blocking_command_until_exit<F, E>(
self,
command: impl FnOnce(CliContext) -> F + Send + 'static,
) -> Result<(), E>
where
F: Future<Output = Result<(), E>> + Send + 'static,
E: Send + Sync + From<std::io::Error> + From<reth_tasks::PanickedTaskError> + 'static,
{
let AsyncCliRunner { context, mut task_manager, tokio_runtime } =
AsyncCliRunner::new(self.tokio_runtime);
// Spawn the command on the blocking thread pool
let handle = tokio_runtime.handle().clone();
let command_handle =
tokio_runtime.handle().spawn_blocking(move || handle.block_on(command(context)));
// Wait for the command to complete or ctrl-c
let command_res = tokio_runtime.block_on(run_to_completion_or_panic(
&mut task_manager,
run_until_ctrl_c(
async move { command_handle.await.expect("Failed to join blocking task") },
),
));
if command_res.is_err() {
error!(target: "reth::cli", "shutting down due to error");
} else {
debug!(target: "reth::cli", "shutting down gracefully");
task_manager.graceful_shutdown_with_timeout(Duration::from_secs(5));
}
// Shutdown the runtime on a separate thread
let (tx, rx) = mpsc::channel();
std::thread::Builder::new()
.name("tokio-runtime-shutdown".to_string())
.spawn(move || {
drop(tokio_runtime);
let _ = tx.send(());
})
.unwrap();
let _ = rx.recv_timeout(Duration::from_secs(5)).inspect_err(|err| {
debug!(target: "reth::cli", %err, "tokio runtime shutdown timed out");
});
command_res
}
/// Executes a regular future until completion or until external signal received.
pub fn run_until_ctrl_c<F, E>(self, fut: F) -> Result<(), E>
where

View File

@@ -279,20 +279,28 @@ pub fn validate_against_parent_hash_number<H: BlockHeader>(
header: &H,
parent: &SealedHeader<H>,
) -> Result<(), ConsensusError> {
// Parent number is consistent.
if parent.number() + 1 != header.number() {
return Err(ConsensusError::ParentBlockNumberMismatch {
parent_block_number: parent.number(),
block_number: header.number(),
})
}
if parent.hash() != header.parent_hash() {
return Err(ConsensusError::ParentHashMismatch(
GotExpected { got: header.parent_hash(), expected: parent.hash() }.into(),
))
}
let Some(parent_number) = parent.number().checked_add(1) else {
// parent block already reached the maximum
return Err(ConsensusError::ParentBlockNumberMismatch {
parent_block_number: parent.number(),
block_number: u64::MAX,
})
};
// Parent number is consistent.
if parent_number != header.number() {
return Err(ConsensusError::ParentBlockNumberMismatch {
parent_block_number: parent.number(),
block_number: header.number(),
})
}
Ok(())
}

View File

@@ -110,6 +110,7 @@ pub async fn setup_engine_with_chain_import(
// Create database path and static files path
let db_path = datadir.join("db");
let static_files_path = datadir.join("static_files");
let rocksdb_dir_path = datadir.join("rocksdb");
// Initialize the database using init_db (same as CLI import command)
// Use the same database arguments as the node will use
@@ -125,6 +126,7 @@ pub async fn setup_engine_with_chain_import(
db.clone(),
chain_spec.clone(),
reth_provider::providers::StaticFileProvider::read_write(static_files_path.clone())?,
reth_provider::providers::RocksDBProvider::builder(rocksdb_dir_path).build().unwrap(),
)?;
// Initialize genesis if needed
@@ -311,6 +313,7 @@ mod tests {
std::fs::create_dir_all(&datadir).unwrap();
let db_path = datadir.join("db");
let static_files_path = datadir.join("static_files");
let rocksdb_dir_path = datadir.join("rocksdb");
// Import the chain
{
@@ -324,6 +327,9 @@ mod tests {
chain_spec.clone(),
reth_provider::providers::StaticFileProvider::read_write(static_files_path.clone())
.unwrap(),
reth_provider::providers::RocksDBProvider::builder(rocksdb_dir_path.clone())
.build()
.unwrap(),
)
.expect("failed to create provider factory");
@@ -385,6 +391,9 @@ mod tests {
chain_spec.clone(),
reth_provider::providers::StaticFileProvider::read_only(static_files_path, false)
.unwrap(),
reth_provider::providers::RocksDBProvider::builder(rocksdb_dir_path)
.build()
.unwrap(),
)
.expect("failed to create provider factory");
@@ -472,11 +481,15 @@ mod tests {
// Create static files path
let static_files_path = datadir.join("static_files");
// Create rocksdb path
let rocksdb_dir_path = datadir.join("rocksdb");
// Create a provider factory
let provider_factory: ProviderFactory<MockNodeTypesWithDB> = ProviderFactory::new(
db.clone(),
chain_spec.clone(),
reth_provider::providers::StaticFileProvider::read_write(static_files_path).unwrap(),
reth_provider::providers::RocksDBProvider::builder(rocksdb_dir_path).build().unwrap(),
)
.expect("failed to create provider factory");

View File

@@ -101,7 +101,7 @@ pub struct TreeConfig {
state_provider_metrics: bool,
/// Cross-block cache size in bytes.
cross_block_cache_size: u64,
/// Whether the host has enough parallelism to run state root in parallel.
/// Whether the host has enough parallelism to run state root task.
has_enough_parallelism: bool,
/// Whether multiproof task should chunk proof targets.
multiproof_chunking_enabled: bool,
@@ -403,17 +403,12 @@ impl TreeConfig {
self
}
/// Setter for whether or not the host has enough parallelism to run state root in parallel.
/// Setter for has enough parallelism.
pub const fn with_has_enough_parallelism(mut self, has_enough_parallelism: bool) -> Self {
self.has_enough_parallelism = has_enough_parallelism;
self
}
/// Whether or not the host has enough parallelism to run state root in parallel.
pub const fn has_enough_parallelism(&self) -> bool {
self.has_enough_parallelism
}
/// Setter for state provider metrics.
pub const fn with_state_provider_metrics(mut self, state_provider_metrics: bool) -> Self {
self.state_provider_metrics = state_provider_metrics;

View File

@@ -22,7 +22,8 @@ use reth_trie_common::HashedPostState;
use serde::{de::DeserializeOwned, Serialize};
// Re-export [`ExecutionPayload`] moved to `reth_payload_primitives`
pub use reth_evm::{ConfigureEngineEvm, ExecutableTxIterator};
#[cfg(feature = "std")]
pub use reth_evm::{ConfigureEngineEvm, ExecutableTxIterator, ExecutableTxTuple};
pub use reth_payload_primitives::ExecutionPayload;
mod error;

View File

@@ -16,7 +16,7 @@ reth-chain-state.workspace = true
reth-chainspec = { workspace = true, optional = true }
reth-consensus.workspace = true
reth-db.workspace = true
reth-engine-primitives.workspace = true
reth-engine-primitives = { workspace = true, features = ["std"] }
reth-errors.workspace = true
reth-execution-types.workspace = true
reth-evm = { workspace = true, features = ["metrics"] }

View File

@@ -230,12 +230,12 @@ fn bench_state_root(c: &mut Criterion) {
let mut handle = payload_processor.spawn(
Default::default(),
(
core::iter::empty::<
Vec::<
Result<
Recovered<TransactionSigned>,
core::convert::Infallible,
>,
>(),
>::new(),
std::convert::identity,
),
StateProviderBuilder::new(provider.clone(), genesis_hash, None),

View File

@@ -21,7 +21,7 @@ use executor::WorkloadExecutor;
use multiproof::{SparseTrieUpdate, *};
use parking_lot::RwLock;
use prewarm::PrewarmMetrics;
use rayon::iter::{ParallelBridge, ParallelIterator};
use rayon::prelude::*;
use reth_engine_primitives::ExecutableTxIterator;
use reth_evm::{
execute::{ExecutableTxFor, WithTxEnv},
@@ -318,36 +318,32 @@ where
usize,
) {
let (transactions, convert) = transactions.into();
let transactions = transactions.into_iter();
// Get the transaction count for prewarming task
// Use upper bound if available (more accurate), otherwise use lower bound
let (lower, upper) = transactions.size_hint();
let transaction_count_hint = upper.unwrap_or(lower);
let transactions = transactions.into_par_iter();
let transaction_count_hint = transactions.len();
// Spawn a task that iterates through all transactions in parallel and sends them to the
// main task.
let (tx, rx) = mpsc::channel();
let (ooo_tx, ooo_rx) = mpsc::channel();
let (prewarm_tx, prewarm_rx) = mpsc::channel();
let (execute_tx, execute_rx) = mpsc::channel();
// Spawn a task that `convert`s all transactions in parallel and sends them out-of-order.
self.executor.spawn_blocking(move || {
transactions.enumerate().par_bridge().for_each_with(tx, |sender, (idx, tx)| {
transactions.enumerate().for_each_with(ooo_tx, |ooo_tx, (idx, tx)| {
let tx = convert(tx);
let tx = tx.map(|tx| WithTxEnv { tx_env: tx.to_tx_env(), tx: Arc::new(tx) });
let _ = sender.send((idx, tx));
// Only send Ok(_) variants to prewarming task.
if let Ok(tx) = &tx {
let _ = prewarm_tx.send(tx.clone());
}
let _ = ooo_tx.send((idx, tx));
});
});
// Spawn a task that processes out-of-order transactions from the task above and sends them
// to prewarming and execution tasks.
let (prewarm_tx, prewarm_rx) = mpsc::channel();
let (execute_tx, execute_rx) = mpsc::channel();
// to the execution task in order.
self.executor.spawn_blocking(move || {
let mut next_for_execution = 0;
let mut queue = BTreeMap::new();
while let Ok((idx, tx)) = rx.recv() {
// only send Ok(_) variants to prewarming task
if let Ok(tx) = &tx {
let _ = prewarm_tx.send(tx.clone());
}
while let Ok((idx, tx)) = ooo_rx.recv() {
if next_for_execution == idx {
let _ = execute_tx.send(tx);
next_for_execution += 1;
@@ -1057,19 +1053,16 @@ mod tests {
let provider_factory = BlockchainProvider::new(factory).unwrap();
let mut handle =
payload_processor.spawn(
Default::default(),
(
core::iter::empty::<
Result<Recovered<TransactionSigned>, core::convert::Infallible>,
>(),
std::convert::identity,
),
StateProviderBuilder::new(provider_factory.clone(), genesis_hash, None),
OverlayStateProviderFactory::new(provider_factory),
&TreeConfig::default(),
);
let mut handle = payload_processor.spawn(
Default::default(),
(
Vec::<Result<Recovered<TransactionSigned>, core::convert::Infallible>>::new(),
std::convert::identity,
),
StateProviderBuilder::new(provider_factory.clone(), genesis_hash, None),
OverlayStateProviderFactory::new(provider_factory),
&TreeConfig::default(),
);
let mut state_hook = handle.state_hook();

View File

@@ -15,6 +15,7 @@ use alloy_eip7928::BlockAccessList;
use alloy_eips::{eip1898::BlockWithParent, NumHash};
use alloy_evm::Evm;
use alloy_primitives::B256;
use rayon::prelude::*;
use reth_chain_state::{CanonicalInMemoryState, DeferredTrieData, ExecutedBlock};
use reth_consensus::{ConsensusError, FullConsensus};
use reth_engine_primitives::{
@@ -221,7 +222,7 @@ where
.map_err(NewPayloadError::other)?
.into();
let iter = Either::Left(iter.into_iter().map(Either::Left));
let iter = Either::Left(iter.into_par_iter().map(Either::Left));
let convert = move |tx| {
let Either::Left(tx) = tx else { unreachable!() };
convert(tx).map(Either::Left).map_err(Either::Left)
@@ -231,8 +232,9 @@ where
Ok((iter, Box::new(convert) as Box<dyn Fn(_) -> _ + Send + Sync + 'static>))
}
BlockOrPayload::Block(block) => {
let iter =
Either::Right(block.body().clone_transactions().into_iter().map(Either::Right));
let iter = Either::Right(
block.body().clone_transactions().into_par_iter().map(Either::Right),
);
let convert = move |tx: Either<_, N::SignedTx>| {
let Either::Right(tx) = tx else { unreachable!() };
tx.try_into_recovered().map(Either::Right).map_err(Either::Right)
@@ -874,7 +876,7 @@ where
/// Note: Use state root task only if prefix sets are empty, otherwise proof generation is
/// too expensive because it requires walking all paths in every proof.
const fn plan_state_root_computation(&self) -> StateRootStrategy {
if self.config.state_root_fallback() || !self.config.has_enough_parallelism() {
if self.config.state_root_fallback() {
StateRootStrategy::Synchronous
} else if self.config.use_state_root_task() {
StateRootStrategy::StateRootTask

View File

@@ -16,7 +16,7 @@ reth-primitives-traits.workspace = true
reth-errors.workspace = true
reth-chainspec.workspace = true
reth-fs-util.workspace = true
reth-engine-primitives.workspace = true
reth-engine-primitives = { workspace = true, features = ["std"] }
reth-engine-tree.workspace = true
reth-evm.workspace = true
reth-revm.workspace = true

View File

@@ -154,7 +154,9 @@ where
Commands::ImportEra(command) => runner.run_blocking_until_ctrl_c(command.execute::<N>()),
Commands::ExportEra(command) => runner.run_blocking_until_ctrl_c(command.execute::<N>()),
Commands::DumpGenesis(command) => runner.run_blocking_until_ctrl_c(command.execute()),
Commands::Db(command) => runner.run_blocking_until_ctrl_c(command.execute::<N>()),
Commands::Db(command) => {
runner.run_blocking_command_until_exit(|ctx| command.execute::<N>(ctx))
}
Commands::Download(command) => runner.run_blocking_until_ctrl_c(command.execute::<N>()),
Commands::Stage(command) => {
runner.run_command_until_exit(|ctx| command.execute::<N, _>(ctx, components))

View File

@@ -19,32 +19,37 @@ extern crate alloc;
use alloc::{borrow::Cow, sync::Arc};
use alloy_consensus::Header;
use alloy_eips::Decodable2718;
pub use alloy_evm::EthEvm;
use alloy_evm::{
eth::{EthBlockExecutionCtx, EthBlockExecutorFactory},
EthEvmFactory, FromRecoveredTx, FromTxWithEncoded,
};
use alloy_primitives::{Bytes, U256};
use alloy_rpc_types_engine::ExecutionData;
use core::{convert::Infallible, fmt::Debug};
use reth_chainspec::{ChainSpec, EthChainSpec, EthereumHardforks, MAINNET};
use reth_chainspec::{ChainSpec, EthChainSpec, MAINNET};
use reth_ethereum_primitives::{Block, EthPrimitives, TransactionSigned};
use reth_evm::{
eth::NextEvmEnvAttributes, precompiles::PrecompilesMap, ConfigureEngineEvm, ConfigureEvm,
EvmEnv, EvmEnvFor, EvmFactory, ExecutableTxIterator, ExecutionCtxFor, NextBlockEnvAttributes,
TransactionEnv,
eth::NextEvmEnvAttributes, precompiles::PrecompilesMap, ConfigureEvm, EvmEnv, EvmFactory,
NextBlockEnvAttributes, TransactionEnv,
};
use reth_primitives_traits::{
constants::MAX_TX_GAS_LIMIT_OSAKA, SealedBlock, SealedHeader, SignedTransaction, TxTy,
};
use reth_storage_errors::any::AnyError;
use revm::{
context::{BlockEnv, CfgEnv},
context_interface::block::BlobExcessGasAndPrice,
primitives::hardfork::SpecId,
use reth_primitives_traits::{SealedBlock, SealedHeader};
use revm::{context::BlockEnv, primitives::hardfork::SpecId};
#[cfg(feature = "std")]
use reth_evm::{ConfigureEngineEvm, ExecutableTxIterator};
#[allow(unused_imports)]
use {
alloy_eips::Decodable2718,
alloy_primitives::{Bytes, U256},
alloy_rpc_types_engine::ExecutionData,
reth_chainspec::EthereumHardforks,
reth_evm::{EvmEnvFor, ExecutionCtxFor},
reth_primitives_traits::{constants::MAX_TX_GAS_LIMIT_OSAKA, SignedTransaction, TxTy},
reth_storage_errors::any::AnyError,
revm::context::CfgEnv,
revm::context_interface::block::BlobExcessGasAndPrice,
};
pub use alloy_evm::EthEvm;
mod config;
use alloy_evm::eth::spec::EthExecutorSpec;
pub use config::{revm_spec, revm_spec_by_timestamp_and_block_number};
@@ -206,6 +211,7 @@ where
}
}
#[cfg(feature = "std")]
impl<ChainSpec, EvmF> ConfigureEngineEvm<ExecutionData> for EthEvmConfig<ChainSpec, EvmF>
where
ChainSpec: EthExecutorSpec + EthChainSpec<Header = Header> + Hardforks + 'static,
@@ -286,7 +292,7 @@ where
&self,
payload: &ExecutionData,
) -> Result<impl ExecutableTxIterator<Self>, Self::Error> {
let txs = payload.payload.transactions().clone().into_iter();
let txs = payload.payload.transactions().clone();
let convert = |tx: Bytes| {
let tx =
TxTy::<Self::Primitives>::decode_2718_exact(tx.as_ref()).map_err(AnyError::new)?;

View File

@@ -24,7 +24,7 @@ reth-provider.workspace = true
reth-transaction-pool.workspace = true
reth-network.workspace = true
reth-evm.workspace = true
reth-evm-ethereum.workspace = true
reth-evm-ethereum = { workspace = true, features = ["std"] }
reth-rpc.workspace = true
reth-rpc-api.workspace = true
reth-rpc-eth-api.workspace = true
@@ -35,7 +35,7 @@ reth-chainspec.workspace = true
reth-revm = { workspace = true, features = ["std"] }
reth-rpc-eth-types.workspace = true
reth-engine-local.workspace = true
reth-engine-primitives.workspace = true
reth-engine-primitives = { workspace = true, features = ["std"] }
reth-payload-primitives.workspace = true
# ethereum

View File

@@ -118,13 +118,14 @@ impl EthereumNode {
/// use reth_chainspec::ChainSpecBuilder;
/// use reth_db::open_db_read_only;
/// use reth_node_ethereum::EthereumNode;
/// use reth_provider::providers::StaticFileProvider;
/// use reth_provider::providers::{RocksDBProvider, StaticFileProvider};
/// use std::sync::Arc;
///
/// let factory = EthereumNode::provider_factory_builder()
/// .db(Arc::new(open_db_read_only("db", Default::default()).unwrap()))
/// .chainspec(ChainSpecBuilder::mainnet().build().into())
/// .static_file(StaticFileProvider::read_only("db/static_files", false).unwrap())
/// .rocksdb_provider(RocksDBProvider::builder("db/rocksdb").build().unwrap())
/// .build_provider_factory();
/// ```
pub fn provider_factory_builder() -> ProviderFactoryBuilder<Self> {

View File

@@ -28,6 +28,7 @@ async fn testing_rpc_build_block_works() -> eyre::Result<()> {
datadir: MaybePlatformPath::<DataDirPath>::from_str(tempdir.path().to_str().unwrap())
.expect("valid datadir"),
static_files_path: Some(tempdir.path().join("static")),
rocksdb_path: Some(tempdir.path().join("rocksdb")),
};
let config = NodeConfig::test().with_datadir_args(datadir_args).with_rpc(rpc_args);
let db = create_test_rw_db();

View File

@@ -24,7 +24,7 @@ reth-payload-builder-primitives.workspace = true
reth-payload-primitives.workspace = true
reth-basic-payload-builder.workspace = true
reth-evm.workspace = true
reth-evm-ethereum.workspace = true
reth-evm-ethereum = { workspace = true, features = ["std"] }
reth-errors.workspace = true
reth-chainspec.workspace = true
reth-payload-validator.workspace = true

View File

@@ -32,6 +32,7 @@ auto_impl.workspace = true
derive_more.workspace = true
futures-util.workspace = true
metrics = { workspace = true, optional = true }
rayon = { workspace = true, optional = true }
[dev-dependencies]
reth-ethereum-primitives.workspace = true
@@ -40,6 +41,7 @@ reth-ethereum-forks.workspace = true
[features]
default = ["std"]
std = [
"dep:rayon",
"reth-primitives-traits/std",
"alloy-eips/std",
"alloy-primitives/std",

View File

@@ -1,4 +1,5 @@
use crate::{execute::ExecutableTxFor, ConfigureEvm, EvmEnvFor, ExecutionCtxFor};
use rayon::prelude::*;
/// [`ConfigureEvm`] extension providing methods for executing payloads.
pub trait ConfigureEngineEvm<ExecutionData>: ConfigureEvm {
@@ -21,7 +22,7 @@ pub trait ConfigureEngineEvm<ExecutionData>: ConfigureEvm {
/// A helper trait representing a pair of a "raw" transactions iterator and a closure that can be
/// used to convert them to an executable transaction. This tuple is used in the engine to
/// parallelize heavy work like decoding or recovery.
pub trait ExecutableTxTuple: Into<(Self::Iter, Self::Convert)> + Send + 'static {
pub trait ExecutableTxTuple: Into<(Self::IntoIter, Self::Convert)> + Send + 'static {
/// Raw transaction that can be converted to an [`ExecutableTxTuple::Tx`]
///
/// This can be any type that can be converted to an [`ExecutableTxTuple::Tx`]. For example,
@@ -32,8 +33,10 @@ pub trait ExecutableTxTuple: Into<(Self::Iter, Self::Convert)> + Send + 'static
/// Errors that may occur while recovering or decoding transactions.
type Error: core::error::Error + Send + Sync + 'static;
/// Iterator over [`ExecutableTxTuple::Tx`]
type Iter: Iterator<Item = Self::RawTx> + Send + 'static;
/// Iterator over [`ExecutableTxTuple::Tx`].
type IntoIter: IntoParallelIterator<Item = Self::RawTx, Iter: IndexedParallelIterator>
+ Send
+ 'static;
/// Closure that can be used to convert a [`ExecutableTxTuple::RawTx`] to a
/// [`ExecutableTxTuple::Tx`]. This might involve heavy work like decoding or recovery
/// and will be parallelized in the engine.
@@ -45,14 +48,14 @@ where
RawTx: Send + Sync + 'static,
Tx: Clone + Send + Sync + 'static,
Err: core::error::Error + Send + Sync + 'static,
I: Iterator<Item = RawTx> + Send + 'static,
I: IntoParallelIterator<Item = RawTx, Iter: IndexedParallelIterator> + Send + 'static,
F: Fn(RawTx) -> Result<Tx, Err> + Send + Sync + 'static,
{
type RawTx = RawTx;
type Tx = Tx;
type Error = Err;
type Iter = I;
type IntoIter = I;
type Convert = F;
}

View File

@@ -44,8 +44,10 @@ pub mod execute;
mod aliases;
pub use aliases::*;
#[cfg(feature = "std")]
mod engine;
pub use engine::{ConfigureEngineEvm, ExecutableTxIterator};
#[cfg(feature = "std")]
pub use engine::{ConfigureEngineEvm, ExecutableTxIterator, ExecutableTxTuple};
#[cfg(feature = "metrics")]
pub mod metrics;

View File

@@ -20,7 +20,9 @@ use futures_util::FutureExt;
use reth_chainspec::{ChainSpec, MAINNET};
use reth_consensus::test_utils::TestConsensus;
use reth_db::{
test_utils::{create_test_rw_db, create_test_static_files_dir, TempDatabase},
test_utils::{
create_test_rocksdb_dir, create_test_rw_db, create_test_static_files_dir, TempDatabase,
},
DatabaseEnv,
};
use reth_db_common::init::init_genesis;
@@ -50,7 +52,7 @@ use reth_node_ethereum::{
use reth_payload_builder::noop::NoopPayloadBuilderService;
use reth_primitives_traits::{Block as _, RecoveredBlock};
use reth_provider::{
providers::{BlockchainProvider, StaticFileProvider},
providers::{BlockchainProvider, RocksDBProvider, StaticFileProvider},
BlockReader, EthStorage, ProviderFactory,
};
use reth_tasks::TaskManager;
@@ -239,11 +241,13 @@ pub async fn test_exex_context_with_chain_spec(
let consensus = Arc::new(TestConsensus::default());
let (static_dir, _) = create_test_static_files_dir();
let (rocksdb_dir, _) = create_test_rocksdb_dir();
let db = create_test_rw_db();
let provider_factory = ProviderFactory::<NodeTypesWithDBAdapter<TestNode, _>>::new(
db,
chain_spec.clone(),
StaticFileProvider::read_write(static_dir.keep()).expect("static file provider"),
RocksDBProvider::builder(rocksdb_dir.keep()).build().unwrap(),
)?;
let genesis_hash = init_genesis(&provider_factory)?;

View File

@@ -24,7 +24,7 @@ pub struct Discv4Config {
/// The number of allowed consecutive failures for `FindNode` requests. Default: 5.
pub max_find_node_failures: u8,
/// The interval to use when checking for expired nodes that need to be re-pinged. Default:
/// 10min.
/// 10 seconds.
pub ping_interval: Duration,
/// The duration of we consider a ping timed out.
pub ping_expiration: Duration,
@@ -93,7 +93,7 @@ impl Discv4Config {
/// Returns the corresponding [`ResolveNatInterval`], if a [`NatResolver`] and an interval was
/// configured
pub fn resolve_external_ip_interval(&self) -> Option<ResolveNatInterval> {
let resolver = self.external_ip_resolver?;
let resolver = self.external_ip_resolver.clone()?;
let interval = self.resolve_external_ip_interval?;
Some(ResolveNatInterval::interval_at(resolver, tokio::time::Instant::now(), interval))
}
@@ -275,10 +275,7 @@ impl Discv4ConfigBuilder {
}
/// Configures if and how the external IP of the node should be resolved.
pub const fn external_ip_resolver(
&mut self,
external_ip_resolver: Option<NatResolver>,
) -> &mut Self {
pub fn external_ip_resolver(&mut self, external_ip_resolver: Option<NatResolver>) -> &mut Self {
self.config.external_ip_resolver = external_ip_resolver;
self
}

View File

@@ -625,10 +625,13 @@ impl Discv4Service {
self.lookup_interval = tokio::time::interval(duration);
}
/// Sets the external Ip to the configured external IP if [`NatResolver::ExternalIp`].
/// Sets the external Ip to the configured external IP if [`NatResolver::ExternalIp`] or
/// [`NatResolver::ExternalAddr`]. In the case of [`NatResolver::ExternalAddr`], it will return
/// the first IP address found for the domain associated with the discv4 UDP port.
fn resolve_external_ip(&mut self) {
if let Some(r) = &self.resolve_external_ip_interval &&
let Some(external_ip) = r.resolver().as_external_ip()
let Some(external_ip) =
r.resolver().clone().as_external_ip(self.local_node_record.udp_port)
{
self.set_external_ip_addr(external_ip);
}

View File

@@ -169,7 +169,7 @@ impl NewPooledTransactionHashes {
matches!(version, EthVersion::Eth67 | EthVersion::Eth66)
}
Self::Eth68(_) => {
matches!(version, EthVersion::Eth68 | EthVersion::Eth69)
matches!(version, EthVersion::Eth68 | EthVersion::Eth69 | EthVersion::Eth70)
}
}
}

View File

@@ -100,6 +100,16 @@ impl Capability {
Self::eth(EthVersion::Eth68)
}
/// Returns the [`EthVersion::Eth69`] capability.
pub const fn eth_69() -> Self {
Self::eth(EthVersion::Eth69)
}
/// Returns the [`EthVersion::Eth70`] capability.
pub const fn eth_70() -> Self {
Self::eth(EthVersion::Eth70)
}
/// Whether this is eth v66 protocol.
#[inline]
pub fn is_eth_v66(&self) -> bool {
@@ -118,10 +128,26 @@ impl Capability {
self.name == "eth" && self.version == 68
}
/// Whether this is eth v69.
#[inline]
pub fn is_eth_v69(&self) -> bool {
self.name == "eth" && self.version == 69
}
/// Whether this is eth v70.
#[inline]
pub fn is_eth_v70(&self) -> bool {
self.name == "eth" && self.version == 70
}
/// Whether this is any eth version.
#[inline]
pub fn is_eth(&self) -> bool {
self.is_eth_v66() || self.is_eth_v67() || self.is_eth_v68()
self.is_eth_v66() ||
self.is_eth_v67() ||
self.is_eth_v68() ||
self.is_eth_v69() ||
self.is_eth_v70()
}
}
@@ -141,7 +167,7 @@ impl From<EthVersion> for Capability {
#[cfg(any(test, feature = "arbitrary"))]
impl<'a> arbitrary::Arbitrary<'a> for Capability {
fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result<Self> {
let version = u.int_in_range(66..=69)?; // Valid eth protocol versions are 66-69
let version = u.int_in_range(66..=70)?; // Valid eth protocol versions are 66-70
// Only generate valid eth protocol name for now since it's the only supported protocol
Ok(Self::new_static("eth", version))
}
@@ -155,6 +181,8 @@ pub struct Capabilities {
eth_66: bool,
eth_67: bool,
eth_68: bool,
eth_69: bool,
eth_70: bool,
}
impl Capabilities {
@@ -164,6 +192,8 @@ impl Capabilities {
eth_66: value.iter().any(Capability::is_eth_v66),
eth_67: value.iter().any(Capability::is_eth_v67),
eth_68: value.iter().any(Capability::is_eth_v68),
eth_69: value.iter().any(Capability::is_eth_v69),
eth_70: value.iter().any(Capability::is_eth_v70),
inner: value,
}
}
@@ -182,7 +212,7 @@ impl Capabilities {
/// Whether the peer supports `eth` sub-protocol.
#[inline]
pub const fn supports_eth(&self) -> bool {
self.eth_68 || self.eth_67 || self.eth_66
self.eth_70 || self.eth_69 || self.eth_68 || self.eth_67 || self.eth_66
}
/// Whether this peer supports eth v66 protocol.
@@ -202,6 +232,18 @@ impl Capabilities {
pub const fn supports_eth_v68(&self) -> bool {
self.eth_68
}
/// Whether this peer supports eth v69 protocol.
#[inline]
pub const fn supports_eth_v69(&self) -> bool {
self.eth_69
}
/// Whether this peer supports eth v70 protocol.
#[inline]
pub const fn supports_eth_v70(&self) -> bool {
self.eth_70
}
}
impl From<Vec<Capability>> for Capabilities {
@@ -224,6 +266,8 @@ impl Decodable for Capabilities {
eth_66: inner.iter().any(Capability::is_eth_v66),
eth_67: inner.iter().any(Capability::is_eth_v67),
eth_68: inner.iter().any(Capability::is_eth_v68),
eth_69: inner.iter().any(Capability::is_eth_v69),
eth_70: inner.iter().any(Capability::is_eth_v70),
inner,
})
}

View File

@@ -1,4 +1,4 @@
//! Implements Ethereum wire protocol for versions 66, 67, and 68.
//! Implements Ethereum wire protocol for versions 66 through 70.
//! Defines structs/enums for messages, request-response pairs, and broadcasts.
//! Handles compatibility with [`EthVersion`].
//!
@@ -8,13 +8,13 @@
use super::{
broadcast::NewBlockHashes, BlockBodies, BlockHeaders, GetBlockBodies, GetBlockHeaders,
GetNodeData, GetPooledTransactions, GetReceipts, NewPooledTransactionHashes66,
GetNodeData, GetPooledTransactions, GetReceipts, GetReceipts70, NewPooledTransactionHashes66,
NewPooledTransactionHashes68, NodeData, PooledTransactions, Receipts, Status, StatusEth69,
Transactions,
};
use crate::{
status::StatusMessage, BlockRangeUpdate, EthNetworkPrimitives, EthVersion, NetworkPrimitives,
RawCapabilityMessage, Receipts69, SharedTransactions,
RawCapabilityMessage, Receipts69, Receipts70, SharedTransactions,
};
use alloc::{boxed::Box, string::String, sync::Arc};
use alloy_primitives::{
@@ -111,13 +111,29 @@ impl<N: NetworkPrimitives> ProtocolMessage<N> {
}
EthMessage::NodeData(RequestPair::decode(buf)?)
}
EthMessageID::GetReceipts => EthMessage::GetReceipts(RequestPair::decode(buf)?),
EthMessageID::Receipts => {
if version < EthVersion::Eth69 {
EthMessage::Receipts(RequestPair::decode(buf)?)
EthMessageID::GetReceipts => {
if version >= EthVersion::Eth70 {
EthMessage::GetReceipts70(RequestPair::decode(buf)?)
} else {
// with eth69, receipts no longer include the bloom
EthMessage::Receipts69(RequestPair::decode(buf)?)
EthMessage::GetReceipts(RequestPair::decode(buf)?)
}
}
EthMessageID::Receipts => {
match version {
v if v >= EthVersion::Eth70 => {
// eth/70 continues to omit bloom filters and adds the
// `lastBlockIncomplete` flag, encoded as
// `[request-id, lastBlockIncomplete, [[receipt₁, receipt₂], ...]]`.
EthMessage::Receipts70(RequestPair::decode(buf)?)
}
EthVersion::Eth69 => {
// with eth69, receipts no longer include the bloom
EthMessage::Receipts69(RequestPair::decode(buf)?)
}
_ => {
// before eth69 we need to decode the bloom as well
EthMessage::Receipts(RequestPair::decode(buf)?)
}
}
}
EthMessageID::BlockRangeUpdate => {
@@ -205,6 +221,9 @@ impl<N: NetworkPrimitives> From<EthBroadcastMessage<N>> for ProtocolBroadcastMes
///
/// The `eth/69` announces the historical block range served by the node. Removes total difficulty
/// information. And removes the Bloom field from receipts transferred over the protocol.
///
/// The `eth/70` (EIP-7975) keeps the eth/69 status format and introduces partial receipts.
/// requests/responses.
#[derive(Clone, Debug, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub enum EthMessage<N: NetworkPrimitives = EthNetworkPrimitives> {
@@ -259,6 +278,12 @@ pub enum EthMessage<N: NetworkPrimitives = EthNetworkPrimitives> {
NodeData(RequestPair<NodeData>),
/// Represents a `GetReceipts` request-response pair.
GetReceipts(RequestPair<GetReceipts>),
/// Represents a `GetReceipts` request for eth/70.
///
/// Note: Unlike earlier protocol versions, the eth/70 encoding for
/// `GetReceipts` in EIP-7975 inlines the request id. The type still wraps
/// a [`RequestPair`], but with a custom inline encoding.
GetReceipts70(RequestPair<GetReceipts70>),
/// Represents a Receipts request-response pair.
#[cfg_attr(
feature = "serde",
@@ -271,6 +296,16 @@ pub enum EthMessage<N: NetworkPrimitives = EthNetworkPrimitives> {
serde(bound = "N::Receipt: serde::Serialize + serde::de::DeserializeOwned")
)]
Receipts69(RequestPair<Receipts69<N::Receipt>>),
/// Represents a Receipts request-response pair for eth/70.
#[cfg_attr(
feature = "serde",
serde(bound = "N::Receipt: serde::Serialize + serde::de::DeserializeOwned")
)]
///
/// Note: The eth/70 encoding for `Receipts` in EIP-7975 inlines the
/// request id. The type still wraps a [`RequestPair`], but with a custom
/// inline encoding.
Receipts70(RequestPair<Receipts70<N::Receipt>>),
/// Represents a `BlockRangeUpdate` message broadcast to the network.
#[cfg_attr(
feature = "serde",
@@ -300,8 +335,8 @@ impl<N: NetworkPrimitives> EthMessage<N> {
Self::PooledTransactions(_) => EthMessageID::PooledTransactions,
Self::GetNodeData(_) => EthMessageID::GetNodeData,
Self::NodeData(_) => EthMessageID::NodeData,
Self::GetReceipts(_) => EthMessageID::GetReceipts,
Self::Receipts(_) | Self::Receipts69(_) => EthMessageID::Receipts,
Self::GetReceipts(_) | Self::GetReceipts70(_) => EthMessageID::GetReceipts,
Self::Receipts(_) | Self::Receipts69(_) | Self::Receipts70(_) => EthMessageID::Receipts,
Self::BlockRangeUpdate(_) => EthMessageID::BlockRangeUpdate,
Self::Other(msg) => EthMessageID::Other(msg.id as u8),
}
@@ -314,6 +349,7 @@ impl<N: NetworkPrimitives> EthMessage<N> {
Self::GetBlockBodies(_) |
Self::GetBlockHeaders(_) |
Self::GetReceipts(_) |
Self::GetReceipts70(_) |
Self::GetPooledTransactions(_) |
Self::GetNodeData(_)
)
@@ -326,11 +362,40 @@ impl<N: NetworkPrimitives> EthMessage<N> {
Self::PooledTransactions(_) |
Self::Receipts(_) |
Self::Receipts69(_) |
Self::Receipts70(_) |
Self::BlockHeaders(_) |
Self::BlockBodies(_) |
Self::NodeData(_)
)
}
/// Converts the message types where applicable.
///
/// This handles up/downcasting where appropriate, for example for different receipt request
/// types.
pub fn map_versioned(mut self, version: EthVersion) -> Self {
// For eth/70 peers we send `GetReceipts` using the new eth/70
// encoding with `firstBlockReceiptIndex = 0`, while keeping the
// user-facing `PeerRequest` API unchanged.
if version >= EthVersion::Eth70 {
return match self {
EthMessage::GetReceipts(pair) => {
let RequestPair { request_id, message } = pair;
let req = RequestPair {
request_id,
message: GetReceipts70 {
first_block_receipt_index: 0,
block_hashes: message.0,
},
};
EthMessage::GetReceipts70(req)
}
other => other,
}
}
self
}
}
impl<N: NetworkPrimitives> Encodable for EthMessage<N> {
@@ -351,8 +416,10 @@ impl<N: NetworkPrimitives> Encodable for EthMessage<N> {
Self::GetNodeData(request) => request.encode(out),
Self::NodeData(data) => data.encode(out),
Self::GetReceipts(request) => request.encode(out),
Self::GetReceipts70(request) => request.encode(out),
Self::Receipts(receipts) => receipts.encode(out),
Self::Receipts69(receipt69) => receipt69.encode(out),
Self::Receipts70(receipt70) => receipt70.encode(out),
Self::BlockRangeUpdate(block_range_update) => block_range_update.encode(out),
Self::Other(unknown) => out.put_slice(&unknown.payload),
}
@@ -374,8 +441,10 @@ impl<N: NetworkPrimitives> Encodable for EthMessage<N> {
Self::GetNodeData(request) => request.length(),
Self::NodeData(data) => data.length(),
Self::GetReceipts(request) => request.length(),
Self::GetReceipts70(request) => request.length(),
Self::Receipts(receipts) => receipts.length(),
Self::Receipts69(receipt69) => receipt69.length(),
Self::Receipts70(receipt70) => receipt70.length(),
Self::BlockRangeUpdate(block_range_update) => block_range_update.length(),
Self::Other(unknown) => unknown.length(),
}

View File

@@ -17,6 +17,42 @@ pub struct GetReceipts(
pub Vec<B256>,
);
/// Eth/70 `GetReceipts` request payload that supports partial receipt queries.
///
/// When used with eth/70, the request id is carried by the surrounding
/// [`crate::message::RequestPair`], and the on-wire shape is the flattened list
/// `firstBlockReceiptIndex, [blockhash₁, ...]`.
///
/// See also [eip-7975](https://eips.ethereum.org/EIPS/eip-7975)
#[derive(Clone, Debug, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
#[cfg_attr(any(test, feature = "arbitrary"), derive(arbitrary::Arbitrary))]
pub struct GetReceipts70 {
/// Index into the receipts of the first requested block hash.
pub first_block_receipt_index: u64,
/// The block hashes to request receipts for.
pub block_hashes: Vec<B256>,
}
impl alloy_rlp::Encodable for GetReceipts70 {
fn encode(&self, out: &mut dyn alloy_rlp::BufMut) {
self.first_block_receipt_index.encode(out);
self.block_hashes.encode(out);
}
fn length(&self) -> usize {
self.first_block_receipt_index.length() + self.block_hashes.length()
}
}
impl alloy_rlp::Decodable for GetReceipts70 {
fn decode(buf: &mut &[u8]) -> alloy_rlp::Result<Self> {
let first_block_receipt_index = u64::decode(buf)?;
let block_hashes = Vec::<B256>::decode(buf)?;
Ok(Self { first_block_receipt_index, block_hashes })
}
}
/// The response to [`GetReceipts`], containing receipt lists that correspond to each block
/// requested.
#[derive(Clone, Debug, PartialEq, Eq, Default)]
@@ -58,7 +94,13 @@ pub struct Receipts69<T = Receipt>(pub Vec<Vec<T>>);
impl<T: TxReceipt> Receipts69<T> {
/// Encodes all receipts with the bloom filter.
///
/// Note: This is an expensive operation that recalculates the bloom for each receipt.
/// Eth/69 omits bloom filters on the wire, while some internal callers
/// (and legacy APIs) still operate on [`Receipts`] with
/// [`ReceiptWithBloom`]. This helper reconstructs the bloom locally from
/// each receipt's logs so the older API can be used on top of eth/69 data.
///
/// Note: This is an expensive operation that recalculates the bloom for
/// every receipt.
pub fn into_with_bloom(self) -> Receipts<T> {
Receipts(
self.0
@@ -75,6 +117,68 @@ impl<T: TxReceipt> From<Receipts69<T>> for Receipts<T> {
}
}
/// Eth/70 `Receipts` response payload.
///
/// This is used in conjunction with [`crate::message::RequestPair`] to encode the full wire
/// message `[request-id, lastBlockIncomplete, [[receipt₁, receipt₂], ...]]`.
#[derive(Clone, Debug, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
#[cfg_attr(any(test, feature = "arbitrary"), derive(arbitrary::Arbitrary))]
pub struct Receipts70<T = Receipt> {
/// Whether the receipts list for the last block is incomplete.
pub last_block_incomplete: bool,
/// Receipts grouped by block.
pub receipts: Vec<Vec<T>>,
}
impl<T> alloy_rlp::Encodable for Receipts70<T>
where
T: alloy_rlp::Encodable,
{
fn encode(&self, out: &mut dyn alloy_rlp::BufMut) {
self.last_block_incomplete.encode(out);
self.receipts.encode(out);
}
fn length(&self) -> usize {
self.last_block_incomplete.length() + self.receipts.length()
}
}
impl<T> alloy_rlp::Decodable for Receipts70<T>
where
T: alloy_rlp::Decodable,
{
fn decode(buf: &mut &[u8]) -> alloy_rlp::Result<Self> {
let last_block_incomplete = bool::decode(buf)?;
let receipts = Vec::<Vec<T>>::decode(buf)?;
Ok(Self { last_block_incomplete, receipts })
}
}
impl<T: TxReceipt> Receipts70<T> {
/// Encodes all receipts with the bloom filter.
///
/// Just like eth/69, eth/70 does not transmit bloom filters over the wire.
/// When higher layers still expect the older bloom-bearing [`Receipts`]
/// type, this helper converts the eth/70 payload into that shape by
/// recomputing the bloom locally from the contained receipts.
///
/// Note: This is an expensive operation that recalculates the bloom for
/// every receipt.
pub fn into_with_bloom(self) -> Receipts<T> {
// Reuse the eth/69 helper, since both variants carry the same
// receipt list shape (only eth/70 adds request metadata).
Receipts69(self.receipts).into_with_bloom()
}
}
impl<T: TxReceipt> From<Receipts70<T>> for Receipts<T> {
fn from(receipts: Receipts70<T>) -> Self {
receipts.into_with_bloom()
}
}
#[cfg(test)]
mod tests {
use super::*;
@@ -225,4 +329,70 @@ mod tests {
let encoded = alloy_rlp::encode(&request);
assert_eq!(encoded, data);
}
#[test]
fn encode_get_receipts70_inline_shape() {
let req = RequestPair {
request_id: 1111,
message: GetReceipts70 {
first_block_receipt_index: 0,
block_hashes: vec![
hex!("00000000000000000000000000000000000000000000000000000000deadc0de").into(),
hex!("00000000000000000000000000000000000000000000000000000000feedbeef").into(),
],
},
};
let mut out = vec![];
req.encode(&mut out);
let mut buf = out.as_slice();
let header = alloy_rlp::Header::decode(&mut buf).unwrap();
let payload_start = buf.len();
let request_id = u64::decode(&mut buf).unwrap();
let first_block_receipt_index = u64::decode(&mut buf).unwrap();
let block_hashes = Vec::<B256>::decode(&mut buf).unwrap();
assert!(buf.is_empty(), "buffer not fully consumed");
assert_eq!(request_id, 1111);
assert_eq!(first_block_receipt_index, 0);
assert_eq!(block_hashes.len(), 2);
// ensure payload length matches header
assert_eq!(payload_start - buf.len(), header.payload_length);
let mut buf = out.as_slice();
let decoded = RequestPair::<GetReceipts70>::decode(&mut buf).unwrap();
assert!(buf.is_empty(), "buffer not fully consumed on decode");
assert_eq!(decoded, req);
}
#[test]
fn encode_receipts70_inline_shape() {
let payload: Receipts70<Receipt> =
Receipts70 { last_block_incomplete: true, receipts: vec![vec![Receipt::default()]] };
let resp = RequestPair { request_id: 7, message: payload };
let mut out = vec![];
resp.encode(&mut out);
let mut buf = out.as_slice();
let header = alloy_rlp::Header::decode(&mut buf).unwrap();
let payload_start = buf.len();
let request_id = u64::decode(&mut buf).unwrap();
let last_block_incomplete = bool::decode(&mut buf).unwrap();
let receipts = Vec::<Vec<Receipt>>::decode(&mut buf).unwrap();
assert!(buf.is_empty(), "buffer not fully consumed");
assert_eq!(payload_start - buf.len(), header.payload_length);
assert_eq!(request_id, 7);
assert!(last_block_incomplete);
assert_eq!(receipts.len(), 1);
assert_eq!(receipts[0].len(), 1);
let mut buf = out.as_slice();
let decoded = RequestPair::<Receipts70>::decode(&mut buf).unwrap();
assert!(buf.is_empty(), "buffer not fully consumed on decode");
assert_eq!(decoded, resp);
}
}

View File

@@ -13,7 +13,7 @@ use reth_codecs_derive::add_arbitrary_tests;
/// unsupported fields are stripped out.
#[derive(Clone, Debug, PartialEq, Eq, Copy)]
pub struct UnifiedStatus {
/// The eth protocol version (e.g. eth/66 to eth/69).
/// The eth protocol version (e.g. eth/66 to eth/70).
pub version: EthVersion,
/// The chain ID identifying the peers network.
pub chain: Chain,
@@ -157,7 +157,7 @@ impl StatusBuilder {
self.status
}
/// Sets the eth protocol version (e.g., eth/66, eth/69).
/// Sets the eth protocol version (e.g., eth/66, eth/70).
pub const fn version(mut self, version: EthVersion) -> Self {
self.status.version = version;
self
@@ -378,8 +378,8 @@ impl Debug for StatusEth69 {
}
}
/// `StatusMessage` can store either the Legacy version (with TD) or the
/// eth/69 version (omits TD).
/// `StatusMessage` can store either the Legacy version (with TD), or the eth/69+/eth/70 version
/// (omits TD, includes block range).
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum StatusMessage {
@@ -546,6 +546,24 @@ mod tests {
assert_eq!(unified_status, roundtripped_unified_status);
}
#[test]
fn roundtrip_eth70() {
let unified_status = UnifiedStatus::builder()
.version(EthVersion::Eth70)
.chain(Chain::mainnet())
.genesis(MAINNET_GENESIS_HASH)
.forkid(ForkId { hash: ForkHash([0xb7, 0x15, 0x07, 0x7d]), next: 0 })
.blockhash(b256!("0xfeb27336ca7923f8fab3bd617fcb6e75841538f71c1bcfc267d7838489d9e13d"))
.total_difficulty(None)
.earliest_block(Some(1))
.latest_block(Some(2))
.build();
let status_message = unified_status.into_message();
let roundtripped_unified_status = UnifiedStatus::from_message(status_message);
assert_eq!(unified_status, roundtripped_unified_status);
}
#[test]
fn encode_eth69_status_message() {
let expected = hex!("f8544501a0d4e56740f876aef8c010b86a40d5f56745a118d0906a34e69aec8c0db1cb8fa3c684b715077d8083ed14f2840112a880a0feb27336ca7923f8fab3bd617fcb6e75841538f71c1bcfc267d7838489d9e13d");

View File

@@ -27,6 +27,8 @@ pub enum EthVersion {
Eth68 = 68,
/// The `eth` protocol version 69.
Eth69 = 69,
/// The `eth` protocol version 70.
Eth70 = 70,
}
impl EthVersion {
@@ -55,6 +57,11 @@ impl EthVersion {
pub const fn is_eth69(&self) -> bool {
matches!(self, Self::Eth69)
}
/// Returns true if the version is eth/70
pub const fn is_eth70(&self) -> bool {
matches!(self, Self::Eth70)
}
}
/// RLP encodes `EthVersion` as a single byte (66-69).
@@ -96,6 +103,7 @@ impl TryFrom<&str> for EthVersion {
"67" => Ok(Self::Eth67),
"68" => Ok(Self::Eth68),
"69" => Ok(Self::Eth69),
"70" => Ok(Self::Eth70),
_ => Err(ParseVersionError(s.to_string())),
}
}
@@ -120,6 +128,7 @@ impl TryFrom<u8> for EthVersion {
67 => Ok(Self::Eth67),
68 => Ok(Self::Eth68),
69 => Ok(Self::Eth69),
70 => Ok(Self::Eth70),
_ => Err(ParseVersionError(u.to_string())),
}
}
@@ -149,6 +158,7 @@ impl From<EthVersion> for &'static str {
EthVersion::Eth67 => "67",
EthVersion::Eth68 => "68",
EthVersion::Eth69 => "69",
EthVersion::Eth70 => "70",
}
}
}
@@ -195,7 +205,7 @@ impl Decodable for ProtocolVersion {
#[cfg(test)]
mod tests {
use super::{EthVersion, ParseVersionError};
use super::EthVersion;
use alloy_rlp::{Decodable, Encodable, Error as RlpError};
use bytes::BytesMut;
@@ -205,7 +215,7 @@ mod tests {
assert_eq!(EthVersion::Eth67, EthVersion::try_from("67").unwrap());
assert_eq!(EthVersion::Eth68, EthVersion::try_from("68").unwrap());
assert_eq!(EthVersion::Eth69, EthVersion::try_from("69").unwrap());
assert_eq!(Err(ParseVersionError("70".to_string())), EthVersion::try_from("70"));
assert_eq!(EthVersion::Eth70, EthVersion::try_from("70").unwrap());
}
#[test]
@@ -214,12 +224,18 @@ mod tests {
assert_eq!(EthVersion::Eth67, "67".parse().unwrap());
assert_eq!(EthVersion::Eth68, "68".parse().unwrap());
assert_eq!(EthVersion::Eth69, "69".parse().unwrap());
assert_eq!(Err(ParseVersionError("70".to_string())), "70".parse::<EthVersion>());
assert_eq!(EthVersion::Eth70, "70".parse().unwrap());
}
#[test]
fn test_eth_version_rlp_encode() {
let versions = [EthVersion::Eth66, EthVersion::Eth67, EthVersion::Eth68, EthVersion::Eth69];
let versions = [
EthVersion::Eth66,
EthVersion::Eth67,
EthVersion::Eth68,
EthVersion::Eth69,
EthVersion::Eth70,
];
for version in versions {
let mut encoded = BytesMut::new();
@@ -236,7 +252,7 @@ mod tests {
(67_u8, Ok(EthVersion::Eth67)),
(68_u8, Ok(EthVersion::Eth68)),
(69_u8, Ok(EthVersion::Eth69)),
(70_u8, Err(RlpError::Custom("invalid eth version"))),
(70_u8, Ok(EthVersion::Eth70)),
(65_u8, Err(RlpError::Custom("invalid eth version"))),
];

View File

@@ -418,6 +418,8 @@ mod tests {
Capability::new_static("eth", 66),
Capability::new_static("eth", 67),
Capability::new_static("eth", 68),
Capability::new_static("eth", 69),
Capability::new_static("eth", 70),
]
.into();
@@ -425,6 +427,8 @@ mod tests {
assert!(capabilities.supports_eth_v66());
assert!(capabilities.supports_eth_v67());
assert!(capabilities.supports_eth_v68());
assert!(capabilities.supports_eth_v69());
assert!(capabilities.supports_eth_v70());
}
#[test]

View File

@@ -260,10 +260,11 @@ mod tests {
assert_eq!(hello_encoded.len(), hello.length());
}
//TODO: add test for eth70 here once we have fully support it
#[test]
fn test_default_protocols_include_eth69() {
// ensure that the default protocol list includes Eth69 as the latest version
fn test_default_protocols_still_include_eth69() {
// ensure that older eth/69 remains advertised for compatibility
let secret_key = SecretKey::new(&mut rand_08::thread_rng());
let id = pk2id(&secret_key.public_key(SECP256K1));
let hello = HelloMessageWithProtocols::builder(id).build();

View File

@@ -19,7 +19,7 @@ pub use net_if::{NetInterfaceError, DEFAULT_NET_IF_NAME};
use std::{
fmt,
future::{poll_fn, Future},
net::{AddrParseError, IpAddr},
net::{AddrParseError, IpAddr, ToSocketAddrs},
pin::Pin,
str::FromStr,
task::{Context, Poll},
@@ -38,7 +38,7 @@ const EXTERNAL_IP_APIS: &[&str] =
&["https://ipinfo.io/ip", "https://icanhazip.com", "https://ifconfig.me"];
/// All builtin resolvers.
#[derive(Debug, Clone, Copy, Eq, PartialEq, Default, Hash)]
#[derive(Debug, Clone, Eq, PartialEq, Default, Hash)]
#[cfg_attr(feature = "serde", derive(SerializeDisplay, DeserializeFromStr))]
pub enum NatResolver {
/// Resolve with any available resolver.
@@ -50,6 +50,14 @@ pub enum NatResolver {
PublicIp,
/// Use the given [`IpAddr`]
ExternalIp(IpAddr),
/// Use the given domain name as the external address to expose to peers.
/// This is behaving essentially the same as [`NatResolver::ExternalIp`], but supports domain
/// names. Domain names are resolved to IP addresses using the OS's resolver. The first IP
/// address found is used.
/// This may be useful in docker bridge networks where containers are usually queried by DNS
/// instead of direct IP addresses.
/// Note: the domain shouldn't include a port number. Only the IP address is resolved.
ExternalAddr(String),
/// Resolve external IP via the network interface.
NetIf,
/// Resolve nothing
@@ -62,10 +70,17 @@ impl NatResolver {
external_addr_with(self).await
}
/// Returns the external ip, if it is [`NatResolver::ExternalIp`]
pub const fn as_external_ip(self) -> Option<IpAddr> {
/// Returns the fixed ip, if it is [`NatResolver::ExternalIp`] or [`NatResolver::ExternalAddr`].
///
/// In the case of [`NatResolver::ExternalAddr`], it will return the first IP address found for
/// the domain.
pub fn as_external_ip(self, port: u16) -> Option<IpAddr> {
match self {
Self::ExternalIp(ip) => Some(ip),
Self::ExternalAddr(domain) => format!("{domain}:{port}")
.to_socket_addrs()
.ok()
.and_then(|mut addrs| addrs.next().map(|addr| addr.ip())),
_ => None,
}
}
@@ -78,6 +93,7 @@ impl fmt::Display for NatResolver {
Self::Upnp => f.write_str("upnp"),
Self::PublicIp => f.write_str("publicip"),
Self::ExternalIp(ip) => write!(f, "extip:{ip}"),
Self::ExternalAddr(domain) => write!(f, "extaddr:{domain}"),
Self::NetIf => f.write_str("netif"),
Self::None => f.write_str("none"),
}
@@ -106,12 +122,15 @@ impl FromStr for NatResolver {
"publicip" | "public-ip" => Self::PublicIp,
"netif" => Self::NetIf,
s => {
let Some(ip) = s.strip_prefix("extip:") else {
if let Some(ip) = s.strip_prefix("extip:") {
Self::ExternalIp(ip.parse()?)
} else if let Some(domain) = s.strip_prefix("extaddr:") {
Self::ExternalAddr(domain.to_string())
} else {
return Err(ParseNatResolverError::UnknownVariant(format!(
"Unknown Nat Resolver: {s}"
)))
};
Self::ExternalIp(ip.parse()?)
)));
}
}
};
Ok(r)
@@ -180,7 +199,7 @@ impl ResolveNatInterval {
/// `None` if the attempt was unsuccessful.
pub fn poll_tick(&mut self, cx: &mut Context<'_>) -> Poll<Option<IpAddr>> {
if self.interval.poll_tick(cx).is_ready() {
self.future = Some(Box::pin(self.resolver.external_addr()));
self.future = Some(Box::pin(self.resolver.clone().external_addr()));
}
if let Some(mut fut) = self.future.take() {
@@ -212,6 +231,9 @@ pub async fn external_addr_with(resolver: NatResolver) -> Option<IpAddr> {
);
})
.ok(),
NatResolver::ExternalAddr(domain) => {
domain.to_socket_addrs().ok().and_then(|mut addrs| addrs.next().map(|addr| addr.ip()))
}
NatResolver::None => None,
}
}
@@ -245,7 +267,7 @@ async fn resolve_external_ip_url(url: &str) -> Option<IpAddr> {
#[cfg(test)]
mod tests {
use super::*;
use std::net::Ipv4Addr;
use std::net::{Ipv4Addr, Ipv6Addr};
#[tokio::test]
#[ignore]
@@ -267,6 +289,18 @@ mod tests {
dbg!(ip);
}
#[test]
fn as_external_ip_test() {
let resolver = NatResolver::ExternalAddr("localhost".to_string());
let ip = resolver.as_external_ip(30303).expect("localhost should be resolvable");
if ip.is_ipv4() {
assert_eq!(ip, IpAddr::V4(Ipv4Addr::LOCALHOST));
} else {
assert_eq!(ip, IpAddr::V6(Ipv6Addr::LOCALHOST));
}
}
#[test]
fn test_from_str() {
assert_eq!(NatResolver::Any, "any".parse().unwrap());
@@ -275,6 +309,6 @@ mod tests {
let ip = NatResolver::ExternalIp(IpAddr::V4(Ipv4Addr::UNSPECIFIED));
let s = "extip:0.0.0.0";
assert_eq!(ip, s.parse().unwrap());
assert_eq!(ip.to_string().as_str(), s);
assert_eq!(ip.to_string(), s);
}
}

View File

@@ -3,8 +3,8 @@
use reth_eth_wire_types::{
message::RequestPair, BlockBodies, BlockHeaders, Capabilities, DisconnectReason, EthMessage,
EthNetworkPrimitives, EthVersion, GetBlockBodies, GetBlockHeaders, GetNodeData,
GetPooledTransactions, GetReceipts, NetworkPrimitives, NodeData, PooledTransactions, Receipts,
Receipts69, UnifiedStatus,
GetPooledTransactions, GetReceipts, GetReceipts70, NetworkPrimitives, NodeData,
PooledTransactions, Receipts, Receipts69, Receipts70, UnifiedStatus,
};
use reth_ethereum_forks::ForkId;
use reth_network_p2p::error::{RequestError, RequestResult};
@@ -238,6 +238,15 @@ pub enum PeerRequest<N: NetworkPrimitives = EthNetworkPrimitives> {
/// The channel to send the response for receipts.
response: oneshot::Sender<RequestResult<Receipts69<N::Receipt>>>,
},
/// Requests receipts from the peer using eth/70 (supports `firstBlockReceiptIndex`).
///
/// The response should be sent through the channel.
GetReceipts70 {
/// The request for receipts.
request: GetReceipts70,
/// The channel to send the response for receipts.
response: oneshot::Sender<RequestResult<Receipts70<N::Receipt>>>,
},
}
// === impl PeerRequest ===
@@ -257,6 +266,7 @@ impl<N: NetworkPrimitives> PeerRequest<N> {
Self::GetNodeData { response, .. } => response.send(Err(err)).ok(),
Self::GetReceipts { response, .. } => response.send(Err(err)).ok(),
Self::GetReceipts69 { response, .. } => response.send(Err(err)).ok(),
Self::GetReceipts70 { response, .. } => response.send(Err(err)).ok(),
};
}
@@ -281,6 +291,9 @@ impl<N: NetworkPrimitives> PeerRequest<N> {
Self::GetReceipts { request, .. } | Self::GetReceipts69 { request, .. } => {
EthMessage::GetReceipts(RequestPair { request_id, message: request.clone() })
}
Self::GetReceipts70 { request, .. } => {
EthMessage::GetReceipts70(RequestPair { request_id, message: request.clone() })
}
}
}

View File

@@ -66,6 +66,7 @@ tracing.workspace = true
rustc-hash.workspace = true
thiserror.workspace = true
parking_lot.workspace = true
rayon.workspace = true
rand.workspace = true
rand_08.workspace = true
secp256k1 = { workspace = true, features = ["global-context", "std", "recovery"] }

View File

@@ -433,7 +433,7 @@ impl<N: NetworkPrimitives> NetworkConfigBuilder<N> {
pub fn external_ip_resolver(mut self, resolver: NatResolver) -> Self {
self.discovery_v4_builder
.get_or_insert_with(Discv4Config::builder)
.external_ip_resolver(Some(resolver));
.external_ip_resolver(Some(resolver.clone()));
self.nat = Some(resolver);
self
}
@@ -484,7 +484,7 @@ impl<N: NetworkPrimitives> NetworkConfigBuilder<N> {
}
// Disable nat
pub const fn disable_nat(mut self) -> Self {
pub fn disable_nat(mut self) -> Self {
self.nat = None;
self
}
@@ -579,7 +579,7 @@ impl<N: NetworkPrimitives> NetworkConfigBuilder<N> {
}
/// Sets the NAT resolver for external IP.
pub const fn add_nat(mut self, nat: Option<NatResolver>) -> Self {
pub fn add_nat(mut self, nat: Option<NatResolver>) -> Self {
self.nat = nat;
self
}

View File

@@ -10,7 +10,8 @@ use alloy_rlp::Encodable;
use futures::StreamExt;
use reth_eth_wire::{
BlockBodies, BlockHeaders, EthNetworkPrimitives, GetBlockBodies, GetBlockHeaders, GetNodeData,
GetReceipts, HeadersDirection, NetworkPrimitives, NodeData, Receipts, Receipts69,
GetReceipts, GetReceipts70, HeadersDirection, NetworkPrimitives, NodeData, Receipts,
Receipts69, Receipts70,
};
use reth_network_api::test_utils::PeersHandle;
use reth_network_p2p::error::RequestResult;
@@ -217,6 +218,66 @@ where
let _ = response.send(Ok(Receipts69(receipts)));
}
fn on_receipts70_request(
&self,
_peer_id: PeerId,
request: GetReceipts70,
response: oneshot::Sender<RequestResult<Receipts70<C::Receipt>>>,
) {
self.metrics.eth_receipts_requests_received_total.increment(1);
let GetReceipts70 { first_block_receipt_index, block_hashes } = request;
let mut receipts = Vec::new();
let mut total_bytes = 0usize;
let mut last_block_incomplete = false;
for (idx, hash) in block_hashes.into_iter().enumerate() {
if idx >= MAX_RECEIPTS_SERVE {
break
}
let Some(mut block_receipts) =
self.client.receipts_by_block(BlockHashOrNumber::Hash(hash)).unwrap_or_default()
else {
break
};
if idx == 0 && first_block_receipt_index > 0 {
let skip = first_block_receipt_index as usize;
if skip >= block_receipts.len() {
block_receipts.clear();
} else {
block_receipts.drain(0..skip);
}
}
let block_size = block_receipts.length();
if total_bytes + block_size <= SOFT_RESPONSE_LIMIT {
total_bytes += block_size;
receipts.push(block_receipts);
continue;
}
let mut partial_block = Vec::new();
for receipt in block_receipts {
let receipt_size = receipt.length();
if total_bytes + receipt_size > SOFT_RESPONSE_LIMIT {
break;
}
total_bytes += receipt_size;
partial_block.push(receipt);
}
receipts.push(partial_block);
last_block_incomplete = true;
break;
}
let _ = response.send(Ok(Receipts70 { last_block_incomplete, receipts }));
}
#[inline]
fn get_receipts_response<T, F>(&self, request: GetReceipts, transform_fn: F) -> Vec<Vec<T>>
where
@@ -285,6 +346,9 @@ where
IncomingEthRequest::GetReceipts69 { peer_id, request, response } => {
this.on_receipts69_request(peer_id, request, response)
}
IncomingEthRequest::GetReceipts70 { peer_id, request, response } => {
this.on_receipts70_request(peer_id, request, response)
}
}
},
);
@@ -359,4 +423,15 @@ pub enum IncomingEthRequest<N: NetworkPrimitives = EthNetworkPrimitives> {
/// The channel sender for the response containing Receipts69.
response: oneshot::Sender<RequestResult<Receipts69<N::Receipt>>>,
},
/// Request Receipts from the peer using eth/70.
///
/// The response should be sent through the channel.
GetReceipts70 {
/// The ID of the peer to request receipts from.
peer_id: PeerId,
/// The specific receipts requested including the `firstBlockReceiptIndex`.
request: GetReceipts70,
/// The channel sender for the response containing Receipts70.
response: oneshot::Sender<RequestResult<Receipts70<N::Receipt>>>,
},
}

View File

@@ -532,6 +532,13 @@ impl<N: NetworkPrimitives> NetworkManager<N> {
response,
})
}
PeerRequest::GetReceipts70 { request, response } => {
self.delegate_eth_request(IncomingEthRequest::GetReceipts70 {
peer_id,
request,
response,
})
}
PeerRequest::GetPooledTransactions { request, response } => {
self.notify_tx_manager(NetworkTransactionEvent::GetPooledTransactions {
peer_id,

View File

@@ -3,7 +3,7 @@
//! An `RLPx` stream is multiplexed via the prepended message-id of a framed message.
//! Capabilities are exchanged via the `RLPx` `Hello` message as pairs of `(id, version)`, <https://github.com/ethereum/devp2p/blob/master/rlpx.md#capability-messaging>
use crate::types::Receipts69;
use crate::types::{Receipts69, Receipts70};
use alloy_consensus::{BlockHeader, ReceiptWithBloom};
use alloy_primitives::{Bytes, B256};
use futures::FutureExt;
@@ -116,6 +116,11 @@ pub enum PeerResponse<N: NetworkPrimitives = EthNetworkPrimitives> {
/// The receiver channel for the response to a receipts request.
response: oneshot::Receiver<RequestResult<Receipts69<N::Receipt>>>,
},
/// Represents a response to a request for receipts using eth/70.
Receipts70 {
/// The receiver channel for the response to a receipts request.
response: oneshot::Receiver<RequestResult<Receipts70<N::Receipt>>>,
},
}
// === impl PeerResponse ===
@@ -151,6 +156,10 @@ impl<N: NetworkPrimitives> PeerResponse<N> {
Self::Receipts69 { response } => {
poll_request!(response, Receipts69, cx)
}
Self::Receipts70 { response } => match ready!(response.poll_unpin(cx)) {
Ok(res) => PeerResponseResult::Receipts70(res),
Err(err) => PeerResponseResult::Receipts70(Err(err.into())),
},
};
Poll::Ready(res)
}
@@ -171,6 +180,8 @@ pub enum PeerResponseResult<N: NetworkPrimitives = EthNetworkPrimitives> {
Receipts(RequestResult<Vec<Vec<ReceiptWithBloom<N::Receipt>>>>),
/// Represents a result containing receipts or an error for eth/69.
Receipts69(RequestResult<Vec<Vec<N::Receipt>>>),
/// Represents a result containing receipts or an error for eth/70.
Receipts70(RequestResult<Receipts70<N::Receipt>>),
}
// === impl PeerResponseResult ===
@@ -208,6 +219,13 @@ impl<N: NetworkPrimitives> PeerResponseResult<N> {
Self::Receipts69(resp) => {
to_message!(resp, Receipts69, id)
}
Self::Receipts70(resp) => match resp {
Ok(res) => {
let request = RequestPair { request_id: id, message: res };
Ok(EthMessage::Receipts70(request))
}
Err(err) => Err(err),
},
}
}
@@ -220,6 +238,7 @@ impl<N: NetworkPrimitives> PeerResponseResult<N> {
Self::NodeData(res) => res.as_ref().err(),
Self::Receipts(res) => res.as_ref().err(),
Self::Receipts69(res) => res.as_ref().err(),
Self::Receipts70(res) => res.as_ref().err(),
}
}

View File

@@ -237,7 +237,9 @@ impl<N: NetworkPrimitives> PeersInfo for NetworkHandle<N> {
discv4.node_record()
} else if let Some(discv5) = self.inner.discv5.as_ref() {
// for disv5 we must check if we have an external ip configured
if let Some(external) = self.inner.nat.and_then(|nat| nat.as_external_ip()) {
if let Some(external) =
self.inner.nat.clone().and_then(|nat| nat.as_external_ip(discv5.local_port()))
{
NodeRecord::new((external, discv5.local_port()).into(), *self.peer_id())
} else {
// use the node record that discv5 tracks or use localhost
@@ -252,9 +254,11 @@ impl<N: NetworkPrimitives> PeersInfo for NetworkHandle<N> {
// also use the tcp port
.with_tcp_port(self.inner.listener_address.lock().port())
} else {
let external_ip = self.inner.nat.and_then(|nat| nat.as_external_ip());
let mut socket_addr = *self.inner.listener_address.lock();
let external_ip =
self.inner.nat.clone().and_then(|nat| nat.as_external_ip(socket_addr.port()));
if let Some(ip) = external_ip {
// if able to resolve external ip, use it instead and also set the local address
socket_addr.set_ip(ip)

View File

@@ -25,10 +25,10 @@ use futures::{stream::Fuse, SinkExt, StreamExt};
use metrics::Gauge;
use reth_eth_wire::{
errors::{EthHandshakeError, EthStreamError},
message::{EthBroadcastMessage, MessageError, RequestPair},
message::{EthBroadcastMessage, MessageError},
Capabilities, DisconnectP2P, DisconnectReason, EthMessage, NetworkPrimitives, NewBlockPayload,
};
use reth_eth_wire_types::RawCapabilityMessage;
use reth_eth_wire_types::{message::RequestPair, RawCapabilityMessage};
use reth_metrics::common::mpsc::MeteredPollSender;
use reth_network_api::PeerRequest;
use reth_network_p2p::error::RequestError;
@@ -270,12 +270,18 @@ impl<N: NetworkPrimitives> ActiveSession<N> {
on_request!(req, Receipts, GetReceipts)
}
}
EthMessage::GetReceipts70(req) => {
on_request!(req, Receipts70, GetReceipts70)
}
EthMessage::Receipts(resp) => {
on_response!(resp, GetReceipts)
}
EthMessage::Receipts69(resp) => {
on_response!(resp, GetReceipts69)
}
EthMessage::Receipts70(resp) => {
on_response!(resp, GetReceipts70)
}
EthMessage::BlockRangeUpdate(msg) => {
// Validate that earliest <= latest according to the spec
if msg.earliest > msg.latest {
@@ -311,9 +317,9 @@ impl<N: NetworkPrimitives> ActiveSession<N> {
/// Handle an internal peer request that will be sent to the remote.
fn on_internal_peer_request(&mut self, request: PeerRequest<N>, deadline: Instant) {
let request_id = self.next_id();
trace!(?request, peer_id=?self.remote_peer_id, ?request_id, "sending request to peer");
let msg = request.create_request_message(request_id);
let mut msg = request.create_request_message(request_id).map_versioned(self.conn.version());
self.queued_outgoing.push_back(msg.into());
let req = InflightRequest {
request: RequestState::Waiting(request),
@@ -368,6 +374,7 @@ impl<N: NetworkPrimitives> ActiveSession<N> {
fn handle_outgoing_response(&mut self, id: u64, resp: PeerResponseResult<N>) {
match resp.try_into_message(id) {
Ok(msg) => {
let msg: EthMessage<N> = msg;
self.queued_outgoing.push_back(msg.into());
}
Err(err) => {

View File

@@ -1,6 +1,7 @@
//! Transactions management for the p2p network.
use alloy_consensus::transaction::TxHashRef;
use rayon::iter::{IntoParallelIterator, ParallelIterator};
/// Aggregation on configurable parameters for [`TransactionsManager`].
pub mod config;
@@ -1368,51 +1369,49 @@ where
// tracks the quality of the given transactions
let mut has_bad_transactions = false;
// 2. filter out transactions that are invalid or already pending import pre-size to avoid
// reallocations
let mut new_txs = Vec::with_capacity(transactions.len());
for tx in transactions {
match self.transactions_by_peers.entry(*tx.tx_hash()) {
Entry::Occupied(mut entry) => {
// transaction was already inserted
entry.get_mut().insert(peer_id);
}
Entry::Vacant(entry) => {
if self.bad_imports.contains(tx.tx_hash()) {
trace!(target: "net::tx",
peer_id=format!("{peer_id:#}"),
hash=%tx.tx_hash(),
client_version=%peer.client_version,
"received a known bad transaction from peer"
);
has_bad_transactions = true;
} else {
// this is a new transaction that should be imported into the pool
// recover transaction
let tx = match tx.try_into_recovered() {
Ok(tx) => tx,
Err(badtx) => {
trace!(target: "net::tx",
peer_id=format!("{peer_id:#}"),
hash=%badtx.tx_hash(),
client_version=%peer.client_version,
"failed ecrecovery for transaction"
);
has_bad_transactions = true;
continue
}
};
let pool_transaction = Pool::Transaction::from_pooled(tx);
new_txs.push(pool_transaction);
entry.insert(HashSet::from([peer_id]));
}
}
// Remove known and invalid transactions
transactions.retain(|tx| {
if let Entry::Occupied(mut entry) = self.transactions_by_peers.entry(*tx.tx_hash()) {
entry.get_mut().insert(peer_id);
return false
}
if self.bad_imports.contains(tx.tx_hash()) {
trace!(target: "net::tx",
peer_id=format!("{peer_id:#}"),
hash=%tx.tx_hash(),
client_version=%peer.client_version,
"received a known bad transaction from peer"
);
has_bad_transactions = true;
return false;
}
true
});
let txs_len = transactions.len();
let new_txs = transactions
.into_par_iter()
.filter_map(|tx| match tx.try_into_recovered() {
Ok(tx) => Some(Pool::Transaction::from_pooled(tx)),
Err(badtx) => {
trace!(target: "net::tx",
peer_id=format!("{peer_id:#}"),
hash=%badtx.tx_hash(),
client_version=%peer.client_version,
"failed ecrecovery for transaction"
);
None
}
})
.collect::<Vec<_>>();
has_bad_transactions |= new_txs.len() != txs_len;
// Record the transactions as seen by the peer
for tx in &new_txs {
self.transactions_by_peers.insert(*tx.hash(), HashSet::from([peer_id]));
}
new_txs.shrink_to_fit();
// 3. import new transactions as a batch to minimize lock contention on the underlying
// pool
@@ -1925,7 +1924,9 @@ impl PooledTransactionsHashesBuilder {
fn new(version: EthVersion) -> Self {
match version {
EthVersion::Eth66 | EthVersion::Eth67 => Self::Eth66(Default::default()),
EthVersion::Eth68 | EthVersion::Eth69 => Self::Eth68(Default::default()),
EthVersion::Eth68 | EthVersion::Eth69 | EthVersion::Eth70 => {
Self::Eth68(Default::default())
}
}
}

View File

@@ -65,7 +65,7 @@ use reth_node_metrics::{
version::VersionInfo,
};
use reth_provider::{
providers::{NodeTypesForProvider, ProviderNodeTypes, StaticFileProvider},
providers::{NodeTypesForProvider, ProviderNodeTypes, RocksDBProvider, StaticFileProvider},
BlockHashReader, BlockNumReader, ProviderError, ProviderFactory, ProviderResult,
StageCheckpointReader, StaticFileProviderBuilder, StaticFileProviderFactory,
};
@@ -485,9 +485,19 @@ where
.with_blocks_per_file_for_segments(static_files_config.as_blocks_per_file_map())
.build()?;
let factory =
ProviderFactory::new(self.right().clone(), self.chain_spec(), static_file_provider)?
.with_prune_modes(self.prune_modes());
// Initialize RocksDB provider with metrics and statistics enabled
let rocksdb_provider = RocksDBProvider::builder(self.data_dir().rocksdb())
.with_metrics()
.with_statistics()
.build()?;
let factory = ProviderFactory::new(
self.right().clone(),
self.chain_spec(),
static_file_provider,
rocksdb_provider,
)?
.with_prune_modes(self.prune_modes());
// Check for consistency between database and static files. If it fails, it unwinds to
// the first block that's consistent between database and static files.

View File

@@ -27,6 +27,10 @@ pub struct DatadirArgs {
verbatim_doc_comment
)]
pub static_files_path: Option<PathBuf>,
/// The absolute path to store `RocksDB` database in.
#[arg(long = "datadir.rocksdb", value_name = "PATH", verbatim_doc_comment)]
pub rocksdb_path: Option<PathBuf>,
}
impl DatadirArgs {

View File

@@ -337,7 +337,7 @@ impl NetworkArgs {
// Configure basic network stack
NetworkConfigBuilder::<N>::new(secret_key)
.external_ip_resolver(self.nat)
.external_ip_resolver(self.nat.clone())
.sessions_config(
SessionsConfig::default().with_upscaled_event_buffer(peers_config.max_peers()),
)
@@ -399,7 +399,7 @@ impl NetworkArgs {
}
/// Configures the [`NatResolver`]
pub const fn with_nat_resolver(mut self, nat: NatResolver) -> Self {
pub fn with_nat_resolver(mut self, nat: NatResolver) -> Self {
self.nat = nat;
self
}
@@ -782,10 +782,11 @@ mod tests {
let tests = vec![0, 10];
for retries in tests {
let retries_str = retries.to_string();
let args = CommandParser::<NetworkArgs>::parse_from([
"reth",
"--dns-retries",
retries.to_string().as_str(),
retries_str.as_str(),
])
.args;

View File

@@ -301,6 +301,18 @@ impl<D> ChainPath<D> {
}
}
/// Returns the path to the `RocksDB` database directory for this chain.
///
/// `<DIR>/<CHAIN_ID>/rocksdb`
pub fn rocksdb(&self) -> PathBuf {
let datadir_args = &self.2;
if let Some(rocksdb_path) = &datadir_args.rocksdb_path {
rocksdb_path.clone()
} else {
self.data_dir().join("rocksdb")
}
}
/// Returns the path to the reth p2p secret key for this chain.
///
/// `<DIR>/<CHAIN_ID>/discovery-secret`

View File

@@ -98,7 +98,9 @@ where
runner.run_blocking_until_ctrl_c(command.execute::<OpNode>())
}
Commands::DumpGenesis(command) => runner.run_blocking_until_ctrl_c(command.execute()),
Commands::Db(command) => runner.run_blocking_until_ctrl_c(command.execute::<OpNode>()),
Commands::Db(command) => {
runner.run_blocking_command_until_exit(|ctx| command.execute::<OpNode>(ctx))
}
Commands::Stage(command) => {
runner.run_command_until_exit(|ctx| command.execute::<OpNode, _>(ctx, components))
}

View File

@@ -13,32 +13,38 @@ extern crate alloc;
use alloc::sync::Arc;
use alloy_consensus::{BlockHeader, Header};
use alloy_eips::Decodable2718;
use alloy_evm::{EvmFactory, FromRecoveredTx, FromTxWithEncoded};
use alloy_op_evm::block::{receipt_builder::OpReceiptBuilder, OpTxEnv};
use alloy_primitives::{Bytes, U256};
use core::fmt::Debug;
use op_alloy_consensus::EIP1559ParamError;
use op_alloy_rpc_types_engine::OpExecutionData;
use op_revm::{OpSpecId, OpTransaction};
use reth_chainspec::EthChainSpec;
use reth_evm::{
eth::NextEvmEnvAttributes, precompiles::PrecompilesMap, ConfigureEngineEvm, ConfigureEvm,
EvmEnv, EvmEnvFor, ExecutableTxIterator, ExecutionCtxFor, TransactionEnv,
eth::NextEvmEnvAttributes, precompiles::PrecompilesMap, ConfigureEvm, EvmEnv, TransactionEnv,
};
use reth_optimism_chainspec::OpChainSpec;
use reth_optimism_forks::OpHardforks;
use reth_optimism_primitives::{DepositReceipt, OpPrimitives};
use reth_primitives_traits::{
NodePrimitives, SealedBlock, SealedHeader, SignedTransaction, TxTy, WithEncoded,
};
use reth_storage_errors::any::AnyError;
use revm::{
context::{BlockEnv, CfgEnv, TxEnv},
context_interface::block::BlobExcessGasAndPrice,
primitives::hardfork::SpecId,
use reth_primitives_traits::{NodePrimitives, SealedBlock, SealedHeader, SignedTransaction};
use revm::context::{BlockEnv, TxEnv};
#[allow(unused_imports)]
use {
alloy_eips::Decodable2718,
alloy_primitives::{Bytes, U256},
op_alloy_rpc_types_engine::OpExecutionData,
reth_evm::{EvmEnvFor, ExecutionCtxFor},
reth_primitives_traits::{TxTy, WithEncoded},
reth_storage_errors::any::AnyError,
revm::{
context::CfgEnv, context_interface::block::BlobExcessGasAndPrice,
primitives::hardfork::SpecId,
},
};
#[cfg(feature = "std")]
use reth_evm::{ConfigureEngineEvm, ExecutableTxIterator};
mod config;
pub use config::{revm_spec, revm_spec_by_timestamp_after_bedrock, OpNextBlockEnvAttributes};
mod execute;
@@ -200,6 +206,7 @@ where
}
}
#[cfg(feature = "std")]
impl<ChainSpec, N, R> ConfigureEngineEvm<OpExecutionData> for OpEvmConfig<ChainSpec, N, R>
where
ChainSpec: EthChainSpec<Header = Header> + OpHardforks,
@@ -265,7 +272,7 @@ where
&self,
payload: &OpExecutionData,
) -> Result<impl ExecutableTxIterator<Self>, Self::Error> {
let transactions = payload.payload.transactions().clone().into_iter();
let transactions = payload.payload.transactions().clone();
let convert = |encoded: Bytes| {
let tx = TxTy::<Self::Primitives>::decode_2718_exact(encoded.as_ref())
.map_err(AnyError::new)?;

View File

@@ -34,7 +34,7 @@ mod cache;
mod test_utils;
mod ws;
pub use ws::{WsConnect, WsFlashBlockStream};
pub use ws::{FlashBlockDecoder, WsConnect, WsFlashBlockStream};
/// Receiver of the most recent [`PendingFlashBlock`] built out of [`FlashBlock`]s.
///

View File

@@ -1,6 +1,6 @@
pub use stream::{WsConnect, WsFlashBlockStream};
mod decoding;
pub(crate) use decoding::FlashBlockDecoder;
pub use decoding::FlashBlockDecoder;
mod stream;

View File

@@ -34,7 +34,7 @@ reth-rpc-api.workspace = true
# op-reth
reth-optimism-payload-builder.workspace = true
reth-optimism-evm = { workspace = true, features = ["rpc"] }
reth-optimism-evm = { workspace = true, features = ["std", "rpc"] }
reth-optimism-rpc.workspace = true
reth-optimism-storage.workspace = true
reth-optimism-txpool.workspace = true

View File

@@ -218,13 +218,14 @@ impl OpNode {
/// use reth_db::open_db_read_only;
/// use reth_optimism_chainspec::OpChainSpecBuilder;
/// use reth_optimism_node::OpNode;
/// use reth_provider::providers::StaticFileProvider;
/// use reth_provider::providers::{RocksDBProvider, StaticFileProvider};
/// use std::sync::Arc;
///
/// let factory = OpNode::provider_factory_builder()
/// .db(Arc::new(open_db_read_only("db", Default::default()).unwrap()))
/// .chainspec(OpChainSpecBuilder::base_mainnet().build().into())
/// .static_file(StaticFileProvider::read_only("db/static_files", false).unwrap())
/// .rocksdb_provider(RocksDBProvider::builder("db/rocksdb").build().unwrap())
/// .build_provider_factory();
/// ```
pub fn provider_factory_builder() -> ProviderFactoryBuilder<Self> {

View File

@@ -2342,7 +2342,7 @@ mod tests {
$(
let val: RethRpcModule = $s.parse().unwrap();
assert_eq!(val, $v);
assert_eq!(val.to_string().as_str(), $s);
assert_eq!(val.to_string(), $s);
)*
};
}

View File

@@ -463,7 +463,7 @@ where
/// Returns the most recent version of the payload that is available in the corresponding
/// payload build process at the time of receiving this call.
///
/// See also <https://github.com/ethereum/execution-apis/blob/7907424db935b93c2fe6a3c0faab943adebe8557/src/engine/prague.md#engine_newpayloadv4>
/// See also <https://github.com/ethereum/execution-apis/blob/7907424db935b93c2fe6a3c0faab943adebe8557/src/engine/prague.md#engine_getpayloadv4>
///
/// Note:
/// > Provider software MAY stop the corresponding build process after serving this call.
@@ -933,7 +933,7 @@ where
Ok(self.fork_choice_updated_v2_metered(fork_choice_state, payload_attributes).await?)
}
/// Handler for `engine_forkchoiceUpdatedV2`
/// Handler for `engine_forkchoiceUpdatedV3`
///
/// See also <https://github.com/ethereum/execution-apis/blob/main/src/engine/cancun.md#engine_forkchoiceupdatedv3>
async fn fork_choice_updated_v3(

View File

@@ -74,7 +74,11 @@ pub trait EthBlocks: LoadBlock<RpcConvert: RpcConvert<Primitives = Self::Primiti
block_id: BlockId,
) -> impl Future<Output = Result<Option<usize>, Self::Error>> + Send {
async move {
// If no pending block from provider, build the pending block locally.
if block_id.is_pending() {
if let Some(pending) = self.local_pending_block().await? {
return Ok(Some(pending.block.body().transaction_count()));
}
// Pending block can be fetched directly without need for caching
return Ok(self
.provider()

View File

@@ -71,7 +71,7 @@ pub trait TraceApiExt {
/// Returns a new stream that yields the traces the opcodes for the given blocks.
///
/// See also [`StreamExt::buffered`].
/// See also [`StreamExt::buffer_unordered`].
fn trace_block_opcode_gas_unordered<I, B>(
&self,
params: I,
@@ -301,7 +301,7 @@ impl<T: TraceApiClient<TransactionRequest> + Sync> TraceApiExt for T {
Err(err) => Err((err, block)),
}
}))
.buffered(n);
.buffer_unordered(n);
TraceBlockOpcodeGasStream { stream: Box::pin(stream) }
}

View File

@@ -324,11 +324,13 @@ impl<Provider, S: Stage<Provider> + ?Sized> StageExt<Provider> for S {}
#[cfg(test)]
mod tests {
use reth_chainspec::MAINNET;
use reth_db::test_utils::{create_test_rw_db, create_test_static_files_dir};
use reth_db::test_utils::{
create_test_rocksdb_dir, create_test_rw_db, create_test_static_files_dir,
};
use reth_db_api::{models::StoredBlockBodyIndices, tables, transaction::DbTxMut};
use reth_provider::{
test_utils::MockNodeTypesWithDB, ProviderFactory, StaticFileProviderBuilder,
StaticFileProviderFactory, StaticFileSegment,
providers::RocksDBProvider, test_utils::MockNodeTypesWithDB, ProviderFactory,
StaticFileProviderBuilder, StaticFileProviderFactory, StaticFileSegment,
};
use reth_stages_types::StageCheckpoint;
use reth_testing_utils::generators::{self, random_signed_tx};
@@ -346,6 +348,7 @@ mod tests {
.with_blocks_per_file(1)
.build()
.unwrap(),
RocksDBProvider::builder(create_test_rocksdb_dir().0.keep()).build().unwrap(),
)
.unwrap();

View File

@@ -1,7 +1,10 @@
use alloy_primitives::{keccak256, Address, BlockNumber, TxHash, TxNumber, B256};
use reth_chainspec::MAINNET;
use reth_db::{
test_utils::{create_test_rw_db, create_test_rw_db_with_path, create_test_static_files_dir},
test_utils::{
create_test_rocksdb_dir, create_test_rw_db, create_test_rw_db_with_path,
create_test_static_files_dir,
},
DatabaseEnv,
};
use reth_db_api::{
@@ -17,7 +20,9 @@ use reth_db_api::{
use reth_ethereum_primitives::{Block, EthPrimitives, Receipt};
use reth_primitives_traits::{Account, SealedBlock, SealedHeader, StorageEntry};
use reth_provider::{
providers::{StaticFileProvider, StaticFileProviderRWRefMut, StaticFileWriter},
providers::{
RocksDBProvider, StaticFileProvider, StaticFileProviderRWRefMut, StaticFileWriter,
},
test_utils::MockNodeTypesWithDB,
HistoryWriter, ProviderError, ProviderFactory, StaticFileProviderFactory, StatsReader,
};
@@ -38,12 +43,14 @@ impl Default for TestStageDB {
/// Create a new instance of [`TestStageDB`]
fn default() -> Self {
let (static_dir, static_dir_path) = create_test_static_files_dir();
let (_, rocksdb_dir_path) = create_test_rocksdb_dir();
Self {
temp_static_files_dir: static_dir,
factory: ProviderFactory::new(
create_test_rw_db(),
MAINNET.clone(),
StaticFileProvider::read_write(static_dir_path).unwrap(),
RocksDBProvider::builder(rocksdb_dir_path).build().unwrap(),
)
.expect("failed to create test provider factory"),
}
@@ -53,6 +60,7 @@ impl Default for TestStageDB {
impl TestStageDB {
pub fn new(path: &Path) -> Self {
let (static_dir, static_dir_path) = create_test_static_files_dir();
let (_, rocksdb_dir_path) = create_test_rocksdb_dir();
Self {
temp_static_files_dir: static_dir,
@@ -60,6 +68,7 @@ impl TestStageDB {
create_test_rw_db_with_path(path),
MAINNET.clone(),
StaticFileProvider::read_write(static_dir_path).unwrap(),
RocksDBProvider::builder(rocksdb_dir_path).build().unwrap(),
)
.expect("failed to create test provider factory"),
}

View File

@@ -711,7 +711,7 @@ mod tests {
};
use reth_provider::{
test_utils::{create_test_provider_factory_with_chain_spec, MockNodeTypesWithDB},
ProviderFactory,
ProviderFactory, RocksDBProviderFactory,
};
use std::{collections::BTreeMap, sync::Arc};
@@ -756,6 +756,7 @@ mod tests {
fn fail_init_inconsistent_db() {
let factory = create_test_provider_factory_with_chain_spec(SEPOLIA.clone());
let static_file_provider = factory.static_file_provider();
let rocksdb_provider = factory.rocksdb_provider();
init_genesis(&factory).unwrap();
// Try to init db with a different genesis block
@@ -764,6 +765,7 @@ mod tests {
factory.into_db(),
MAINNET.clone(),
static_file_provider,
rocksdb_provider,
)
.unwrap(),
);

View File

@@ -159,6 +159,14 @@ pub mod test_utils {
(temp_dir, path)
}
/// Create `rocksdb` path for testing
#[track_caller]
pub fn create_test_rocksdb_dir() -> (TempDir, PathBuf) {
let temp_dir = TempDir::with_prefix("reth-test-rocksdb-").expect(ERROR_TEMPDIR);
let path = temp_dir.path().to_path_buf();
(temp_dir, path)
}
/// Get a temporary directory path to use for the database
pub fn tempdir_path() -> PathBuf {
let builder = tempfile::Builder::new().prefix("reth-test-").rand_bytes(8).tempdir();

View File

@@ -4,7 +4,7 @@
use std::{marker::PhantomData, ops::Range};
#[cfg(all(unix, feature = "rocksdb"))]
use crate::providers::rocksdb::RocksDBWriteMode;
use crate::providers::rocksdb::RocksDBBatch;
use crate::{
providers::{StaticFileProvider, StaticFileProviderRWRefMut},
StaticFileProviderFactory,
@@ -37,11 +37,11 @@ type EitherWriterTy<'a, P, T> = EitherWriter<
>;
// Helper types so constructors stay exported even when RocksDB feature is off.
// RocksDBWriteMode encapsulates the choice between Transaction and Batch writes.
// Historical data tables use a write-only RocksDB batch (no read-your-writes needed).
#[cfg(all(unix, feature = "rocksdb"))]
type RocksWriteModeArg<'a> = crate::providers::rocksdb::RocksDBWriteMode<'a>;
type RocksBatchArg<'a> = crate::providers::rocksdb::RocksDBBatch<'a>;
#[cfg(not(all(unix, feature = "rocksdb")))]
type RocksWriteModeArg<'a> = ();
type RocksBatchArg<'a> = ();
#[cfg(all(unix, feature = "rocksdb"))]
type RocksTxRefArg<'a> = &'a crate::providers::rocksdb::RocksTx<'a>;
@@ -55,12 +55,9 @@ pub enum EitherWriter<'a, CURSOR, N> {
Database(CURSOR),
/// Write to static file
StaticFile(StaticFileProviderRWRefMut<'a, N>),
/// Write to `RocksDB` (transaction or batch - internal detail).
///
/// Uses [`RocksDBWriteMode`] to encapsulate the choice between full transaction
/// semantics and high-throughput batch writes.
/// Write to `RocksDB` using a write-only batch (historical tables).
#[cfg(all(unix, feature = "rocksdb"))]
RocksDB(RocksDBWriteMode<'a>),
RocksDB(RocksDBBatch<'a>),
}
impl<'a> EitherWriter<'a, (), ()> {
@@ -131,11 +128,9 @@ impl<'a> EitherWriter<'a, (), ()> {
}
/// Creates a new [`EitherWriter`] for storages history based on storage settings.
///
/// Accepts either Transaction or Batch mode via [`RocksDBWriteMode`].
pub fn new_storages_history<P>(
provider: &P,
_rocksdb_mode: RocksWriteModeArg<'a>,
_rocksdb_batch: RocksBatchArg<'a>,
) -> ProviderResult<EitherWriterTy<'a, P, tables::StoragesHistory>>
where
P: DBProvider + NodePrimitivesProvider + StorageSettingsCache,
@@ -143,37 +138,16 @@ impl<'a> EitherWriter<'a, (), ()> {
{
#[cfg(all(unix, feature = "rocksdb"))]
if provider.cached_storage_settings().storages_history_in_rocksdb {
return Ok(EitherWriter::RocksDB(_rocksdb_mode));
return Ok(EitherWriter::RocksDB(_rocksdb_batch));
}
Ok(EitherWriter::Database(provider.tx_ref().cursor_write::<tables::StoragesHistory>()?))
}
/// Creates a new [`EitherWriter`] for accounts history based on storage settings.
///
/// Accepts either Transaction or Batch mode via [`RocksDBWriteMode`].
pub fn new_accounts_history<P>(
provider: &P,
_rocksdb_mode: RocksWriteModeArg<'a>,
) -> ProviderResult<EitherWriterTy<'a, P, tables::AccountsHistory>>
where
P: DBProvider + NodePrimitivesProvider + StorageSettingsCache,
P::Tx: DbTxMut,
{
#[cfg(all(unix, feature = "rocksdb"))]
if provider.cached_storage_settings().account_history_in_rocksdb {
return Ok(EitherWriter::RocksDB(_rocksdb_mode));
}
Ok(EitherWriter::Database(provider.tx_ref().cursor_write::<tables::AccountsHistory>()?))
}
/// Creates a new [`EitherWriter`] for transaction hash numbers based on storage settings.
///
/// Accepts either Transaction or Batch mode via [`RocksDBWriteMode`].
pub fn new_transaction_hash_numbers<P>(
provider: &P,
_rocksdb_mode: RocksWriteModeArg<'a>,
_rocksdb_batch: RocksBatchArg<'a>,
) -> ProviderResult<EitherWriterTy<'a, P, tables::TransactionHashNumbers>>
where
P: DBProvider + NodePrimitivesProvider + StorageSettingsCache,
@@ -181,13 +155,30 @@ impl<'a> EitherWriter<'a, (), ()> {
{
#[cfg(all(unix, feature = "rocksdb"))]
if provider.cached_storage_settings().transaction_hash_numbers_in_rocksdb {
return Ok(EitherWriter::RocksDB(_rocksdb_mode));
return Ok(EitherWriter::RocksDB(_rocksdb_batch));
}
Ok(EitherWriter::Database(
provider.tx_ref().cursor_write::<tables::TransactionHashNumbers>()?,
))
}
/// Creates a new [`EitherWriter`] for account history based on storage settings.
pub fn new_accounts_history<P>(
provider: &P,
_rocksdb_batch: RocksBatchArg<'a>,
) -> ProviderResult<EitherWriterTy<'a, P, tables::AccountsHistory>>
where
P: DBProvider + NodePrimitivesProvider + StorageSettingsCache,
P::Tx: DbTxMut,
{
#[cfg(all(unix, feature = "rocksdb"))]
if provider.cached_storage_settings().account_history_in_rocksdb {
return Ok(EitherWriter::RocksDB(_rocksdb_batch));
}
Ok(EitherWriter::Database(provider.tx_ref().cursor_write::<tables::AccountsHistory>()?))
}
}
impl<'a, CURSOR, N: NodePrimitives> EitherWriter<'a, CURSOR, N> {
@@ -217,24 +208,6 @@ impl<'a, CURSOR, N: NodePrimitives> EitherWriter<'a, CURSOR, N> {
Self::RocksDB(_) => Err(ProviderError::UnsupportedProvider),
}
}
/// Commits `RocksDB` writes if this is a `RocksDB` writer.
///
/// For [`Self::Database`] and [`Self::StaticFile`], this is a no-op as they use
/// different commit patterns (MDBX transaction commit, static file sync).
///
/// # Commit Order
///
/// Call this AFTER the outer MDBX transaction commits successfully. This ensures
/// that if `RocksDB` commit fails, the primary data (MDBX) is still intact and
/// the `RocksDB` data (which is derived) can be rebuilt.
#[cfg(all(unix, feature = "rocksdb"))]
pub fn commit(self) -> ProviderResult<()> {
match self {
Self::Database(_) | Self::StaticFile(_) => Ok(()),
Self::RocksDB(mode) => mode.commit(),
}
}
}
impl<'a, CURSOR, N: NodePrimitives> EitherWriter<'a, CURSOR, N>
@@ -391,6 +364,26 @@ impl<'a> EitherReader<'a, (), ()> {
PhantomData,
))
}
/// Creates a new [`EitherReader`] for account history based on storage settings.
pub fn new_accounts_history<P>(
provider: &P,
_rocksdb_tx: RocksTxRefArg<'a>,
) -> ProviderResult<EitherReaderTy<'a, P, tables::AccountsHistory>>
where
P: DBProvider + NodePrimitivesProvider + StorageSettingsCache,
P::Tx: DbTx,
{
#[cfg(all(unix, feature = "rocksdb"))]
if provider.cached_storage_settings().account_history_in_rocksdb {
return Ok(EitherReader::RocksDB(_rocksdb_tx));
}
Ok(EitherReader::Database(
provider.tx_ref().cursor_read::<tables::AccountsHistory>()?,
PhantomData,
))
}
}
impl<CURSOR, N: NodePrimitives> EitherReader<'_, CURSOR, N>

View File

@@ -1,14 +1,15 @@
use crate::{
providers::{
ConsistentProvider, ProviderNodeTypes, StaticFileProvider, StaticFileProviderRWRefMut,
ConsistentProvider, ProviderNodeTypes, RocksDBProvider, StaticFileProvider,
StaticFileProviderRWRefMut,
},
AccountReader, BlockHashReader, BlockIdReader, BlockNumReader, BlockReader, BlockReaderIdExt,
BlockSource, CanonChainTracker, CanonStateNotifications, CanonStateSubscriptions,
ChainSpecProvider, ChainStateBlockReader, ChangeSetReader, DatabaseProviderFactory,
HashedPostStateProvider, HeaderProvider, ProviderError, ProviderFactory, PruneCheckpointReader,
ReceiptProvider, ReceiptProviderIdExt, StageCheckpointReader, StateProviderBox,
StateProviderFactory, StateReader, StaticFileProviderFactory, TransactionVariant,
TransactionsProvider, TrieReader,
ReceiptProvider, ReceiptProviderIdExt, RocksDBProviderFactory, StageCheckpointReader,
StateProviderBox, StateProviderFactory, StateReader, StaticFileProviderFactory,
TransactionVariant, TransactionsProvider, TrieReader,
};
use alloy_consensus::transaction::TransactionMeta;
use alloy_eips::{BlockHashOrNumber, BlockId, BlockNumHash, BlockNumberOrTag};
@@ -176,6 +177,12 @@ impl<N: ProviderNodeTypes> StaticFileProviderFactory for BlockchainProvider<N> {
}
}
impl<N: ProviderNodeTypes> RocksDBProviderFactory for BlockchainProvider<N> {
fn rocksdb_provider(&self) -> RocksDBProvider {
self.database.rocksdb_provider()
}
}
impl<N: ProviderNodeTypes> HeaderProvider for BlockchainProvider<N> {
type Header = HeaderTy<N>;

View File

@@ -4,7 +4,7 @@
//! up to the intended build target.
use crate::{
providers::{NodeTypesForProvider, StaticFileProvider},
providers::{NodeTypesForProvider, RocksDBProvider, StaticFileProvider},
ProviderFactory,
};
use reth_db::{
@@ -104,11 +104,12 @@ impl<N> ProviderFactoryBuilder<N> {
where
N: NodeTypesForProvider,
{
let ReadOnlyConfig { db_dir, db_args, static_files_dir, watch_static_files } =
let ReadOnlyConfig { db_dir, db_args, static_files_dir, rocksdb_dir, watch_static_files } =
config.into();
self.db(Arc::new(open_db_read_only(db_dir, db_args)?))
.chainspec(chainspec)
.static_file(StaticFileProvider::read_only(static_files_dir, watch_static_files)?)
.rocksdb_provider(RocksDBProvider::builder(&rocksdb_dir).build()?)
.build_provider_factory()
.map_err(Into::into)
}
@@ -120,7 +121,7 @@ impl<N> Default for ProviderFactoryBuilder<N> {
}
}
/// Settings for how to open the database and static files.
/// Settings for how to open the database, static files, and `RocksDB`.
///
/// The default derivation from a path assumes the path is the datadir:
/// [`ReadOnlyConfig::from_datadir`]
@@ -132,6 +133,8 @@ pub struct ReadOnlyConfig {
pub db_args: DatabaseArguments,
/// The path to the static file dir
pub static_files_dir: PathBuf,
/// The path to the `RocksDB` directory
pub rocksdb_dir: PathBuf,
/// Whether the static files should be watched for changes.
pub watch_static_files: bool,
}
@@ -144,6 +147,7 @@ impl ReadOnlyConfig {
/// ```text
/// -`datadir`
/// |__db
/// |__rocksdb
/// |__static_files
/// ```
///
@@ -151,7 +155,13 @@ impl ReadOnlyConfig {
/// [`StaticFileProvider::read_only`]
pub fn from_datadir(datadir: impl AsRef<Path>) -> Self {
let datadir = datadir.as_ref();
Self::from_dirs(datadir.join("db"), datadir.join("static_files"))
Self {
db_dir: datadir.join("db"),
db_args: Default::default(),
static_files_dir: datadir.join("static_files"),
rocksdb_dir: datadir.join("rocksdb"),
watch_static_files: true,
}
}
/// Disables long-lived read transaction safety guarantees.
@@ -169,7 +179,8 @@ impl ReadOnlyConfig {
///
/// ```text
/// - db
/// -static_files
/// - rocksdb
/// - static_files
/// ```
///
/// By default this watches the static file directory for changes, see also
@@ -180,13 +191,10 @@ impl ReadOnlyConfig {
/// If the path does not exist
pub fn from_db_dir(db_dir: impl AsRef<Path>) -> Self {
let db_dir = db_dir.as_ref();
let static_files_dir = std::fs::canonicalize(db_dir)
.unwrap()
.parent()
.unwrap()
.to_path_buf()
.join("static_files");
Self::from_dirs(db_dir, static_files_dir)
let datadir = std::fs::canonicalize(db_dir).unwrap().parent().unwrap().to_path_buf();
let static_files_dir = datadir.join("static_files");
let rocksdb_dir = datadir.join("rocksdb");
Self::from_dirs(db_dir, static_files_dir, rocksdb_dir)
}
/// Creates the config for the given paths.
@@ -194,11 +202,16 @@ impl ReadOnlyConfig {
///
/// By default this watches the static file directory for changes, see also
/// [`StaticFileProvider::read_only`]
pub fn from_dirs(db_dir: impl AsRef<Path>, static_files_dir: impl AsRef<Path>) -> Self {
pub fn from_dirs(
db_dir: impl AsRef<Path>,
static_files_dir: impl AsRef<Path>,
rocksdb_dir: impl AsRef<Path>,
) -> Self {
Self {
static_files_dir: static_files_dir.as_ref().into(),
db_dir: db_dir.as_ref().into(),
db_args: Default::default(),
static_files_dir: static_files_dir.as_ref().into(),
rocksdb_dir: rocksdb_dir.as_ref().into(),
watch_static_files: true,
}
}
@@ -317,7 +330,37 @@ impl<N, Val1, Val2, Val3> TypesAnd3<N, Val1, Val2, Val3> {
}
}
impl<N, DB> TypesAnd3<N, DB, Arc<N::ChainSpec>, StaticFileProvider<N::Primitives>>
impl<N, DB, C> TypesAnd3<N, DB, Arc<C>, StaticFileProvider<N::Primitives>>
where
N: NodeTypes,
{
/// Configures the `RocksDB` provider.
pub fn rocksdb_provider(
self,
rocksdb_provider: RocksDBProvider,
) -> TypesAnd4<N, DB, Arc<C>, StaticFileProvider<N::Primitives>, RocksDBProvider> {
TypesAnd4::new(self.val_1, self.val_2, self.val_3, rocksdb_provider)
}
}
/// This is staging type that contains the configured types and _four_ values.
#[derive(Debug)]
pub struct TypesAnd4<N, Val1, Val2, Val3, Val4> {
_types: PhantomData<N>,
val_1: Val1,
val_2: Val2,
val_3: Val3,
val_4: Val4,
}
impl<N, Val1, Val2, Val3, Val4> TypesAnd4<N, Val1, Val2, Val3, Val4> {
/// Creates a new instance with the given types and four values.
pub fn new(val_1: Val1, val_2: Val2, val_3: Val3, val_4: Val4) -> Self {
Self { _types: Default::default(), val_1, val_2, val_3, val_4 }
}
}
impl<N, DB> TypesAnd4<N, DB, Arc<N::ChainSpec>, StaticFileProvider<N::Primitives>, RocksDBProvider>
where
N: NodeTypesForProvider,
DB: Database + DatabaseMetrics + Clone + Unpin + 'static,
@@ -326,7 +369,7 @@ where
pub fn build_provider_factory(
self,
) -> ProviderResult<ProviderFactory<NodeTypesWithDBAdapter<N, DB>>> {
let Self { _types, val_1, val_2, val_3 } = self;
ProviderFactory::new(val_1, val_2, val_3)
let Self { _types, val_1, val_2, val_3, val_4 } = self;
ProviderFactory::new(val_1, val_2, val_3, val_4)
}
}

View File

@@ -1,15 +1,15 @@
use crate::{
providers::{
state::latest::LatestStateProvider, NodeTypesForProvider, StaticFileProvider,
StaticFileProviderRWRefMut,
state::latest::LatestStateProvider, NodeTypesForProvider, RocksDBProvider,
StaticFileProvider, StaticFileProviderRWRefMut,
},
to_range,
traits::{BlockSource, ReceiptProvider},
BlockHashReader, BlockNumReader, BlockReader, ChainSpecProvider, DatabaseProviderFactory,
EitherWriterDestination, HashedPostStateProvider, HeaderProvider, HeaderSyncGapProvider,
MetadataProvider, ProviderError, PruneCheckpointReader, StageCheckpointReader,
StateProviderBox, StaticFileProviderFactory, StaticFileWriter, TransactionVariant,
TransactionsProvider,
MetadataProvider, ProviderError, PruneCheckpointReader, RocksDBProviderFactory,
StageCheckpointReader, StateProviderBox, StaticFileProviderFactory, StaticFileWriter,
TransactionVariant, TransactionsProvider,
};
use alloy_consensus::transaction::TransactionMeta;
use alloy_eips::BlockHashOrNumber;
@@ -72,6 +72,8 @@ pub struct ProviderFactory<N: NodeTypesWithDB> {
storage: Arc<N::Storage>,
/// Storage configuration settings for this node
storage_settings: Arc<RwLock<StorageSettings>>,
/// `RocksDB` provider
rocksdb_provider: RocksDBProvider,
}
impl<N: NodeTypesForProvider> ProviderFactory<NodeTypesWithDBAdapter<N, Arc<DatabaseEnv>>> {
@@ -87,6 +89,7 @@ impl<N: ProviderNodeTypes> ProviderFactory<N> {
db: N::DB,
chain_spec: Arc<N::ChainSpec>,
static_file_provider: StaticFileProvider<N::Primitives>,
rocksdb_provider: RocksDBProvider,
) -> ProviderResult<Self> {
// Load storage settings from database at init time. Creates a temporary provider
// to read persisted settings, falling back to legacy defaults if none exist.
@@ -100,6 +103,7 @@ impl<N: ProviderNodeTypes> ProviderFactory<N> {
Default::default(),
Default::default(),
Arc::new(RwLock::new(legacy_settings)),
rocksdb_provider.clone(),
)
.storage_settings()?
.unwrap_or(legacy_settings);
@@ -111,6 +115,7 @@ impl<N: ProviderNodeTypes> ProviderFactory<N> {
prune_modes: PruneModes::default(),
storage: Default::default(),
storage_settings: Arc::new(RwLock::new(storage_settings)),
rocksdb_provider,
})
}
}
@@ -144,6 +149,12 @@ impl<N: NodeTypesWithDB> StorageSettingsCache for ProviderFactory<N> {
}
}
impl<N: NodeTypesWithDB> RocksDBProviderFactory for ProviderFactory<N> {
fn rocksdb_provider(&self) -> RocksDBProvider {
self.rocksdb_provider.clone()
}
}
impl<N: ProviderNodeTypes<DB = Arc<DatabaseEnv>>> ProviderFactory<N> {
/// Create new database provider by passing a path. [`ProviderFactory`] will own the database
/// instance.
@@ -152,11 +163,13 @@ impl<N: ProviderNodeTypes<DB = Arc<DatabaseEnv>>> ProviderFactory<N> {
chain_spec: Arc<N::ChainSpec>,
args: DatabaseArguments,
static_file_provider: StaticFileProvider<N::Primitives>,
rocksdb_provider: RocksDBProvider,
) -> RethResult<Self> {
Self::new(
Arc::new(init_db(path, args).map_err(RethError::msg)?),
chain_spec,
static_file_provider,
rocksdb_provider,
)
.map_err(RethError::Provider)
}
@@ -178,6 +191,7 @@ impl<N: ProviderNodeTypes> ProviderFactory<N> {
self.prune_modes.clone(),
self.storage.clone(),
self.storage_settings.clone(),
self.rocksdb_provider.clone(),
))
}
@@ -194,6 +208,7 @@ impl<N: ProviderNodeTypes> ProviderFactory<N> {
self.prune_modes.clone(),
self.storage.clone(),
self.storage_settings.clone(),
self.rocksdb_provider.clone(),
)))
}
@@ -595,8 +610,15 @@ where
N: NodeTypesWithDB<DB: fmt::Debug, ChainSpec: fmt::Debug, Storage: fmt::Debug>,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let Self { db, chain_spec, static_file_provider, prune_modes, storage, storage_settings } =
self;
let Self {
db,
chain_spec,
static_file_provider,
prune_modes,
storage,
storage_settings,
rocksdb_provider,
} = self;
f.debug_struct("ProviderFactory")
.field("db", &db)
.field("chain_spec", &chain_spec)
@@ -604,6 +626,7 @@ where
.field("prune_modes", &prune_modes)
.field("storage", &storage)
.field("storage_settings", &*storage_settings.read())
.field("rocksdb_provider", &rocksdb_provider)
.finish()
}
}
@@ -617,6 +640,7 @@ impl<N: NodeTypesWithDB> Clone for ProviderFactory<N> {
prune_modes: self.prune_modes.clone(),
storage: self.storage.clone(),
storage_settings: self.storage_settings.clone(),
rocksdb_provider: self.rocksdb_provider.clone(),
}
}
}
@@ -635,7 +659,7 @@ mod tests {
use reth_chainspec::ChainSpecBuilder;
use reth_db::{
mdbx::DatabaseArguments,
test_utils::{create_test_static_files_dir, ERROR_TEMPDIR},
test_utils::{create_test_rocksdb_dir, create_test_static_files_dir, ERROR_TEMPDIR},
};
use reth_db_api::tables;
use reth_primitives_traits::SignerRecoverable;
@@ -674,11 +698,13 @@ mod tests {
fn provider_factory_with_database_path() {
let chain_spec = ChainSpecBuilder::mainnet().build();
let (_static_dir, static_dir_path) = create_test_static_files_dir();
let (_, rocksdb_path) = create_test_rocksdb_dir();
let factory = ProviderFactory::<MockNodeTypesWithDB<DatabaseEnv>>::new_with_database_path(
tempfile::TempDir::new().expect(ERROR_TEMPDIR).keep(),
Arc::new(chain_spec),
DatabaseArguments::new(Default::default()),
StaticFileProvider::read_write(static_dir_path).unwrap(),
RocksDBProvider::builder(&rocksdb_path).build().unwrap(),
)
.unwrap();
let provider = factory.provider().unwrap();

View File

@@ -4,6 +4,7 @@ use crate::{
},
providers::{
database::{chain::ChainStorage, metrics},
rocksdb::RocksDBProvider,
static_file::StaticFileWriter,
NodeTypesForProvider, StaticFileProvider,
},
@@ -16,10 +17,10 @@ use crate::{
DBProvider, EitherReader, EitherWriter, EitherWriterDestination, HashingWriter, HeaderProvider,
HeaderSyncGapProvider, HistoricalStateProvider, HistoricalStateProviderRef, HistoryWriter,
LatestStateProvider, LatestStateProviderRef, OriginalValuesKnown, ProviderError,
PruneCheckpointReader, PruneCheckpointWriter, RevertsInit, StageCheckpointReader,
StateProviderBox, StateWriter, StaticFileProviderFactory, StatsReader, StorageReader,
StorageTrieWriter, TransactionVariant, TransactionsProvider, TransactionsProviderExt,
TrieReader, TrieWriter,
PruneCheckpointReader, PruneCheckpointWriter, RevertsInit, RocksDBProviderFactory,
StageCheckpointReader, StateProviderBox, StateWriter, StaticFileProviderFactory, StatsReader,
StorageReader, StorageTrieWriter, TransactionVariant, TransactionsProvider,
TransactionsProviderExt, TrieReader, TrieWriter,
};
use alloy_consensus::{
transaction::{SignerRecoverable, TransactionMeta, TxHashRef},
@@ -164,6 +165,8 @@ pub struct DatabaseProvider<TX, N: NodeTypes> {
storage: Arc<N::Storage>,
/// Storage configuration settings for this node
storage_settings: Arc<RwLock<StorageSettings>>,
/// `RocksDB` provider
rocksdb_provider: RocksDBProvider,
/// Minimum distance from tip required for pruning
minimum_pruning_distance: u64,
}
@@ -251,6 +254,13 @@ impl<TX, N: NodeTypes> StaticFileProviderFactory for DatabaseProvider<TX, N> {
}
}
impl<TX, N: NodeTypes> RocksDBProviderFactory for DatabaseProvider<TX, N> {
/// Returns the `RocksDB` provider.
fn rocksdb_provider(&self) -> RocksDBProvider {
self.rocksdb_provider.clone()
}
}
impl<TX: Debug + Send + Sync, N: NodeTypes<ChainSpec: EthChainSpec + 'static>> ChainSpecProvider
for DatabaseProvider<TX, N>
{
@@ -270,6 +280,7 @@ impl<TX: DbTxMut, N: NodeTypes> DatabaseProvider<TX, N> {
prune_modes: PruneModes,
storage: Arc<N::Storage>,
storage_settings: Arc<RwLock<StorageSettings>>,
rocksdb_provider: RocksDBProvider,
) -> Self {
Self {
tx,
@@ -278,6 +289,7 @@ impl<TX: DbTxMut, N: NodeTypes> DatabaseProvider<TX, N> {
prune_modes,
storage,
storage_settings,
rocksdb_provider,
minimum_pruning_distance: MINIMUM_PRUNING_DISTANCE,
}
}
@@ -523,6 +535,7 @@ impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
prune_modes: PruneModes,
storage: Arc<N::Storage>,
storage_settings: Arc<RwLock<StorageSettings>>,
rocksdb_provider: RocksDBProvider,
) -> Self {
Self {
tx,
@@ -531,6 +544,7 @@ impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
prune_modes,
storage,
storage_settings,
rocksdb_provider,
minimum_pruning_distance: MINIMUM_PRUNING_DISTANCE,
}
}

View File

@@ -31,10 +31,11 @@ pub use consistent::ConsistentProvider;
// RocksDB currently only supported on Unix platforms
// Windows support is planned for future releases
#[cfg(all(unix, feature = "rocksdb"))]
#[cfg_attr(all(unix, feature = "rocksdb"), path = "rocksdb/mod.rs")]
#[cfg_attr(not(all(unix, feature = "rocksdb")), path = "rocksdb_stub.rs")]
pub(crate) mod rocksdb;
#[cfg(all(unix, feature = "rocksdb"))]
pub use rocksdb::{RocksDBBuilder, RocksDBProvider, RocksDBWriteMode, RocksTx};
pub use rocksdb::{RocksDBBatch, RocksDBBuilder, RocksDBProvider, RocksTx};
/// Helper trait to bound [`NodeTypes`] so that combined with database they satisfy
/// [`ProviderNodeTypes`].

View File

@@ -2,7 +2,4 @@
mod metrics;
mod provider;
pub use provider::{RocksDBBuilder, RocksDBProvider, RocksDBWriteMode, RocksTx};
#[cfg(test)]
pub(crate) use provider::RocksDBBatch;
pub use provider::{RocksDBBatch, RocksDBBuilder, RocksDBProvider, RocksTx};

View File

@@ -18,7 +18,6 @@ use std::{
sync::Arc,
time::Instant,
};
use tracing::warn;
/// Default cache size for `RocksDB` block cache (128 MB).
const DEFAULT_CACHE_SIZE: usize = 128 << 20;
@@ -35,6 +34,11 @@ const DEFAULT_BYTES_PER_SYNC: u64 = 1_048_576;
/// Default bloom filter bits per key (~1% false positive rate).
const DEFAULT_BLOOM_FILTER_BITS: f64 = 10.0;
/// Default buffer capacity for compression in batches.
/// 4 KiB matches common block/page sizes and comfortably holds typical history values,
/// reducing the first few reallocations without over-allocating.
const DEFAULT_COMPRESS_BUF_CAPACITY: usize = 4096;
/// Builder for [`RocksDBProvider`].
pub struct RocksDBBuilder {
path: PathBuf,
@@ -152,8 +156,10 @@ impl RocksDBBuilder {
}
/// Sets the log level from `DatabaseArgs` configuration.
pub const fn with_database_log_level(mut self, log_level: LogLevel) -> Self {
self.log_level = convert_log_level(log_level);
pub const fn with_database_log_level(mut self, log_level: Option<LogLevel>) -> Self {
if let Some(level) = log_level {
self.log_level = convert_log_level(level);
}
self
}
@@ -260,23 +266,15 @@ impl RocksDBProvider {
RocksTx { inner, provider: self }
}
/// Creates a new batch for manual commit.
/// Creates a new batch for atomic writes.
///
/// Use [`Self::write_batch`] for closure-based atomic writes.
/// Use this method when the batch needs to be held by [`EitherWriter`](crate::EitherWriter).
///
/// # Example
///
/// ```ignore
/// let batch = provider.batch();
/// batch.put::<SomeTable>(key, &value)?;
/// batch.commit()?;
/// ```
/// Use this method when the batch needs to be held by [`crate::EitherWriter`].
pub fn batch(&self) -> RocksDBBatch<'_> {
RocksDBBatch {
provider: self,
inner: WriteBatchWithTransaction::<true>::default(),
buf: Vec::new(),
buf: Vec::with_capacity(DEFAULT_COMPRESS_BUF_CAPACITY),
}
}
@@ -375,31 +373,21 @@ impl RocksDBProvider {
where
F: FnOnce(&mut RocksDBBatch<'_>) -> ProviderResult<()>,
{
// Note: Using "Batch" as table name for batch operations across multiple tables
self.execute_with_operation_metric(RocksDBOperation::BatchWrite, "Batch", |this| {
let mut batch_handle = RocksDBBatch {
provider: this,
inner: WriteBatchWithTransaction::<true>::default(),
buf: Vec::new(),
};
let mut batch_handle = this.batch();
f(&mut batch_handle)?;
// Take ownership of inner to prevent Drop from logging a warning
let batch = std::mem::take(&mut batch_handle.inner);
this.0.db.write(batch).map_err(|e| {
ProviderError::Database(DatabaseError::Commit(DatabaseErrorInfo {
message: e.to_string().into(),
code: -1,
}))
})
batch_handle.commit()
})
}
}
/// Handle for building a batch of operations atomically.
///
/// Uses `WriteBatchWithTransaction<true>` for compatibility with `TransactionDB`.
/// Uses `WriteBatchWithTransaction` for atomic writes without full transaction overhead.
/// Unlike [`RocksTx`], this does NOT support read-your-writes. Use for write-only flows
/// where you don't need to read back uncommitted data within the same operation
/// (e.g., history index writes).
#[must_use = "batch must be committed"]
pub struct RocksDBBatch<'a> {
provider: &'a RocksDBProvider,
inner: WriteBatchWithTransaction<true>,
@@ -447,14 +435,8 @@ impl<'a> RocksDBBatch<'a> {
/// Commits the batch to the database.
///
/// This consumes the batch and writes all operations atomically to `RocksDB`.
///
/// # Errors
///
/// Returns an error if the write fails.
pub fn commit(mut self) -> ProviderResult<()> {
// Take ownership of inner to prevent Drop from logging a warning
let batch = std::mem::take(&mut self.inner);
self.provider.0.db.write(batch).map_err(|e| {
pub fn commit(self) -> ProviderResult<()> {
self.provider.0.db.write_opt(self.inner, &WriteOptions::default()).map_err(|e| {
ProviderError::Database(DatabaseError::Commit(DatabaseErrorInfo {
message: e.to_string().into(),
code: -1,
@@ -462,7 +444,7 @@ impl<'a> RocksDBBatch<'a> {
})
}
/// Returns the number of operations in this batch.
/// Returns the number of write operations (puts + deletes) queued in this batch.
pub fn len(&self) -> usize {
self.inner.len()
}
@@ -473,18 +455,6 @@ impl<'a> RocksDBBatch<'a> {
}
}
impl Drop for RocksDBBatch<'_> {
fn drop(&mut self) {
if !self.inner.is_empty() {
warn!(
target: "reth::storage",
batch_len = %self.inner.len(),
"RocksDBBatch dropped without commit - data discarded"
);
}
}
}
/// `RocksDB` transaction wrapper providing MDBX-like semantics.
///
/// Supports:
@@ -646,49 +616,6 @@ impl<T: Table> Iterator for RocksTxIter<'_, T> {
}
}
/// `RocksDB` write strategy - internal implementation detail.
///
/// This enum encapsulates the choice between full transaction semantics
/// and high-throughput batch writes. Use [`RocksDBWriteMode::Transaction`] for
/// read-modify-write operations that need read-your-writes semantics.
/// Use [`RocksDBWriteMode::Batch`] for bulk sync operations where
/// read-your-writes is not needed.
pub enum RocksDBWriteMode<'a> {
/// Full transaction with read-your-writes, rollback support.
/// Use for read-modify-write operations.
Transaction(RocksTx<'a>),
/// Write-only batch for maximum throughput.
/// Use for bulk sync operations where read-your-writes is not needed.
Batch(RocksDBBatch<'a>),
}
impl fmt::Debug for RocksDBWriteMode<'_> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Transaction(tx) => f.debug_tuple("Transaction").field(tx).finish(),
Self::Batch(batch) => f.debug_tuple("Batch").field(batch).finish(),
}
}
}
impl<'a> RocksDBWriteMode<'a> {
/// Puts a value into the specified table.
pub fn put<T: Table>(&mut self, key: T::Key, value: &T::Value) -> ProviderResult<()> {
match self {
Self::Transaction(tx) => tx.put::<T>(key, value),
Self::Batch(batch) => batch.put::<T>(key, value),
}
}
/// Commits the transaction or batch.
pub fn commit(self) -> ProviderResult<()> {
match self {
Self::Transaction(tx) => tx.commit(),
Self::Batch(batch) => batch.commit(),
}
}
}
/// Converts Reth's [`LogLevel`] to `RocksDB`'s [`rocksdb::LogLevel`].
const fn convert_log_level(level: LogLevel) -> rocksdb::LogLevel {
match level {
@@ -975,167 +902,4 @@ mod tests {
assert_eq!(provider.get::<TestTable>(i).unwrap(), Some(value));
}
}
#[test]
fn test_write_mode_batch() {
let temp_dir = TempDir::new().unwrap();
let provider =
RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
// Create write mode using Batch
let batch = provider.batch();
let mut mode = RocksDBWriteMode::Batch(batch);
// Write via RocksDBWriteMode
let key = 42u64;
let value = b"test_via_mode".to_vec();
mode.put::<TestTable>(key, &value).unwrap();
// Commit via RocksDBWriteMode
mode.commit().unwrap();
// Verify data is visible
assert_eq!(provider.get::<TestTable>(key).unwrap(), Some(value));
}
#[test]
fn test_write_mode_transaction() {
let temp_dir = TempDir::new().unwrap();
let provider =
RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
// Create write mode using Transaction
let tx = provider.tx();
let mut mode = RocksDBWriteMode::Transaction(tx);
// Write via RocksDBWriteMode
let key = 100u64;
let value = b"test_via_tx_mode".to_vec();
mode.put::<TestTable>(key, &value).unwrap();
// Commit via RocksDBWriteMode
mode.commit().unwrap();
// Verify data is visible
assert_eq!(provider.get::<TestTable>(key).unwrap(), Some(value));
}
#[test]
fn test_batch_empty_commit() {
let temp_dir = TempDir::new().unwrap();
let provider =
RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
// Create an empty batch
let batch = provider.batch();
assert!(batch.is_empty());
assert_eq!(batch.len(), 0);
// Commit should succeed (no-op)
batch.commit().unwrap();
}
#[test]
fn test_batch_drop_without_commit_does_not_persist() {
let temp_dir = TempDir::new().unwrap();
let provider =
RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
// Create a batch and add entries but DON'T commit
{
let mut batch = provider.batch();
for i in 0..5u64 {
let value = format!("dropped_value_{i}").into_bytes();
batch.put::<TestTable>(i, &value).unwrap();
}
// batch dropped here without commit - should log warning
}
// Data should NOT be persisted
for i in 0..5u64 {
assert_eq!(
provider.get::<TestTable>(i).unwrap(),
None,
"Dropped batch should not persist data"
);
}
}
#[test]
fn test_write_mode_debug_formatting() {
let temp_dir = TempDir::new().unwrap();
let provider =
RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
// Test Batch variant debug output
let batch = provider.batch();
let mode_batch = RocksDBWriteMode::Batch(batch);
let debug_str = format!("{:?}", mode_batch);
assert!(debug_str.contains("Batch"), "Debug should contain 'Batch': {debug_str}");
// Test Transaction variant debug output
let tx = provider.tx();
let mode_tx = RocksDBWriteMode::Transaction(tx);
let debug_str = format!("{:?}", mode_tx);
assert!(
debug_str.contains("Transaction"),
"Debug should contain 'Transaction': {debug_str}"
);
}
#[test]
fn test_batch_delete_operation() {
let temp_dir = TempDir::new().unwrap();
let provider =
RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
// First, insert some data
let mut batch = provider.batch();
for i in 0..5u64 {
batch.put::<TestTable>(i, &vec![i as u8]).unwrap();
}
batch.commit().unwrap();
// Verify data exists
for i in 0..5u64 {
assert!(provider.get::<TestTable>(i).unwrap().is_some());
}
// Now delete via batch
let mut delete_batch = provider.batch();
for i in 0..5u64 {
delete_batch.delete::<TestTable>(i).unwrap();
}
delete_batch.commit().unwrap();
// Verify data is deleted
for i in 0..5u64 {
assert_eq!(provider.get::<TestTable>(i).unwrap(), None);
}
}
#[test]
fn test_batch_overwrite_existing_key() {
let temp_dir = TempDir::new().unwrap();
let provider =
RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
let key = 42u64;
let initial_value = b"initial".to_vec();
let updated_value = b"updated".to_vec();
// Insert initial value
let mut batch = provider.batch();
batch.put::<TestTable>(key, &initial_value).unwrap();
batch.commit().unwrap();
assert_eq!(provider.get::<TestTable>(key).unwrap(), Some(initial_value));
// Overwrite with new value
let mut batch = provider.batch();
batch.put::<TestTable>(key, &updated_value).unwrap();
batch.commit().unwrap();
assert_eq!(provider.get::<TestTable>(key).unwrap(), Some(updated_value));
}
}

View File

@@ -0,0 +1,210 @@
//! Stub implementation of `RocksDB` provider.
//!
//! This module provides placeholder types that allow the code to compile when `RocksDB` is not
//! available (either on non-Unix platforms or when the `rocksdb` feature is not enabled).
//! Operations will produce errors if actually attempted.
use reth_db_api::table::{Encode, Table};
use reth_storage_errors::{
db::LogLevel,
provider::{ProviderError::UnsupportedProvider, ProviderResult},
};
use std::path::Path;
/// A stub `RocksDB` provider.
///
/// This type exists to allow code to compile when `RocksDB` is not available (either on non-Unix
/// platforms or when the `rocksdb` feature is not enabled). When using this stub, the
/// `transaction_hash_numbers_in_rocksdb` flag should be set to `false` to ensure all operations
/// route to MDBX instead.
#[derive(Debug, Clone)]
pub struct RocksDBProvider;
impl RocksDBProvider {
/// Creates a new stub `RocksDB` provider.
///
/// On non-Unix platforms, this returns an error indicating `RocksDB` is not supported.
pub fn new(_path: impl AsRef<Path>) -> ProviderResult<Self> {
Ok(Self)
}
/// Creates a new stub `RocksDB` provider builder.
pub fn builder(path: impl AsRef<Path>) -> RocksDBBuilder {
RocksDBBuilder::new(path)
}
/// Get a value from `RocksDB` (stub implementation).
pub fn get<T: Table>(&self, _key: T::Key) -> ProviderResult<Option<T::Value>> {
Err(UnsupportedProvider)
}
/// Get a value from `RocksDB` using pre-encoded key (stub implementation).
pub const fn get_encoded<T: Table>(
&self,
_key: &<T::Key as Encode>::Encoded,
) -> ProviderResult<Option<T::Value>> {
Err(UnsupportedProvider)
}
/// Put a value into `RocksDB` (stub implementation).
pub fn put<T: Table>(&self, _key: T::Key, _value: &T::Value) -> ProviderResult<()> {
Err(UnsupportedProvider)
}
/// Put a value into `RocksDB` using pre-encoded key (stub implementation).
pub const fn put_encoded<T: Table>(
&self,
_key: &<T::Key as Encode>::Encoded,
_value: &T::Value,
) -> ProviderResult<()> {
Err(UnsupportedProvider)
}
/// Delete a value from `RocksDB` (stub implementation).
pub fn delete<T: Table>(&self, _key: T::Key) -> ProviderResult<()> {
Err(UnsupportedProvider)
}
/// Write a batch of operations (stub implementation).
pub fn write_batch<F>(&self, _f: F) -> ProviderResult<()>
where
F: FnOnce(&mut RocksDBBatch) -> ProviderResult<()>,
{
Err(UnsupportedProvider)
}
/// Creates a new transaction (stub implementation).
pub const fn tx(&self) -> RocksTx {
RocksTx
}
}
/// A stub batch writer for `RocksDB` on non-Unix platforms.
#[derive(Debug)]
pub struct RocksDBBatch;
impl RocksDBBatch {
/// Puts a value into the batch (stub implementation).
pub fn put<T: Table>(&self, _key: T::Key, _value: &T::Value) -> ProviderResult<()> {
Err(UnsupportedProvider)
}
/// Puts a value into the batch using pre-encoded key (stub implementation).
pub const fn put_encoded<T: Table>(
&self,
_key: &<T::Key as Encode>::Encoded,
_value: &T::Value,
) -> ProviderResult<()> {
Err(UnsupportedProvider)
}
/// Deletes a value from the batch (stub implementation).
pub fn delete<T: Table>(&self, _key: T::Key) -> ProviderResult<()> {
Err(UnsupportedProvider)
}
}
/// A stub builder for `RocksDB` on non-Unix platforms.
#[derive(Debug)]
pub struct RocksDBBuilder;
impl RocksDBBuilder {
/// Creates a new stub builder.
pub fn new<P: AsRef<Path>>(_path: P) -> Self {
Self
}
/// Adds a column family for a specific table type (stub implementation).
pub const fn with_table<T: Table>(self) -> Self {
self
}
/// Enables metrics (stub implementation).
pub const fn with_metrics(self) -> Self {
self
}
/// Enables `RocksDB` internal statistics collection (stub implementation).
pub const fn with_statistics(self) -> Self {
self
}
/// Sets the log level from `DatabaseArgs` configuration (stub implementation).
pub const fn with_database_log_level(self, _log_level: Option<LogLevel>) -> Self {
self
}
/// Sets a custom block cache size (stub implementation).
pub const fn with_block_cache_size(self, _capacity_bytes: usize) -> Self {
self
}
/// Build the `RocksDB` provider (stub implementation).
pub const fn build(self) -> ProviderResult<RocksDBProvider> {
Ok(RocksDBProvider)
}
}
/// A stub transaction for `RocksDB`.
#[derive(Debug)]
pub struct RocksTx;
impl RocksTx {
/// Gets a value from the specified table (stub implementation).
pub fn get<T: Table>(&self, _key: T::Key) -> ProviderResult<Option<T::Value>> {
Err(UnsupportedProvider)
}
/// Gets a value using pre-encoded key (stub implementation).
pub const fn get_encoded<T: Table>(
&self,
_key: &<T::Key as Encode>::Encoded,
) -> ProviderResult<Option<T::Value>> {
Err(UnsupportedProvider)
}
/// Puts a value into the specified table (stub implementation).
pub fn put<T: Table>(&self, _key: T::Key, _value: &T::Value) -> ProviderResult<()> {
Err(UnsupportedProvider)
}
/// Puts a value using pre-encoded key (stub implementation).
pub const fn put_encoded<T: Table>(
&self,
_key: &<T::Key as Encode>::Encoded,
_value: &T::Value,
) -> ProviderResult<()> {
Err(UnsupportedProvider)
}
/// Deletes a value from the specified table (stub implementation).
pub fn delete<T: Table>(&self, _key: T::Key) -> ProviderResult<()> {
Err(UnsupportedProvider)
}
/// Creates an iterator for the specified table (stub implementation).
pub const fn iter<T: Table>(&self) -> ProviderResult<RocksTxIter<'_, T>> {
Err(UnsupportedProvider)
}
/// Creates an iterator starting from the given key (stub implementation).
pub fn iter_from<T: Table>(&self, _key: T::Key) -> ProviderResult<RocksTxIter<'_, T>> {
Err(UnsupportedProvider)
}
/// Commits the transaction (stub implementation).
pub const fn commit(self) -> ProviderResult<()> {
Err(UnsupportedProvider)
}
/// Rolls back the transaction (stub implementation).
pub const fn rollback(self) -> ProviderResult<()> {
Err(UnsupportedProvider)
}
}
/// A stub iterator for `RocksDB` transactions.
#[derive(Debug)]
pub struct RocksTxIter<'a, T> {
_marker: std::marker::PhantomData<(&'a (), T)>,
}

View File

@@ -1,11 +1,13 @@
use crate::{
providers::{NodeTypesForProvider, ProviderNodeTypes, StaticFileProvider},
providers::{NodeTypesForProvider, ProviderNodeTypes, RocksDBProvider, StaticFileProvider},
HashingWriter, ProviderFactory, TrieWriter,
};
use alloy_primitives::B256;
use reth_chainspec::{ChainSpec, MAINNET};
use reth_db::{
test_utils::{create_test_rw_db, create_test_static_files_dir, TempDatabase},
test_utils::{
create_test_rocksdb_dir, create_test_rw_db, create_test_static_files_dir, TempDatabase,
},
DatabaseEnv,
};
use reth_errors::ProviderResult;
@@ -54,11 +56,13 @@ pub fn create_test_provider_factory_with_node_types<N: NodeTypesForProvider>(
chain_spec: Arc<N::ChainSpec>,
) -> ProviderFactory<NodeTypesWithDBAdapter<N, Arc<TempDatabase<DatabaseEnv>>>> {
let (static_dir, _) = create_test_static_files_dir();
let (rocksdb_dir, _) = create_test_rocksdb_dir();
let db = create_test_rw_db();
ProviderFactory::new(
db,
chain_spec,
StaticFileProvider::read_write(static_dir.keep()).expect("static file provider"),
RocksDBProvider::new(&rocksdb_dir).expect("failed to create test RocksDB provider"),
)
.expect("failed to create test provider factory")
}

View File

@@ -1,8 +1,8 @@
//! Additional testing support for `NoopProvider`.
use crate::{
providers::{StaticFileProvider, StaticFileProviderRWRefMut},
StaticFileProviderFactory,
providers::{RocksDBProvider, StaticFileProvider, StaticFileProviderRWRefMut},
RocksDBProviderFactory, StaticFileProviderFactory,
};
use reth_errors::{ProviderError, ProviderResult};
use reth_primitives_traits::NodePrimitives;
@@ -24,3 +24,9 @@ impl<C: Send + Sync, N: NodePrimitives> StaticFileProviderFactory for NoopProvid
Err(ProviderError::ReadOnlyStaticFileAccess)
}
}
impl<C: Send + Sync, N: NodePrimitives> RocksDBProviderFactory for NoopProvider<C, N> {
fn rocksdb_provider(&self) -> RocksDBProvider {
RocksDBProvider::builder(PathBuf::default()).build().unwrap()
}
}

View File

@@ -2,8 +2,9 @@
use crate::{
AccountReader, BlockReader, BlockReaderIdExt, ChainSpecProvider, ChangeSetReader,
DatabaseProviderFactory, HashedPostStateProvider, PruneCheckpointReader, StageCheckpointReader,
StateProviderFactory, StateReader, StaticFileProviderFactory, TrieReader,
DatabaseProviderFactory, HashedPostStateProvider, PruneCheckpointReader,
RocksDBProviderFactory, StageCheckpointReader, StateProviderFactory, StateReader,
StaticFileProviderFactory, TrieReader,
};
use reth_chain_state::{CanonStateSubscriptions, ForkChoiceSubscriptions};
use reth_node_types::{BlockTy, HeaderTy, NodeTypesWithDB, ReceiptTy, TxTy};
@@ -17,6 +18,7 @@ pub trait FullProvider<N: NodeTypesWithDB>:
Provider: BlockReader + TrieReader + StageCheckpointReader + PruneCheckpointReader,
> + NodePrimitivesProvider<Primitives = N::Primitives>
+ StaticFileProviderFactory<Primitives = N::Primitives>
+ RocksDBProviderFactory
+ BlockReaderIdExt<
Transaction = TxTy<N>,
Block = BlockTy<N>,
@@ -44,6 +46,7 @@ impl<T, N: NodeTypesWithDB> FullProvider<N> for T where
Provider: BlockReader + TrieReader + StageCheckpointReader + PruneCheckpointReader,
> + NodePrimitivesProvider<Primitives = N::Primitives>
+ StaticFileProviderFactory<Primitives = N::Primitives>
+ RocksDBProviderFactory
+ BlockReaderIdExt<
Transaction = TxTy<N>,
Block = BlockTy<N>,

View File

@@ -8,5 +8,8 @@ pub use reth_chainspec::ChainSpecProvider;
mod static_file_provider;
pub use static_file_provider::StaticFileProviderFactory;
mod rocksdb_provider;
pub use rocksdb_provider::RocksDBProviderFactory;
mod full;
pub use full::FullProvider;

View File

@@ -0,0 +1,9 @@
use crate::providers::RocksDBProvider;
/// `RocksDB` provider factory.
///
/// This trait provides access to the `RocksDB` provider
pub trait RocksDBProviderFactory {
/// Returns the `RocksDB` provider.
fn rocksdb_provider(&self) -> RocksDBProvider;
}

View File

@@ -145,9 +145,9 @@ where
/// Manages listeners for transaction state change events.
event_listener: RwLock<PoolEventBroadcast<T::Transaction>>,
/// Listeners for new _full_ pending transactions.
pending_transaction_listener: Mutex<Vec<PendingTransactionHashListener>>,
pending_transaction_listener: RwLock<Vec<PendingTransactionHashListener>>,
/// Listeners for new transactions added to the pool.
transaction_listener: Mutex<Vec<TransactionListener<T::Transaction>>>,
transaction_listener: RwLock<Vec<TransactionListener<T::Transaction>>>,
/// Listener for new blob transaction sidecars added to the pool.
blob_transaction_sidecar_listener: Mutex<Vec<BlobTransactionSidecarListener>>,
/// Metrics for the blob store
@@ -243,7 +243,12 @@ where
pub fn add_pending_listener(&self, kind: TransactionListenerKind) -> mpsc::Receiver<TxHash> {
let (sender, rx) = mpsc::channel(self.config.pending_tx_listener_buffer_size);
let listener = PendingTransactionHashListener { sender, kind };
self.pending_transaction_listener.lock().push(listener);
let mut listeners = self.pending_transaction_listener.write();
// Clean up dead listeners before adding new one
listeners.retain(|l| !l.sender.is_closed());
listeners.push(listener);
rx
}
@@ -254,7 +259,12 @@ where
) -> mpsc::Receiver<NewTransactionEvent<T::Transaction>> {
let (sender, rx) = mpsc::channel(self.config.new_tx_listener_buffer_size);
let listener = TransactionListener { sender, kind };
self.transaction_listener.lock().push(listener);
let mut listeners = self.transaction_listener.write();
// Clean up dead listeners before adding new one
listeners.retain(|l| !l.sender.is_closed());
listeners.push(listener);
rx
}
/// Adds a new blob sidecar listener to the pool that gets notified about every new
@@ -475,6 +485,9 @@ where
/// Add a single validated transaction into the pool.
///
/// Returns the outcome and optionally metadata to be processed after the pool lock is
/// released.
///
/// Note: this is only used internally by [`Self::add_transactions()`], all new transaction(s)
/// come in through that function, either as a batch or `std::iter::once`.
fn add_transaction(
@@ -482,7 +495,7 @@ where
pool: &mut RwLockWriteGuard<'_, TxPool<T>>,
origin: TransactionOrigin,
tx: TransactionValidationOutcome<T::Transaction>,
) -> PoolResult<AddedTransactionOutcome> {
) -> (PoolResult<AddedTransactionOutcome>, Option<AddedTransactionMeta<T::Transaction>>) {
match tx {
TransactionValidationOutcome::Valid {
balance,
@@ -496,7 +509,7 @@ where
let transaction_id = TransactionId::new(sender_id, transaction.nonce());
// split the valid transaction and the blob sidecar if it has any
let (transaction, maybe_sidecar) = match transaction {
let (transaction, blob_sidecar) = match transaction {
ValidTransaction::Valid(tx) => (tx, None),
ValidTransaction::ValidWithSidecar { transaction, sidecar } => {
debug_assert!(
@@ -516,50 +529,26 @@ where
authority_ids: authorities.map(|auths| self.get_sender_ids(auths)),
};
let added = pool.add_transaction(tx, balance, state_nonce, bytecode_hash)?;
let added = match pool.add_transaction(tx, balance, state_nonce, bytecode_hash) {
Ok(added) => added,
Err(err) => return (Err(err), None),
};
let hash = *added.hash();
let state = added.transaction_state();
// transaction was successfully inserted into the pool
if let Some(sidecar) = maybe_sidecar {
// notify blob sidecar listeners
self.on_new_blob_sidecar(&hash, &sidecar);
// store the sidecar in the blob store
self.insert_blob(hash, sidecar);
}
let meta = AddedTransactionMeta { added, blob_sidecar };
if let Some(replaced) = added.replaced_blob_transaction() {
debug!(target: "txpool", "[{:?}] delete replaced blob sidecar", replaced);
// delete the replaced transaction from the blob store
self.delete_blob(replaced);
}
// Notify about new pending transactions
if let Some(pending) = added.as_pending() {
self.on_new_pending_transaction(pending);
}
// Notify tx event listeners
self.notify_event_listeners(&added);
if let Some(discarded) = added.discarded_transactions() {
self.delete_discarded_blobs(discarded.iter());
}
// Notify listeners for _all_ transactions
self.on_new_transaction(added.into_new_transaction_event());
Ok(AddedTransactionOutcome { hash, state })
(Ok(AddedTransactionOutcome { hash, state }), Some(meta))
}
TransactionValidationOutcome::Invalid(tx, err) => {
let mut listener = self.event_listener.write();
listener.invalid(tx.hash());
Err(PoolError::new(*tx.hash(), err))
(Err(PoolError::new(*tx.hash(), err)), None)
}
TransactionValidationOutcome::Error(tx_hash, err) => {
let mut listener = self.event_listener.write();
listener.discarded(&tx_hash);
Err(PoolError::other(tx_hash, err))
(Err(PoolError::other(tx_hash, err)), None)
}
}
}
@@ -580,33 +569,46 @@ where
}
/// Adds all transactions in the iterator to the pool, returning a list of results.
///
/// Note: A large batch may lock the pool for a long time that blocks important operations
/// like updating the pool on canonical state changes. The caller should consider having
/// a max batch size to balance transaction insertions with other updates.
pub fn add_transactions(
&self,
origin: TransactionOrigin,
transactions: impl IntoIterator<Item = TransactionValidationOutcome<T::Transaction>>,
) -> Vec<PoolResult<AddedTransactionOutcome>> {
// Process all transactions in one write lock, maintaining individual origins
let (mut added, discarded) = {
// Collect results and metadata while holding the pool write lock
let (mut results, added_metas, discarded) = {
let mut pool = self.pool.write();
let added = transactions
let mut added_metas = Vec::new();
let results = transactions
.into_iter()
.map(|tx| self.add_transaction(&mut pool, origin, tx))
.map(|tx| {
let (result, meta) = self.add_transaction(&mut pool, origin, tx);
// Only collect metadata for successful insertions
if result.is_ok() &&
let Some(meta) = meta
{
added_metas.push(meta);
}
result
})
.collect::<Vec<_>>();
// Enforce the pool size limits if at least one transaction was added successfully
let discarded = if added.iter().any(Result::is_ok) {
let discarded = if results.iter().any(Result::is_ok) {
pool.discard_worst()
} else {
Default::default()
};
(added, discarded)
(results, added_metas, discarded)
};
for meta in added_metas {
self.on_added_transaction(meta);
}
if !discarded.is_empty() {
// Delete any blobs associated with discarded blob transactions
self.delete_discarded_blobs(discarded.iter());
@@ -617,7 +619,7 @@ where
// A newly added transaction may be immediately discarded, so we need to
// adjust the result here
for res in &mut added {
for res in &mut results {
if let Ok(AddedTransactionOutcome { hash, .. }) = res &&
discarded_hashes.contains(hash)
{
@@ -626,7 +628,42 @@ where
}
};
added
results
}
/// Process a transaction that was added to the pool.
///
/// Performs blob storage operations and sends all notifications. This should be called
/// after the pool write lock has been released to avoid blocking pool operations.
fn on_added_transaction(&self, meta: AddedTransactionMeta<T::Transaction>) {
// Handle blob sidecar storage and notifications for EIP-4844 transactions
if let Some(sidecar) = meta.blob_sidecar {
let hash = *meta.added.hash();
self.on_new_blob_sidecar(&hash, &sidecar);
self.insert_blob(hash, sidecar);
}
// Delete replaced blob sidecar if any
if let Some(replaced) = meta.added.replaced_blob_transaction() {
debug!(target: "txpool", "[{:?}] delete replaced blob sidecar", replaced);
self.delete_blob(replaced);
}
// Delete discarded blob sidecars if any, this doesnt do any IO.
if let Some(discarded) = meta.added.discarded_transactions() {
self.delete_discarded_blobs(discarded.iter());
}
// Notify pending transaction listeners
if let Some(pending) = meta.added.as_pending() {
self.on_new_pending_transaction(pending);
}
// Notify event listeners
self.notify_event_listeners(&meta.added);
// Notify new transaction listeners
self.on_new_transaction(meta.added.into_new_transaction_event());
}
/// Notify all listeners about a new pending transaction.
@@ -638,11 +675,23 @@ where
/// [`TransactionPool`](crate::TransactionPool) trait for a custom pool implementation
/// [`TransactionPool::pending_transactions_listener_for`](crate::TransactionPool).
pub fn on_new_pending_transaction(&self, pending: &AddedPendingTransaction<T::Transaction>) {
let mut transaction_listeners = self.pending_transaction_listener.lock();
transaction_listeners.retain_mut(|listener| {
// broadcast all pending transactions to the listener
listener.send_all(pending.pending_transactions(listener.kind))
});
let mut needs_cleanup = false;
{
let listeners = self.pending_transaction_listener.read();
for listener in listeners.iter() {
if !listener.send_all(pending.pending_transactions(listener.kind)) {
needs_cleanup = true;
}
}
}
// Clean up dead listeners if we detected any closed channels
if needs_cleanup {
self.pending_transaction_listener
.write()
.retain(|listener| !listener.sender.is_closed());
}
}
/// Notify all listeners about a newly inserted pending transaction.
@@ -654,16 +703,29 @@ where
/// [`TransactionPool`](crate::TransactionPool) trait for a custom pool implementation
/// [`TransactionPool::new_transactions_listener_for`](crate::TransactionPool).
pub fn on_new_transaction(&self, event: NewTransactionEvent<T::Transaction>) {
let mut transaction_listeners = self.transaction_listener.lock();
transaction_listeners.retain_mut(|listener| {
if listener.kind.is_propagate_only() && !event.transaction.propagate {
// only emit this hash to listeners that are only allowed to receive propagate only
// transactions, such as network
return !listener.sender.is_closed()
}
let mut needs_cleanup = false;
listener.send(event.clone())
});
{
let listeners = self.transaction_listener.read();
for listener in listeners.iter() {
if listener.kind.is_propagate_only() && !event.transaction.propagate {
if listener.sender.is_closed() {
needs_cleanup = true;
}
// Skip non-propagate transactions for propagate-only listeners
continue
}
if !listener.send(event.clone()) {
needs_cleanup = true;
}
}
}
// Clean up dead listeners if we detected any closed channels
if needs_cleanup {
self.transaction_listener.write().retain(|listener| !listener.sender.is_closed());
}
}
/// Notify all listeners about a blob sidecar for a newly inserted blob (eip4844) transaction.
@@ -697,16 +759,33 @@ where
fn notify_on_new_state(&self, outcome: OnNewCanonicalStateOutcome<T::Transaction>) {
trace!(target: "txpool", promoted=outcome.promoted.len(), discarded= outcome.discarded.len() ,"notifying listeners on state change");
// notify about promoted pending transactions
// emit hashes
self.pending_transaction_listener
.lock()
.retain_mut(|listener| listener.send_all(outcome.pending_transactions(listener.kind)));
// notify about promoted pending transactions - emit hashes
let mut needs_pending_cleanup = false;
{
let listeners = self.pending_transaction_listener.read();
for listener in listeners.iter() {
if !listener.send_all(outcome.pending_transactions(listener.kind)) {
needs_pending_cleanup = true;
}
}
}
if needs_pending_cleanup {
self.pending_transaction_listener.write().retain(|l| !l.sender.is_closed());
}
// emit full transactions
self.transaction_listener.lock().retain_mut(|listener| {
listener.send_all(outcome.full_pending_transactions(listener.kind))
});
let mut needs_tx_cleanup = false;
{
let listeners = self.transaction_listener.read();
for listener in listeners.iter() {
if !listener.send_all(outcome.full_pending_transactions(listener.kind)) {
needs_tx_cleanup = true;
}
}
}
if needs_tx_cleanup {
self.transaction_listener.write().retain(|l| !l.sender.is_closed());
}
let OnNewCanonicalStateOutcome { mined, promoted, discarded, block_hash } = outcome;
@@ -742,28 +821,46 @@ where
) {
// Notify about promoted pending transactions (similar to notify_on_new_state)
if !promoted.is_empty() {
self.pending_transaction_listener.lock().retain_mut(|listener| {
let promoted_hashes = promoted.iter().filter_map(|tx| {
if listener.kind.is_propagate_only() && !tx.propagate {
None
} else {
Some(*tx.hash())
let mut needs_pending_cleanup = false;
{
let listeners = self.pending_transaction_listener.read();
for listener in listeners.iter() {
let promoted_hashes = promoted.iter().filter_map(|tx| {
if listener.kind.is_propagate_only() && !tx.propagate {
None
} else {
Some(*tx.hash())
}
});
if !listener.send_all(promoted_hashes) {
needs_pending_cleanup = true;
}
});
listener.send_all(promoted_hashes)
});
}
}
if needs_pending_cleanup {
self.pending_transaction_listener.write().retain(|l| !l.sender.is_closed());
}
// in this case we should also emit promoted transactions in full
self.transaction_listener.lock().retain_mut(|listener| {
let promoted_txs = promoted.iter().filter_map(|tx| {
if listener.kind.is_propagate_only() && !tx.propagate {
None
} else {
Some(NewTransactionEvent::pending(tx.clone()))
let mut needs_tx_cleanup = false;
{
let listeners = self.transaction_listener.read();
for listener in listeners.iter() {
let promoted_txs = promoted.iter().filter_map(|tx| {
if listener.kind.is_propagate_only() && !tx.propagate {
None
} else {
Some(NewTransactionEvent::pending(tx.clone()))
}
});
if !listener.send_all(promoted_txs) {
needs_tx_cleanup = true;
}
});
listener.send_all(promoted_txs)
});
}
}
if needs_tx_cleanup {
self.transaction_listener.write().retain(|l| !l.sender.is_closed());
}
}
{
@@ -1125,6 +1222,18 @@ impl<V, T: TransactionOrdering, S> fmt::Debug for PoolInner<V, T, S> {
}
}
/// Metadata for a transaction that was added to the pool.
///
/// This holds all the data needed to complete post-insertion operations (notifications,
/// blob storage).
#[derive(Debug)]
struct AddedTransactionMeta<T: PoolTransaction> {
/// The transaction that was added to the pool
added: AddedTransaction<T>,
/// Optional blob sidecar for EIP-4844 transactions
blob_sidecar: Option<BlobTransactionSidecarVariant>,
}
/// Tracks an added transaction and all graph changes caused by adding it.
#[derive(Debug, Clone)]
pub struct AddedPendingTransaction<T: PoolTransaction> {

View File

@@ -85,8 +85,6 @@ use tracing::{trace, warn};
/// new --> |apply state changes| pool
/// ```
pub struct TxPool<T: TransactionOrdering> {
/// Contains the currently known information about the senders.
sender_info: FxHashMap<SenderId, SenderInfo>,
/// pending subpool
///
/// Holds transactions that are ready to be executed on the current state.
@@ -124,7 +122,6 @@ impl<T: TransactionOrdering> TxPool<T> {
/// Create a new graph pool instance.
pub fn new(ordering: T, config: PoolConfig) -> Self {
Self {
sender_info: Default::default(),
pending_pool: PendingPool::with_buffer(
ordering,
config.max_new_pending_txs_notifications,
@@ -165,7 +162,7 @@ impl<T: TransactionOrdering> TxPool<T> {
let mut last_consecutive_tx = None;
// ensure this operates on the most recent
if let Some(current) = self.sender_info.get(&on_chain.sender) {
if let Some(current) = self.all_transactions.sender_info.get(&on_chain.sender) {
on_chain.nonce = on_chain.nonce.max(current.state_nonce);
}
@@ -625,7 +622,7 @@ impl<T: TransactionOrdering> TxPool<T> {
let updates = self.all_transactions.update(&changed_senders);
// track changed accounts
self.sender_info.extend(changed_senders);
self.all_transactions.sender_info.extend(changed_senders);
// Process the sub-pool updates
let update = self.process_updates(updates);
@@ -743,7 +740,8 @@ impl<T: TransactionOrdering> TxPool<T> {
self.validate_auth(&tx, on_chain_nonce, on_chain_code_hash)?;
// Update sender info with balance and nonce
self.sender_info
self.all_transactions
.sender_info
.entry(tx.sender_id())
.or_default()
.update(on_chain_nonce, on_chain_balance);
@@ -938,6 +936,7 @@ impl<T: TransactionOrdering> TxPool<T> {
/// This will move/discard the given transaction according to the `PoolUpdate`
fn process_updates(&mut self, updates: Vec<PoolUpdate>) -> UpdateOutcome<T::Transaction> {
let mut outcome = UpdateOutcome::default();
let mut removed = 0;
for PoolUpdate { id, current, destination } in updates {
match destination {
Destination::Discard => {
@@ -945,7 +944,7 @@ impl<T: TransactionOrdering> TxPool<T> {
if let Some(tx) = self.prune_transaction_by_id(&id) {
outcome.discarded.push(tx);
}
self.metrics.removed_transactions.increment(1);
removed += 1;
}
Destination::Pool(move_to) => {
debug_assert_ne!(&move_to, &current, "destination must be different");
@@ -960,6 +959,10 @@ impl<T: TransactionOrdering> TxPool<T> {
}
}
if removed > 0 {
self.metrics.removed_transactions.increment(removed);
}
outcome
}
@@ -1324,6 +1327,8 @@ pub(crate) struct AllTransactions<T: PoolTransaction> {
by_hash: HashMap<TxHash, Arc<ValidPoolTransaction<T>>>,
/// _All_ transaction in the pool sorted by their sender and nonce pair.
txs: BTreeMap<TransactionId, PoolInternalTransaction<T>>,
/// Contains the currently known information about the senders.
sender_info: FxHashMap<SenderId, SenderInfo>,
/// Tracks the number of transactions by sender that are currently in the pool.
tx_counter: FxHashMap<SenderId, usize>,
/// The current block number the pool keeps track of.
@@ -1391,6 +1396,7 @@ impl<T: PoolTransaction> AllTransactions<T> {
let count = entry.get_mut();
if *count == 1 {
entry.remove();
self.sender_info.remove(&sender);
self.metrics.all_transactions_by_all_senders.decrement(1.0);
return
}
@@ -2128,6 +2134,7 @@ impl<T: PoolTransaction> Default for AllTransactions<T> {
block_gas_limit: ETHEREUM_BLOCK_GAS_LIMIT_30M,
by_hash: Default::default(),
txs: Default::default(),
sender_info: Default::default(),
tx_counter: Default::default(),
last_seen_block_number: Default::default(),
last_seen_block_hash: Default::default(),
@@ -3614,7 +3621,7 @@ mod tests {
// update the tracked nonce
let mut info = SenderInfo::default();
info.update(8, U256::ZERO);
pool.sender_info.insert(sender_id, info);
pool.all_transactions.sender_info.insert(sender_id, info);
let next_tx =
pool.get_highest_consecutive_transaction_by_sender(sender_id.into_transaction_id(5));
assert_eq!(next_tx.map(|tx| tx.nonce()), Some(9), "Expected nonce 9 for on-chain nonce 8");

View File

@@ -76,12 +76,6 @@ pub fn prefix_set_lookups(c: &mut Criterion) {
test_data.clone(),
size,
);
prefix_set_bench::<VecBinarySearchPrefixSet>(
&mut group,
"`Vec` with binary search lookup",
test_data.clone(),
size,
);
}
}
@@ -207,43 +201,6 @@ mod implementations {
false
}
}
#[derive(Default)]
pub struct VecBinarySearchPrefixSet {
keys: Vec<Nibbles>,
sorted: bool,
}
impl PrefixSetMutAbstraction for VecBinarySearchPrefixSet {
type Frozen = Self;
fn insert(&mut self, key: Nibbles) {
self.sorted = false;
self.keys.push(key);
}
fn freeze(self) -> Self::Frozen {
self
}
}
impl PrefixSetAbstraction for VecBinarySearchPrefixSet {
fn contains(&mut self, prefix: Nibbles) -> bool {
if !self.sorted {
self.keys.sort();
self.sorted = true;
}
match self.keys.binary_search(&prefix) {
Ok(_) => true,
Err(idx) => match self.keys.get(idx) {
Some(key) => key.starts_with(&prefix),
None => false, // prefix > last key
},
}
}
}
#[derive(Default)]
pub struct VecCursorPrefixSet {
keys: Vec<Nibbles>,

View File

@@ -18,10 +18,7 @@ use reth_trie_sparse::{
SparseTrieUpdates,
};
use smallvec::SmallVec;
use std::{
cmp::{Ord, Ordering, PartialOrd},
sync::mpsc,
};
use std::cmp::{Ord, Ordering, PartialOrd};
use tracing::{debug, instrument, trace};
/// The maximum length of a path, in nibbles, which belongs to the upper subtrie of a
@@ -265,11 +262,9 @@ impl SparseTrieInterface for ParallelSparseTrie {
})
.collect();
let (tx, rx) = mpsc::channel();
// Zip the lower subtries and their corresponding node groups, and reveal lower subtrie
// nodes in parallel
lower_subtries
let results: Vec<_> = lower_subtries
.into_par_iter()
.zip(node_groups.into_par_iter())
.map(|((subtrie_idx, mut subtrie), nodes)| {
@@ -286,16 +281,12 @@ impl SparseTrieInterface for ParallelSparseTrie {
}
(subtrie_idx, subtrie, Ok(()))
})
.for_each_init(|| tx.clone(), |tx, result| tx.send(result).unwrap());
.collect();
drop(tx);
// Take back all lower subtries which were sent to the rayon pool, collecting the last
// seen error in the process and returning that. If we don't fully drain the channel
// then we lose lower sparse tries, putting the whole ParallelSparseTrie in an
// inconsistent state.
// Put subtries back which were processed in the rayon pool, collecting the last
// seen error in the process and returning that.
let mut any_err = Ok(());
for (subtrie_idx, subtrie, res) in rx {
for (subtrie_idx, subtrie, res) in results {
self.lower_subtries[subtrie_idx] = LowerSparseSubtrie::Revealed(subtrie);
if res.is_err() {
any_err = res;
@@ -745,11 +736,9 @@ impl SparseTrieInterface for ParallelSparseTrie {
{
use rayon::iter::{IntoParallelIterator, ParallelIterator};
let (tx, rx) = mpsc::channel();
let branch_node_tree_masks = &self.branch_node_tree_masks;
let branch_node_hash_masks = &self.branch_node_hash_masks;
changed_subtries
let updated_subtries: Vec<_> = changed_subtries
.into_par_iter()
.map(|mut changed_subtrie| {
#[cfg(feature = "metrics")]
@@ -764,10 +753,9 @@ impl SparseTrieInterface for ParallelSparseTrie {
self.metrics.subtrie_hash_update_latency.record(start.elapsed());
changed_subtrie
})
.for_each_init(|| tx.clone(), |tx, result| tx.send(result).unwrap());
.collect();
drop(tx);
self.insert_changed_subtries(rx);
self.insert_changed_subtries(updated_subtries);
}
}

View File

@@ -64,6 +64,7 @@ parking_lot.workspace = true
pretty_assertions.workspace = true
proptest-arbitrary-interop.workspace = true
proptest.workspace = true
rand.workspace = true
[features]
metrics = ["reth-metrics", "dep:metrics"]
@@ -84,6 +85,7 @@ serde = [
"revm-state/serde",
"parking_lot/serde",
"reth-ethereum-primitives/serde",
"rand/serde",
]
test-utils = [
"triehash",

View File

@@ -11,20 +11,19 @@ use reth_trie::{
proof_v2::StorageProofCalculator,
trie_cursor::{mock::MockTrieCursorFactory, TrieCursorFactory},
};
use reth_trie_common::{HashedPostState, HashedStorage, Nibbles};
use std::collections::BTreeMap;
use reth_trie_common::{HashedPostState, HashedStorage};
/// Generate test data for benchmarking.
///
/// Returns a tuple of:
/// - Hashed address for the storage trie
/// - `HashedPostState` with random storage slots
/// - Proof targets (Nibbles) that are 80% from existing slots, 20% random
/// - Proof targets as B256 (sorted) for V2 implementation
/// - Equivalent [`B256Set`] for legacy implementation
fn generate_test_data(
dataset_size: usize,
num_targets: usize,
) -> (B256, HashedPostState, Vec<Nibbles>, B256Set) {
) -> (B256, HashedPostState, Vec<B256>, B256Set) {
let mut runner = TestRunner::deterministic();
// Use a fixed hashed address for the storage trie
@@ -68,14 +67,8 @@ fn generate_test_data(
let target_b256s = targets_strategy.new_tree(&mut runner).unwrap().current();
// Convert B256 targets to sorted Nibbles for V2
let mut targets: Vec<Nibbles> = target_b256s
.iter()
.map(|b256| {
// SAFETY: B256 is exactly 32 bytes
unsafe { Nibbles::unpack_unchecked(b256.as_slice()) }
})
.collect();
// Sort B256 targets for V2 (storage_proof expects sorted targets)
let mut targets: Vec<B256> = target_b256s.clone();
targets.sort();
// Create B256Set for legacy
@@ -86,19 +79,42 @@ fn generate_test_data(
/// Create cursor factories from a `HashedPostState` for storage trie testing.
///
/// This mimics the test harness pattern from the `proof_v2` tests.
/// This mimics the test harness pattern from the `proof_v2` tests by using `StateRoot`
/// to generate `TrieUpdates` from the `HashedPostState`.
fn create_cursor_factories(
post_state: &HashedPostState,
) -> (MockTrieCursorFactory, MockHashedCursorFactory) {
// Ensure that there's a storage trie dataset for every storage trie, even if empty
let storage_trie_nodes: B256Map<BTreeMap<_, _>> =
post_state.storages.keys().copied().map(|addr| (addr, Default::default())).collect();
use reth_trie::{updates::StorageTrieUpdates, StateRoot};
// Create empty trie cursor factory to serve as the initial state for StateRoot
// Ensure that there's a storage trie dataset for every storage account
let storage_tries: B256Map<_> = post_state
.storages
.keys()
.copied()
.map(|addr| (addr, StorageTrieUpdates::default()))
.collect();
let empty_trie_cursor_factory =
MockTrieCursorFactory::from_trie_updates(reth_trie_common::updates::TrieUpdates {
storage_tries: storage_tries.clone(),
..Default::default()
});
// Create mock hashed cursor factory from the post state
let hashed_cursor_factory = MockHashedCursorFactory::from_hashed_post_state(post_state.clone());
// Create empty trie cursor factory (leaf-only calculator doesn't need trie nodes)
let trie_cursor_factory = MockTrieCursorFactory::new(BTreeMap::new(), storage_trie_nodes);
// Generate TrieUpdates using StateRoot
let (_root, mut trie_updates) =
StateRoot::new(empty_trie_cursor_factory, hashed_cursor_factory.clone())
.root_with_updates()
.expect("StateRoot should succeed");
// Continue using empty storage tries for each account
trie_updates.storage_tries = storage_tries;
// Initialize trie cursor factory from the generated TrieUpdates
let trie_cursor_factory = MockTrieCursorFactory::from_trie_updates(trie_updates);
(trie_cursor_factory, hashed_cursor_factory)
}
@@ -148,7 +164,7 @@ fn bench_proof_algos(c: &mut Criterion) {
|| targets.clone(),
|targets| {
proof_calculator
.storage_proof(hashed_address, targets.into_iter())
.storage_proof(hashed_address, targets)
.expect("Proof generation failed");
},
BatchSize::SmallInput,

View File

@@ -48,7 +48,7 @@ impl MockHashedCursorFactory {
.collect();
// Extract storages from post state
let hashed_storages: B256Map<BTreeMap<B256, U256>> = post_state
let mut hashed_storages: B256Map<BTreeMap<B256, U256>> = post_state
.storages
.into_iter()
.map(|(addr, hashed_storage)| {
@@ -62,6 +62,11 @@ impl MockHashedCursorFactory {
})
.collect();
// Ensure all accounts have at least an empty storage
for account in hashed_accounts.keys() {
hashed_storages.entry(*account).or_default();
}
Self::new(hashed_accounts, hashed_storages)
}

View File

@@ -428,15 +428,21 @@ mod tests {
/// Generate a sorted vector of (B256, U256) entries (including deletions as ZERO)
fn sorted_post_state_nodes_strategy() -> impl Strategy<Value = Vec<(B256, U256)>> {
prop::collection::vec((any::<u8>(), u256_strategy()), 0..20).prop_map(|entries| {
let mut result: Vec<(B256, U256)> = entries
.into_iter()
.map(|(byte, value)| (B256::repeat_byte(byte), value))
.collect();
result.sort_by(|a, b| a.0.cmp(&b.0));
result.dedup_by(|a, b| a.0 == b.0);
result
})
// Explicitly inject ZERO values to model post-state deletions.
prop::collection::vec((any::<u8>(), u256_strategy(), any::<bool>()), 0..20).prop_map(
|entries| {
let mut result: Vec<(B256, U256)> = entries
.into_iter()
.map(|(byte, value, is_deletion)| {
let effective_value = if is_deletion { U256::ZERO } else { value };
(B256::repeat_byte(byte), effective_value)
})
.collect();
result.sort_by(|a, b| a.0.cmp(&b.0));
result.dedup_by(|a, b| a.0 == b.0);
result
},
)
}
proptest! {

File diff suppressed because it is too large Load Diff

View File

@@ -25,7 +25,12 @@ pub(crate) enum ProofTrieBranchChild<RF> {
child: RlpNode,
},
/// A branch node whose children have already been flattened into [`RlpNode`]s.
Branch(BranchNode),
Branch {
/// The node itself, for use during RLP encoding.
node: BranchNode,
/// Bitmasks carried over from cached `BranchNodeCompact` values, if any.
masks: TrieMasks,
},
/// A node whose type is not known, as it has already been converted to an [`RlpNode`].
RlpNode(RlpNode),
}
@@ -64,7 +69,7 @@ impl<RF: DeferredValueEncoder> ProofTrieBranchChild<RF> {
ExtensionNodeRef::new(&short_key, child.as_slice()).encode(buf);
Ok((RlpNode::from_rlp(buf), None))
}
Self::Branch(branch_node) => {
Self::Branch { node: branch_node, .. } => {
branch_node.encode(buf);
Ok((RlpNode::from_rlp(buf), Some(branch_node.stack)))
}
@@ -98,8 +103,7 @@ impl<RF: DeferredValueEncoder> ProofTrieBranchChild<RF> {
Self::Extension { short_key, child } => {
(TrieNode::Extension(ExtensionNode { key: short_key, child }), TrieMasks::none())
}
// TODO store trie masks on branch
Self::Branch(branch_node) => (TrieNode::Branch(branch_node), TrieMasks::none()),
Self::Branch { node, masks } => (TrieNode::Branch(node), masks),
Self::RlpNode(_) => panic!("Cannot call `into_proof_trie_node` on RlpNode"),
};
@@ -111,7 +115,7 @@ impl<RF: DeferredValueEncoder> ProofTrieBranchChild<RF> {
pub(crate) fn short_key(&self) -> &Nibbles {
match self {
Self::Leaf { short_key, .. } | Self::Extension { short_key, .. } => short_key,
Self::Branch(_) | Self::RlpNode(_) => {
Self::Branch { .. } | Self::RlpNode(_) => {
static EMPTY_NIBBLES: Nibbles = Nibbles::new();
&EMPTY_NIBBLES
}
@@ -136,7 +140,7 @@ impl<RF: DeferredValueEncoder> ProofTrieBranchChild<RF> {
Self::Leaf { short_key, .. } | Self::Extension { short_key, .. } => {
*short_key = trim_nibbles_prefix(short_key, len);
}
Self::Branch(_) | Self::RlpNode(_) => {
Self::Branch { .. } | Self::RlpNode(_) => {
panic!("Cannot call `trim_short_key_prefix` on Branch or RlpNode")
}
}
@@ -153,14 +157,8 @@ pub(crate) struct ProofTrieBranch {
/// A mask tracking which child nibbles are set on the branch so far. There will be a single
/// child on the stack for each set bit.
pub(crate) state_mask: TrieMask,
/// A subset of `state_mask`. Each bit is set if the `state_mask` bit is set and:
/// - The child is a branch which is stored in the DB.
/// - The child is an extension whose child branch is stored in the DB.
#[expect(unused)]
pub(crate) tree_mask: TrieMask,
/// A subset of `state_mask`. Each bit is set if the hash for the child is cached in the DB.
#[expect(unused)]
pub(crate) hash_mask: TrieMask,
/// Bitmasks which are subsets of `state_mask`.
pub(crate) masks: TrieMasks,
}
/// Trims the first `len` nibbles from the head of the given `Nibbles`.

Some files were not shown because too many files have changed in this diff Show More