mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-04-30 03:01:58 -04:00
Compare commits
74 Commits
yk/either-
...
eth/70
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1f93ee97bb | ||
|
|
3adac16571 | ||
|
|
bbd51862d4 | ||
|
|
08a16a5bde | ||
|
|
f2c39db7a2 | ||
|
|
ae9e84d6e3 | ||
|
|
c51da593d1 | ||
|
|
0e08f9f56c | ||
|
|
7eef092110 | ||
|
|
40e8241bf5 | ||
|
|
dd9ff731e4 | ||
|
|
83f9d1837f | ||
|
|
68911e617b | ||
|
|
36ba6db029 | ||
|
|
fec4432d82 | ||
|
|
179da26305 | ||
|
|
b5e7a694d2 | ||
|
|
9489667814 | ||
|
|
004877ba59 | ||
|
|
a9e36923e1 | ||
|
|
74a3816611 | ||
|
|
5576d4547f | ||
|
|
21216e2f24 | ||
|
|
42c1e1afe1 | ||
|
|
5f7e87fa2a | ||
|
|
1b417dacc4 | ||
|
|
bb952be5b5 | ||
|
|
f927eec880 | ||
|
|
9c61f5568c | ||
|
|
662c0486a1 | ||
|
|
997848c2a1 | ||
|
|
155bdecf3b | ||
|
|
bafe9943fd | ||
|
|
ac21a606a0 | ||
|
|
8f940c2d69 | ||
|
|
1af8a03ab6 | ||
|
|
3ab7774c5e | ||
|
|
912ce282dd | ||
|
|
6a4e5eb409 | ||
|
|
fd3b778f04 | ||
|
|
5db0e519ca | ||
|
|
fc8931c30e | ||
|
|
72bfdfe6ca | ||
|
|
b6d0e04a16 | ||
|
|
35d214f616 | ||
|
|
ed093cd5a0 | ||
|
|
04d22f6469 | ||
|
|
26db968153 | ||
|
|
d2420598b0 | ||
|
|
0b4d697ef3 | ||
|
|
f257508554 | ||
|
|
c6628ae3a4 | ||
|
|
3f2f5baaca | ||
|
|
d44a500750 | ||
|
|
b7b3f327fc | ||
|
|
2c70f5157f | ||
|
|
6d364b1379 | ||
|
|
3b67d78e24 | ||
|
|
b5ccdb89ac | ||
|
|
3e4e764b5a | ||
|
|
39c725e66c | ||
|
|
19bdc02e57 | ||
|
|
c68bd5036c | ||
|
|
f49b9542f5 | ||
|
|
04824d304b | ||
|
|
0d38ba9fc5 | ||
|
|
63ac93a431 | ||
|
|
90492aa3cf | ||
|
|
dd7a10ba4a | ||
|
|
14453ccca5 | ||
|
|
13e9c00ae9 | ||
|
|
3b129f5a34 | ||
|
|
70480acd66 | ||
|
|
7856d256e6 |
6
.github/workflows/hive.yml
vendored
6
.github/workflows/hive.yml
vendored
@@ -67,7 +67,7 @@ jobs:
|
||||
chmod +x hive
|
||||
|
||||
- name: Upload hive assets
|
||||
uses: actions/upload-artifact@v5
|
||||
uses: actions/upload-artifact@v6
|
||||
with:
|
||||
name: hive_assets
|
||||
path: ./hive_assets
|
||||
@@ -187,13 +187,13 @@ jobs:
|
||||
fetch-depth: 0
|
||||
|
||||
- name: Download hive assets
|
||||
uses: actions/download-artifact@v6
|
||||
uses: actions/download-artifact@v7
|
||||
with:
|
||||
name: hive_assets
|
||||
path: /tmp
|
||||
|
||||
- name: Download reth image
|
||||
uses: actions/download-artifact@v6
|
||||
uses: actions/download-artifact@v7
|
||||
with:
|
||||
name: artifacts
|
||||
path: /tmp
|
||||
|
||||
2
.github/workflows/kurtosis-op.yml
vendored
2
.github/workflows/kurtosis-op.yml
vendored
@@ -41,7 +41,7 @@ jobs:
|
||||
fetch-depth: 0
|
||||
|
||||
- name: Download reth image
|
||||
uses: actions/download-artifact@v6
|
||||
uses: actions/download-artifact@v7
|
||||
with:
|
||||
name: artifacts
|
||||
path: /tmp
|
||||
|
||||
2
.github/workflows/kurtosis.yml
vendored
2
.github/workflows/kurtosis.yml
vendored
@@ -39,7 +39,7 @@ jobs:
|
||||
fetch-depth: 0
|
||||
|
||||
- name: Download reth image
|
||||
uses: actions/download-artifact@v6
|
||||
uses: actions/download-artifact@v7
|
||||
with:
|
||||
name: artifacts
|
||||
path: /tmp
|
||||
|
||||
8
.github/workflows/lint.yml
vendored
8
.github/workflows/lint.yml
vendored
@@ -245,12 +245,8 @@ jobs:
|
||||
|
||||
# Checks that selected crates can compile with power set of features
|
||||
features:
|
||||
name: features (${{ matrix.partition }}/${{ matrix.total_partitions }})
|
||||
name: features
|
||||
runs-on: depot-ubuntu-latest
|
||||
strategy:
|
||||
matrix:
|
||||
partition: [1, 2]
|
||||
total_partitions: [2]
|
||||
timeout-minutes: 30
|
||||
steps:
|
||||
- uses: actions/checkout@v6
|
||||
@@ -268,7 +264,7 @@ jobs:
|
||||
--package reth-primitives-traits \
|
||||
--package reth-primitives \
|
||||
--feature-powerset \
|
||||
--partition ${{ matrix.partition }}/${{ matrix.total_partitions }}
|
||||
--depth 2
|
||||
env:
|
||||
RUSTFLAGS: -D warnings
|
||||
|
||||
|
||||
2
.github/workflows/prepare-reth.yml
vendored
2
.github/workflows/prepare-reth.yml
vendored
@@ -50,7 +50,7 @@ jobs:
|
||||
|
||||
- name: Upload reth image
|
||||
id: upload
|
||||
uses: actions/upload-artifact@v5
|
||||
uses: actions/upload-artifact@v6
|
||||
with:
|
||||
name: artifacts
|
||||
path: ./artifacts
|
||||
|
||||
6
.github/workflows/release.yml
vendored
6
.github/workflows/release.yml
vendored
@@ -144,14 +144,14 @@ jobs:
|
||||
|
||||
- name: Upload artifact
|
||||
if: ${{ github.event.inputs.dry_run != 'true' }}
|
||||
uses: actions/upload-artifact@v5
|
||||
uses: actions/upload-artifact@v6
|
||||
with:
|
||||
name: ${{ matrix.build.binary }}-${{ needs.extract-version.outputs.VERSION }}-${{ matrix.configs.target }}.tar.gz
|
||||
path: ${{ matrix.build.binary }}-${{ needs.extract-version.outputs.VERSION }}-${{ matrix.configs.target }}.tar.gz
|
||||
|
||||
- name: Upload signature
|
||||
if: ${{ github.event.inputs.dry_run != 'true' }}
|
||||
uses: actions/upload-artifact@v5
|
||||
uses: actions/upload-artifact@v6
|
||||
with:
|
||||
name: ${{ matrix.build.binary }}-${{ needs.extract-version.outputs.VERSION }}-${{ matrix.configs.target }}.tar.gz.asc
|
||||
path: ${{ matrix.build.binary }}-${{ needs.extract-version.outputs.VERSION }}-${{ matrix.configs.target }}.tar.gz.asc
|
||||
@@ -173,7 +173,7 @@ jobs:
|
||||
with:
|
||||
fetch-depth: 0
|
||||
- name: Download artifacts
|
||||
uses: actions/download-artifact@v6
|
||||
uses: actions/download-artifact@v7
|
||||
- name: Generate full changelog
|
||||
id: changelog
|
||||
run: |
|
||||
|
||||
6
.github/workflows/reproducible-build.yml
vendored
6
.github/workflows/reproducible-build.yml
vendored
@@ -42,7 +42,7 @@ jobs:
|
||||
echo "Binaries SHA256 on ${{ matrix.machine }}: $(cat checksum.sha256)"
|
||||
|
||||
- name: Upload the hash
|
||||
uses: actions/upload-artifact@v5
|
||||
uses: actions/upload-artifact@v6
|
||||
with:
|
||||
name: checksum-${{ matrix.machine }}
|
||||
path: |
|
||||
@@ -55,12 +55,12 @@ jobs:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Download artifacts from machine-1
|
||||
uses: actions/download-artifact@v4
|
||||
uses: actions/download-artifact@v7
|
||||
with:
|
||||
name: checksum-machine-1
|
||||
path: machine-1/
|
||||
- name: Download artifacts from machine-2
|
||||
uses: actions/download-artifact@v4
|
||||
uses: actions/download-artifact@v7
|
||||
with:
|
||||
name: checksum-machine-2
|
||||
path: machine-2/
|
||||
|
||||
2
.github/workflows/update-superchain.yml
vendored
2
.github/workflows/update-superchain.yml
vendored
@@ -27,7 +27,7 @@ jobs:
|
||||
./fetch_superchain_config.sh
|
||||
|
||||
- name: Create Pull Request
|
||||
uses: peter-evans/create-pull-request@v7
|
||||
uses: peter-evans/create-pull-request@v8
|
||||
with:
|
||||
commit-message: "chore: update superchain config"
|
||||
title: "chore: update superchain config"
|
||||
|
||||
3
Cargo.lock
generated
3
Cargo.lock
generated
@@ -8639,6 +8639,7 @@ dependencies = [
|
||||
"derive_more",
|
||||
"futures-util",
|
||||
"metrics",
|
||||
"rayon",
|
||||
"reth-ethereum-forks",
|
||||
"reth-ethereum-primitives",
|
||||
"reth-execution-errors",
|
||||
@@ -8944,6 +8945,7 @@ dependencies = [
|
||||
"pin-project",
|
||||
"rand 0.8.5",
|
||||
"rand 0.9.2",
|
||||
"rayon",
|
||||
"reth-chainspec",
|
||||
"reth-consensus",
|
||||
"reth-discv4",
|
||||
@@ -10894,6 +10896,7 @@ dependencies = [
|
||||
"pretty_assertions",
|
||||
"proptest",
|
||||
"proptest-arbitrary-interop",
|
||||
"rand 0.9.2",
|
||||
"reth-ethereum-primitives",
|
||||
"reth-execution-errors",
|
||||
"reth-metrics",
|
||||
|
||||
2
Makefile
2
Makefile
@@ -521,5 +521,3 @@ pr:
|
||||
make update-book-cli && \
|
||||
cargo docs --document-private-items && \
|
||||
make test
|
||||
|
||||
check-features:
|
||||
|
||||
@@ -23,7 +23,7 @@ use reth_node_core::{
|
||||
dirs::{ChainPath, DataDirPath},
|
||||
};
|
||||
use reth_provider::{
|
||||
providers::{BlockchainProvider, NodeTypesForProvider, StaticFileProvider},
|
||||
providers::{BlockchainProvider, NodeTypesForProvider, RocksDBProvider, StaticFileProvider},
|
||||
ProviderFactory, StaticFileProviderFactory,
|
||||
};
|
||||
use reth_stages::{sets::DefaultStages, Pipeline, PipelineTarget};
|
||||
@@ -75,10 +75,12 @@ impl<C: ChainSpecParser> EnvironmentArgs<C> {
|
||||
let data_dir = self.datadir.clone().resolve_datadir(self.chain.chain());
|
||||
let db_path = data_dir.db();
|
||||
let sf_path = data_dir.static_files();
|
||||
let rocksdb_path = data_dir.rocksdb();
|
||||
|
||||
if access.is_read_write() {
|
||||
reth_fs_util::create_dir_all(&db_path)?;
|
||||
reth_fs_util::create_dir_all(&sf_path)?;
|
||||
reth_fs_util::create_dir_all(&rocksdb_path)?;
|
||||
}
|
||||
|
||||
let config_path = self.config.clone().unwrap_or_else(|| data_dir.config());
|
||||
@@ -108,8 +110,13 @@ impl<C: ChainSpecParser> EnvironmentArgs<C> {
|
||||
StaticFileProvider::read_only(sf_path, false)?,
|
||||
),
|
||||
};
|
||||
// TransactionDB only support read-write mode
|
||||
let rocksdb_provider = RocksDBProvider::builder(data_dir.rocksdb())
|
||||
.with_database_log_level(self.db.log_level)
|
||||
.build()?;
|
||||
|
||||
let provider_factory = self.create_provider_factory(&config, db, sfp, access)?;
|
||||
let provider_factory =
|
||||
self.create_provider_factory(&config, db, sfp, rocksdb_provider, access)?;
|
||||
if access.is_read_write() {
|
||||
debug!(target: "reth::cli", chain=%self.chain.chain(), genesis=?self.chain.genesis_hash(), "Initializing genesis");
|
||||
init_genesis_with_settings(&provider_factory, self.static_files.to_settings())?;
|
||||
@@ -128,6 +135,7 @@ impl<C: ChainSpecParser> EnvironmentArgs<C> {
|
||||
config: &Config,
|
||||
db: Arc<DatabaseEnv>,
|
||||
static_file_provider: StaticFileProvider<N::Primitives>,
|
||||
rocksdb_provider: RocksDBProvider,
|
||||
access: AccessRights,
|
||||
) -> eyre::Result<ProviderFactory<NodeTypesWithDBAdapter<N, Arc<DatabaseEnv>>>>
|
||||
where
|
||||
@@ -138,6 +146,7 @@ impl<C: ChainSpecParser> EnvironmentArgs<C> {
|
||||
db,
|
||||
self.chain.clone(),
|
||||
static_file_provider,
|
||||
rocksdb_provider,
|
||||
)?
|
||||
.with_prune_modes(prune_modes.clone());
|
||||
|
||||
|
||||
@@ -2,6 +2,7 @@ use crate::common::{AccessRights, CliNodeTypes, Environment, EnvironmentArgs};
|
||||
use clap::{Parser, Subcommand};
|
||||
use reth_chainspec::{EthChainSpec, EthereumHardforks};
|
||||
use reth_cli::chainspec::ChainSpecParser;
|
||||
use reth_cli_runner::CliContext;
|
||||
use reth_db::version::{get_db_version, DatabaseVersionError, DB_VERSION};
|
||||
use reth_db_common::DbTool;
|
||||
use std::{
|
||||
@@ -79,7 +80,10 @@ macro_rules! db_exec {
|
||||
|
||||
impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> Command<C> {
|
||||
/// Execute `db` command
|
||||
pub async fn execute<N: CliNodeTypes<ChainSpec = C::ChainSpec>>(self) -> eyre::Result<()> {
|
||||
pub async fn execute<N: CliNodeTypes<ChainSpec = C::ChainSpec>>(
|
||||
self,
|
||||
ctx: CliContext,
|
||||
) -> eyre::Result<()> {
|
||||
let data_dir = self.env.datadir.clone().resolve_datadir(self.env.chain.chain());
|
||||
let db_path = data_dir.db();
|
||||
let static_files_path = data_dir.static_files();
|
||||
@@ -158,7 +162,7 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> Command<C>
|
||||
let access_rights =
|
||||
if command.dry_run { AccessRights::RO } else { AccessRights::RW };
|
||||
db_exec!(self.env, tool, N, access_rights, {
|
||||
command.execute(&tool)?;
|
||||
command.execute(&tool, ctx.task_executor.clone())?;
|
||||
});
|
||||
}
|
||||
Subcommands::StaticFileHeader(command) => {
|
||||
|
||||
@@ -18,6 +18,7 @@ use reth_node_metrics::{
|
||||
};
|
||||
use reth_provider::{providers::ProviderNodeTypes, ChainSpecProvider, StageCheckpointReader};
|
||||
use reth_stages::StageId;
|
||||
use reth_tasks::TaskExecutor;
|
||||
use reth_trie::{
|
||||
verify::{Output, Verifier},
|
||||
Nibbles,
|
||||
@@ -48,52 +49,37 @@ pub struct Command {
|
||||
|
||||
impl Command {
|
||||
/// Execute `db repair-trie` command
|
||||
pub fn execute<N: ProviderNodeTypes>(self, tool: &DbTool<N>) -> eyre::Result<()> {
|
||||
pub fn execute<N: ProviderNodeTypes>(
|
||||
self,
|
||||
tool: &DbTool<N>,
|
||||
task_executor: TaskExecutor,
|
||||
) -> eyre::Result<()> {
|
||||
// Set up metrics server if requested
|
||||
let _metrics_handle = if let Some(listen_addr) = self.metrics {
|
||||
// Spawn an OS thread with a single-threaded tokio runtime for the metrics server
|
||||
let chain_name = tool.provider_factory.chain_spec().chain().to_string();
|
||||
let executor = task_executor.clone();
|
||||
|
||||
let handle = std::thread::Builder::new().name("metrics-server".to_string()).spawn(
|
||||
move || {
|
||||
// Create a single-threaded tokio runtime
|
||||
let runtime = tokio::runtime::Builder::new_current_thread()
|
||||
.enable_all()
|
||||
.build()
|
||||
.expect("Failed to create tokio runtime for metrics server");
|
||||
let handle = task_executor.spawn_critical("metrics server", async move {
|
||||
let config = MetricServerConfig::new(
|
||||
listen_addr,
|
||||
VersionInfo {
|
||||
version: version_metadata().cargo_pkg_version.as_ref(),
|
||||
build_timestamp: version_metadata().vergen_build_timestamp.as_ref(),
|
||||
cargo_features: version_metadata().vergen_cargo_features.as_ref(),
|
||||
git_sha: version_metadata().vergen_git_sha.as_ref(),
|
||||
target_triple: version_metadata().vergen_cargo_target_triple.as_ref(),
|
||||
build_profile: version_metadata().build_profile_name.as_ref(),
|
||||
},
|
||||
ChainSpecInfo { name: chain_name },
|
||||
executor,
|
||||
Hooks::builder().build(),
|
||||
);
|
||||
|
||||
let handle = runtime.handle().clone();
|
||||
runtime.block_on(async move {
|
||||
let task_manager = reth_tasks::TaskManager::new(handle.clone());
|
||||
let task_executor = task_manager.executor();
|
||||
|
||||
let config = MetricServerConfig::new(
|
||||
listen_addr,
|
||||
VersionInfo {
|
||||
version: version_metadata().cargo_pkg_version.as_ref(),
|
||||
build_timestamp: version_metadata().vergen_build_timestamp.as_ref(),
|
||||
cargo_features: version_metadata().vergen_cargo_features.as_ref(),
|
||||
git_sha: version_metadata().vergen_git_sha.as_ref(),
|
||||
target_triple: version_metadata()
|
||||
.vergen_cargo_target_triple
|
||||
.as_ref(),
|
||||
build_profile: version_metadata().build_profile_name.as_ref(),
|
||||
},
|
||||
ChainSpecInfo { name: chain_name },
|
||||
task_executor,
|
||||
Hooks::builder().build(),
|
||||
);
|
||||
|
||||
// Spawn the metrics server
|
||||
if let Err(e) = MetricServer::new(config).serve().await {
|
||||
tracing::error!("Metrics server error: {}", e);
|
||||
}
|
||||
|
||||
// Block forever to keep the runtime alive
|
||||
std::future::pending::<()>().await
|
||||
});
|
||||
},
|
||||
)?;
|
||||
// Spawn the metrics server
|
||||
if let Err(e) = MetricServer::new(config).serve().await {
|
||||
tracing::error!("Metrics server error: {}", e);
|
||||
}
|
||||
});
|
||||
|
||||
Some(handle)
|
||||
} else {
|
||||
|
||||
@@ -189,7 +189,7 @@ impl<C: ChainSpecParser> DownloadArgs<C> {
|
||||
|
||||
let net = NetworkConfigBuilder::<N::NetworkPrimitives>::new(p2p_secret_key)
|
||||
.peer_config(config.peers_config_with_basic_nodes_from_file(None))
|
||||
.external_ip_resolver(self.network.nat)
|
||||
.external_ip_resolver(self.network.nat.clone())
|
||||
.network_id(self.network.network_id)
|
||||
.boot_nodes(boot_nodes.clone())
|
||||
.apply(|builder| {
|
||||
|
||||
@@ -9,7 +9,7 @@ use reth_evm::ConfigureEvm;
|
||||
use reth_node_builder::NodeTypesWithDB;
|
||||
use reth_node_core::dirs::{ChainPath, DataDirPath};
|
||||
use reth_provider::{
|
||||
providers::{ProviderNodeTypes, StaticFileProvider},
|
||||
providers::{ProviderNodeTypes, RocksDBProvider, StaticFileProvider},
|
||||
DatabaseProviderFactory, ProviderFactory,
|
||||
};
|
||||
use reth_stages::{stages::ExecutionStage, Stage, StageCheckpoint, UnwindInput};
|
||||
@@ -42,6 +42,7 @@ where
|
||||
Arc::new(output_db),
|
||||
db_tool.chain(),
|
||||
StaticFileProvider::read_write(output_datadir.static_files())?,
|
||||
RocksDBProvider::builder(output_datadir.rocksdb()).build()?,
|
||||
)?,
|
||||
to,
|
||||
from,
|
||||
|
||||
@@ -6,7 +6,7 @@ use reth_db_api::{database::Database, table::TableImporter, tables};
|
||||
use reth_db_common::DbTool;
|
||||
use reth_node_core::dirs::{ChainPath, DataDirPath};
|
||||
use reth_provider::{
|
||||
providers::{ProviderNodeTypes, StaticFileProvider},
|
||||
providers::{ProviderNodeTypes, RocksDBProvider, StaticFileProvider},
|
||||
DatabaseProviderFactory, ProviderFactory,
|
||||
};
|
||||
use reth_stages::{stages::AccountHashingStage, Stage, StageCheckpoint, UnwindInput};
|
||||
@@ -39,6 +39,7 @@ pub(crate) async fn dump_hashing_account_stage<N: ProviderNodeTypes<DB = Arc<Dat
|
||||
Arc::new(output_db),
|
||||
db_tool.chain(),
|
||||
StaticFileProvider::read_write(output_datadir.static_files())?,
|
||||
RocksDBProvider::builder(output_datadir.rocksdb()).build()?,
|
||||
)?,
|
||||
to,
|
||||
from,
|
||||
|
||||
@@ -5,7 +5,7 @@ use reth_db_api::{database::Database, table::TableImporter, tables};
|
||||
use reth_db_common::DbTool;
|
||||
use reth_node_core::dirs::{ChainPath, DataDirPath};
|
||||
use reth_provider::{
|
||||
providers::{ProviderNodeTypes, StaticFileProvider},
|
||||
providers::{ProviderNodeTypes, RocksDBProvider, StaticFileProvider},
|
||||
DatabaseProviderFactory, ProviderFactory,
|
||||
};
|
||||
use reth_stages::{stages::StorageHashingStage, Stage, StageCheckpoint, UnwindInput};
|
||||
@@ -29,6 +29,7 @@ pub(crate) async fn dump_hashing_storage_stage<N: ProviderNodeTypes<DB = Arc<Dat
|
||||
Arc::new(output_db),
|
||||
db_tool.chain(),
|
||||
StaticFileProvider::read_write(output_datadir.static_files())?,
|
||||
RocksDBProvider::builder(output_datadir.rocksdb()).build()?,
|
||||
)?,
|
||||
to,
|
||||
from,
|
||||
|
||||
@@ -12,7 +12,7 @@ use reth_evm::ConfigureEvm;
|
||||
use reth_exex::ExExManagerHandle;
|
||||
use reth_node_core::dirs::{ChainPath, DataDirPath};
|
||||
use reth_provider::{
|
||||
providers::{ProviderNodeTypes, StaticFileProvider},
|
||||
providers::{ProviderNodeTypes, RocksDBProvider, StaticFileProvider},
|
||||
DatabaseProviderFactory, ProviderFactory,
|
||||
};
|
||||
use reth_stages::{
|
||||
@@ -62,6 +62,7 @@ where
|
||||
Arc::new(output_db),
|
||||
db_tool.chain(),
|
||||
StaticFileProvider::read_write(output_datadir.static_files())?,
|
||||
RocksDBProvider::builder(output_datadir.rocksdb()).build()?,
|
||||
)?,
|
||||
to,
|
||||
from,
|
||||
|
||||
@@ -97,6 +97,57 @@ impl CliRunner {
|
||||
command_res
|
||||
}
|
||||
|
||||
/// Executes a command in a blocking context with access to `CliContext`.
|
||||
///
|
||||
/// See [`Runtime::spawn_blocking`](tokio::runtime::Runtime::spawn_blocking).
|
||||
pub fn run_blocking_command_until_exit<F, E>(
|
||||
self,
|
||||
command: impl FnOnce(CliContext) -> F + Send + 'static,
|
||||
) -> Result<(), E>
|
||||
where
|
||||
F: Future<Output = Result<(), E>> + Send + 'static,
|
||||
E: Send + Sync + From<std::io::Error> + From<reth_tasks::PanickedTaskError> + 'static,
|
||||
{
|
||||
let AsyncCliRunner { context, mut task_manager, tokio_runtime } =
|
||||
AsyncCliRunner::new(self.tokio_runtime);
|
||||
|
||||
// Spawn the command on the blocking thread pool
|
||||
let handle = tokio_runtime.handle().clone();
|
||||
let command_handle =
|
||||
tokio_runtime.handle().spawn_blocking(move || handle.block_on(command(context)));
|
||||
|
||||
// Wait for the command to complete or ctrl-c
|
||||
let command_res = tokio_runtime.block_on(run_to_completion_or_panic(
|
||||
&mut task_manager,
|
||||
run_until_ctrl_c(
|
||||
async move { command_handle.await.expect("Failed to join blocking task") },
|
||||
),
|
||||
));
|
||||
|
||||
if command_res.is_err() {
|
||||
error!(target: "reth::cli", "shutting down due to error");
|
||||
} else {
|
||||
debug!(target: "reth::cli", "shutting down gracefully");
|
||||
task_manager.graceful_shutdown_with_timeout(Duration::from_secs(5));
|
||||
}
|
||||
|
||||
// Shutdown the runtime on a separate thread
|
||||
let (tx, rx) = mpsc::channel();
|
||||
std::thread::Builder::new()
|
||||
.name("tokio-runtime-shutdown".to_string())
|
||||
.spawn(move || {
|
||||
drop(tokio_runtime);
|
||||
let _ = tx.send(());
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
let _ = rx.recv_timeout(Duration::from_secs(5)).inspect_err(|err| {
|
||||
debug!(target: "reth::cli", %err, "tokio runtime shutdown timed out");
|
||||
});
|
||||
|
||||
command_res
|
||||
}
|
||||
|
||||
/// Executes a regular future until completion or until external signal received.
|
||||
pub fn run_until_ctrl_c<F, E>(self, fut: F) -> Result<(), E>
|
||||
where
|
||||
|
||||
@@ -279,20 +279,28 @@ pub fn validate_against_parent_hash_number<H: BlockHeader>(
|
||||
header: &H,
|
||||
parent: &SealedHeader<H>,
|
||||
) -> Result<(), ConsensusError> {
|
||||
// Parent number is consistent.
|
||||
if parent.number() + 1 != header.number() {
|
||||
return Err(ConsensusError::ParentBlockNumberMismatch {
|
||||
parent_block_number: parent.number(),
|
||||
block_number: header.number(),
|
||||
})
|
||||
}
|
||||
|
||||
if parent.hash() != header.parent_hash() {
|
||||
return Err(ConsensusError::ParentHashMismatch(
|
||||
GotExpected { got: header.parent_hash(), expected: parent.hash() }.into(),
|
||||
))
|
||||
}
|
||||
|
||||
let Some(parent_number) = parent.number().checked_add(1) else {
|
||||
// parent block already reached the maximum
|
||||
return Err(ConsensusError::ParentBlockNumberMismatch {
|
||||
parent_block_number: parent.number(),
|
||||
block_number: u64::MAX,
|
||||
})
|
||||
};
|
||||
|
||||
// Parent number is consistent.
|
||||
if parent_number != header.number() {
|
||||
return Err(ConsensusError::ParentBlockNumberMismatch {
|
||||
parent_block_number: parent.number(),
|
||||
block_number: header.number(),
|
||||
})
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@@ -110,6 +110,7 @@ pub async fn setup_engine_with_chain_import(
|
||||
// Create database path and static files path
|
||||
let db_path = datadir.join("db");
|
||||
let static_files_path = datadir.join("static_files");
|
||||
let rocksdb_dir_path = datadir.join("rocksdb");
|
||||
|
||||
// Initialize the database using init_db (same as CLI import command)
|
||||
// Use the same database arguments as the node will use
|
||||
@@ -125,6 +126,7 @@ pub async fn setup_engine_with_chain_import(
|
||||
db.clone(),
|
||||
chain_spec.clone(),
|
||||
reth_provider::providers::StaticFileProvider::read_write(static_files_path.clone())?,
|
||||
reth_provider::providers::RocksDBProvider::builder(rocksdb_dir_path).build().unwrap(),
|
||||
)?;
|
||||
|
||||
// Initialize genesis if needed
|
||||
@@ -311,6 +313,7 @@ mod tests {
|
||||
std::fs::create_dir_all(&datadir).unwrap();
|
||||
let db_path = datadir.join("db");
|
||||
let static_files_path = datadir.join("static_files");
|
||||
let rocksdb_dir_path = datadir.join("rocksdb");
|
||||
|
||||
// Import the chain
|
||||
{
|
||||
@@ -324,6 +327,9 @@ mod tests {
|
||||
chain_spec.clone(),
|
||||
reth_provider::providers::StaticFileProvider::read_write(static_files_path.clone())
|
||||
.unwrap(),
|
||||
reth_provider::providers::RocksDBProvider::builder(rocksdb_dir_path.clone())
|
||||
.build()
|
||||
.unwrap(),
|
||||
)
|
||||
.expect("failed to create provider factory");
|
||||
|
||||
@@ -385,6 +391,9 @@ mod tests {
|
||||
chain_spec.clone(),
|
||||
reth_provider::providers::StaticFileProvider::read_only(static_files_path, false)
|
||||
.unwrap(),
|
||||
reth_provider::providers::RocksDBProvider::builder(rocksdb_dir_path)
|
||||
.build()
|
||||
.unwrap(),
|
||||
)
|
||||
.expect("failed to create provider factory");
|
||||
|
||||
@@ -472,11 +481,15 @@ mod tests {
|
||||
// Create static files path
|
||||
let static_files_path = datadir.join("static_files");
|
||||
|
||||
// Create rocksdb path
|
||||
let rocksdb_dir_path = datadir.join("rocksdb");
|
||||
|
||||
// Create a provider factory
|
||||
let provider_factory: ProviderFactory<MockNodeTypesWithDB> = ProviderFactory::new(
|
||||
db.clone(),
|
||||
chain_spec.clone(),
|
||||
reth_provider::providers::StaticFileProvider::read_write(static_files_path).unwrap(),
|
||||
reth_provider::providers::RocksDBProvider::builder(rocksdb_dir_path).build().unwrap(),
|
||||
)
|
||||
.expect("failed to create provider factory");
|
||||
|
||||
|
||||
@@ -101,7 +101,7 @@ pub struct TreeConfig {
|
||||
state_provider_metrics: bool,
|
||||
/// Cross-block cache size in bytes.
|
||||
cross_block_cache_size: u64,
|
||||
/// Whether the host has enough parallelism to run state root in parallel.
|
||||
/// Whether the host has enough parallelism to run state root task.
|
||||
has_enough_parallelism: bool,
|
||||
/// Whether multiproof task should chunk proof targets.
|
||||
multiproof_chunking_enabled: bool,
|
||||
@@ -403,17 +403,12 @@ impl TreeConfig {
|
||||
self
|
||||
}
|
||||
|
||||
/// Setter for whether or not the host has enough parallelism to run state root in parallel.
|
||||
/// Setter for has enough parallelism.
|
||||
pub const fn with_has_enough_parallelism(mut self, has_enough_parallelism: bool) -> Self {
|
||||
self.has_enough_parallelism = has_enough_parallelism;
|
||||
self
|
||||
}
|
||||
|
||||
/// Whether or not the host has enough parallelism to run state root in parallel.
|
||||
pub const fn has_enough_parallelism(&self) -> bool {
|
||||
self.has_enough_parallelism
|
||||
}
|
||||
|
||||
/// Setter for state provider metrics.
|
||||
pub const fn with_state_provider_metrics(mut self, state_provider_metrics: bool) -> Self {
|
||||
self.state_provider_metrics = state_provider_metrics;
|
||||
|
||||
@@ -22,7 +22,8 @@ use reth_trie_common::HashedPostState;
|
||||
use serde::{de::DeserializeOwned, Serialize};
|
||||
|
||||
// Re-export [`ExecutionPayload`] moved to `reth_payload_primitives`
|
||||
pub use reth_evm::{ConfigureEngineEvm, ExecutableTxIterator};
|
||||
#[cfg(feature = "std")]
|
||||
pub use reth_evm::{ConfigureEngineEvm, ExecutableTxIterator, ExecutableTxTuple};
|
||||
pub use reth_payload_primitives::ExecutionPayload;
|
||||
|
||||
mod error;
|
||||
|
||||
@@ -16,7 +16,7 @@ reth-chain-state.workspace = true
|
||||
reth-chainspec = { workspace = true, optional = true }
|
||||
reth-consensus.workspace = true
|
||||
reth-db.workspace = true
|
||||
reth-engine-primitives.workspace = true
|
||||
reth-engine-primitives = { workspace = true, features = ["std"] }
|
||||
reth-errors.workspace = true
|
||||
reth-execution-types.workspace = true
|
||||
reth-evm = { workspace = true, features = ["metrics"] }
|
||||
|
||||
@@ -230,12 +230,12 @@ fn bench_state_root(c: &mut Criterion) {
|
||||
let mut handle = payload_processor.spawn(
|
||||
Default::default(),
|
||||
(
|
||||
core::iter::empty::<
|
||||
Vec::<
|
||||
Result<
|
||||
Recovered<TransactionSigned>,
|
||||
core::convert::Infallible,
|
||||
>,
|
||||
>(),
|
||||
>::new(),
|
||||
std::convert::identity,
|
||||
),
|
||||
StateProviderBuilder::new(provider.clone(), genesis_hash, None),
|
||||
|
||||
@@ -21,7 +21,7 @@ use executor::WorkloadExecutor;
|
||||
use multiproof::{SparseTrieUpdate, *};
|
||||
use parking_lot::RwLock;
|
||||
use prewarm::PrewarmMetrics;
|
||||
use rayon::iter::{ParallelBridge, ParallelIterator};
|
||||
use rayon::prelude::*;
|
||||
use reth_engine_primitives::ExecutableTxIterator;
|
||||
use reth_evm::{
|
||||
execute::{ExecutableTxFor, WithTxEnv},
|
||||
@@ -318,36 +318,32 @@ where
|
||||
usize,
|
||||
) {
|
||||
let (transactions, convert) = transactions.into();
|
||||
let transactions = transactions.into_iter();
|
||||
// Get the transaction count for prewarming task
|
||||
// Use upper bound if available (more accurate), otherwise use lower bound
|
||||
let (lower, upper) = transactions.size_hint();
|
||||
let transaction_count_hint = upper.unwrap_or(lower);
|
||||
let transactions = transactions.into_par_iter();
|
||||
let transaction_count_hint = transactions.len();
|
||||
|
||||
// Spawn a task that iterates through all transactions in parallel and sends them to the
|
||||
// main task.
|
||||
let (tx, rx) = mpsc::channel();
|
||||
let (ooo_tx, ooo_rx) = mpsc::channel();
|
||||
let (prewarm_tx, prewarm_rx) = mpsc::channel();
|
||||
let (execute_tx, execute_rx) = mpsc::channel();
|
||||
|
||||
// Spawn a task that `convert`s all transactions in parallel and sends them out-of-order.
|
||||
self.executor.spawn_blocking(move || {
|
||||
transactions.enumerate().par_bridge().for_each_with(tx, |sender, (idx, tx)| {
|
||||
transactions.enumerate().for_each_with(ooo_tx, |ooo_tx, (idx, tx)| {
|
||||
let tx = convert(tx);
|
||||
let tx = tx.map(|tx| WithTxEnv { tx_env: tx.to_tx_env(), tx: Arc::new(tx) });
|
||||
let _ = sender.send((idx, tx));
|
||||
// Only send Ok(_) variants to prewarming task.
|
||||
if let Ok(tx) = &tx {
|
||||
let _ = prewarm_tx.send(tx.clone());
|
||||
}
|
||||
let _ = ooo_tx.send((idx, tx));
|
||||
});
|
||||
});
|
||||
|
||||
// Spawn a task that processes out-of-order transactions from the task above and sends them
|
||||
// to prewarming and execution tasks.
|
||||
let (prewarm_tx, prewarm_rx) = mpsc::channel();
|
||||
let (execute_tx, execute_rx) = mpsc::channel();
|
||||
// to the execution task in order.
|
||||
self.executor.spawn_blocking(move || {
|
||||
let mut next_for_execution = 0;
|
||||
let mut queue = BTreeMap::new();
|
||||
while let Ok((idx, tx)) = rx.recv() {
|
||||
// only send Ok(_) variants to prewarming task
|
||||
if let Ok(tx) = &tx {
|
||||
let _ = prewarm_tx.send(tx.clone());
|
||||
}
|
||||
|
||||
while let Ok((idx, tx)) = ooo_rx.recv() {
|
||||
if next_for_execution == idx {
|
||||
let _ = execute_tx.send(tx);
|
||||
next_for_execution += 1;
|
||||
@@ -1057,19 +1053,16 @@ mod tests {
|
||||
|
||||
let provider_factory = BlockchainProvider::new(factory).unwrap();
|
||||
|
||||
let mut handle =
|
||||
payload_processor.spawn(
|
||||
Default::default(),
|
||||
(
|
||||
core::iter::empty::<
|
||||
Result<Recovered<TransactionSigned>, core::convert::Infallible>,
|
||||
>(),
|
||||
std::convert::identity,
|
||||
),
|
||||
StateProviderBuilder::new(provider_factory.clone(), genesis_hash, None),
|
||||
OverlayStateProviderFactory::new(provider_factory),
|
||||
&TreeConfig::default(),
|
||||
);
|
||||
let mut handle = payload_processor.spawn(
|
||||
Default::default(),
|
||||
(
|
||||
Vec::<Result<Recovered<TransactionSigned>, core::convert::Infallible>>::new(),
|
||||
std::convert::identity,
|
||||
),
|
||||
StateProviderBuilder::new(provider_factory.clone(), genesis_hash, None),
|
||||
OverlayStateProviderFactory::new(provider_factory),
|
||||
&TreeConfig::default(),
|
||||
);
|
||||
|
||||
let mut state_hook = handle.state_hook();
|
||||
|
||||
|
||||
@@ -15,6 +15,7 @@ use alloy_eip7928::BlockAccessList;
|
||||
use alloy_eips::{eip1898::BlockWithParent, NumHash};
|
||||
use alloy_evm::Evm;
|
||||
use alloy_primitives::B256;
|
||||
use rayon::prelude::*;
|
||||
use reth_chain_state::{CanonicalInMemoryState, DeferredTrieData, ExecutedBlock};
|
||||
use reth_consensus::{ConsensusError, FullConsensus};
|
||||
use reth_engine_primitives::{
|
||||
@@ -221,7 +222,7 @@ where
|
||||
.map_err(NewPayloadError::other)?
|
||||
.into();
|
||||
|
||||
let iter = Either::Left(iter.into_iter().map(Either::Left));
|
||||
let iter = Either::Left(iter.into_par_iter().map(Either::Left));
|
||||
let convert = move |tx| {
|
||||
let Either::Left(tx) = tx else { unreachable!() };
|
||||
convert(tx).map(Either::Left).map_err(Either::Left)
|
||||
@@ -231,8 +232,9 @@ where
|
||||
Ok((iter, Box::new(convert) as Box<dyn Fn(_) -> _ + Send + Sync + 'static>))
|
||||
}
|
||||
BlockOrPayload::Block(block) => {
|
||||
let iter =
|
||||
Either::Right(block.body().clone_transactions().into_iter().map(Either::Right));
|
||||
let iter = Either::Right(
|
||||
block.body().clone_transactions().into_par_iter().map(Either::Right),
|
||||
);
|
||||
let convert = move |tx: Either<_, N::SignedTx>| {
|
||||
let Either::Right(tx) = tx else { unreachable!() };
|
||||
tx.try_into_recovered().map(Either::Right).map_err(Either::Right)
|
||||
@@ -874,7 +876,7 @@ where
|
||||
/// Note: Use state root task only if prefix sets are empty, otherwise proof generation is
|
||||
/// too expensive because it requires walking all paths in every proof.
|
||||
const fn plan_state_root_computation(&self) -> StateRootStrategy {
|
||||
if self.config.state_root_fallback() || !self.config.has_enough_parallelism() {
|
||||
if self.config.state_root_fallback() {
|
||||
StateRootStrategy::Synchronous
|
||||
} else if self.config.use_state_root_task() {
|
||||
StateRootStrategy::StateRootTask
|
||||
|
||||
@@ -16,7 +16,7 @@ reth-primitives-traits.workspace = true
|
||||
reth-errors.workspace = true
|
||||
reth-chainspec.workspace = true
|
||||
reth-fs-util.workspace = true
|
||||
reth-engine-primitives.workspace = true
|
||||
reth-engine-primitives = { workspace = true, features = ["std"] }
|
||||
reth-engine-tree.workspace = true
|
||||
reth-evm.workspace = true
|
||||
reth-revm.workspace = true
|
||||
|
||||
@@ -154,7 +154,9 @@ where
|
||||
Commands::ImportEra(command) => runner.run_blocking_until_ctrl_c(command.execute::<N>()),
|
||||
Commands::ExportEra(command) => runner.run_blocking_until_ctrl_c(command.execute::<N>()),
|
||||
Commands::DumpGenesis(command) => runner.run_blocking_until_ctrl_c(command.execute()),
|
||||
Commands::Db(command) => runner.run_blocking_until_ctrl_c(command.execute::<N>()),
|
||||
Commands::Db(command) => {
|
||||
runner.run_blocking_command_until_exit(|ctx| command.execute::<N>(ctx))
|
||||
}
|
||||
Commands::Download(command) => runner.run_blocking_until_ctrl_c(command.execute::<N>()),
|
||||
Commands::Stage(command) => {
|
||||
runner.run_command_until_exit(|ctx| command.execute::<N, _>(ctx, components))
|
||||
|
||||
@@ -19,32 +19,37 @@ extern crate alloc;
|
||||
|
||||
use alloc::{borrow::Cow, sync::Arc};
|
||||
use alloy_consensus::Header;
|
||||
use alloy_eips::Decodable2718;
|
||||
pub use alloy_evm::EthEvm;
|
||||
use alloy_evm::{
|
||||
eth::{EthBlockExecutionCtx, EthBlockExecutorFactory},
|
||||
EthEvmFactory, FromRecoveredTx, FromTxWithEncoded,
|
||||
};
|
||||
use alloy_primitives::{Bytes, U256};
|
||||
use alloy_rpc_types_engine::ExecutionData;
|
||||
use core::{convert::Infallible, fmt::Debug};
|
||||
use reth_chainspec::{ChainSpec, EthChainSpec, EthereumHardforks, MAINNET};
|
||||
use reth_chainspec::{ChainSpec, EthChainSpec, MAINNET};
|
||||
use reth_ethereum_primitives::{Block, EthPrimitives, TransactionSigned};
|
||||
use reth_evm::{
|
||||
eth::NextEvmEnvAttributes, precompiles::PrecompilesMap, ConfigureEngineEvm, ConfigureEvm,
|
||||
EvmEnv, EvmEnvFor, EvmFactory, ExecutableTxIterator, ExecutionCtxFor, NextBlockEnvAttributes,
|
||||
TransactionEnv,
|
||||
eth::NextEvmEnvAttributes, precompiles::PrecompilesMap, ConfigureEvm, EvmEnv, EvmFactory,
|
||||
NextBlockEnvAttributes, TransactionEnv,
|
||||
};
|
||||
use reth_primitives_traits::{
|
||||
constants::MAX_TX_GAS_LIMIT_OSAKA, SealedBlock, SealedHeader, SignedTransaction, TxTy,
|
||||
};
|
||||
use reth_storage_errors::any::AnyError;
|
||||
use revm::{
|
||||
context::{BlockEnv, CfgEnv},
|
||||
context_interface::block::BlobExcessGasAndPrice,
|
||||
primitives::hardfork::SpecId,
|
||||
use reth_primitives_traits::{SealedBlock, SealedHeader};
|
||||
use revm::{context::BlockEnv, primitives::hardfork::SpecId};
|
||||
|
||||
#[cfg(feature = "std")]
|
||||
use reth_evm::{ConfigureEngineEvm, ExecutableTxIterator};
|
||||
#[allow(unused_imports)]
|
||||
use {
|
||||
alloy_eips::Decodable2718,
|
||||
alloy_primitives::{Bytes, U256},
|
||||
alloy_rpc_types_engine::ExecutionData,
|
||||
reth_chainspec::EthereumHardforks,
|
||||
reth_evm::{EvmEnvFor, ExecutionCtxFor},
|
||||
reth_primitives_traits::{constants::MAX_TX_GAS_LIMIT_OSAKA, SignedTransaction, TxTy},
|
||||
reth_storage_errors::any::AnyError,
|
||||
revm::context::CfgEnv,
|
||||
revm::context_interface::block::BlobExcessGasAndPrice,
|
||||
};
|
||||
|
||||
pub use alloy_evm::EthEvm;
|
||||
|
||||
mod config;
|
||||
use alloy_evm::eth::spec::EthExecutorSpec;
|
||||
pub use config::{revm_spec, revm_spec_by_timestamp_and_block_number};
|
||||
@@ -206,6 +211,7 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "std")]
|
||||
impl<ChainSpec, EvmF> ConfigureEngineEvm<ExecutionData> for EthEvmConfig<ChainSpec, EvmF>
|
||||
where
|
||||
ChainSpec: EthExecutorSpec + EthChainSpec<Header = Header> + Hardforks + 'static,
|
||||
@@ -286,7 +292,7 @@ where
|
||||
&self,
|
||||
payload: &ExecutionData,
|
||||
) -> Result<impl ExecutableTxIterator<Self>, Self::Error> {
|
||||
let txs = payload.payload.transactions().clone().into_iter();
|
||||
let txs = payload.payload.transactions().clone();
|
||||
let convert = |tx: Bytes| {
|
||||
let tx =
|
||||
TxTy::<Self::Primitives>::decode_2718_exact(tx.as_ref()).map_err(AnyError::new)?;
|
||||
|
||||
@@ -24,7 +24,7 @@ reth-provider.workspace = true
|
||||
reth-transaction-pool.workspace = true
|
||||
reth-network.workspace = true
|
||||
reth-evm.workspace = true
|
||||
reth-evm-ethereum.workspace = true
|
||||
reth-evm-ethereum = { workspace = true, features = ["std"] }
|
||||
reth-rpc.workspace = true
|
||||
reth-rpc-api.workspace = true
|
||||
reth-rpc-eth-api.workspace = true
|
||||
@@ -35,7 +35,7 @@ reth-chainspec.workspace = true
|
||||
reth-revm = { workspace = true, features = ["std"] }
|
||||
reth-rpc-eth-types.workspace = true
|
||||
reth-engine-local.workspace = true
|
||||
reth-engine-primitives.workspace = true
|
||||
reth-engine-primitives = { workspace = true, features = ["std"] }
|
||||
reth-payload-primitives.workspace = true
|
||||
|
||||
# ethereum
|
||||
|
||||
@@ -118,13 +118,14 @@ impl EthereumNode {
|
||||
/// use reth_chainspec::ChainSpecBuilder;
|
||||
/// use reth_db::open_db_read_only;
|
||||
/// use reth_node_ethereum::EthereumNode;
|
||||
/// use reth_provider::providers::StaticFileProvider;
|
||||
/// use reth_provider::providers::{RocksDBProvider, StaticFileProvider};
|
||||
/// use std::sync::Arc;
|
||||
///
|
||||
/// let factory = EthereumNode::provider_factory_builder()
|
||||
/// .db(Arc::new(open_db_read_only("db", Default::default()).unwrap()))
|
||||
/// .chainspec(ChainSpecBuilder::mainnet().build().into())
|
||||
/// .static_file(StaticFileProvider::read_only("db/static_files", false).unwrap())
|
||||
/// .rocksdb_provider(RocksDBProvider::builder("db/rocksdb").build().unwrap())
|
||||
/// .build_provider_factory();
|
||||
/// ```
|
||||
pub fn provider_factory_builder() -> ProviderFactoryBuilder<Self> {
|
||||
|
||||
@@ -28,6 +28,7 @@ async fn testing_rpc_build_block_works() -> eyre::Result<()> {
|
||||
datadir: MaybePlatformPath::<DataDirPath>::from_str(tempdir.path().to_str().unwrap())
|
||||
.expect("valid datadir"),
|
||||
static_files_path: Some(tempdir.path().join("static")),
|
||||
rocksdb_path: Some(tempdir.path().join("rocksdb")),
|
||||
};
|
||||
let config = NodeConfig::test().with_datadir_args(datadir_args).with_rpc(rpc_args);
|
||||
let db = create_test_rw_db();
|
||||
|
||||
@@ -24,7 +24,7 @@ reth-payload-builder-primitives.workspace = true
|
||||
reth-payload-primitives.workspace = true
|
||||
reth-basic-payload-builder.workspace = true
|
||||
reth-evm.workspace = true
|
||||
reth-evm-ethereum.workspace = true
|
||||
reth-evm-ethereum = { workspace = true, features = ["std"] }
|
||||
reth-errors.workspace = true
|
||||
reth-chainspec.workspace = true
|
||||
reth-payload-validator.workspace = true
|
||||
|
||||
@@ -32,6 +32,7 @@ auto_impl.workspace = true
|
||||
derive_more.workspace = true
|
||||
futures-util.workspace = true
|
||||
metrics = { workspace = true, optional = true }
|
||||
rayon = { workspace = true, optional = true }
|
||||
|
||||
[dev-dependencies]
|
||||
reth-ethereum-primitives.workspace = true
|
||||
@@ -40,6 +41,7 @@ reth-ethereum-forks.workspace = true
|
||||
[features]
|
||||
default = ["std"]
|
||||
std = [
|
||||
"dep:rayon",
|
||||
"reth-primitives-traits/std",
|
||||
"alloy-eips/std",
|
||||
"alloy-primitives/std",
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
use crate::{execute::ExecutableTxFor, ConfigureEvm, EvmEnvFor, ExecutionCtxFor};
|
||||
use rayon::prelude::*;
|
||||
|
||||
/// [`ConfigureEvm`] extension providing methods for executing payloads.
|
||||
pub trait ConfigureEngineEvm<ExecutionData>: ConfigureEvm {
|
||||
@@ -21,7 +22,7 @@ pub trait ConfigureEngineEvm<ExecutionData>: ConfigureEvm {
|
||||
/// A helper trait representing a pair of a "raw" transactions iterator and a closure that can be
|
||||
/// used to convert them to an executable transaction. This tuple is used in the engine to
|
||||
/// parallelize heavy work like decoding or recovery.
|
||||
pub trait ExecutableTxTuple: Into<(Self::Iter, Self::Convert)> + Send + 'static {
|
||||
pub trait ExecutableTxTuple: Into<(Self::IntoIter, Self::Convert)> + Send + 'static {
|
||||
/// Raw transaction that can be converted to an [`ExecutableTxTuple::Tx`]
|
||||
///
|
||||
/// This can be any type that can be converted to an [`ExecutableTxTuple::Tx`]. For example,
|
||||
@@ -32,8 +33,10 @@ pub trait ExecutableTxTuple: Into<(Self::Iter, Self::Convert)> + Send + 'static
|
||||
/// Errors that may occur while recovering or decoding transactions.
|
||||
type Error: core::error::Error + Send + Sync + 'static;
|
||||
|
||||
/// Iterator over [`ExecutableTxTuple::Tx`]
|
||||
type Iter: Iterator<Item = Self::RawTx> + Send + 'static;
|
||||
/// Iterator over [`ExecutableTxTuple::Tx`].
|
||||
type IntoIter: IntoParallelIterator<Item = Self::RawTx, Iter: IndexedParallelIterator>
|
||||
+ Send
|
||||
+ 'static;
|
||||
/// Closure that can be used to convert a [`ExecutableTxTuple::RawTx`] to a
|
||||
/// [`ExecutableTxTuple::Tx`]. This might involve heavy work like decoding or recovery
|
||||
/// and will be parallelized in the engine.
|
||||
@@ -45,14 +48,14 @@ where
|
||||
RawTx: Send + Sync + 'static,
|
||||
Tx: Clone + Send + Sync + 'static,
|
||||
Err: core::error::Error + Send + Sync + 'static,
|
||||
I: Iterator<Item = RawTx> + Send + 'static,
|
||||
I: IntoParallelIterator<Item = RawTx, Iter: IndexedParallelIterator> + Send + 'static,
|
||||
F: Fn(RawTx) -> Result<Tx, Err> + Send + Sync + 'static,
|
||||
{
|
||||
type RawTx = RawTx;
|
||||
type Tx = Tx;
|
||||
type Error = Err;
|
||||
|
||||
type Iter = I;
|
||||
type IntoIter = I;
|
||||
type Convert = F;
|
||||
}
|
||||
|
||||
|
||||
@@ -44,8 +44,10 @@ pub mod execute;
|
||||
mod aliases;
|
||||
pub use aliases::*;
|
||||
|
||||
#[cfg(feature = "std")]
|
||||
mod engine;
|
||||
pub use engine::{ConfigureEngineEvm, ExecutableTxIterator};
|
||||
#[cfg(feature = "std")]
|
||||
pub use engine::{ConfigureEngineEvm, ExecutableTxIterator, ExecutableTxTuple};
|
||||
|
||||
#[cfg(feature = "metrics")]
|
||||
pub mod metrics;
|
||||
|
||||
@@ -20,7 +20,9 @@ use futures_util::FutureExt;
|
||||
use reth_chainspec::{ChainSpec, MAINNET};
|
||||
use reth_consensus::test_utils::TestConsensus;
|
||||
use reth_db::{
|
||||
test_utils::{create_test_rw_db, create_test_static_files_dir, TempDatabase},
|
||||
test_utils::{
|
||||
create_test_rocksdb_dir, create_test_rw_db, create_test_static_files_dir, TempDatabase,
|
||||
},
|
||||
DatabaseEnv,
|
||||
};
|
||||
use reth_db_common::init::init_genesis;
|
||||
@@ -50,7 +52,7 @@ use reth_node_ethereum::{
|
||||
use reth_payload_builder::noop::NoopPayloadBuilderService;
|
||||
use reth_primitives_traits::{Block as _, RecoveredBlock};
|
||||
use reth_provider::{
|
||||
providers::{BlockchainProvider, StaticFileProvider},
|
||||
providers::{BlockchainProvider, RocksDBProvider, StaticFileProvider},
|
||||
BlockReader, EthStorage, ProviderFactory,
|
||||
};
|
||||
use reth_tasks::TaskManager;
|
||||
@@ -239,11 +241,13 @@ pub async fn test_exex_context_with_chain_spec(
|
||||
let consensus = Arc::new(TestConsensus::default());
|
||||
|
||||
let (static_dir, _) = create_test_static_files_dir();
|
||||
let (rocksdb_dir, _) = create_test_rocksdb_dir();
|
||||
let db = create_test_rw_db();
|
||||
let provider_factory = ProviderFactory::<NodeTypesWithDBAdapter<TestNode, _>>::new(
|
||||
db,
|
||||
chain_spec.clone(),
|
||||
StaticFileProvider::read_write(static_dir.keep()).expect("static file provider"),
|
||||
RocksDBProvider::builder(rocksdb_dir.keep()).build().unwrap(),
|
||||
)?;
|
||||
|
||||
let genesis_hash = init_genesis(&provider_factory)?;
|
||||
|
||||
@@ -24,7 +24,7 @@ pub struct Discv4Config {
|
||||
/// The number of allowed consecutive failures for `FindNode` requests. Default: 5.
|
||||
pub max_find_node_failures: u8,
|
||||
/// The interval to use when checking for expired nodes that need to be re-pinged. Default:
|
||||
/// 10min.
|
||||
/// 10 seconds.
|
||||
pub ping_interval: Duration,
|
||||
/// The duration of we consider a ping timed out.
|
||||
pub ping_expiration: Duration,
|
||||
@@ -93,7 +93,7 @@ impl Discv4Config {
|
||||
/// Returns the corresponding [`ResolveNatInterval`], if a [`NatResolver`] and an interval was
|
||||
/// configured
|
||||
pub fn resolve_external_ip_interval(&self) -> Option<ResolveNatInterval> {
|
||||
let resolver = self.external_ip_resolver?;
|
||||
let resolver = self.external_ip_resolver.clone()?;
|
||||
let interval = self.resolve_external_ip_interval?;
|
||||
Some(ResolveNatInterval::interval_at(resolver, tokio::time::Instant::now(), interval))
|
||||
}
|
||||
@@ -275,10 +275,7 @@ impl Discv4ConfigBuilder {
|
||||
}
|
||||
|
||||
/// Configures if and how the external IP of the node should be resolved.
|
||||
pub const fn external_ip_resolver(
|
||||
&mut self,
|
||||
external_ip_resolver: Option<NatResolver>,
|
||||
) -> &mut Self {
|
||||
pub fn external_ip_resolver(&mut self, external_ip_resolver: Option<NatResolver>) -> &mut Self {
|
||||
self.config.external_ip_resolver = external_ip_resolver;
|
||||
self
|
||||
}
|
||||
|
||||
@@ -625,10 +625,13 @@ impl Discv4Service {
|
||||
self.lookup_interval = tokio::time::interval(duration);
|
||||
}
|
||||
|
||||
/// Sets the external Ip to the configured external IP if [`NatResolver::ExternalIp`].
|
||||
/// Sets the external Ip to the configured external IP if [`NatResolver::ExternalIp`] or
|
||||
/// [`NatResolver::ExternalAddr`]. In the case of [`NatResolver::ExternalAddr`], it will return
|
||||
/// the first IP address found for the domain associated with the discv4 UDP port.
|
||||
fn resolve_external_ip(&mut self) {
|
||||
if let Some(r) = &self.resolve_external_ip_interval &&
|
||||
let Some(external_ip) = r.resolver().as_external_ip()
|
||||
let Some(external_ip) =
|
||||
r.resolver().clone().as_external_ip(self.local_node_record.udp_port)
|
||||
{
|
||||
self.set_external_ip_addr(external_ip);
|
||||
}
|
||||
|
||||
@@ -169,7 +169,7 @@ impl NewPooledTransactionHashes {
|
||||
matches!(version, EthVersion::Eth67 | EthVersion::Eth66)
|
||||
}
|
||||
Self::Eth68(_) => {
|
||||
matches!(version, EthVersion::Eth68 | EthVersion::Eth69)
|
||||
matches!(version, EthVersion::Eth68 | EthVersion::Eth69 | EthVersion::Eth70)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -100,6 +100,16 @@ impl Capability {
|
||||
Self::eth(EthVersion::Eth68)
|
||||
}
|
||||
|
||||
/// Returns the [`EthVersion::Eth69`] capability.
|
||||
pub const fn eth_69() -> Self {
|
||||
Self::eth(EthVersion::Eth69)
|
||||
}
|
||||
|
||||
/// Returns the [`EthVersion::Eth70`] capability.
|
||||
pub const fn eth_70() -> Self {
|
||||
Self::eth(EthVersion::Eth70)
|
||||
}
|
||||
|
||||
/// Whether this is eth v66 protocol.
|
||||
#[inline]
|
||||
pub fn is_eth_v66(&self) -> bool {
|
||||
@@ -118,10 +128,26 @@ impl Capability {
|
||||
self.name == "eth" && self.version == 68
|
||||
}
|
||||
|
||||
/// Whether this is eth v69.
|
||||
#[inline]
|
||||
pub fn is_eth_v69(&self) -> bool {
|
||||
self.name == "eth" && self.version == 69
|
||||
}
|
||||
|
||||
/// Whether this is eth v70.
|
||||
#[inline]
|
||||
pub fn is_eth_v70(&self) -> bool {
|
||||
self.name == "eth" && self.version == 70
|
||||
}
|
||||
|
||||
/// Whether this is any eth version.
|
||||
#[inline]
|
||||
pub fn is_eth(&self) -> bool {
|
||||
self.is_eth_v66() || self.is_eth_v67() || self.is_eth_v68()
|
||||
self.is_eth_v66() ||
|
||||
self.is_eth_v67() ||
|
||||
self.is_eth_v68() ||
|
||||
self.is_eth_v69() ||
|
||||
self.is_eth_v70()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -141,7 +167,7 @@ impl From<EthVersion> for Capability {
|
||||
#[cfg(any(test, feature = "arbitrary"))]
|
||||
impl<'a> arbitrary::Arbitrary<'a> for Capability {
|
||||
fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result<Self> {
|
||||
let version = u.int_in_range(66..=69)?; // Valid eth protocol versions are 66-69
|
||||
let version = u.int_in_range(66..=70)?; // Valid eth protocol versions are 66-70
|
||||
// Only generate valid eth protocol name for now since it's the only supported protocol
|
||||
Ok(Self::new_static("eth", version))
|
||||
}
|
||||
@@ -155,6 +181,8 @@ pub struct Capabilities {
|
||||
eth_66: bool,
|
||||
eth_67: bool,
|
||||
eth_68: bool,
|
||||
eth_69: bool,
|
||||
eth_70: bool,
|
||||
}
|
||||
|
||||
impl Capabilities {
|
||||
@@ -164,6 +192,8 @@ impl Capabilities {
|
||||
eth_66: value.iter().any(Capability::is_eth_v66),
|
||||
eth_67: value.iter().any(Capability::is_eth_v67),
|
||||
eth_68: value.iter().any(Capability::is_eth_v68),
|
||||
eth_69: value.iter().any(Capability::is_eth_v69),
|
||||
eth_70: value.iter().any(Capability::is_eth_v70),
|
||||
inner: value,
|
||||
}
|
||||
}
|
||||
@@ -182,7 +212,7 @@ impl Capabilities {
|
||||
/// Whether the peer supports `eth` sub-protocol.
|
||||
#[inline]
|
||||
pub const fn supports_eth(&self) -> bool {
|
||||
self.eth_68 || self.eth_67 || self.eth_66
|
||||
self.eth_70 || self.eth_69 || self.eth_68 || self.eth_67 || self.eth_66
|
||||
}
|
||||
|
||||
/// Whether this peer supports eth v66 protocol.
|
||||
@@ -202,6 +232,18 @@ impl Capabilities {
|
||||
pub const fn supports_eth_v68(&self) -> bool {
|
||||
self.eth_68
|
||||
}
|
||||
|
||||
/// Whether this peer supports eth v69 protocol.
|
||||
#[inline]
|
||||
pub const fn supports_eth_v69(&self) -> bool {
|
||||
self.eth_69
|
||||
}
|
||||
|
||||
/// Whether this peer supports eth v70 protocol.
|
||||
#[inline]
|
||||
pub const fn supports_eth_v70(&self) -> bool {
|
||||
self.eth_70
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Vec<Capability>> for Capabilities {
|
||||
@@ -224,6 +266,8 @@ impl Decodable for Capabilities {
|
||||
eth_66: inner.iter().any(Capability::is_eth_v66),
|
||||
eth_67: inner.iter().any(Capability::is_eth_v67),
|
||||
eth_68: inner.iter().any(Capability::is_eth_v68),
|
||||
eth_69: inner.iter().any(Capability::is_eth_v69),
|
||||
eth_70: inner.iter().any(Capability::is_eth_v70),
|
||||
inner,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
//! Implements Ethereum wire protocol for versions 66, 67, and 68.
|
||||
//! Implements Ethereum wire protocol for versions 66 through 70.
|
||||
//! Defines structs/enums for messages, request-response pairs, and broadcasts.
|
||||
//! Handles compatibility with [`EthVersion`].
|
||||
//!
|
||||
@@ -8,13 +8,13 @@
|
||||
|
||||
use super::{
|
||||
broadcast::NewBlockHashes, BlockBodies, BlockHeaders, GetBlockBodies, GetBlockHeaders,
|
||||
GetNodeData, GetPooledTransactions, GetReceipts, NewPooledTransactionHashes66,
|
||||
GetNodeData, GetPooledTransactions, GetReceipts, GetReceipts70, NewPooledTransactionHashes66,
|
||||
NewPooledTransactionHashes68, NodeData, PooledTransactions, Receipts, Status, StatusEth69,
|
||||
Transactions,
|
||||
};
|
||||
use crate::{
|
||||
status::StatusMessage, BlockRangeUpdate, EthNetworkPrimitives, EthVersion, NetworkPrimitives,
|
||||
RawCapabilityMessage, Receipts69, SharedTransactions,
|
||||
RawCapabilityMessage, Receipts69, Receipts70, SharedTransactions,
|
||||
};
|
||||
use alloc::{boxed::Box, string::String, sync::Arc};
|
||||
use alloy_primitives::{
|
||||
@@ -111,13 +111,29 @@ impl<N: NetworkPrimitives> ProtocolMessage<N> {
|
||||
}
|
||||
EthMessage::NodeData(RequestPair::decode(buf)?)
|
||||
}
|
||||
EthMessageID::GetReceipts => EthMessage::GetReceipts(RequestPair::decode(buf)?),
|
||||
EthMessageID::Receipts => {
|
||||
if version < EthVersion::Eth69 {
|
||||
EthMessage::Receipts(RequestPair::decode(buf)?)
|
||||
EthMessageID::GetReceipts => {
|
||||
if version >= EthVersion::Eth70 {
|
||||
EthMessage::GetReceipts70(RequestPair::decode(buf)?)
|
||||
} else {
|
||||
// with eth69, receipts no longer include the bloom
|
||||
EthMessage::Receipts69(RequestPair::decode(buf)?)
|
||||
EthMessage::GetReceipts(RequestPair::decode(buf)?)
|
||||
}
|
||||
}
|
||||
EthMessageID::Receipts => {
|
||||
match version {
|
||||
v if v >= EthVersion::Eth70 => {
|
||||
// eth/70 continues to omit bloom filters and adds the
|
||||
// `lastBlockIncomplete` flag, encoded as
|
||||
// `[request-id, lastBlockIncomplete, [[receipt₁, receipt₂], ...]]`.
|
||||
EthMessage::Receipts70(RequestPair::decode(buf)?)
|
||||
}
|
||||
EthVersion::Eth69 => {
|
||||
// with eth69, receipts no longer include the bloom
|
||||
EthMessage::Receipts69(RequestPair::decode(buf)?)
|
||||
}
|
||||
_ => {
|
||||
// before eth69 we need to decode the bloom as well
|
||||
EthMessage::Receipts(RequestPair::decode(buf)?)
|
||||
}
|
||||
}
|
||||
}
|
||||
EthMessageID::BlockRangeUpdate => {
|
||||
@@ -205,6 +221,9 @@ impl<N: NetworkPrimitives> From<EthBroadcastMessage<N>> for ProtocolBroadcastMes
|
||||
///
|
||||
/// The `eth/69` announces the historical block range served by the node. Removes total difficulty
|
||||
/// information. And removes the Bloom field from receipts transferred over the protocol.
|
||||
///
|
||||
/// The `eth/70` (EIP-7975) keeps the eth/69 status format and introduces partial receipts.
|
||||
/// requests/responses.
|
||||
#[derive(Clone, Debug, PartialEq, Eq)]
|
||||
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
|
||||
pub enum EthMessage<N: NetworkPrimitives = EthNetworkPrimitives> {
|
||||
@@ -259,6 +278,12 @@ pub enum EthMessage<N: NetworkPrimitives = EthNetworkPrimitives> {
|
||||
NodeData(RequestPair<NodeData>),
|
||||
/// Represents a `GetReceipts` request-response pair.
|
||||
GetReceipts(RequestPair<GetReceipts>),
|
||||
/// Represents a `GetReceipts` request for eth/70.
|
||||
///
|
||||
/// Note: Unlike earlier protocol versions, the eth/70 encoding for
|
||||
/// `GetReceipts` in EIP-7975 inlines the request id. The type still wraps
|
||||
/// a [`RequestPair`], but with a custom inline encoding.
|
||||
GetReceipts70(RequestPair<GetReceipts70>),
|
||||
/// Represents a Receipts request-response pair.
|
||||
#[cfg_attr(
|
||||
feature = "serde",
|
||||
@@ -271,6 +296,16 @@ pub enum EthMessage<N: NetworkPrimitives = EthNetworkPrimitives> {
|
||||
serde(bound = "N::Receipt: serde::Serialize + serde::de::DeserializeOwned")
|
||||
)]
|
||||
Receipts69(RequestPair<Receipts69<N::Receipt>>),
|
||||
/// Represents a Receipts request-response pair for eth/70.
|
||||
#[cfg_attr(
|
||||
feature = "serde",
|
||||
serde(bound = "N::Receipt: serde::Serialize + serde::de::DeserializeOwned")
|
||||
)]
|
||||
///
|
||||
/// Note: The eth/70 encoding for `Receipts` in EIP-7975 inlines the
|
||||
/// request id. The type still wraps a [`RequestPair`], but with a custom
|
||||
/// inline encoding.
|
||||
Receipts70(RequestPair<Receipts70<N::Receipt>>),
|
||||
/// Represents a `BlockRangeUpdate` message broadcast to the network.
|
||||
#[cfg_attr(
|
||||
feature = "serde",
|
||||
@@ -300,8 +335,8 @@ impl<N: NetworkPrimitives> EthMessage<N> {
|
||||
Self::PooledTransactions(_) => EthMessageID::PooledTransactions,
|
||||
Self::GetNodeData(_) => EthMessageID::GetNodeData,
|
||||
Self::NodeData(_) => EthMessageID::NodeData,
|
||||
Self::GetReceipts(_) => EthMessageID::GetReceipts,
|
||||
Self::Receipts(_) | Self::Receipts69(_) => EthMessageID::Receipts,
|
||||
Self::GetReceipts(_) | Self::GetReceipts70(_) => EthMessageID::GetReceipts,
|
||||
Self::Receipts(_) | Self::Receipts69(_) | Self::Receipts70(_) => EthMessageID::Receipts,
|
||||
Self::BlockRangeUpdate(_) => EthMessageID::BlockRangeUpdate,
|
||||
Self::Other(msg) => EthMessageID::Other(msg.id as u8),
|
||||
}
|
||||
@@ -314,6 +349,7 @@ impl<N: NetworkPrimitives> EthMessage<N> {
|
||||
Self::GetBlockBodies(_) |
|
||||
Self::GetBlockHeaders(_) |
|
||||
Self::GetReceipts(_) |
|
||||
Self::GetReceipts70(_) |
|
||||
Self::GetPooledTransactions(_) |
|
||||
Self::GetNodeData(_)
|
||||
)
|
||||
@@ -326,11 +362,40 @@ impl<N: NetworkPrimitives> EthMessage<N> {
|
||||
Self::PooledTransactions(_) |
|
||||
Self::Receipts(_) |
|
||||
Self::Receipts69(_) |
|
||||
Self::Receipts70(_) |
|
||||
Self::BlockHeaders(_) |
|
||||
Self::BlockBodies(_) |
|
||||
Self::NodeData(_)
|
||||
)
|
||||
}
|
||||
|
||||
/// Converts the message types where applicable.
|
||||
///
|
||||
/// This handles up/downcasting where appropriate, for example for different receipt request
|
||||
/// types.
|
||||
pub fn map_versioned(mut self, version: EthVersion) -> Self {
|
||||
// For eth/70 peers we send `GetReceipts` using the new eth/70
|
||||
// encoding with `firstBlockReceiptIndex = 0`, while keeping the
|
||||
// user-facing `PeerRequest` API unchanged.
|
||||
if version >= EthVersion::Eth70 {
|
||||
return match self {
|
||||
EthMessage::GetReceipts(pair) => {
|
||||
let RequestPair { request_id, message } = pair;
|
||||
let req = RequestPair {
|
||||
request_id,
|
||||
message: GetReceipts70 {
|
||||
first_block_receipt_index: 0,
|
||||
block_hashes: message.0,
|
||||
},
|
||||
};
|
||||
EthMessage::GetReceipts70(req)
|
||||
}
|
||||
other => other,
|
||||
}
|
||||
}
|
||||
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
impl<N: NetworkPrimitives> Encodable for EthMessage<N> {
|
||||
@@ -351,8 +416,10 @@ impl<N: NetworkPrimitives> Encodable for EthMessage<N> {
|
||||
Self::GetNodeData(request) => request.encode(out),
|
||||
Self::NodeData(data) => data.encode(out),
|
||||
Self::GetReceipts(request) => request.encode(out),
|
||||
Self::GetReceipts70(request) => request.encode(out),
|
||||
Self::Receipts(receipts) => receipts.encode(out),
|
||||
Self::Receipts69(receipt69) => receipt69.encode(out),
|
||||
Self::Receipts70(receipt70) => receipt70.encode(out),
|
||||
Self::BlockRangeUpdate(block_range_update) => block_range_update.encode(out),
|
||||
Self::Other(unknown) => out.put_slice(&unknown.payload),
|
||||
}
|
||||
@@ -374,8 +441,10 @@ impl<N: NetworkPrimitives> Encodable for EthMessage<N> {
|
||||
Self::GetNodeData(request) => request.length(),
|
||||
Self::NodeData(data) => data.length(),
|
||||
Self::GetReceipts(request) => request.length(),
|
||||
Self::GetReceipts70(request) => request.length(),
|
||||
Self::Receipts(receipts) => receipts.length(),
|
||||
Self::Receipts69(receipt69) => receipt69.length(),
|
||||
Self::Receipts70(receipt70) => receipt70.length(),
|
||||
Self::BlockRangeUpdate(block_range_update) => block_range_update.length(),
|
||||
Self::Other(unknown) => unknown.length(),
|
||||
}
|
||||
|
||||
@@ -17,6 +17,42 @@ pub struct GetReceipts(
|
||||
pub Vec<B256>,
|
||||
);
|
||||
|
||||
/// Eth/70 `GetReceipts` request payload that supports partial receipt queries.
|
||||
///
|
||||
/// When used with eth/70, the request id is carried by the surrounding
|
||||
/// [`crate::message::RequestPair`], and the on-wire shape is the flattened list
|
||||
/// `firstBlockReceiptIndex, [blockhash₁, ...]`.
|
||||
///
|
||||
/// See also [eip-7975](https://eips.ethereum.org/EIPS/eip-7975)
|
||||
#[derive(Clone, Debug, PartialEq, Eq)]
|
||||
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
|
||||
#[cfg_attr(any(test, feature = "arbitrary"), derive(arbitrary::Arbitrary))]
|
||||
pub struct GetReceipts70 {
|
||||
/// Index into the receipts of the first requested block hash.
|
||||
pub first_block_receipt_index: u64,
|
||||
/// The block hashes to request receipts for.
|
||||
pub block_hashes: Vec<B256>,
|
||||
}
|
||||
|
||||
impl alloy_rlp::Encodable for GetReceipts70 {
|
||||
fn encode(&self, out: &mut dyn alloy_rlp::BufMut) {
|
||||
self.first_block_receipt_index.encode(out);
|
||||
self.block_hashes.encode(out);
|
||||
}
|
||||
|
||||
fn length(&self) -> usize {
|
||||
self.first_block_receipt_index.length() + self.block_hashes.length()
|
||||
}
|
||||
}
|
||||
|
||||
impl alloy_rlp::Decodable for GetReceipts70 {
|
||||
fn decode(buf: &mut &[u8]) -> alloy_rlp::Result<Self> {
|
||||
let first_block_receipt_index = u64::decode(buf)?;
|
||||
let block_hashes = Vec::<B256>::decode(buf)?;
|
||||
Ok(Self { first_block_receipt_index, block_hashes })
|
||||
}
|
||||
}
|
||||
|
||||
/// The response to [`GetReceipts`], containing receipt lists that correspond to each block
|
||||
/// requested.
|
||||
#[derive(Clone, Debug, PartialEq, Eq, Default)]
|
||||
@@ -58,7 +94,13 @@ pub struct Receipts69<T = Receipt>(pub Vec<Vec<T>>);
|
||||
impl<T: TxReceipt> Receipts69<T> {
|
||||
/// Encodes all receipts with the bloom filter.
|
||||
///
|
||||
/// Note: This is an expensive operation that recalculates the bloom for each receipt.
|
||||
/// Eth/69 omits bloom filters on the wire, while some internal callers
|
||||
/// (and legacy APIs) still operate on [`Receipts`] with
|
||||
/// [`ReceiptWithBloom`]. This helper reconstructs the bloom locally from
|
||||
/// each receipt's logs so the older API can be used on top of eth/69 data.
|
||||
///
|
||||
/// Note: This is an expensive operation that recalculates the bloom for
|
||||
/// every receipt.
|
||||
pub fn into_with_bloom(self) -> Receipts<T> {
|
||||
Receipts(
|
||||
self.0
|
||||
@@ -75,6 +117,68 @@ impl<T: TxReceipt> From<Receipts69<T>> for Receipts<T> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Eth/70 `Receipts` response payload.
|
||||
///
|
||||
/// This is used in conjunction with [`crate::message::RequestPair`] to encode the full wire
|
||||
/// message `[request-id, lastBlockIncomplete, [[receipt₁, receipt₂], ...]]`.
|
||||
#[derive(Clone, Debug, PartialEq, Eq)]
|
||||
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
|
||||
#[cfg_attr(any(test, feature = "arbitrary"), derive(arbitrary::Arbitrary))]
|
||||
pub struct Receipts70<T = Receipt> {
|
||||
/// Whether the receipts list for the last block is incomplete.
|
||||
pub last_block_incomplete: bool,
|
||||
/// Receipts grouped by block.
|
||||
pub receipts: Vec<Vec<T>>,
|
||||
}
|
||||
|
||||
impl<T> alloy_rlp::Encodable for Receipts70<T>
|
||||
where
|
||||
T: alloy_rlp::Encodable,
|
||||
{
|
||||
fn encode(&self, out: &mut dyn alloy_rlp::BufMut) {
|
||||
self.last_block_incomplete.encode(out);
|
||||
self.receipts.encode(out);
|
||||
}
|
||||
|
||||
fn length(&self) -> usize {
|
||||
self.last_block_incomplete.length() + self.receipts.length()
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> alloy_rlp::Decodable for Receipts70<T>
|
||||
where
|
||||
T: alloy_rlp::Decodable,
|
||||
{
|
||||
fn decode(buf: &mut &[u8]) -> alloy_rlp::Result<Self> {
|
||||
let last_block_incomplete = bool::decode(buf)?;
|
||||
let receipts = Vec::<Vec<T>>::decode(buf)?;
|
||||
Ok(Self { last_block_incomplete, receipts })
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: TxReceipt> Receipts70<T> {
|
||||
/// Encodes all receipts with the bloom filter.
|
||||
///
|
||||
/// Just like eth/69, eth/70 does not transmit bloom filters over the wire.
|
||||
/// When higher layers still expect the older bloom-bearing [`Receipts`]
|
||||
/// type, this helper converts the eth/70 payload into that shape by
|
||||
/// recomputing the bloom locally from the contained receipts.
|
||||
///
|
||||
/// Note: This is an expensive operation that recalculates the bloom for
|
||||
/// every receipt.
|
||||
pub fn into_with_bloom(self) -> Receipts<T> {
|
||||
// Reuse the eth/69 helper, since both variants carry the same
|
||||
// receipt list shape (only eth/70 adds request metadata).
|
||||
Receipts69(self.receipts).into_with_bloom()
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: TxReceipt> From<Receipts70<T>> for Receipts<T> {
|
||||
fn from(receipts: Receipts70<T>) -> Self {
|
||||
receipts.into_with_bloom()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
@@ -225,4 +329,70 @@ mod tests {
|
||||
let encoded = alloy_rlp::encode(&request);
|
||||
assert_eq!(encoded, data);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn encode_get_receipts70_inline_shape() {
|
||||
let req = RequestPair {
|
||||
request_id: 1111,
|
||||
message: GetReceipts70 {
|
||||
first_block_receipt_index: 0,
|
||||
block_hashes: vec![
|
||||
hex!("00000000000000000000000000000000000000000000000000000000deadc0de").into(),
|
||||
hex!("00000000000000000000000000000000000000000000000000000000feedbeef").into(),
|
||||
],
|
||||
},
|
||||
};
|
||||
|
||||
let mut out = vec![];
|
||||
req.encode(&mut out);
|
||||
|
||||
let mut buf = out.as_slice();
|
||||
let header = alloy_rlp::Header::decode(&mut buf).unwrap();
|
||||
let payload_start = buf.len();
|
||||
let request_id = u64::decode(&mut buf).unwrap();
|
||||
let first_block_receipt_index = u64::decode(&mut buf).unwrap();
|
||||
let block_hashes = Vec::<B256>::decode(&mut buf).unwrap();
|
||||
|
||||
assert!(buf.is_empty(), "buffer not fully consumed");
|
||||
assert_eq!(request_id, 1111);
|
||||
assert_eq!(first_block_receipt_index, 0);
|
||||
assert_eq!(block_hashes.len(), 2);
|
||||
// ensure payload length matches header
|
||||
assert_eq!(payload_start - buf.len(), header.payload_length);
|
||||
|
||||
let mut buf = out.as_slice();
|
||||
let decoded = RequestPair::<GetReceipts70>::decode(&mut buf).unwrap();
|
||||
assert!(buf.is_empty(), "buffer not fully consumed on decode");
|
||||
assert_eq!(decoded, req);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn encode_receipts70_inline_shape() {
|
||||
let payload: Receipts70<Receipt> =
|
||||
Receipts70 { last_block_incomplete: true, receipts: vec![vec![Receipt::default()]] };
|
||||
|
||||
let resp = RequestPair { request_id: 7, message: payload };
|
||||
|
||||
let mut out = vec![];
|
||||
resp.encode(&mut out);
|
||||
|
||||
let mut buf = out.as_slice();
|
||||
let header = alloy_rlp::Header::decode(&mut buf).unwrap();
|
||||
let payload_start = buf.len();
|
||||
let request_id = u64::decode(&mut buf).unwrap();
|
||||
let last_block_incomplete = bool::decode(&mut buf).unwrap();
|
||||
let receipts = Vec::<Vec<Receipt>>::decode(&mut buf).unwrap();
|
||||
|
||||
assert!(buf.is_empty(), "buffer not fully consumed");
|
||||
assert_eq!(payload_start - buf.len(), header.payload_length);
|
||||
assert_eq!(request_id, 7);
|
||||
assert!(last_block_incomplete);
|
||||
assert_eq!(receipts.len(), 1);
|
||||
assert_eq!(receipts[0].len(), 1);
|
||||
|
||||
let mut buf = out.as_slice();
|
||||
let decoded = RequestPair::<Receipts70>::decode(&mut buf).unwrap();
|
||||
assert!(buf.is_empty(), "buffer not fully consumed on decode");
|
||||
assert_eq!(decoded, resp);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -13,7 +13,7 @@ use reth_codecs_derive::add_arbitrary_tests;
|
||||
/// unsupported fields are stripped out.
|
||||
#[derive(Clone, Debug, PartialEq, Eq, Copy)]
|
||||
pub struct UnifiedStatus {
|
||||
/// The eth protocol version (e.g. eth/66 to eth/69).
|
||||
/// The eth protocol version (e.g. eth/66 to eth/70).
|
||||
pub version: EthVersion,
|
||||
/// The chain ID identifying the peer’s network.
|
||||
pub chain: Chain,
|
||||
@@ -157,7 +157,7 @@ impl StatusBuilder {
|
||||
self.status
|
||||
}
|
||||
|
||||
/// Sets the eth protocol version (e.g., eth/66, eth/69).
|
||||
/// Sets the eth protocol version (e.g., eth/66, eth/70).
|
||||
pub const fn version(mut self, version: EthVersion) -> Self {
|
||||
self.status.version = version;
|
||||
self
|
||||
@@ -378,8 +378,8 @@ impl Debug for StatusEth69 {
|
||||
}
|
||||
}
|
||||
|
||||
/// `StatusMessage` can store either the Legacy version (with TD) or the
|
||||
/// eth/69 version (omits TD).
|
||||
/// `StatusMessage` can store either the Legacy version (with TD), or the eth/69+/eth/70 version
|
||||
/// (omits TD, includes block range).
|
||||
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
|
||||
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
|
||||
pub enum StatusMessage {
|
||||
@@ -546,6 +546,24 @@ mod tests {
|
||||
assert_eq!(unified_status, roundtripped_unified_status);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn roundtrip_eth70() {
|
||||
let unified_status = UnifiedStatus::builder()
|
||||
.version(EthVersion::Eth70)
|
||||
.chain(Chain::mainnet())
|
||||
.genesis(MAINNET_GENESIS_HASH)
|
||||
.forkid(ForkId { hash: ForkHash([0xb7, 0x15, 0x07, 0x7d]), next: 0 })
|
||||
.blockhash(b256!("0xfeb27336ca7923f8fab3bd617fcb6e75841538f71c1bcfc267d7838489d9e13d"))
|
||||
.total_difficulty(None)
|
||||
.earliest_block(Some(1))
|
||||
.latest_block(Some(2))
|
||||
.build();
|
||||
|
||||
let status_message = unified_status.into_message();
|
||||
let roundtripped_unified_status = UnifiedStatus::from_message(status_message);
|
||||
assert_eq!(unified_status, roundtripped_unified_status);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn encode_eth69_status_message() {
|
||||
let expected = hex!("f8544501a0d4e56740f876aef8c010b86a40d5f56745a118d0906a34e69aec8c0db1cb8fa3c684b715077d8083ed14f2840112a880a0feb27336ca7923f8fab3bd617fcb6e75841538f71c1bcfc267d7838489d9e13d");
|
||||
|
||||
@@ -27,6 +27,8 @@ pub enum EthVersion {
|
||||
Eth68 = 68,
|
||||
/// The `eth` protocol version 69.
|
||||
Eth69 = 69,
|
||||
/// The `eth` protocol version 70.
|
||||
Eth70 = 70,
|
||||
}
|
||||
|
||||
impl EthVersion {
|
||||
@@ -55,6 +57,11 @@ impl EthVersion {
|
||||
pub const fn is_eth69(&self) -> bool {
|
||||
matches!(self, Self::Eth69)
|
||||
}
|
||||
|
||||
/// Returns true if the version is eth/70
|
||||
pub const fn is_eth70(&self) -> bool {
|
||||
matches!(self, Self::Eth70)
|
||||
}
|
||||
}
|
||||
|
||||
/// RLP encodes `EthVersion` as a single byte (66-69).
|
||||
@@ -96,6 +103,7 @@ impl TryFrom<&str> for EthVersion {
|
||||
"67" => Ok(Self::Eth67),
|
||||
"68" => Ok(Self::Eth68),
|
||||
"69" => Ok(Self::Eth69),
|
||||
"70" => Ok(Self::Eth70),
|
||||
_ => Err(ParseVersionError(s.to_string())),
|
||||
}
|
||||
}
|
||||
@@ -120,6 +128,7 @@ impl TryFrom<u8> for EthVersion {
|
||||
67 => Ok(Self::Eth67),
|
||||
68 => Ok(Self::Eth68),
|
||||
69 => Ok(Self::Eth69),
|
||||
70 => Ok(Self::Eth70),
|
||||
_ => Err(ParseVersionError(u.to_string())),
|
||||
}
|
||||
}
|
||||
@@ -149,6 +158,7 @@ impl From<EthVersion> for &'static str {
|
||||
EthVersion::Eth67 => "67",
|
||||
EthVersion::Eth68 => "68",
|
||||
EthVersion::Eth69 => "69",
|
||||
EthVersion::Eth70 => "70",
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -195,7 +205,7 @@ impl Decodable for ProtocolVersion {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::{EthVersion, ParseVersionError};
|
||||
use super::EthVersion;
|
||||
use alloy_rlp::{Decodable, Encodable, Error as RlpError};
|
||||
use bytes::BytesMut;
|
||||
|
||||
@@ -205,7 +215,7 @@ mod tests {
|
||||
assert_eq!(EthVersion::Eth67, EthVersion::try_from("67").unwrap());
|
||||
assert_eq!(EthVersion::Eth68, EthVersion::try_from("68").unwrap());
|
||||
assert_eq!(EthVersion::Eth69, EthVersion::try_from("69").unwrap());
|
||||
assert_eq!(Err(ParseVersionError("70".to_string())), EthVersion::try_from("70"));
|
||||
assert_eq!(EthVersion::Eth70, EthVersion::try_from("70").unwrap());
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -214,12 +224,18 @@ mod tests {
|
||||
assert_eq!(EthVersion::Eth67, "67".parse().unwrap());
|
||||
assert_eq!(EthVersion::Eth68, "68".parse().unwrap());
|
||||
assert_eq!(EthVersion::Eth69, "69".parse().unwrap());
|
||||
assert_eq!(Err(ParseVersionError("70".to_string())), "70".parse::<EthVersion>());
|
||||
assert_eq!(EthVersion::Eth70, "70".parse().unwrap());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_eth_version_rlp_encode() {
|
||||
let versions = [EthVersion::Eth66, EthVersion::Eth67, EthVersion::Eth68, EthVersion::Eth69];
|
||||
let versions = [
|
||||
EthVersion::Eth66,
|
||||
EthVersion::Eth67,
|
||||
EthVersion::Eth68,
|
||||
EthVersion::Eth69,
|
||||
EthVersion::Eth70,
|
||||
];
|
||||
|
||||
for version in versions {
|
||||
let mut encoded = BytesMut::new();
|
||||
@@ -236,7 +252,7 @@ mod tests {
|
||||
(67_u8, Ok(EthVersion::Eth67)),
|
||||
(68_u8, Ok(EthVersion::Eth68)),
|
||||
(69_u8, Ok(EthVersion::Eth69)),
|
||||
(70_u8, Err(RlpError::Custom("invalid eth version"))),
|
||||
(70_u8, Ok(EthVersion::Eth70)),
|
||||
(65_u8, Err(RlpError::Custom("invalid eth version"))),
|
||||
];
|
||||
|
||||
|
||||
@@ -418,6 +418,8 @@ mod tests {
|
||||
Capability::new_static("eth", 66),
|
||||
Capability::new_static("eth", 67),
|
||||
Capability::new_static("eth", 68),
|
||||
Capability::new_static("eth", 69),
|
||||
Capability::new_static("eth", 70),
|
||||
]
|
||||
.into();
|
||||
|
||||
@@ -425,6 +427,8 @@ mod tests {
|
||||
assert!(capabilities.supports_eth_v66());
|
||||
assert!(capabilities.supports_eth_v67());
|
||||
assert!(capabilities.supports_eth_v68());
|
||||
assert!(capabilities.supports_eth_v69());
|
||||
assert!(capabilities.supports_eth_v70());
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -260,10 +260,11 @@ mod tests {
|
||||
|
||||
assert_eq!(hello_encoded.len(), hello.length());
|
||||
}
|
||||
//TODO: add test for eth70 here once we have fully support it
|
||||
|
||||
#[test]
|
||||
fn test_default_protocols_include_eth69() {
|
||||
// ensure that the default protocol list includes Eth69 as the latest version
|
||||
fn test_default_protocols_still_include_eth69() {
|
||||
// ensure that older eth/69 remains advertised for compatibility
|
||||
let secret_key = SecretKey::new(&mut rand_08::thread_rng());
|
||||
let id = pk2id(&secret_key.public_key(SECP256K1));
|
||||
let hello = HelloMessageWithProtocols::builder(id).build();
|
||||
|
||||
@@ -19,7 +19,7 @@ pub use net_if::{NetInterfaceError, DEFAULT_NET_IF_NAME};
|
||||
use std::{
|
||||
fmt,
|
||||
future::{poll_fn, Future},
|
||||
net::{AddrParseError, IpAddr},
|
||||
net::{AddrParseError, IpAddr, ToSocketAddrs},
|
||||
pin::Pin,
|
||||
str::FromStr,
|
||||
task::{Context, Poll},
|
||||
@@ -38,7 +38,7 @@ const EXTERNAL_IP_APIS: &[&str] =
|
||||
&["https://ipinfo.io/ip", "https://icanhazip.com", "https://ifconfig.me"];
|
||||
|
||||
/// All builtin resolvers.
|
||||
#[derive(Debug, Clone, Copy, Eq, PartialEq, Default, Hash)]
|
||||
#[derive(Debug, Clone, Eq, PartialEq, Default, Hash)]
|
||||
#[cfg_attr(feature = "serde", derive(SerializeDisplay, DeserializeFromStr))]
|
||||
pub enum NatResolver {
|
||||
/// Resolve with any available resolver.
|
||||
@@ -50,6 +50,14 @@ pub enum NatResolver {
|
||||
PublicIp,
|
||||
/// Use the given [`IpAddr`]
|
||||
ExternalIp(IpAddr),
|
||||
/// Use the given domain name as the external address to expose to peers.
|
||||
/// This is behaving essentially the same as [`NatResolver::ExternalIp`], but supports domain
|
||||
/// names. Domain names are resolved to IP addresses using the OS's resolver. The first IP
|
||||
/// address found is used.
|
||||
/// This may be useful in docker bridge networks where containers are usually queried by DNS
|
||||
/// instead of direct IP addresses.
|
||||
/// Note: the domain shouldn't include a port number. Only the IP address is resolved.
|
||||
ExternalAddr(String),
|
||||
/// Resolve external IP via the network interface.
|
||||
NetIf,
|
||||
/// Resolve nothing
|
||||
@@ -62,10 +70,17 @@ impl NatResolver {
|
||||
external_addr_with(self).await
|
||||
}
|
||||
|
||||
/// Returns the external ip, if it is [`NatResolver::ExternalIp`]
|
||||
pub const fn as_external_ip(self) -> Option<IpAddr> {
|
||||
/// Returns the fixed ip, if it is [`NatResolver::ExternalIp`] or [`NatResolver::ExternalAddr`].
|
||||
///
|
||||
/// In the case of [`NatResolver::ExternalAddr`], it will return the first IP address found for
|
||||
/// the domain.
|
||||
pub fn as_external_ip(self, port: u16) -> Option<IpAddr> {
|
||||
match self {
|
||||
Self::ExternalIp(ip) => Some(ip),
|
||||
Self::ExternalAddr(domain) => format!("{domain}:{port}")
|
||||
.to_socket_addrs()
|
||||
.ok()
|
||||
.and_then(|mut addrs| addrs.next().map(|addr| addr.ip())),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
@@ -78,6 +93,7 @@ impl fmt::Display for NatResolver {
|
||||
Self::Upnp => f.write_str("upnp"),
|
||||
Self::PublicIp => f.write_str("publicip"),
|
||||
Self::ExternalIp(ip) => write!(f, "extip:{ip}"),
|
||||
Self::ExternalAddr(domain) => write!(f, "extaddr:{domain}"),
|
||||
Self::NetIf => f.write_str("netif"),
|
||||
Self::None => f.write_str("none"),
|
||||
}
|
||||
@@ -106,12 +122,15 @@ impl FromStr for NatResolver {
|
||||
"publicip" | "public-ip" => Self::PublicIp,
|
||||
"netif" => Self::NetIf,
|
||||
s => {
|
||||
let Some(ip) = s.strip_prefix("extip:") else {
|
||||
if let Some(ip) = s.strip_prefix("extip:") {
|
||||
Self::ExternalIp(ip.parse()?)
|
||||
} else if let Some(domain) = s.strip_prefix("extaddr:") {
|
||||
Self::ExternalAddr(domain.to_string())
|
||||
} else {
|
||||
return Err(ParseNatResolverError::UnknownVariant(format!(
|
||||
"Unknown Nat Resolver: {s}"
|
||||
)))
|
||||
};
|
||||
Self::ExternalIp(ip.parse()?)
|
||||
)));
|
||||
}
|
||||
}
|
||||
};
|
||||
Ok(r)
|
||||
@@ -180,7 +199,7 @@ impl ResolveNatInterval {
|
||||
/// `None` if the attempt was unsuccessful.
|
||||
pub fn poll_tick(&mut self, cx: &mut Context<'_>) -> Poll<Option<IpAddr>> {
|
||||
if self.interval.poll_tick(cx).is_ready() {
|
||||
self.future = Some(Box::pin(self.resolver.external_addr()));
|
||||
self.future = Some(Box::pin(self.resolver.clone().external_addr()));
|
||||
}
|
||||
|
||||
if let Some(mut fut) = self.future.take() {
|
||||
@@ -212,6 +231,9 @@ pub async fn external_addr_with(resolver: NatResolver) -> Option<IpAddr> {
|
||||
);
|
||||
})
|
||||
.ok(),
|
||||
NatResolver::ExternalAddr(domain) => {
|
||||
domain.to_socket_addrs().ok().and_then(|mut addrs| addrs.next().map(|addr| addr.ip()))
|
||||
}
|
||||
NatResolver::None => None,
|
||||
}
|
||||
}
|
||||
@@ -245,7 +267,7 @@ async fn resolve_external_ip_url(url: &str) -> Option<IpAddr> {
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use std::net::Ipv4Addr;
|
||||
use std::net::{Ipv4Addr, Ipv6Addr};
|
||||
|
||||
#[tokio::test]
|
||||
#[ignore]
|
||||
@@ -267,6 +289,18 @@ mod tests {
|
||||
dbg!(ip);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn as_external_ip_test() {
|
||||
let resolver = NatResolver::ExternalAddr("localhost".to_string());
|
||||
let ip = resolver.as_external_ip(30303).expect("localhost should be resolvable");
|
||||
|
||||
if ip.is_ipv4() {
|
||||
assert_eq!(ip, IpAddr::V4(Ipv4Addr::LOCALHOST));
|
||||
} else {
|
||||
assert_eq!(ip, IpAddr::V6(Ipv6Addr::LOCALHOST));
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_from_str() {
|
||||
assert_eq!(NatResolver::Any, "any".parse().unwrap());
|
||||
@@ -275,6 +309,6 @@ mod tests {
|
||||
let ip = NatResolver::ExternalIp(IpAddr::V4(Ipv4Addr::UNSPECIFIED));
|
||||
let s = "extip:0.0.0.0";
|
||||
assert_eq!(ip, s.parse().unwrap());
|
||||
assert_eq!(ip.to_string().as_str(), s);
|
||||
assert_eq!(ip.to_string(), s);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,8 +3,8 @@
|
||||
use reth_eth_wire_types::{
|
||||
message::RequestPair, BlockBodies, BlockHeaders, Capabilities, DisconnectReason, EthMessage,
|
||||
EthNetworkPrimitives, EthVersion, GetBlockBodies, GetBlockHeaders, GetNodeData,
|
||||
GetPooledTransactions, GetReceipts, NetworkPrimitives, NodeData, PooledTransactions, Receipts,
|
||||
Receipts69, UnifiedStatus,
|
||||
GetPooledTransactions, GetReceipts, GetReceipts70, NetworkPrimitives, NodeData,
|
||||
PooledTransactions, Receipts, Receipts69, Receipts70, UnifiedStatus,
|
||||
};
|
||||
use reth_ethereum_forks::ForkId;
|
||||
use reth_network_p2p::error::{RequestError, RequestResult};
|
||||
@@ -238,6 +238,15 @@ pub enum PeerRequest<N: NetworkPrimitives = EthNetworkPrimitives> {
|
||||
/// The channel to send the response for receipts.
|
||||
response: oneshot::Sender<RequestResult<Receipts69<N::Receipt>>>,
|
||||
},
|
||||
/// Requests receipts from the peer using eth/70 (supports `firstBlockReceiptIndex`).
|
||||
///
|
||||
/// The response should be sent through the channel.
|
||||
GetReceipts70 {
|
||||
/// The request for receipts.
|
||||
request: GetReceipts70,
|
||||
/// The channel to send the response for receipts.
|
||||
response: oneshot::Sender<RequestResult<Receipts70<N::Receipt>>>,
|
||||
},
|
||||
}
|
||||
|
||||
// === impl PeerRequest ===
|
||||
@@ -257,6 +266,7 @@ impl<N: NetworkPrimitives> PeerRequest<N> {
|
||||
Self::GetNodeData { response, .. } => response.send(Err(err)).ok(),
|
||||
Self::GetReceipts { response, .. } => response.send(Err(err)).ok(),
|
||||
Self::GetReceipts69 { response, .. } => response.send(Err(err)).ok(),
|
||||
Self::GetReceipts70 { response, .. } => response.send(Err(err)).ok(),
|
||||
};
|
||||
}
|
||||
|
||||
@@ -281,6 +291,9 @@ impl<N: NetworkPrimitives> PeerRequest<N> {
|
||||
Self::GetReceipts { request, .. } | Self::GetReceipts69 { request, .. } => {
|
||||
EthMessage::GetReceipts(RequestPair { request_id, message: request.clone() })
|
||||
}
|
||||
Self::GetReceipts70 { request, .. } => {
|
||||
EthMessage::GetReceipts70(RequestPair { request_id, message: request.clone() })
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -66,6 +66,7 @@ tracing.workspace = true
|
||||
rustc-hash.workspace = true
|
||||
thiserror.workspace = true
|
||||
parking_lot.workspace = true
|
||||
rayon.workspace = true
|
||||
rand.workspace = true
|
||||
rand_08.workspace = true
|
||||
secp256k1 = { workspace = true, features = ["global-context", "std", "recovery"] }
|
||||
|
||||
@@ -433,7 +433,7 @@ impl<N: NetworkPrimitives> NetworkConfigBuilder<N> {
|
||||
pub fn external_ip_resolver(mut self, resolver: NatResolver) -> Self {
|
||||
self.discovery_v4_builder
|
||||
.get_or_insert_with(Discv4Config::builder)
|
||||
.external_ip_resolver(Some(resolver));
|
||||
.external_ip_resolver(Some(resolver.clone()));
|
||||
self.nat = Some(resolver);
|
||||
self
|
||||
}
|
||||
@@ -484,7 +484,7 @@ impl<N: NetworkPrimitives> NetworkConfigBuilder<N> {
|
||||
}
|
||||
|
||||
// Disable nat
|
||||
pub const fn disable_nat(mut self) -> Self {
|
||||
pub fn disable_nat(mut self) -> Self {
|
||||
self.nat = None;
|
||||
self
|
||||
}
|
||||
@@ -579,7 +579,7 @@ impl<N: NetworkPrimitives> NetworkConfigBuilder<N> {
|
||||
}
|
||||
|
||||
/// Sets the NAT resolver for external IP.
|
||||
pub const fn add_nat(mut self, nat: Option<NatResolver>) -> Self {
|
||||
pub fn add_nat(mut self, nat: Option<NatResolver>) -> Self {
|
||||
self.nat = nat;
|
||||
self
|
||||
}
|
||||
|
||||
@@ -10,7 +10,8 @@ use alloy_rlp::Encodable;
|
||||
use futures::StreamExt;
|
||||
use reth_eth_wire::{
|
||||
BlockBodies, BlockHeaders, EthNetworkPrimitives, GetBlockBodies, GetBlockHeaders, GetNodeData,
|
||||
GetReceipts, HeadersDirection, NetworkPrimitives, NodeData, Receipts, Receipts69,
|
||||
GetReceipts, GetReceipts70, HeadersDirection, NetworkPrimitives, NodeData, Receipts,
|
||||
Receipts69, Receipts70,
|
||||
};
|
||||
use reth_network_api::test_utils::PeersHandle;
|
||||
use reth_network_p2p::error::RequestResult;
|
||||
@@ -217,6 +218,66 @@ where
|
||||
let _ = response.send(Ok(Receipts69(receipts)));
|
||||
}
|
||||
|
||||
fn on_receipts70_request(
|
||||
&self,
|
||||
_peer_id: PeerId,
|
||||
request: GetReceipts70,
|
||||
response: oneshot::Sender<RequestResult<Receipts70<C::Receipt>>>,
|
||||
) {
|
||||
self.metrics.eth_receipts_requests_received_total.increment(1);
|
||||
|
||||
let GetReceipts70 { first_block_receipt_index, block_hashes } = request;
|
||||
|
||||
let mut receipts = Vec::new();
|
||||
let mut total_bytes = 0usize;
|
||||
let mut last_block_incomplete = false;
|
||||
|
||||
for (idx, hash) in block_hashes.into_iter().enumerate() {
|
||||
if idx >= MAX_RECEIPTS_SERVE {
|
||||
break
|
||||
}
|
||||
|
||||
let Some(mut block_receipts) =
|
||||
self.client.receipts_by_block(BlockHashOrNumber::Hash(hash)).unwrap_or_default()
|
||||
else {
|
||||
break
|
||||
};
|
||||
|
||||
if idx == 0 && first_block_receipt_index > 0 {
|
||||
let skip = first_block_receipt_index as usize;
|
||||
if skip >= block_receipts.len() {
|
||||
block_receipts.clear();
|
||||
} else {
|
||||
block_receipts.drain(0..skip);
|
||||
}
|
||||
}
|
||||
|
||||
let block_size = block_receipts.length();
|
||||
|
||||
if total_bytes + block_size <= SOFT_RESPONSE_LIMIT {
|
||||
total_bytes += block_size;
|
||||
receipts.push(block_receipts);
|
||||
continue;
|
||||
}
|
||||
|
||||
let mut partial_block = Vec::new();
|
||||
for receipt in block_receipts {
|
||||
let receipt_size = receipt.length();
|
||||
if total_bytes + receipt_size > SOFT_RESPONSE_LIMIT {
|
||||
break;
|
||||
}
|
||||
total_bytes += receipt_size;
|
||||
partial_block.push(receipt);
|
||||
}
|
||||
|
||||
receipts.push(partial_block);
|
||||
last_block_incomplete = true;
|
||||
break;
|
||||
}
|
||||
|
||||
let _ = response.send(Ok(Receipts70 { last_block_incomplete, receipts }));
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn get_receipts_response<T, F>(&self, request: GetReceipts, transform_fn: F) -> Vec<Vec<T>>
|
||||
where
|
||||
@@ -285,6 +346,9 @@ where
|
||||
IncomingEthRequest::GetReceipts69 { peer_id, request, response } => {
|
||||
this.on_receipts69_request(peer_id, request, response)
|
||||
}
|
||||
IncomingEthRequest::GetReceipts70 { peer_id, request, response } => {
|
||||
this.on_receipts70_request(peer_id, request, response)
|
||||
}
|
||||
}
|
||||
},
|
||||
);
|
||||
@@ -359,4 +423,15 @@ pub enum IncomingEthRequest<N: NetworkPrimitives = EthNetworkPrimitives> {
|
||||
/// The channel sender for the response containing Receipts69.
|
||||
response: oneshot::Sender<RequestResult<Receipts69<N::Receipt>>>,
|
||||
},
|
||||
/// Request Receipts from the peer using eth/70.
|
||||
///
|
||||
/// The response should be sent through the channel.
|
||||
GetReceipts70 {
|
||||
/// The ID of the peer to request receipts from.
|
||||
peer_id: PeerId,
|
||||
/// The specific receipts requested including the `firstBlockReceiptIndex`.
|
||||
request: GetReceipts70,
|
||||
/// The channel sender for the response containing Receipts70.
|
||||
response: oneshot::Sender<RequestResult<Receipts70<N::Receipt>>>,
|
||||
},
|
||||
}
|
||||
|
||||
@@ -532,6 +532,13 @@ impl<N: NetworkPrimitives> NetworkManager<N> {
|
||||
response,
|
||||
})
|
||||
}
|
||||
PeerRequest::GetReceipts70 { request, response } => {
|
||||
self.delegate_eth_request(IncomingEthRequest::GetReceipts70 {
|
||||
peer_id,
|
||||
request,
|
||||
response,
|
||||
})
|
||||
}
|
||||
PeerRequest::GetPooledTransactions { request, response } => {
|
||||
self.notify_tx_manager(NetworkTransactionEvent::GetPooledTransactions {
|
||||
peer_id,
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
//! An `RLPx` stream is multiplexed via the prepended message-id of a framed message.
|
||||
//! Capabilities are exchanged via the `RLPx` `Hello` message as pairs of `(id, version)`, <https://github.com/ethereum/devp2p/blob/master/rlpx.md#capability-messaging>
|
||||
|
||||
use crate::types::Receipts69;
|
||||
use crate::types::{Receipts69, Receipts70};
|
||||
use alloy_consensus::{BlockHeader, ReceiptWithBloom};
|
||||
use alloy_primitives::{Bytes, B256};
|
||||
use futures::FutureExt;
|
||||
@@ -116,6 +116,11 @@ pub enum PeerResponse<N: NetworkPrimitives = EthNetworkPrimitives> {
|
||||
/// The receiver channel for the response to a receipts request.
|
||||
response: oneshot::Receiver<RequestResult<Receipts69<N::Receipt>>>,
|
||||
},
|
||||
/// Represents a response to a request for receipts using eth/70.
|
||||
Receipts70 {
|
||||
/// The receiver channel for the response to a receipts request.
|
||||
response: oneshot::Receiver<RequestResult<Receipts70<N::Receipt>>>,
|
||||
},
|
||||
}
|
||||
|
||||
// === impl PeerResponse ===
|
||||
@@ -151,6 +156,10 @@ impl<N: NetworkPrimitives> PeerResponse<N> {
|
||||
Self::Receipts69 { response } => {
|
||||
poll_request!(response, Receipts69, cx)
|
||||
}
|
||||
Self::Receipts70 { response } => match ready!(response.poll_unpin(cx)) {
|
||||
Ok(res) => PeerResponseResult::Receipts70(res),
|
||||
Err(err) => PeerResponseResult::Receipts70(Err(err.into())),
|
||||
},
|
||||
};
|
||||
Poll::Ready(res)
|
||||
}
|
||||
@@ -171,6 +180,8 @@ pub enum PeerResponseResult<N: NetworkPrimitives = EthNetworkPrimitives> {
|
||||
Receipts(RequestResult<Vec<Vec<ReceiptWithBloom<N::Receipt>>>>),
|
||||
/// Represents a result containing receipts or an error for eth/69.
|
||||
Receipts69(RequestResult<Vec<Vec<N::Receipt>>>),
|
||||
/// Represents a result containing receipts or an error for eth/70.
|
||||
Receipts70(RequestResult<Receipts70<N::Receipt>>),
|
||||
}
|
||||
|
||||
// === impl PeerResponseResult ===
|
||||
@@ -208,6 +219,13 @@ impl<N: NetworkPrimitives> PeerResponseResult<N> {
|
||||
Self::Receipts69(resp) => {
|
||||
to_message!(resp, Receipts69, id)
|
||||
}
|
||||
Self::Receipts70(resp) => match resp {
|
||||
Ok(res) => {
|
||||
let request = RequestPair { request_id: id, message: res };
|
||||
Ok(EthMessage::Receipts70(request))
|
||||
}
|
||||
Err(err) => Err(err),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
@@ -220,6 +238,7 @@ impl<N: NetworkPrimitives> PeerResponseResult<N> {
|
||||
Self::NodeData(res) => res.as_ref().err(),
|
||||
Self::Receipts(res) => res.as_ref().err(),
|
||||
Self::Receipts69(res) => res.as_ref().err(),
|
||||
Self::Receipts70(res) => res.as_ref().err(),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -237,7 +237,9 @@ impl<N: NetworkPrimitives> PeersInfo for NetworkHandle<N> {
|
||||
discv4.node_record()
|
||||
} else if let Some(discv5) = self.inner.discv5.as_ref() {
|
||||
// for disv5 we must check if we have an external ip configured
|
||||
if let Some(external) = self.inner.nat.and_then(|nat| nat.as_external_ip()) {
|
||||
if let Some(external) =
|
||||
self.inner.nat.clone().and_then(|nat| nat.as_external_ip(discv5.local_port()))
|
||||
{
|
||||
NodeRecord::new((external, discv5.local_port()).into(), *self.peer_id())
|
||||
} else {
|
||||
// use the node record that discv5 tracks or use localhost
|
||||
@@ -252,9 +254,11 @@ impl<N: NetworkPrimitives> PeersInfo for NetworkHandle<N> {
|
||||
// also use the tcp port
|
||||
.with_tcp_port(self.inner.listener_address.lock().port())
|
||||
} else {
|
||||
let external_ip = self.inner.nat.and_then(|nat| nat.as_external_ip());
|
||||
|
||||
let mut socket_addr = *self.inner.listener_address.lock();
|
||||
|
||||
let external_ip =
|
||||
self.inner.nat.clone().and_then(|nat| nat.as_external_ip(socket_addr.port()));
|
||||
|
||||
if let Some(ip) = external_ip {
|
||||
// if able to resolve external ip, use it instead and also set the local address
|
||||
socket_addr.set_ip(ip)
|
||||
|
||||
@@ -25,10 +25,10 @@ use futures::{stream::Fuse, SinkExt, StreamExt};
|
||||
use metrics::Gauge;
|
||||
use reth_eth_wire::{
|
||||
errors::{EthHandshakeError, EthStreamError},
|
||||
message::{EthBroadcastMessage, MessageError, RequestPair},
|
||||
message::{EthBroadcastMessage, MessageError},
|
||||
Capabilities, DisconnectP2P, DisconnectReason, EthMessage, NetworkPrimitives, NewBlockPayload,
|
||||
};
|
||||
use reth_eth_wire_types::RawCapabilityMessage;
|
||||
use reth_eth_wire_types::{message::RequestPair, RawCapabilityMessage};
|
||||
use reth_metrics::common::mpsc::MeteredPollSender;
|
||||
use reth_network_api::PeerRequest;
|
||||
use reth_network_p2p::error::RequestError;
|
||||
@@ -270,12 +270,18 @@ impl<N: NetworkPrimitives> ActiveSession<N> {
|
||||
on_request!(req, Receipts, GetReceipts)
|
||||
}
|
||||
}
|
||||
EthMessage::GetReceipts70(req) => {
|
||||
on_request!(req, Receipts70, GetReceipts70)
|
||||
}
|
||||
EthMessage::Receipts(resp) => {
|
||||
on_response!(resp, GetReceipts)
|
||||
}
|
||||
EthMessage::Receipts69(resp) => {
|
||||
on_response!(resp, GetReceipts69)
|
||||
}
|
||||
EthMessage::Receipts70(resp) => {
|
||||
on_response!(resp, GetReceipts70)
|
||||
}
|
||||
EthMessage::BlockRangeUpdate(msg) => {
|
||||
// Validate that earliest <= latest according to the spec
|
||||
if msg.earliest > msg.latest {
|
||||
@@ -311,9 +317,9 @@ impl<N: NetworkPrimitives> ActiveSession<N> {
|
||||
/// Handle an internal peer request that will be sent to the remote.
|
||||
fn on_internal_peer_request(&mut self, request: PeerRequest<N>, deadline: Instant) {
|
||||
let request_id = self.next_id();
|
||||
|
||||
trace!(?request, peer_id=?self.remote_peer_id, ?request_id, "sending request to peer");
|
||||
let msg = request.create_request_message(request_id);
|
||||
let mut msg = request.create_request_message(request_id).map_versioned(self.conn.version());
|
||||
|
||||
self.queued_outgoing.push_back(msg.into());
|
||||
let req = InflightRequest {
|
||||
request: RequestState::Waiting(request),
|
||||
@@ -368,6 +374,7 @@ impl<N: NetworkPrimitives> ActiveSession<N> {
|
||||
fn handle_outgoing_response(&mut self, id: u64, resp: PeerResponseResult<N>) {
|
||||
match resp.try_into_message(id) {
|
||||
Ok(msg) => {
|
||||
let msg: EthMessage<N> = msg;
|
||||
self.queued_outgoing.push_back(msg.into());
|
||||
}
|
||||
Err(err) => {
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
//! Transactions management for the p2p network.
|
||||
|
||||
use alloy_consensus::transaction::TxHashRef;
|
||||
use rayon::iter::{IntoParallelIterator, ParallelIterator};
|
||||
|
||||
/// Aggregation on configurable parameters for [`TransactionsManager`].
|
||||
pub mod config;
|
||||
@@ -1368,51 +1369,49 @@ where
|
||||
// tracks the quality of the given transactions
|
||||
let mut has_bad_transactions = false;
|
||||
|
||||
// 2. filter out transactions that are invalid or already pending import pre-size to avoid
|
||||
// reallocations
|
||||
let mut new_txs = Vec::with_capacity(transactions.len());
|
||||
for tx in transactions {
|
||||
match self.transactions_by_peers.entry(*tx.tx_hash()) {
|
||||
Entry::Occupied(mut entry) => {
|
||||
// transaction was already inserted
|
||||
entry.get_mut().insert(peer_id);
|
||||
}
|
||||
Entry::Vacant(entry) => {
|
||||
if self.bad_imports.contains(tx.tx_hash()) {
|
||||
trace!(target: "net::tx",
|
||||
peer_id=format!("{peer_id:#}"),
|
||||
hash=%tx.tx_hash(),
|
||||
client_version=%peer.client_version,
|
||||
"received a known bad transaction from peer"
|
||||
);
|
||||
has_bad_transactions = true;
|
||||
} else {
|
||||
// this is a new transaction that should be imported into the pool
|
||||
|
||||
// recover transaction
|
||||
let tx = match tx.try_into_recovered() {
|
||||
Ok(tx) => tx,
|
||||
Err(badtx) => {
|
||||
trace!(target: "net::tx",
|
||||
peer_id=format!("{peer_id:#}"),
|
||||
hash=%badtx.tx_hash(),
|
||||
client_version=%peer.client_version,
|
||||
"failed ecrecovery for transaction"
|
||||
);
|
||||
has_bad_transactions = true;
|
||||
continue
|
||||
}
|
||||
};
|
||||
|
||||
let pool_transaction = Pool::Transaction::from_pooled(tx);
|
||||
new_txs.push(pool_transaction);
|
||||
|
||||
entry.insert(HashSet::from([peer_id]));
|
||||
}
|
||||
}
|
||||
// Remove known and invalid transactions
|
||||
transactions.retain(|tx| {
|
||||
if let Entry::Occupied(mut entry) = self.transactions_by_peers.entry(*tx.tx_hash()) {
|
||||
entry.get_mut().insert(peer_id);
|
||||
return false
|
||||
}
|
||||
if self.bad_imports.contains(tx.tx_hash()) {
|
||||
trace!(target: "net::tx",
|
||||
peer_id=format!("{peer_id:#}"),
|
||||
hash=%tx.tx_hash(),
|
||||
client_version=%peer.client_version,
|
||||
"received a known bad transaction from peer"
|
||||
);
|
||||
has_bad_transactions = true;
|
||||
return false;
|
||||
}
|
||||
true
|
||||
});
|
||||
|
||||
let txs_len = transactions.len();
|
||||
|
||||
let new_txs = transactions
|
||||
.into_par_iter()
|
||||
.filter_map(|tx| match tx.try_into_recovered() {
|
||||
Ok(tx) => Some(Pool::Transaction::from_pooled(tx)),
|
||||
Err(badtx) => {
|
||||
trace!(target: "net::tx",
|
||||
peer_id=format!("{peer_id:#}"),
|
||||
hash=%badtx.tx_hash(),
|
||||
client_version=%peer.client_version,
|
||||
"failed ecrecovery for transaction"
|
||||
);
|
||||
None
|
||||
}
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
has_bad_transactions |= new_txs.len() != txs_len;
|
||||
|
||||
// Record the transactions as seen by the peer
|
||||
for tx in &new_txs {
|
||||
self.transactions_by_peers.insert(*tx.hash(), HashSet::from([peer_id]));
|
||||
}
|
||||
new_txs.shrink_to_fit();
|
||||
|
||||
// 3. import new transactions as a batch to minimize lock contention on the underlying
|
||||
// pool
|
||||
@@ -1925,7 +1924,9 @@ impl PooledTransactionsHashesBuilder {
|
||||
fn new(version: EthVersion) -> Self {
|
||||
match version {
|
||||
EthVersion::Eth66 | EthVersion::Eth67 => Self::Eth66(Default::default()),
|
||||
EthVersion::Eth68 | EthVersion::Eth69 => Self::Eth68(Default::default()),
|
||||
EthVersion::Eth68 | EthVersion::Eth69 | EthVersion::Eth70 => {
|
||||
Self::Eth68(Default::default())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -65,7 +65,7 @@ use reth_node_metrics::{
|
||||
version::VersionInfo,
|
||||
};
|
||||
use reth_provider::{
|
||||
providers::{NodeTypesForProvider, ProviderNodeTypes, StaticFileProvider},
|
||||
providers::{NodeTypesForProvider, ProviderNodeTypes, RocksDBProvider, StaticFileProvider},
|
||||
BlockHashReader, BlockNumReader, ProviderError, ProviderFactory, ProviderResult,
|
||||
StageCheckpointReader, StaticFileProviderBuilder, StaticFileProviderFactory,
|
||||
};
|
||||
@@ -485,9 +485,19 @@ where
|
||||
.with_blocks_per_file_for_segments(static_files_config.as_blocks_per_file_map())
|
||||
.build()?;
|
||||
|
||||
let factory =
|
||||
ProviderFactory::new(self.right().clone(), self.chain_spec(), static_file_provider)?
|
||||
.with_prune_modes(self.prune_modes());
|
||||
// Initialize RocksDB provider with metrics and statistics enabled
|
||||
let rocksdb_provider = RocksDBProvider::builder(self.data_dir().rocksdb())
|
||||
.with_metrics()
|
||||
.with_statistics()
|
||||
.build()?;
|
||||
|
||||
let factory = ProviderFactory::new(
|
||||
self.right().clone(),
|
||||
self.chain_spec(),
|
||||
static_file_provider,
|
||||
rocksdb_provider,
|
||||
)?
|
||||
.with_prune_modes(self.prune_modes());
|
||||
|
||||
// Check for consistency between database and static files. If it fails, it unwinds to
|
||||
// the first block that's consistent between database and static files.
|
||||
|
||||
@@ -27,6 +27,10 @@ pub struct DatadirArgs {
|
||||
verbatim_doc_comment
|
||||
)]
|
||||
pub static_files_path: Option<PathBuf>,
|
||||
|
||||
/// The absolute path to store `RocksDB` database in.
|
||||
#[arg(long = "datadir.rocksdb", value_name = "PATH", verbatim_doc_comment)]
|
||||
pub rocksdb_path: Option<PathBuf>,
|
||||
}
|
||||
|
||||
impl DatadirArgs {
|
||||
|
||||
@@ -337,7 +337,7 @@ impl NetworkArgs {
|
||||
|
||||
// Configure basic network stack
|
||||
NetworkConfigBuilder::<N>::new(secret_key)
|
||||
.external_ip_resolver(self.nat)
|
||||
.external_ip_resolver(self.nat.clone())
|
||||
.sessions_config(
|
||||
SessionsConfig::default().with_upscaled_event_buffer(peers_config.max_peers()),
|
||||
)
|
||||
@@ -399,7 +399,7 @@ impl NetworkArgs {
|
||||
}
|
||||
|
||||
/// Configures the [`NatResolver`]
|
||||
pub const fn with_nat_resolver(mut self, nat: NatResolver) -> Self {
|
||||
pub fn with_nat_resolver(mut self, nat: NatResolver) -> Self {
|
||||
self.nat = nat;
|
||||
self
|
||||
}
|
||||
@@ -782,10 +782,11 @@ mod tests {
|
||||
let tests = vec![0, 10];
|
||||
|
||||
for retries in tests {
|
||||
let retries_str = retries.to_string();
|
||||
let args = CommandParser::<NetworkArgs>::parse_from([
|
||||
"reth",
|
||||
"--dns-retries",
|
||||
retries.to_string().as_str(),
|
||||
retries_str.as_str(),
|
||||
])
|
||||
.args;
|
||||
|
||||
|
||||
@@ -301,6 +301,18 @@ impl<D> ChainPath<D> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the path to the `RocksDB` database directory for this chain.
|
||||
///
|
||||
/// `<DIR>/<CHAIN_ID>/rocksdb`
|
||||
pub fn rocksdb(&self) -> PathBuf {
|
||||
let datadir_args = &self.2;
|
||||
if let Some(rocksdb_path) = &datadir_args.rocksdb_path {
|
||||
rocksdb_path.clone()
|
||||
} else {
|
||||
self.data_dir().join("rocksdb")
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the path to the reth p2p secret key for this chain.
|
||||
///
|
||||
/// `<DIR>/<CHAIN_ID>/discovery-secret`
|
||||
|
||||
@@ -98,7 +98,9 @@ where
|
||||
runner.run_blocking_until_ctrl_c(command.execute::<OpNode>())
|
||||
}
|
||||
Commands::DumpGenesis(command) => runner.run_blocking_until_ctrl_c(command.execute()),
|
||||
Commands::Db(command) => runner.run_blocking_until_ctrl_c(command.execute::<OpNode>()),
|
||||
Commands::Db(command) => {
|
||||
runner.run_blocking_command_until_exit(|ctx| command.execute::<OpNode>(ctx))
|
||||
}
|
||||
Commands::Stage(command) => {
|
||||
runner.run_command_until_exit(|ctx| command.execute::<OpNode, _>(ctx, components))
|
||||
}
|
||||
|
||||
@@ -13,32 +13,38 @@ extern crate alloc;
|
||||
|
||||
use alloc::sync::Arc;
|
||||
use alloy_consensus::{BlockHeader, Header};
|
||||
use alloy_eips::Decodable2718;
|
||||
use alloy_evm::{EvmFactory, FromRecoveredTx, FromTxWithEncoded};
|
||||
use alloy_op_evm::block::{receipt_builder::OpReceiptBuilder, OpTxEnv};
|
||||
use alloy_primitives::{Bytes, U256};
|
||||
use core::fmt::Debug;
|
||||
use op_alloy_consensus::EIP1559ParamError;
|
||||
use op_alloy_rpc_types_engine::OpExecutionData;
|
||||
use op_revm::{OpSpecId, OpTransaction};
|
||||
use reth_chainspec::EthChainSpec;
|
||||
use reth_evm::{
|
||||
eth::NextEvmEnvAttributes, precompiles::PrecompilesMap, ConfigureEngineEvm, ConfigureEvm,
|
||||
EvmEnv, EvmEnvFor, ExecutableTxIterator, ExecutionCtxFor, TransactionEnv,
|
||||
eth::NextEvmEnvAttributes, precompiles::PrecompilesMap, ConfigureEvm, EvmEnv, TransactionEnv,
|
||||
};
|
||||
use reth_optimism_chainspec::OpChainSpec;
|
||||
use reth_optimism_forks::OpHardforks;
|
||||
use reth_optimism_primitives::{DepositReceipt, OpPrimitives};
|
||||
use reth_primitives_traits::{
|
||||
NodePrimitives, SealedBlock, SealedHeader, SignedTransaction, TxTy, WithEncoded,
|
||||
};
|
||||
use reth_storage_errors::any::AnyError;
|
||||
use revm::{
|
||||
context::{BlockEnv, CfgEnv, TxEnv},
|
||||
context_interface::block::BlobExcessGasAndPrice,
|
||||
primitives::hardfork::SpecId,
|
||||
use reth_primitives_traits::{NodePrimitives, SealedBlock, SealedHeader, SignedTransaction};
|
||||
use revm::context::{BlockEnv, TxEnv};
|
||||
|
||||
#[allow(unused_imports)]
|
||||
use {
|
||||
alloy_eips::Decodable2718,
|
||||
alloy_primitives::{Bytes, U256},
|
||||
op_alloy_rpc_types_engine::OpExecutionData,
|
||||
reth_evm::{EvmEnvFor, ExecutionCtxFor},
|
||||
reth_primitives_traits::{TxTy, WithEncoded},
|
||||
reth_storage_errors::any::AnyError,
|
||||
revm::{
|
||||
context::CfgEnv, context_interface::block::BlobExcessGasAndPrice,
|
||||
primitives::hardfork::SpecId,
|
||||
},
|
||||
};
|
||||
|
||||
#[cfg(feature = "std")]
|
||||
use reth_evm::{ConfigureEngineEvm, ExecutableTxIterator};
|
||||
|
||||
mod config;
|
||||
pub use config::{revm_spec, revm_spec_by_timestamp_after_bedrock, OpNextBlockEnvAttributes};
|
||||
mod execute;
|
||||
@@ -200,6 +206,7 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "std")]
|
||||
impl<ChainSpec, N, R> ConfigureEngineEvm<OpExecutionData> for OpEvmConfig<ChainSpec, N, R>
|
||||
where
|
||||
ChainSpec: EthChainSpec<Header = Header> + OpHardforks,
|
||||
@@ -265,7 +272,7 @@ where
|
||||
&self,
|
||||
payload: &OpExecutionData,
|
||||
) -> Result<impl ExecutableTxIterator<Self>, Self::Error> {
|
||||
let transactions = payload.payload.transactions().clone().into_iter();
|
||||
let transactions = payload.payload.transactions().clone();
|
||||
let convert = |encoded: Bytes| {
|
||||
let tx = TxTy::<Self::Primitives>::decode_2718_exact(encoded.as_ref())
|
||||
.map_err(AnyError::new)?;
|
||||
|
||||
@@ -34,7 +34,7 @@ mod cache;
|
||||
mod test_utils;
|
||||
|
||||
mod ws;
|
||||
pub use ws::{WsConnect, WsFlashBlockStream};
|
||||
pub use ws::{FlashBlockDecoder, WsConnect, WsFlashBlockStream};
|
||||
|
||||
/// Receiver of the most recent [`PendingFlashBlock`] built out of [`FlashBlock`]s.
|
||||
///
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
pub use stream::{WsConnect, WsFlashBlockStream};
|
||||
|
||||
mod decoding;
|
||||
pub(crate) use decoding::FlashBlockDecoder;
|
||||
pub use decoding::FlashBlockDecoder;
|
||||
|
||||
mod stream;
|
||||
|
||||
@@ -34,7 +34,7 @@ reth-rpc-api.workspace = true
|
||||
|
||||
# op-reth
|
||||
reth-optimism-payload-builder.workspace = true
|
||||
reth-optimism-evm = { workspace = true, features = ["rpc"] }
|
||||
reth-optimism-evm = { workspace = true, features = ["std", "rpc"] }
|
||||
reth-optimism-rpc.workspace = true
|
||||
reth-optimism-storage.workspace = true
|
||||
reth-optimism-txpool.workspace = true
|
||||
|
||||
@@ -218,13 +218,14 @@ impl OpNode {
|
||||
/// use reth_db::open_db_read_only;
|
||||
/// use reth_optimism_chainspec::OpChainSpecBuilder;
|
||||
/// use reth_optimism_node::OpNode;
|
||||
/// use reth_provider::providers::StaticFileProvider;
|
||||
/// use reth_provider::providers::{RocksDBProvider, StaticFileProvider};
|
||||
/// use std::sync::Arc;
|
||||
///
|
||||
/// let factory = OpNode::provider_factory_builder()
|
||||
/// .db(Arc::new(open_db_read_only("db", Default::default()).unwrap()))
|
||||
/// .chainspec(OpChainSpecBuilder::base_mainnet().build().into())
|
||||
/// .static_file(StaticFileProvider::read_only("db/static_files", false).unwrap())
|
||||
/// .rocksdb_provider(RocksDBProvider::builder("db/rocksdb").build().unwrap())
|
||||
/// .build_provider_factory();
|
||||
/// ```
|
||||
pub fn provider_factory_builder() -> ProviderFactoryBuilder<Self> {
|
||||
|
||||
@@ -2342,7 +2342,7 @@ mod tests {
|
||||
$(
|
||||
let val: RethRpcModule = $s.parse().unwrap();
|
||||
assert_eq!(val, $v);
|
||||
assert_eq!(val.to_string().as_str(), $s);
|
||||
assert_eq!(val.to_string(), $s);
|
||||
)*
|
||||
};
|
||||
}
|
||||
|
||||
@@ -463,7 +463,7 @@ where
|
||||
/// Returns the most recent version of the payload that is available in the corresponding
|
||||
/// payload build process at the time of receiving this call.
|
||||
///
|
||||
/// See also <https://github.com/ethereum/execution-apis/blob/7907424db935b93c2fe6a3c0faab943adebe8557/src/engine/prague.md#engine_newpayloadv4>
|
||||
/// See also <https://github.com/ethereum/execution-apis/blob/7907424db935b93c2fe6a3c0faab943adebe8557/src/engine/prague.md#engine_getpayloadv4>
|
||||
///
|
||||
/// Note:
|
||||
/// > Provider software MAY stop the corresponding build process after serving this call.
|
||||
@@ -933,7 +933,7 @@ where
|
||||
Ok(self.fork_choice_updated_v2_metered(fork_choice_state, payload_attributes).await?)
|
||||
}
|
||||
|
||||
/// Handler for `engine_forkchoiceUpdatedV2`
|
||||
/// Handler for `engine_forkchoiceUpdatedV3`
|
||||
///
|
||||
/// See also <https://github.com/ethereum/execution-apis/blob/main/src/engine/cancun.md#engine_forkchoiceupdatedv3>
|
||||
async fn fork_choice_updated_v3(
|
||||
|
||||
@@ -74,7 +74,11 @@ pub trait EthBlocks: LoadBlock<RpcConvert: RpcConvert<Primitives = Self::Primiti
|
||||
block_id: BlockId,
|
||||
) -> impl Future<Output = Result<Option<usize>, Self::Error>> + Send {
|
||||
async move {
|
||||
// If no pending block from provider, build the pending block locally.
|
||||
if block_id.is_pending() {
|
||||
if let Some(pending) = self.local_pending_block().await? {
|
||||
return Ok(Some(pending.block.body().transaction_count()));
|
||||
}
|
||||
// Pending block can be fetched directly without need for caching
|
||||
return Ok(self
|
||||
.provider()
|
||||
|
||||
@@ -71,7 +71,7 @@ pub trait TraceApiExt {
|
||||
|
||||
/// Returns a new stream that yields the traces the opcodes for the given blocks.
|
||||
///
|
||||
/// See also [`StreamExt::buffered`].
|
||||
/// See also [`StreamExt::buffer_unordered`].
|
||||
fn trace_block_opcode_gas_unordered<I, B>(
|
||||
&self,
|
||||
params: I,
|
||||
@@ -301,7 +301,7 @@ impl<T: TraceApiClient<TransactionRequest> + Sync> TraceApiExt for T {
|
||||
Err(err) => Err((err, block)),
|
||||
}
|
||||
}))
|
||||
.buffered(n);
|
||||
.buffer_unordered(n);
|
||||
TraceBlockOpcodeGasStream { stream: Box::pin(stream) }
|
||||
}
|
||||
|
||||
|
||||
@@ -324,11 +324,13 @@ impl<Provider, S: Stage<Provider> + ?Sized> StageExt<Provider> for S {}
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use reth_chainspec::MAINNET;
|
||||
use reth_db::test_utils::{create_test_rw_db, create_test_static_files_dir};
|
||||
use reth_db::test_utils::{
|
||||
create_test_rocksdb_dir, create_test_rw_db, create_test_static_files_dir,
|
||||
};
|
||||
use reth_db_api::{models::StoredBlockBodyIndices, tables, transaction::DbTxMut};
|
||||
use reth_provider::{
|
||||
test_utils::MockNodeTypesWithDB, ProviderFactory, StaticFileProviderBuilder,
|
||||
StaticFileProviderFactory, StaticFileSegment,
|
||||
providers::RocksDBProvider, test_utils::MockNodeTypesWithDB, ProviderFactory,
|
||||
StaticFileProviderBuilder, StaticFileProviderFactory, StaticFileSegment,
|
||||
};
|
||||
use reth_stages_types::StageCheckpoint;
|
||||
use reth_testing_utils::generators::{self, random_signed_tx};
|
||||
@@ -346,6 +348,7 @@ mod tests {
|
||||
.with_blocks_per_file(1)
|
||||
.build()
|
||||
.unwrap(),
|
||||
RocksDBProvider::builder(create_test_rocksdb_dir().0.keep()).build().unwrap(),
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
|
||||
@@ -1,7 +1,10 @@
|
||||
use alloy_primitives::{keccak256, Address, BlockNumber, TxHash, TxNumber, B256};
|
||||
use reth_chainspec::MAINNET;
|
||||
use reth_db::{
|
||||
test_utils::{create_test_rw_db, create_test_rw_db_with_path, create_test_static_files_dir},
|
||||
test_utils::{
|
||||
create_test_rocksdb_dir, create_test_rw_db, create_test_rw_db_with_path,
|
||||
create_test_static_files_dir,
|
||||
},
|
||||
DatabaseEnv,
|
||||
};
|
||||
use reth_db_api::{
|
||||
@@ -17,7 +20,9 @@ use reth_db_api::{
|
||||
use reth_ethereum_primitives::{Block, EthPrimitives, Receipt};
|
||||
use reth_primitives_traits::{Account, SealedBlock, SealedHeader, StorageEntry};
|
||||
use reth_provider::{
|
||||
providers::{StaticFileProvider, StaticFileProviderRWRefMut, StaticFileWriter},
|
||||
providers::{
|
||||
RocksDBProvider, StaticFileProvider, StaticFileProviderRWRefMut, StaticFileWriter,
|
||||
},
|
||||
test_utils::MockNodeTypesWithDB,
|
||||
HistoryWriter, ProviderError, ProviderFactory, StaticFileProviderFactory, StatsReader,
|
||||
};
|
||||
@@ -38,12 +43,14 @@ impl Default for TestStageDB {
|
||||
/// Create a new instance of [`TestStageDB`]
|
||||
fn default() -> Self {
|
||||
let (static_dir, static_dir_path) = create_test_static_files_dir();
|
||||
let (_, rocksdb_dir_path) = create_test_rocksdb_dir();
|
||||
Self {
|
||||
temp_static_files_dir: static_dir,
|
||||
factory: ProviderFactory::new(
|
||||
create_test_rw_db(),
|
||||
MAINNET.clone(),
|
||||
StaticFileProvider::read_write(static_dir_path).unwrap(),
|
||||
RocksDBProvider::builder(rocksdb_dir_path).build().unwrap(),
|
||||
)
|
||||
.expect("failed to create test provider factory"),
|
||||
}
|
||||
@@ -53,6 +60,7 @@ impl Default for TestStageDB {
|
||||
impl TestStageDB {
|
||||
pub fn new(path: &Path) -> Self {
|
||||
let (static_dir, static_dir_path) = create_test_static_files_dir();
|
||||
let (_, rocksdb_dir_path) = create_test_rocksdb_dir();
|
||||
|
||||
Self {
|
||||
temp_static_files_dir: static_dir,
|
||||
@@ -60,6 +68,7 @@ impl TestStageDB {
|
||||
create_test_rw_db_with_path(path),
|
||||
MAINNET.clone(),
|
||||
StaticFileProvider::read_write(static_dir_path).unwrap(),
|
||||
RocksDBProvider::builder(rocksdb_dir_path).build().unwrap(),
|
||||
)
|
||||
.expect("failed to create test provider factory"),
|
||||
}
|
||||
|
||||
@@ -711,7 +711,7 @@ mod tests {
|
||||
};
|
||||
use reth_provider::{
|
||||
test_utils::{create_test_provider_factory_with_chain_spec, MockNodeTypesWithDB},
|
||||
ProviderFactory,
|
||||
ProviderFactory, RocksDBProviderFactory,
|
||||
};
|
||||
use std::{collections::BTreeMap, sync::Arc};
|
||||
|
||||
@@ -756,6 +756,7 @@ mod tests {
|
||||
fn fail_init_inconsistent_db() {
|
||||
let factory = create_test_provider_factory_with_chain_spec(SEPOLIA.clone());
|
||||
let static_file_provider = factory.static_file_provider();
|
||||
let rocksdb_provider = factory.rocksdb_provider();
|
||||
init_genesis(&factory).unwrap();
|
||||
|
||||
// Try to init db with a different genesis block
|
||||
@@ -764,6 +765,7 @@ mod tests {
|
||||
factory.into_db(),
|
||||
MAINNET.clone(),
|
||||
static_file_provider,
|
||||
rocksdb_provider,
|
||||
)
|
||||
.unwrap(),
|
||||
);
|
||||
|
||||
@@ -159,6 +159,14 @@ pub mod test_utils {
|
||||
(temp_dir, path)
|
||||
}
|
||||
|
||||
/// Create `rocksdb` path for testing
|
||||
#[track_caller]
|
||||
pub fn create_test_rocksdb_dir() -> (TempDir, PathBuf) {
|
||||
let temp_dir = TempDir::with_prefix("reth-test-rocksdb-").expect(ERROR_TEMPDIR);
|
||||
let path = temp_dir.path().to_path_buf();
|
||||
(temp_dir, path)
|
||||
}
|
||||
|
||||
/// Get a temporary directory path to use for the database
|
||||
pub fn tempdir_path() -> PathBuf {
|
||||
let builder = tempfile::Builder::new().prefix("reth-test-").rand_bytes(8).tempdir();
|
||||
|
||||
@@ -4,7 +4,7 @@
|
||||
use std::{marker::PhantomData, ops::Range};
|
||||
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
use crate::providers::rocksdb::RocksDBWriteMode;
|
||||
use crate::providers::rocksdb::RocksDBBatch;
|
||||
use crate::{
|
||||
providers::{StaticFileProvider, StaticFileProviderRWRefMut},
|
||||
StaticFileProviderFactory,
|
||||
@@ -37,11 +37,11 @@ type EitherWriterTy<'a, P, T> = EitherWriter<
|
||||
>;
|
||||
|
||||
// Helper types so constructors stay exported even when RocksDB feature is off.
|
||||
// RocksDBWriteMode encapsulates the choice between Transaction and Batch writes.
|
||||
// Historical data tables use a write-only RocksDB batch (no read-your-writes needed).
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
type RocksWriteModeArg<'a> = crate::providers::rocksdb::RocksDBWriteMode<'a>;
|
||||
type RocksBatchArg<'a> = crate::providers::rocksdb::RocksDBBatch<'a>;
|
||||
#[cfg(not(all(unix, feature = "rocksdb")))]
|
||||
type RocksWriteModeArg<'a> = ();
|
||||
type RocksBatchArg<'a> = ();
|
||||
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
type RocksTxRefArg<'a> = &'a crate::providers::rocksdb::RocksTx<'a>;
|
||||
@@ -55,12 +55,9 @@ pub enum EitherWriter<'a, CURSOR, N> {
|
||||
Database(CURSOR),
|
||||
/// Write to static file
|
||||
StaticFile(StaticFileProviderRWRefMut<'a, N>),
|
||||
/// Write to `RocksDB` (transaction or batch - internal detail).
|
||||
///
|
||||
/// Uses [`RocksDBWriteMode`] to encapsulate the choice between full transaction
|
||||
/// semantics and high-throughput batch writes.
|
||||
/// Write to `RocksDB` using a write-only batch (historical tables).
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
RocksDB(RocksDBWriteMode<'a>),
|
||||
RocksDB(RocksDBBatch<'a>),
|
||||
}
|
||||
|
||||
impl<'a> EitherWriter<'a, (), ()> {
|
||||
@@ -131,11 +128,9 @@ impl<'a> EitherWriter<'a, (), ()> {
|
||||
}
|
||||
|
||||
/// Creates a new [`EitherWriter`] for storages history based on storage settings.
|
||||
///
|
||||
/// Accepts either Transaction or Batch mode via [`RocksDBWriteMode`].
|
||||
pub fn new_storages_history<P>(
|
||||
provider: &P,
|
||||
_rocksdb_mode: RocksWriteModeArg<'a>,
|
||||
_rocksdb_batch: RocksBatchArg<'a>,
|
||||
) -> ProviderResult<EitherWriterTy<'a, P, tables::StoragesHistory>>
|
||||
where
|
||||
P: DBProvider + NodePrimitivesProvider + StorageSettingsCache,
|
||||
@@ -143,37 +138,16 @@ impl<'a> EitherWriter<'a, (), ()> {
|
||||
{
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
if provider.cached_storage_settings().storages_history_in_rocksdb {
|
||||
return Ok(EitherWriter::RocksDB(_rocksdb_mode));
|
||||
return Ok(EitherWriter::RocksDB(_rocksdb_batch));
|
||||
}
|
||||
|
||||
Ok(EitherWriter::Database(provider.tx_ref().cursor_write::<tables::StoragesHistory>()?))
|
||||
}
|
||||
|
||||
/// Creates a new [`EitherWriter`] for accounts history based on storage settings.
|
||||
///
|
||||
/// Accepts either Transaction or Batch mode via [`RocksDBWriteMode`].
|
||||
pub fn new_accounts_history<P>(
|
||||
provider: &P,
|
||||
_rocksdb_mode: RocksWriteModeArg<'a>,
|
||||
) -> ProviderResult<EitherWriterTy<'a, P, tables::AccountsHistory>>
|
||||
where
|
||||
P: DBProvider + NodePrimitivesProvider + StorageSettingsCache,
|
||||
P::Tx: DbTxMut,
|
||||
{
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
if provider.cached_storage_settings().account_history_in_rocksdb {
|
||||
return Ok(EitherWriter::RocksDB(_rocksdb_mode));
|
||||
}
|
||||
|
||||
Ok(EitherWriter::Database(provider.tx_ref().cursor_write::<tables::AccountsHistory>()?))
|
||||
}
|
||||
|
||||
/// Creates a new [`EitherWriter`] for transaction hash numbers based on storage settings.
|
||||
///
|
||||
/// Accepts either Transaction or Batch mode via [`RocksDBWriteMode`].
|
||||
pub fn new_transaction_hash_numbers<P>(
|
||||
provider: &P,
|
||||
_rocksdb_mode: RocksWriteModeArg<'a>,
|
||||
_rocksdb_batch: RocksBatchArg<'a>,
|
||||
) -> ProviderResult<EitherWriterTy<'a, P, tables::TransactionHashNumbers>>
|
||||
where
|
||||
P: DBProvider + NodePrimitivesProvider + StorageSettingsCache,
|
||||
@@ -181,13 +155,30 @@ impl<'a> EitherWriter<'a, (), ()> {
|
||||
{
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
if provider.cached_storage_settings().transaction_hash_numbers_in_rocksdb {
|
||||
return Ok(EitherWriter::RocksDB(_rocksdb_mode));
|
||||
return Ok(EitherWriter::RocksDB(_rocksdb_batch));
|
||||
}
|
||||
|
||||
Ok(EitherWriter::Database(
|
||||
provider.tx_ref().cursor_write::<tables::TransactionHashNumbers>()?,
|
||||
))
|
||||
}
|
||||
|
||||
/// Creates a new [`EitherWriter`] for account history based on storage settings.
|
||||
pub fn new_accounts_history<P>(
|
||||
provider: &P,
|
||||
_rocksdb_batch: RocksBatchArg<'a>,
|
||||
) -> ProviderResult<EitherWriterTy<'a, P, tables::AccountsHistory>>
|
||||
where
|
||||
P: DBProvider + NodePrimitivesProvider + StorageSettingsCache,
|
||||
P::Tx: DbTxMut,
|
||||
{
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
if provider.cached_storage_settings().account_history_in_rocksdb {
|
||||
return Ok(EitherWriter::RocksDB(_rocksdb_batch));
|
||||
}
|
||||
|
||||
Ok(EitherWriter::Database(provider.tx_ref().cursor_write::<tables::AccountsHistory>()?))
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, CURSOR, N: NodePrimitives> EitherWriter<'a, CURSOR, N> {
|
||||
@@ -217,24 +208,6 @@ impl<'a, CURSOR, N: NodePrimitives> EitherWriter<'a, CURSOR, N> {
|
||||
Self::RocksDB(_) => Err(ProviderError::UnsupportedProvider),
|
||||
}
|
||||
}
|
||||
|
||||
/// Commits `RocksDB` writes if this is a `RocksDB` writer.
|
||||
///
|
||||
/// For [`Self::Database`] and [`Self::StaticFile`], this is a no-op as they use
|
||||
/// different commit patterns (MDBX transaction commit, static file sync).
|
||||
///
|
||||
/// # Commit Order
|
||||
///
|
||||
/// Call this AFTER the outer MDBX transaction commits successfully. This ensures
|
||||
/// that if `RocksDB` commit fails, the primary data (MDBX) is still intact and
|
||||
/// the `RocksDB` data (which is derived) can be rebuilt.
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
pub fn commit(self) -> ProviderResult<()> {
|
||||
match self {
|
||||
Self::Database(_) | Self::StaticFile(_) => Ok(()),
|
||||
Self::RocksDB(mode) => mode.commit(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, CURSOR, N: NodePrimitives> EitherWriter<'a, CURSOR, N>
|
||||
@@ -391,6 +364,26 @@ impl<'a> EitherReader<'a, (), ()> {
|
||||
PhantomData,
|
||||
))
|
||||
}
|
||||
|
||||
/// Creates a new [`EitherReader`] for account history based on storage settings.
|
||||
pub fn new_accounts_history<P>(
|
||||
provider: &P,
|
||||
_rocksdb_tx: RocksTxRefArg<'a>,
|
||||
) -> ProviderResult<EitherReaderTy<'a, P, tables::AccountsHistory>>
|
||||
where
|
||||
P: DBProvider + NodePrimitivesProvider + StorageSettingsCache,
|
||||
P::Tx: DbTx,
|
||||
{
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
if provider.cached_storage_settings().account_history_in_rocksdb {
|
||||
return Ok(EitherReader::RocksDB(_rocksdb_tx));
|
||||
}
|
||||
|
||||
Ok(EitherReader::Database(
|
||||
provider.tx_ref().cursor_read::<tables::AccountsHistory>()?,
|
||||
PhantomData,
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
impl<CURSOR, N: NodePrimitives> EitherReader<'_, CURSOR, N>
|
||||
|
||||
@@ -1,14 +1,15 @@
|
||||
use crate::{
|
||||
providers::{
|
||||
ConsistentProvider, ProviderNodeTypes, StaticFileProvider, StaticFileProviderRWRefMut,
|
||||
ConsistentProvider, ProviderNodeTypes, RocksDBProvider, StaticFileProvider,
|
||||
StaticFileProviderRWRefMut,
|
||||
},
|
||||
AccountReader, BlockHashReader, BlockIdReader, BlockNumReader, BlockReader, BlockReaderIdExt,
|
||||
BlockSource, CanonChainTracker, CanonStateNotifications, CanonStateSubscriptions,
|
||||
ChainSpecProvider, ChainStateBlockReader, ChangeSetReader, DatabaseProviderFactory,
|
||||
HashedPostStateProvider, HeaderProvider, ProviderError, ProviderFactory, PruneCheckpointReader,
|
||||
ReceiptProvider, ReceiptProviderIdExt, StageCheckpointReader, StateProviderBox,
|
||||
StateProviderFactory, StateReader, StaticFileProviderFactory, TransactionVariant,
|
||||
TransactionsProvider, TrieReader,
|
||||
ReceiptProvider, ReceiptProviderIdExt, RocksDBProviderFactory, StageCheckpointReader,
|
||||
StateProviderBox, StateProviderFactory, StateReader, StaticFileProviderFactory,
|
||||
TransactionVariant, TransactionsProvider, TrieReader,
|
||||
};
|
||||
use alloy_consensus::transaction::TransactionMeta;
|
||||
use alloy_eips::{BlockHashOrNumber, BlockId, BlockNumHash, BlockNumberOrTag};
|
||||
@@ -176,6 +177,12 @@ impl<N: ProviderNodeTypes> StaticFileProviderFactory for BlockchainProvider<N> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<N: ProviderNodeTypes> RocksDBProviderFactory for BlockchainProvider<N> {
|
||||
fn rocksdb_provider(&self) -> RocksDBProvider {
|
||||
self.database.rocksdb_provider()
|
||||
}
|
||||
}
|
||||
|
||||
impl<N: ProviderNodeTypes> HeaderProvider for BlockchainProvider<N> {
|
||||
type Header = HeaderTy<N>;
|
||||
|
||||
|
||||
@@ -4,7 +4,7 @@
|
||||
//! up to the intended build target.
|
||||
|
||||
use crate::{
|
||||
providers::{NodeTypesForProvider, StaticFileProvider},
|
||||
providers::{NodeTypesForProvider, RocksDBProvider, StaticFileProvider},
|
||||
ProviderFactory,
|
||||
};
|
||||
use reth_db::{
|
||||
@@ -104,11 +104,12 @@ impl<N> ProviderFactoryBuilder<N> {
|
||||
where
|
||||
N: NodeTypesForProvider,
|
||||
{
|
||||
let ReadOnlyConfig { db_dir, db_args, static_files_dir, watch_static_files } =
|
||||
let ReadOnlyConfig { db_dir, db_args, static_files_dir, rocksdb_dir, watch_static_files } =
|
||||
config.into();
|
||||
self.db(Arc::new(open_db_read_only(db_dir, db_args)?))
|
||||
.chainspec(chainspec)
|
||||
.static_file(StaticFileProvider::read_only(static_files_dir, watch_static_files)?)
|
||||
.rocksdb_provider(RocksDBProvider::builder(&rocksdb_dir).build()?)
|
||||
.build_provider_factory()
|
||||
.map_err(Into::into)
|
||||
}
|
||||
@@ -120,7 +121,7 @@ impl<N> Default for ProviderFactoryBuilder<N> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Settings for how to open the database and static files.
|
||||
/// Settings for how to open the database, static files, and `RocksDB`.
|
||||
///
|
||||
/// The default derivation from a path assumes the path is the datadir:
|
||||
/// [`ReadOnlyConfig::from_datadir`]
|
||||
@@ -132,6 +133,8 @@ pub struct ReadOnlyConfig {
|
||||
pub db_args: DatabaseArguments,
|
||||
/// The path to the static file dir
|
||||
pub static_files_dir: PathBuf,
|
||||
/// The path to the `RocksDB` directory
|
||||
pub rocksdb_dir: PathBuf,
|
||||
/// Whether the static files should be watched for changes.
|
||||
pub watch_static_files: bool,
|
||||
}
|
||||
@@ -144,6 +147,7 @@ impl ReadOnlyConfig {
|
||||
/// ```text
|
||||
/// -`datadir`
|
||||
/// |__db
|
||||
/// |__rocksdb
|
||||
/// |__static_files
|
||||
/// ```
|
||||
///
|
||||
@@ -151,7 +155,13 @@ impl ReadOnlyConfig {
|
||||
/// [`StaticFileProvider::read_only`]
|
||||
pub fn from_datadir(datadir: impl AsRef<Path>) -> Self {
|
||||
let datadir = datadir.as_ref();
|
||||
Self::from_dirs(datadir.join("db"), datadir.join("static_files"))
|
||||
Self {
|
||||
db_dir: datadir.join("db"),
|
||||
db_args: Default::default(),
|
||||
static_files_dir: datadir.join("static_files"),
|
||||
rocksdb_dir: datadir.join("rocksdb"),
|
||||
watch_static_files: true,
|
||||
}
|
||||
}
|
||||
|
||||
/// Disables long-lived read transaction safety guarantees.
|
||||
@@ -169,7 +179,8 @@ impl ReadOnlyConfig {
|
||||
///
|
||||
/// ```text
|
||||
/// - db
|
||||
/// -static_files
|
||||
/// - rocksdb
|
||||
/// - static_files
|
||||
/// ```
|
||||
///
|
||||
/// By default this watches the static file directory for changes, see also
|
||||
@@ -180,13 +191,10 @@ impl ReadOnlyConfig {
|
||||
/// If the path does not exist
|
||||
pub fn from_db_dir(db_dir: impl AsRef<Path>) -> Self {
|
||||
let db_dir = db_dir.as_ref();
|
||||
let static_files_dir = std::fs::canonicalize(db_dir)
|
||||
.unwrap()
|
||||
.parent()
|
||||
.unwrap()
|
||||
.to_path_buf()
|
||||
.join("static_files");
|
||||
Self::from_dirs(db_dir, static_files_dir)
|
||||
let datadir = std::fs::canonicalize(db_dir).unwrap().parent().unwrap().to_path_buf();
|
||||
let static_files_dir = datadir.join("static_files");
|
||||
let rocksdb_dir = datadir.join("rocksdb");
|
||||
Self::from_dirs(db_dir, static_files_dir, rocksdb_dir)
|
||||
}
|
||||
|
||||
/// Creates the config for the given paths.
|
||||
@@ -194,11 +202,16 @@ impl ReadOnlyConfig {
|
||||
///
|
||||
/// By default this watches the static file directory for changes, see also
|
||||
/// [`StaticFileProvider::read_only`]
|
||||
pub fn from_dirs(db_dir: impl AsRef<Path>, static_files_dir: impl AsRef<Path>) -> Self {
|
||||
pub fn from_dirs(
|
||||
db_dir: impl AsRef<Path>,
|
||||
static_files_dir: impl AsRef<Path>,
|
||||
rocksdb_dir: impl AsRef<Path>,
|
||||
) -> Self {
|
||||
Self {
|
||||
static_files_dir: static_files_dir.as_ref().into(),
|
||||
db_dir: db_dir.as_ref().into(),
|
||||
db_args: Default::default(),
|
||||
static_files_dir: static_files_dir.as_ref().into(),
|
||||
rocksdb_dir: rocksdb_dir.as_ref().into(),
|
||||
watch_static_files: true,
|
||||
}
|
||||
}
|
||||
@@ -317,7 +330,37 @@ impl<N, Val1, Val2, Val3> TypesAnd3<N, Val1, Val2, Val3> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<N, DB> TypesAnd3<N, DB, Arc<N::ChainSpec>, StaticFileProvider<N::Primitives>>
|
||||
impl<N, DB, C> TypesAnd3<N, DB, Arc<C>, StaticFileProvider<N::Primitives>>
|
||||
where
|
||||
N: NodeTypes,
|
||||
{
|
||||
/// Configures the `RocksDB` provider.
|
||||
pub fn rocksdb_provider(
|
||||
self,
|
||||
rocksdb_provider: RocksDBProvider,
|
||||
) -> TypesAnd4<N, DB, Arc<C>, StaticFileProvider<N::Primitives>, RocksDBProvider> {
|
||||
TypesAnd4::new(self.val_1, self.val_2, self.val_3, rocksdb_provider)
|
||||
}
|
||||
}
|
||||
|
||||
/// This is staging type that contains the configured types and _four_ values.
|
||||
#[derive(Debug)]
|
||||
pub struct TypesAnd4<N, Val1, Val2, Val3, Val4> {
|
||||
_types: PhantomData<N>,
|
||||
val_1: Val1,
|
||||
val_2: Val2,
|
||||
val_3: Val3,
|
||||
val_4: Val4,
|
||||
}
|
||||
|
||||
impl<N, Val1, Val2, Val3, Val4> TypesAnd4<N, Val1, Val2, Val3, Val4> {
|
||||
/// Creates a new instance with the given types and four values.
|
||||
pub fn new(val_1: Val1, val_2: Val2, val_3: Val3, val_4: Val4) -> Self {
|
||||
Self { _types: Default::default(), val_1, val_2, val_3, val_4 }
|
||||
}
|
||||
}
|
||||
|
||||
impl<N, DB> TypesAnd4<N, DB, Arc<N::ChainSpec>, StaticFileProvider<N::Primitives>, RocksDBProvider>
|
||||
where
|
||||
N: NodeTypesForProvider,
|
||||
DB: Database + DatabaseMetrics + Clone + Unpin + 'static,
|
||||
@@ -326,7 +369,7 @@ where
|
||||
pub fn build_provider_factory(
|
||||
self,
|
||||
) -> ProviderResult<ProviderFactory<NodeTypesWithDBAdapter<N, DB>>> {
|
||||
let Self { _types, val_1, val_2, val_3 } = self;
|
||||
ProviderFactory::new(val_1, val_2, val_3)
|
||||
let Self { _types, val_1, val_2, val_3, val_4 } = self;
|
||||
ProviderFactory::new(val_1, val_2, val_3, val_4)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,15 +1,15 @@
|
||||
use crate::{
|
||||
providers::{
|
||||
state::latest::LatestStateProvider, NodeTypesForProvider, StaticFileProvider,
|
||||
StaticFileProviderRWRefMut,
|
||||
state::latest::LatestStateProvider, NodeTypesForProvider, RocksDBProvider,
|
||||
StaticFileProvider, StaticFileProviderRWRefMut,
|
||||
},
|
||||
to_range,
|
||||
traits::{BlockSource, ReceiptProvider},
|
||||
BlockHashReader, BlockNumReader, BlockReader, ChainSpecProvider, DatabaseProviderFactory,
|
||||
EitherWriterDestination, HashedPostStateProvider, HeaderProvider, HeaderSyncGapProvider,
|
||||
MetadataProvider, ProviderError, PruneCheckpointReader, StageCheckpointReader,
|
||||
StateProviderBox, StaticFileProviderFactory, StaticFileWriter, TransactionVariant,
|
||||
TransactionsProvider,
|
||||
MetadataProvider, ProviderError, PruneCheckpointReader, RocksDBProviderFactory,
|
||||
StageCheckpointReader, StateProviderBox, StaticFileProviderFactory, StaticFileWriter,
|
||||
TransactionVariant, TransactionsProvider,
|
||||
};
|
||||
use alloy_consensus::transaction::TransactionMeta;
|
||||
use alloy_eips::BlockHashOrNumber;
|
||||
@@ -72,6 +72,8 @@ pub struct ProviderFactory<N: NodeTypesWithDB> {
|
||||
storage: Arc<N::Storage>,
|
||||
/// Storage configuration settings for this node
|
||||
storage_settings: Arc<RwLock<StorageSettings>>,
|
||||
/// `RocksDB` provider
|
||||
rocksdb_provider: RocksDBProvider,
|
||||
}
|
||||
|
||||
impl<N: NodeTypesForProvider> ProviderFactory<NodeTypesWithDBAdapter<N, Arc<DatabaseEnv>>> {
|
||||
@@ -87,6 +89,7 @@ impl<N: ProviderNodeTypes> ProviderFactory<N> {
|
||||
db: N::DB,
|
||||
chain_spec: Arc<N::ChainSpec>,
|
||||
static_file_provider: StaticFileProvider<N::Primitives>,
|
||||
rocksdb_provider: RocksDBProvider,
|
||||
) -> ProviderResult<Self> {
|
||||
// Load storage settings from database at init time. Creates a temporary provider
|
||||
// to read persisted settings, falling back to legacy defaults if none exist.
|
||||
@@ -100,6 +103,7 @@ impl<N: ProviderNodeTypes> ProviderFactory<N> {
|
||||
Default::default(),
|
||||
Default::default(),
|
||||
Arc::new(RwLock::new(legacy_settings)),
|
||||
rocksdb_provider.clone(),
|
||||
)
|
||||
.storage_settings()?
|
||||
.unwrap_or(legacy_settings);
|
||||
@@ -111,6 +115,7 @@ impl<N: ProviderNodeTypes> ProviderFactory<N> {
|
||||
prune_modes: PruneModes::default(),
|
||||
storage: Default::default(),
|
||||
storage_settings: Arc::new(RwLock::new(storage_settings)),
|
||||
rocksdb_provider,
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -144,6 +149,12 @@ impl<N: NodeTypesWithDB> StorageSettingsCache for ProviderFactory<N> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<N: NodeTypesWithDB> RocksDBProviderFactory for ProviderFactory<N> {
|
||||
fn rocksdb_provider(&self) -> RocksDBProvider {
|
||||
self.rocksdb_provider.clone()
|
||||
}
|
||||
}
|
||||
|
||||
impl<N: ProviderNodeTypes<DB = Arc<DatabaseEnv>>> ProviderFactory<N> {
|
||||
/// Create new database provider by passing a path. [`ProviderFactory`] will own the database
|
||||
/// instance.
|
||||
@@ -152,11 +163,13 @@ impl<N: ProviderNodeTypes<DB = Arc<DatabaseEnv>>> ProviderFactory<N> {
|
||||
chain_spec: Arc<N::ChainSpec>,
|
||||
args: DatabaseArguments,
|
||||
static_file_provider: StaticFileProvider<N::Primitives>,
|
||||
rocksdb_provider: RocksDBProvider,
|
||||
) -> RethResult<Self> {
|
||||
Self::new(
|
||||
Arc::new(init_db(path, args).map_err(RethError::msg)?),
|
||||
chain_spec,
|
||||
static_file_provider,
|
||||
rocksdb_provider,
|
||||
)
|
||||
.map_err(RethError::Provider)
|
||||
}
|
||||
@@ -178,6 +191,7 @@ impl<N: ProviderNodeTypes> ProviderFactory<N> {
|
||||
self.prune_modes.clone(),
|
||||
self.storage.clone(),
|
||||
self.storage_settings.clone(),
|
||||
self.rocksdb_provider.clone(),
|
||||
))
|
||||
}
|
||||
|
||||
@@ -194,6 +208,7 @@ impl<N: ProviderNodeTypes> ProviderFactory<N> {
|
||||
self.prune_modes.clone(),
|
||||
self.storage.clone(),
|
||||
self.storage_settings.clone(),
|
||||
self.rocksdb_provider.clone(),
|
||||
)))
|
||||
}
|
||||
|
||||
@@ -595,8 +610,15 @@ where
|
||||
N: NodeTypesWithDB<DB: fmt::Debug, ChainSpec: fmt::Debug, Storage: fmt::Debug>,
|
||||
{
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
let Self { db, chain_spec, static_file_provider, prune_modes, storage, storage_settings } =
|
||||
self;
|
||||
let Self {
|
||||
db,
|
||||
chain_spec,
|
||||
static_file_provider,
|
||||
prune_modes,
|
||||
storage,
|
||||
storage_settings,
|
||||
rocksdb_provider,
|
||||
} = self;
|
||||
f.debug_struct("ProviderFactory")
|
||||
.field("db", &db)
|
||||
.field("chain_spec", &chain_spec)
|
||||
@@ -604,6 +626,7 @@ where
|
||||
.field("prune_modes", &prune_modes)
|
||||
.field("storage", &storage)
|
||||
.field("storage_settings", &*storage_settings.read())
|
||||
.field("rocksdb_provider", &rocksdb_provider)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
@@ -617,6 +640,7 @@ impl<N: NodeTypesWithDB> Clone for ProviderFactory<N> {
|
||||
prune_modes: self.prune_modes.clone(),
|
||||
storage: self.storage.clone(),
|
||||
storage_settings: self.storage_settings.clone(),
|
||||
rocksdb_provider: self.rocksdb_provider.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -635,7 +659,7 @@ mod tests {
|
||||
use reth_chainspec::ChainSpecBuilder;
|
||||
use reth_db::{
|
||||
mdbx::DatabaseArguments,
|
||||
test_utils::{create_test_static_files_dir, ERROR_TEMPDIR},
|
||||
test_utils::{create_test_rocksdb_dir, create_test_static_files_dir, ERROR_TEMPDIR},
|
||||
};
|
||||
use reth_db_api::tables;
|
||||
use reth_primitives_traits::SignerRecoverable;
|
||||
@@ -674,11 +698,13 @@ mod tests {
|
||||
fn provider_factory_with_database_path() {
|
||||
let chain_spec = ChainSpecBuilder::mainnet().build();
|
||||
let (_static_dir, static_dir_path) = create_test_static_files_dir();
|
||||
let (_, rocksdb_path) = create_test_rocksdb_dir();
|
||||
let factory = ProviderFactory::<MockNodeTypesWithDB<DatabaseEnv>>::new_with_database_path(
|
||||
tempfile::TempDir::new().expect(ERROR_TEMPDIR).keep(),
|
||||
Arc::new(chain_spec),
|
||||
DatabaseArguments::new(Default::default()),
|
||||
StaticFileProvider::read_write(static_dir_path).unwrap(),
|
||||
RocksDBProvider::builder(&rocksdb_path).build().unwrap(),
|
||||
)
|
||||
.unwrap();
|
||||
let provider = factory.provider().unwrap();
|
||||
|
||||
@@ -4,6 +4,7 @@ use crate::{
|
||||
},
|
||||
providers::{
|
||||
database::{chain::ChainStorage, metrics},
|
||||
rocksdb::RocksDBProvider,
|
||||
static_file::StaticFileWriter,
|
||||
NodeTypesForProvider, StaticFileProvider,
|
||||
},
|
||||
@@ -16,10 +17,10 @@ use crate::{
|
||||
DBProvider, EitherReader, EitherWriter, EitherWriterDestination, HashingWriter, HeaderProvider,
|
||||
HeaderSyncGapProvider, HistoricalStateProvider, HistoricalStateProviderRef, HistoryWriter,
|
||||
LatestStateProvider, LatestStateProviderRef, OriginalValuesKnown, ProviderError,
|
||||
PruneCheckpointReader, PruneCheckpointWriter, RevertsInit, StageCheckpointReader,
|
||||
StateProviderBox, StateWriter, StaticFileProviderFactory, StatsReader, StorageReader,
|
||||
StorageTrieWriter, TransactionVariant, TransactionsProvider, TransactionsProviderExt,
|
||||
TrieReader, TrieWriter,
|
||||
PruneCheckpointReader, PruneCheckpointWriter, RevertsInit, RocksDBProviderFactory,
|
||||
StageCheckpointReader, StateProviderBox, StateWriter, StaticFileProviderFactory, StatsReader,
|
||||
StorageReader, StorageTrieWriter, TransactionVariant, TransactionsProvider,
|
||||
TransactionsProviderExt, TrieReader, TrieWriter,
|
||||
};
|
||||
use alloy_consensus::{
|
||||
transaction::{SignerRecoverable, TransactionMeta, TxHashRef},
|
||||
@@ -164,6 +165,8 @@ pub struct DatabaseProvider<TX, N: NodeTypes> {
|
||||
storage: Arc<N::Storage>,
|
||||
/// Storage configuration settings for this node
|
||||
storage_settings: Arc<RwLock<StorageSettings>>,
|
||||
/// `RocksDB` provider
|
||||
rocksdb_provider: RocksDBProvider,
|
||||
/// Minimum distance from tip required for pruning
|
||||
minimum_pruning_distance: u64,
|
||||
}
|
||||
@@ -251,6 +254,13 @@ impl<TX, N: NodeTypes> StaticFileProviderFactory for DatabaseProvider<TX, N> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<TX, N: NodeTypes> RocksDBProviderFactory for DatabaseProvider<TX, N> {
|
||||
/// Returns the `RocksDB` provider.
|
||||
fn rocksdb_provider(&self) -> RocksDBProvider {
|
||||
self.rocksdb_provider.clone()
|
||||
}
|
||||
}
|
||||
|
||||
impl<TX: Debug + Send + Sync, N: NodeTypes<ChainSpec: EthChainSpec + 'static>> ChainSpecProvider
|
||||
for DatabaseProvider<TX, N>
|
||||
{
|
||||
@@ -270,6 +280,7 @@ impl<TX: DbTxMut, N: NodeTypes> DatabaseProvider<TX, N> {
|
||||
prune_modes: PruneModes,
|
||||
storage: Arc<N::Storage>,
|
||||
storage_settings: Arc<RwLock<StorageSettings>>,
|
||||
rocksdb_provider: RocksDBProvider,
|
||||
) -> Self {
|
||||
Self {
|
||||
tx,
|
||||
@@ -278,6 +289,7 @@ impl<TX: DbTxMut, N: NodeTypes> DatabaseProvider<TX, N> {
|
||||
prune_modes,
|
||||
storage,
|
||||
storage_settings,
|
||||
rocksdb_provider,
|
||||
minimum_pruning_distance: MINIMUM_PRUNING_DISTANCE,
|
||||
}
|
||||
}
|
||||
@@ -523,6 +535,7 @@ impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
|
||||
prune_modes: PruneModes,
|
||||
storage: Arc<N::Storage>,
|
||||
storage_settings: Arc<RwLock<StorageSettings>>,
|
||||
rocksdb_provider: RocksDBProvider,
|
||||
) -> Self {
|
||||
Self {
|
||||
tx,
|
||||
@@ -531,6 +544,7 @@ impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
|
||||
prune_modes,
|
||||
storage,
|
||||
storage_settings,
|
||||
rocksdb_provider,
|
||||
minimum_pruning_distance: MINIMUM_PRUNING_DISTANCE,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -31,10 +31,11 @@ pub use consistent::ConsistentProvider;
|
||||
|
||||
// RocksDB currently only supported on Unix platforms
|
||||
// Windows support is planned for future releases
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
#[cfg_attr(all(unix, feature = "rocksdb"), path = "rocksdb/mod.rs")]
|
||||
#[cfg_attr(not(all(unix, feature = "rocksdb")), path = "rocksdb_stub.rs")]
|
||||
pub(crate) mod rocksdb;
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
pub use rocksdb::{RocksDBBuilder, RocksDBProvider, RocksDBWriteMode, RocksTx};
|
||||
|
||||
pub use rocksdb::{RocksDBBatch, RocksDBBuilder, RocksDBProvider, RocksTx};
|
||||
|
||||
/// Helper trait to bound [`NodeTypes`] so that combined with database they satisfy
|
||||
/// [`ProviderNodeTypes`].
|
||||
|
||||
@@ -2,7 +2,4 @@
|
||||
|
||||
mod metrics;
|
||||
mod provider;
|
||||
pub use provider::{RocksDBBuilder, RocksDBProvider, RocksDBWriteMode, RocksTx};
|
||||
|
||||
#[cfg(test)]
|
||||
pub(crate) use provider::RocksDBBatch;
|
||||
pub use provider::{RocksDBBatch, RocksDBBuilder, RocksDBProvider, RocksTx};
|
||||
|
||||
@@ -18,7 +18,6 @@ use std::{
|
||||
sync::Arc,
|
||||
time::Instant,
|
||||
};
|
||||
use tracing::warn;
|
||||
|
||||
/// Default cache size for `RocksDB` block cache (128 MB).
|
||||
const DEFAULT_CACHE_SIZE: usize = 128 << 20;
|
||||
@@ -35,6 +34,11 @@ const DEFAULT_BYTES_PER_SYNC: u64 = 1_048_576;
|
||||
/// Default bloom filter bits per key (~1% false positive rate).
|
||||
const DEFAULT_BLOOM_FILTER_BITS: f64 = 10.0;
|
||||
|
||||
/// Default buffer capacity for compression in batches.
|
||||
/// 4 KiB matches common block/page sizes and comfortably holds typical history values,
|
||||
/// reducing the first few reallocations without over-allocating.
|
||||
const DEFAULT_COMPRESS_BUF_CAPACITY: usize = 4096;
|
||||
|
||||
/// Builder for [`RocksDBProvider`].
|
||||
pub struct RocksDBBuilder {
|
||||
path: PathBuf,
|
||||
@@ -152,8 +156,10 @@ impl RocksDBBuilder {
|
||||
}
|
||||
|
||||
/// Sets the log level from `DatabaseArgs` configuration.
|
||||
pub const fn with_database_log_level(mut self, log_level: LogLevel) -> Self {
|
||||
self.log_level = convert_log_level(log_level);
|
||||
pub const fn with_database_log_level(mut self, log_level: Option<LogLevel>) -> Self {
|
||||
if let Some(level) = log_level {
|
||||
self.log_level = convert_log_level(level);
|
||||
}
|
||||
self
|
||||
}
|
||||
|
||||
@@ -260,23 +266,15 @@ impl RocksDBProvider {
|
||||
RocksTx { inner, provider: self }
|
||||
}
|
||||
|
||||
/// Creates a new batch for manual commit.
|
||||
/// Creates a new batch for atomic writes.
|
||||
///
|
||||
/// Use [`Self::write_batch`] for closure-based atomic writes.
|
||||
/// Use this method when the batch needs to be held by [`EitherWriter`](crate::EitherWriter).
|
||||
///
|
||||
/// # Example
|
||||
///
|
||||
/// ```ignore
|
||||
/// let batch = provider.batch();
|
||||
/// batch.put::<SomeTable>(key, &value)?;
|
||||
/// batch.commit()?;
|
||||
/// ```
|
||||
/// Use this method when the batch needs to be held by [`crate::EitherWriter`].
|
||||
pub fn batch(&self) -> RocksDBBatch<'_> {
|
||||
RocksDBBatch {
|
||||
provider: self,
|
||||
inner: WriteBatchWithTransaction::<true>::default(),
|
||||
buf: Vec::new(),
|
||||
buf: Vec::with_capacity(DEFAULT_COMPRESS_BUF_CAPACITY),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -375,31 +373,21 @@ impl RocksDBProvider {
|
||||
where
|
||||
F: FnOnce(&mut RocksDBBatch<'_>) -> ProviderResult<()>,
|
||||
{
|
||||
// Note: Using "Batch" as table name for batch operations across multiple tables
|
||||
self.execute_with_operation_metric(RocksDBOperation::BatchWrite, "Batch", |this| {
|
||||
let mut batch_handle = RocksDBBatch {
|
||||
provider: this,
|
||||
inner: WriteBatchWithTransaction::<true>::default(),
|
||||
buf: Vec::new(),
|
||||
};
|
||||
|
||||
let mut batch_handle = this.batch();
|
||||
f(&mut batch_handle)?;
|
||||
|
||||
// Take ownership of inner to prevent Drop from logging a warning
|
||||
let batch = std::mem::take(&mut batch_handle.inner);
|
||||
this.0.db.write(batch).map_err(|e| {
|
||||
ProviderError::Database(DatabaseError::Commit(DatabaseErrorInfo {
|
||||
message: e.to_string().into(),
|
||||
code: -1,
|
||||
}))
|
||||
})
|
||||
batch_handle.commit()
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// Handle for building a batch of operations atomically.
|
||||
///
|
||||
/// Uses `WriteBatchWithTransaction<true>` for compatibility with `TransactionDB`.
|
||||
/// Uses `WriteBatchWithTransaction` for atomic writes without full transaction overhead.
|
||||
/// Unlike [`RocksTx`], this does NOT support read-your-writes. Use for write-only flows
|
||||
/// where you don't need to read back uncommitted data within the same operation
|
||||
/// (e.g., history index writes).
|
||||
#[must_use = "batch must be committed"]
|
||||
pub struct RocksDBBatch<'a> {
|
||||
provider: &'a RocksDBProvider,
|
||||
inner: WriteBatchWithTransaction<true>,
|
||||
@@ -447,14 +435,8 @@ impl<'a> RocksDBBatch<'a> {
|
||||
/// Commits the batch to the database.
|
||||
///
|
||||
/// This consumes the batch and writes all operations atomically to `RocksDB`.
|
||||
///
|
||||
/// # Errors
|
||||
///
|
||||
/// Returns an error if the write fails.
|
||||
pub fn commit(mut self) -> ProviderResult<()> {
|
||||
// Take ownership of inner to prevent Drop from logging a warning
|
||||
let batch = std::mem::take(&mut self.inner);
|
||||
self.provider.0.db.write(batch).map_err(|e| {
|
||||
pub fn commit(self) -> ProviderResult<()> {
|
||||
self.provider.0.db.write_opt(self.inner, &WriteOptions::default()).map_err(|e| {
|
||||
ProviderError::Database(DatabaseError::Commit(DatabaseErrorInfo {
|
||||
message: e.to_string().into(),
|
||||
code: -1,
|
||||
@@ -462,7 +444,7 @@ impl<'a> RocksDBBatch<'a> {
|
||||
})
|
||||
}
|
||||
|
||||
/// Returns the number of operations in this batch.
|
||||
/// Returns the number of write operations (puts + deletes) queued in this batch.
|
||||
pub fn len(&self) -> usize {
|
||||
self.inner.len()
|
||||
}
|
||||
@@ -473,18 +455,6 @@ impl<'a> RocksDBBatch<'a> {
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for RocksDBBatch<'_> {
|
||||
fn drop(&mut self) {
|
||||
if !self.inner.is_empty() {
|
||||
warn!(
|
||||
target: "reth::storage",
|
||||
batch_len = %self.inner.len(),
|
||||
"RocksDBBatch dropped without commit - data discarded"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// `RocksDB` transaction wrapper providing MDBX-like semantics.
|
||||
///
|
||||
/// Supports:
|
||||
@@ -646,49 +616,6 @@ impl<T: Table> Iterator for RocksTxIter<'_, T> {
|
||||
}
|
||||
}
|
||||
|
||||
/// `RocksDB` write strategy - internal implementation detail.
|
||||
///
|
||||
/// This enum encapsulates the choice between full transaction semantics
|
||||
/// and high-throughput batch writes. Use [`RocksDBWriteMode::Transaction`] for
|
||||
/// read-modify-write operations that need read-your-writes semantics.
|
||||
/// Use [`RocksDBWriteMode::Batch`] for bulk sync operations where
|
||||
/// read-your-writes is not needed.
|
||||
pub enum RocksDBWriteMode<'a> {
|
||||
/// Full transaction with read-your-writes, rollback support.
|
||||
/// Use for read-modify-write operations.
|
||||
Transaction(RocksTx<'a>),
|
||||
/// Write-only batch for maximum throughput.
|
||||
/// Use for bulk sync operations where read-your-writes is not needed.
|
||||
Batch(RocksDBBatch<'a>),
|
||||
}
|
||||
|
||||
impl fmt::Debug for RocksDBWriteMode<'_> {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
match self {
|
||||
Self::Transaction(tx) => f.debug_tuple("Transaction").field(tx).finish(),
|
||||
Self::Batch(batch) => f.debug_tuple("Batch").field(batch).finish(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> RocksDBWriteMode<'a> {
|
||||
/// Puts a value into the specified table.
|
||||
pub fn put<T: Table>(&mut self, key: T::Key, value: &T::Value) -> ProviderResult<()> {
|
||||
match self {
|
||||
Self::Transaction(tx) => tx.put::<T>(key, value),
|
||||
Self::Batch(batch) => batch.put::<T>(key, value),
|
||||
}
|
||||
}
|
||||
|
||||
/// Commits the transaction or batch.
|
||||
pub fn commit(self) -> ProviderResult<()> {
|
||||
match self {
|
||||
Self::Transaction(tx) => tx.commit(),
|
||||
Self::Batch(batch) => batch.commit(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Converts Reth's [`LogLevel`] to `RocksDB`'s [`rocksdb::LogLevel`].
|
||||
const fn convert_log_level(level: LogLevel) -> rocksdb::LogLevel {
|
||||
match level {
|
||||
@@ -975,167 +902,4 @@ mod tests {
|
||||
assert_eq!(provider.get::<TestTable>(i).unwrap(), Some(value));
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_write_mode_batch() {
|
||||
let temp_dir = TempDir::new().unwrap();
|
||||
let provider =
|
||||
RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
|
||||
|
||||
// Create write mode using Batch
|
||||
let batch = provider.batch();
|
||||
let mut mode = RocksDBWriteMode::Batch(batch);
|
||||
|
||||
// Write via RocksDBWriteMode
|
||||
let key = 42u64;
|
||||
let value = b"test_via_mode".to_vec();
|
||||
mode.put::<TestTable>(key, &value).unwrap();
|
||||
|
||||
// Commit via RocksDBWriteMode
|
||||
mode.commit().unwrap();
|
||||
|
||||
// Verify data is visible
|
||||
assert_eq!(provider.get::<TestTable>(key).unwrap(), Some(value));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_write_mode_transaction() {
|
||||
let temp_dir = TempDir::new().unwrap();
|
||||
let provider =
|
||||
RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
|
||||
|
||||
// Create write mode using Transaction
|
||||
let tx = provider.tx();
|
||||
let mut mode = RocksDBWriteMode::Transaction(tx);
|
||||
|
||||
// Write via RocksDBWriteMode
|
||||
let key = 100u64;
|
||||
let value = b"test_via_tx_mode".to_vec();
|
||||
mode.put::<TestTable>(key, &value).unwrap();
|
||||
|
||||
// Commit via RocksDBWriteMode
|
||||
mode.commit().unwrap();
|
||||
|
||||
// Verify data is visible
|
||||
assert_eq!(provider.get::<TestTable>(key).unwrap(), Some(value));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_batch_empty_commit() {
|
||||
let temp_dir = TempDir::new().unwrap();
|
||||
let provider =
|
||||
RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
|
||||
|
||||
// Create an empty batch
|
||||
let batch = provider.batch();
|
||||
assert!(batch.is_empty());
|
||||
assert_eq!(batch.len(), 0);
|
||||
|
||||
// Commit should succeed (no-op)
|
||||
batch.commit().unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_batch_drop_without_commit_does_not_persist() {
|
||||
let temp_dir = TempDir::new().unwrap();
|
||||
let provider =
|
||||
RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
|
||||
|
||||
// Create a batch and add entries but DON'T commit
|
||||
{
|
||||
let mut batch = provider.batch();
|
||||
for i in 0..5u64 {
|
||||
let value = format!("dropped_value_{i}").into_bytes();
|
||||
batch.put::<TestTable>(i, &value).unwrap();
|
||||
}
|
||||
// batch dropped here without commit - should log warning
|
||||
}
|
||||
|
||||
// Data should NOT be persisted
|
||||
for i in 0..5u64 {
|
||||
assert_eq!(
|
||||
provider.get::<TestTable>(i).unwrap(),
|
||||
None,
|
||||
"Dropped batch should not persist data"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_write_mode_debug_formatting() {
|
||||
let temp_dir = TempDir::new().unwrap();
|
||||
let provider =
|
||||
RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
|
||||
|
||||
// Test Batch variant debug output
|
||||
let batch = provider.batch();
|
||||
let mode_batch = RocksDBWriteMode::Batch(batch);
|
||||
let debug_str = format!("{:?}", mode_batch);
|
||||
assert!(debug_str.contains("Batch"), "Debug should contain 'Batch': {debug_str}");
|
||||
|
||||
// Test Transaction variant debug output
|
||||
let tx = provider.tx();
|
||||
let mode_tx = RocksDBWriteMode::Transaction(tx);
|
||||
let debug_str = format!("{:?}", mode_tx);
|
||||
assert!(
|
||||
debug_str.contains("Transaction"),
|
||||
"Debug should contain 'Transaction': {debug_str}"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_batch_delete_operation() {
|
||||
let temp_dir = TempDir::new().unwrap();
|
||||
let provider =
|
||||
RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
|
||||
|
||||
// First, insert some data
|
||||
let mut batch = provider.batch();
|
||||
for i in 0..5u64 {
|
||||
batch.put::<TestTable>(i, &vec![i as u8]).unwrap();
|
||||
}
|
||||
batch.commit().unwrap();
|
||||
|
||||
// Verify data exists
|
||||
for i in 0..5u64 {
|
||||
assert!(provider.get::<TestTable>(i).unwrap().is_some());
|
||||
}
|
||||
|
||||
// Now delete via batch
|
||||
let mut delete_batch = provider.batch();
|
||||
for i in 0..5u64 {
|
||||
delete_batch.delete::<TestTable>(i).unwrap();
|
||||
}
|
||||
delete_batch.commit().unwrap();
|
||||
|
||||
// Verify data is deleted
|
||||
for i in 0..5u64 {
|
||||
assert_eq!(provider.get::<TestTable>(i).unwrap(), None);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_batch_overwrite_existing_key() {
|
||||
let temp_dir = TempDir::new().unwrap();
|
||||
let provider =
|
||||
RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
|
||||
|
||||
let key = 42u64;
|
||||
let initial_value = b"initial".to_vec();
|
||||
let updated_value = b"updated".to_vec();
|
||||
|
||||
// Insert initial value
|
||||
let mut batch = provider.batch();
|
||||
batch.put::<TestTable>(key, &initial_value).unwrap();
|
||||
batch.commit().unwrap();
|
||||
|
||||
assert_eq!(provider.get::<TestTable>(key).unwrap(), Some(initial_value));
|
||||
|
||||
// Overwrite with new value
|
||||
let mut batch = provider.batch();
|
||||
batch.put::<TestTable>(key, &updated_value).unwrap();
|
||||
batch.commit().unwrap();
|
||||
|
||||
assert_eq!(provider.get::<TestTable>(key).unwrap(), Some(updated_value));
|
||||
}
|
||||
}
|
||||
|
||||
210
crates/storage/provider/src/providers/rocksdb_stub.rs
Normal file
210
crates/storage/provider/src/providers/rocksdb_stub.rs
Normal file
@@ -0,0 +1,210 @@
|
||||
//! Stub implementation of `RocksDB` provider.
|
||||
//!
|
||||
//! This module provides placeholder types that allow the code to compile when `RocksDB` is not
|
||||
//! available (either on non-Unix platforms or when the `rocksdb` feature is not enabled).
|
||||
//! Operations will produce errors if actually attempted.
|
||||
|
||||
use reth_db_api::table::{Encode, Table};
|
||||
use reth_storage_errors::{
|
||||
db::LogLevel,
|
||||
provider::{ProviderError::UnsupportedProvider, ProviderResult},
|
||||
};
|
||||
use std::path::Path;
|
||||
|
||||
/// A stub `RocksDB` provider.
|
||||
///
|
||||
/// This type exists to allow code to compile when `RocksDB` is not available (either on non-Unix
|
||||
/// platforms or when the `rocksdb` feature is not enabled). When using this stub, the
|
||||
/// `transaction_hash_numbers_in_rocksdb` flag should be set to `false` to ensure all operations
|
||||
/// route to MDBX instead.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct RocksDBProvider;
|
||||
|
||||
impl RocksDBProvider {
|
||||
/// Creates a new stub `RocksDB` provider.
|
||||
///
|
||||
/// On non-Unix platforms, this returns an error indicating `RocksDB` is not supported.
|
||||
pub fn new(_path: impl AsRef<Path>) -> ProviderResult<Self> {
|
||||
Ok(Self)
|
||||
}
|
||||
|
||||
/// Creates a new stub `RocksDB` provider builder.
|
||||
pub fn builder(path: impl AsRef<Path>) -> RocksDBBuilder {
|
||||
RocksDBBuilder::new(path)
|
||||
}
|
||||
|
||||
/// Get a value from `RocksDB` (stub implementation).
|
||||
pub fn get<T: Table>(&self, _key: T::Key) -> ProviderResult<Option<T::Value>> {
|
||||
Err(UnsupportedProvider)
|
||||
}
|
||||
|
||||
/// Get a value from `RocksDB` using pre-encoded key (stub implementation).
|
||||
pub const fn get_encoded<T: Table>(
|
||||
&self,
|
||||
_key: &<T::Key as Encode>::Encoded,
|
||||
) -> ProviderResult<Option<T::Value>> {
|
||||
Err(UnsupportedProvider)
|
||||
}
|
||||
|
||||
/// Put a value into `RocksDB` (stub implementation).
|
||||
pub fn put<T: Table>(&self, _key: T::Key, _value: &T::Value) -> ProviderResult<()> {
|
||||
Err(UnsupportedProvider)
|
||||
}
|
||||
|
||||
/// Put a value into `RocksDB` using pre-encoded key (stub implementation).
|
||||
pub const fn put_encoded<T: Table>(
|
||||
&self,
|
||||
_key: &<T::Key as Encode>::Encoded,
|
||||
_value: &T::Value,
|
||||
) -> ProviderResult<()> {
|
||||
Err(UnsupportedProvider)
|
||||
}
|
||||
|
||||
/// Delete a value from `RocksDB` (stub implementation).
|
||||
pub fn delete<T: Table>(&self, _key: T::Key) -> ProviderResult<()> {
|
||||
Err(UnsupportedProvider)
|
||||
}
|
||||
|
||||
/// Write a batch of operations (stub implementation).
|
||||
pub fn write_batch<F>(&self, _f: F) -> ProviderResult<()>
|
||||
where
|
||||
F: FnOnce(&mut RocksDBBatch) -> ProviderResult<()>,
|
||||
{
|
||||
Err(UnsupportedProvider)
|
||||
}
|
||||
|
||||
/// Creates a new transaction (stub implementation).
|
||||
pub const fn tx(&self) -> RocksTx {
|
||||
RocksTx
|
||||
}
|
||||
}
|
||||
|
||||
/// A stub batch writer for `RocksDB` on non-Unix platforms.
|
||||
#[derive(Debug)]
|
||||
pub struct RocksDBBatch;
|
||||
|
||||
impl RocksDBBatch {
|
||||
/// Puts a value into the batch (stub implementation).
|
||||
pub fn put<T: Table>(&self, _key: T::Key, _value: &T::Value) -> ProviderResult<()> {
|
||||
Err(UnsupportedProvider)
|
||||
}
|
||||
|
||||
/// Puts a value into the batch using pre-encoded key (stub implementation).
|
||||
pub const fn put_encoded<T: Table>(
|
||||
&self,
|
||||
_key: &<T::Key as Encode>::Encoded,
|
||||
_value: &T::Value,
|
||||
) -> ProviderResult<()> {
|
||||
Err(UnsupportedProvider)
|
||||
}
|
||||
|
||||
/// Deletes a value from the batch (stub implementation).
|
||||
pub fn delete<T: Table>(&self, _key: T::Key) -> ProviderResult<()> {
|
||||
Err(UnsupportedProvider)
|
||||
}
|
||||
}
|
||||
|
||||
/// A stub builder for `RocksDB` on non-Unix platforms.
|
||||
#[derive(Debug)]
|
||||
pub struct RocksDBBuilder;
|
||||
|
||||
impl RocksDBBuilder {
|
||||
/// Creates a new stub builder.
|
||||
pub fn new<P: AsRef<Path>>(_path: P) -> Self {
|
||||
Self
|
||||
}
|
||||
|
||||
/// Adds a column family for a specific table type (stub implementation).
|
||||
pub const fn with_table<T: Table>(self) -> Self {
|
||||
self
|
||||
}
|
||||
|
||||
/// Enables metrics (stub implementation).
|
||||
pub const fn with_metrics(self) -> Self {
|
||||
self
|
||||
}
|
||||
|
||||
/// Enables `RocksDB` internal statistics collection (stub implementation).
|
||||
pub const fn with_statistics(self) -> Self {
|
||||
self
|
||||
}
|
||||
|
||||
/// Sets the log level from `DatabaseArgs` configuration (stub implementation).
|
||||
pub const fn with_database_log_level(self, _log_level: Option<LogLevel>) -> Self {
|
||||
self
|
||||
}
|
||||
|
||||
/// Sets a custom block cache size (stub implementation).
|
||||
pub const fn with_block_cache_size(self, _capacity_bytes: usize) -> Self {
|
||||
self
|
||||
}
|
||||
|
||||
/// Build the `RocksDB` provider (stub implementation).
|
||||
pub const fn build(self) -> ProviderResult<RocksDBProvider> {
|
||||
Ok(RocksDBProvider)
|
||||
}
|
||||
}
|
||||
|
||||
/// A stub transaction for `RocksDB`.
|
||||
#[derive(Debug)]
|
||||
pub struct RocksTx;
|
||||
|
||||
impl RocksTx {
|
||||
/// Gets a value from the specified table (stub implementation).
|
||||
pub fn get<T: Table>(&self, _key: T::Key) -> ProviderResult<Option<T::Value>> {
|
||||
Err(UnsupportedProvider)
|
||||
}
|
||||
|
||||
/// Gets a value using pre-encoded key (stub implementation).
|
||||
pub const fn get_encoded<T: Table>(
|
||||
&self,
|
||||
_key: &<T::Key as Encode>::Encoded,
|
||||
) -> ProviderResult<Option<T::Value>> {
|
||||
Err(UnsupportedProvider)
|
||||
}
|
||||
|
||||
/// Puts a value into the specified table (stub implementation).
|
||||
pub fn put<T: Table>(&self, _key: T::Key, _value: &T::Value) -> ProviderResult<()> {
|
||||
Err(UnsupportedProvider)
|
||||
}
|
||||
|
||||
/// Puts a value using pre-encoded key (stub implementation).
|
||||
pub const fn put_encoded<T: Table>(
|
||||
&self,
|
||||
_key: &<T::Key as Encode>::Encoded,
|
||||
_value: &T::Value,
|
||||
) -> ProviderResult<()> {
|
||||
Err(UnsupportedProvider)
|
||||
}
|
||||
|
||||
/// Deletes a value from the specified table (stub implementation).
|
||||
pub fn delete<T: Table>(&self, _key: T::Key) -> ProviderResult<()> {
|
||||
Err(UnsupportedProvider)
|
||||
}
|
||||
|
||||
/// Creates an iterator for the specified table (stub implementation).
|
||||
pub const fn iter<T: Table>(&self) -> ProviderResult<RocksTxIter<'_, T>> {
|
||||
Err(UnsupportedProvider)
|
||||
}
|
||||
|
||||
/// Creates an iterator starting from the given key (stub implementation).
|
||||
pub fn iter_from<T: Table>(&self, _key: T::Key) -> ProviderResult<RocksTxIter<'_, T>> {
|
||||
Err(UnsupportedProvider)
|
||||
}
|
||||
|
||||
/// Commits the transaction (stub implementation).
|
||||
pub const fn commit(self) -> ProviderResult<()> {
|
||||
Err(UnsupportedProvider)
|
||||
}
|
||||
|
||||
/// Rolls back the transaction (stub implementation).
|
||||
pub const fn rollback(self) -> ProviderResult<()> {
|
||||
Err(UnsupportedProvider)
|
||||
}
|
||||
}
|
||||
|
||||
/// A stub iterator for `RocksDB` transactions.
|
||||
#[derive(Debug)]
|
||||
pub struct RocksTxIter<'a, T> {
|
||||
_marker: std::marker::PhantomData<(&'a (), T)>,
|
||||
}
|
||||
@@ -1,11 +1,13 @@
|
||||
use crate::{
|
||||
providers::{NodeTypesForProvider, ProviderNodeTypes, StaticFileProvider},
|
||||
providers::{NodeTypesForProvider, ProviderNodeTypes, RocksDBProvider, StaticFileProvider},
|
||||
HashingWriter, ProviderFactory, TrieWriter,
|
||||
};
|
||||
use alloy_primitives::B256;
|
||||
use reth_chainspec::{ChainSpec, MAINNET};
|
||||
use reth_db::{
|
||||
test_utils::{create_test_rw_db, create_test_static_files_dir, TempDatabase},
|
||||
test_utils::{
|
||||
create_test_rocksdb_dir, create_test_rw_db, create_test_static_files_dir, TempDatabase,
|
||||
},
|
||||
DatabaseEnv,
|
||||
};
|
||||
use reth_errors::ProviderResult;
|
||||
@@ -54,11 +56,13 @@ pub fn create_test_provider_factory_with_node_types<N: NodeTypesForProvider>(
|
||||
chain_spec: Arc<N::ChainSpec>,
|
||||
) -> ProviderFactory<NodeTypesWithDBAdapter<N, Arc<TempDatabase<DatabaseEnv>>>> {
|
||||
let (static_dir, _) = create_test_static_files_dir();
|
||||
let (rocksdb_dir, _) = create_test_rocksdb_dir();
|
||||
let db = create_test_rw_db();
|
||||
ProviderFactory::new(
|
||||
db,
|
||||
chain_spec,
|
||||
StaticFileProvider::read_write(static_dir.keep()).expect("static file provider"),
|
||||
RocksDBProvider::new(&rocksdb_dir).expect("failed to create test RocksDB provider"),
|
||||
)
|
||||
.expect("failed to create test provider factory")
|
||||
}
|
||||
|
||||
@@ -1,8 +1,8 @@
|
||||
//! Additional testing support for `NoopProvider`.
|
||||
|
||||
use crate::{
|
||||
providers::{StaticFileProvider, StaticFileProviderRWRefMut},
|
||||
StaticFileProviderFactory,
|
||||
providers::{RocksDBProvider, StaticFileProvider, StaticFileProviderRWRefMut},
|
||||
RocksDBProviderFactory, StaticFileProviderFactory,
|
||||
};
|
||||
use reth_errors::{ProviderError, ProviderResult};
|
||||
use reth_primitives_traits::NodePrimitives;
|
||||
@@ -24,3 +24,9 @@ impl<C: Send + Sync, N: NodePrimitives> StaticFileProviderFactory for NoopProvid
|
||||
Err(ProviderError::ReadOnlyStaticFileAccess)
|
||||
}
|
||||
}
|
||||
|
||||
impl<C: Send + Sync, N: NodePrimitives> RocksDBProviderFactory for NoopProvider<C, N> {
|
||||
fn rocksdb_provider(&self) -> RocksDBProvider {
|
||||
RocksDBProvider::builder(PathBuf::default()).build().unwrap()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,8 +2,9 @@
|
||||
|
||||
use crate::{
|
||||
AccountReader, BlockReader, BlockReaderIdExt, ChainSpecProvider, ChangeSetReader,
|
||||
DatabaseProviderFactory, HashedPostStateProvider, PruneCheckpointReader, StageCheckpointReader,
|
||||
StateProviderFactory, StateReader, StaticFileProviderFactory, TrieReader,
|
||||
DatabaseProviderFactory, HashedPostStateProvider, PruneCheckpointReader,
|
||||
RocksDBProviderFactory, StageCheckpointReader, StateProviderFactory, StateReader,
|
||||
StaticFileProviderFactory, TrieReader,
|
||||
};
|
||||
use reth_chain_state::{CanonStateSubscriptions, ForkChoiceSubscriptions};
|
||||
use reth_node_types::{BlockTy, HeaderTy, NodeTypesWithDB, ReceiptTy, TxTy};
|
||||
@@ -17,6 +18,7 @@ pub trait FullProvider<N: NodeTypesWithDB>:
|
||||
Provider: BlockReader + TrieReader + StageCheckpointReader + PruneCheckpointReader,
|
||||
> + NodePrimitivesProvider<Primitives = N::Primitives>
|
||||
+ StaticFileProviderFactory<Primitives = N::Primitives>
|
||||
+ RocksDBProviderFactory
|
||||
+ BlockReaderIdExt<
|
||||
Transaction = TxTy<N>,
|
||||
Block = BlockTy<N>,
|
||||
@@ -44,6 +46,7 @@ impl<T, N: NodeTypesWithDB> FullProvider<N> for T where
|
||||
Provider: BlockReader + TrieReader + StageCheckpointReader + PruneCheckpointReader,
|
||||
> + NodePrimitivesProvider<Primitives = N::Primitives>
|
||||
+ StaticFileProviderFactory<Primitives = N::Primitives>
|
||||
+ RocksDBProviderFactory
|
||||
+ BlockReaderIdExt<
|
||||
Transaction = TxTy<N>,
|
||||
Block = BlockTy<N>,
|
||||
|
||||
@@ -8,5 +8,8 @@ pub use reth_chainspec::ChainSpecProvider;
|
||||
mod static_file_provider;
|
||||
pub use static_file_provider::StaticFileProviderFactory;
|
||||
|
||||
mod rocksdb_provider;
|
||||
pub use rocksdb_provider::RocksDBProviderFactory;
|
||||
|
||||
mod full;
|
||||
pub use full::FullProvider;
|
||||
|
||||
9
crates/storage/provider/src/traits/rocksdb_provider.rs
Normal file
9
crates/storage/provider/src/traits/rocksdb_provider.rs
Normal file
@@ -0,0 +1,9 @@
|
||||
use crate::providers::RocksDBProvider;
|
||||
|
||||
/// `RocksDB` provider factory.
|
||||
///
|
||||
/// This trait provides access to the `RocksDB` provider
|
||||
pub trait RocksDBProviderFactory {
|
||||
/// Returns the `RocksDB` provider.
|
||||
fn rocksdb_provider(&self) -> RocksDBProvider;
|
||||
}
|
||||
@@ -145,9 +145,9 @@ where
|
||||
/// Manages listeners for transaction state change events.
|
||||
event_listener: RwLock<PoolEventBroadcast<T::Transaction>>,
|
||||
/// Listeners for new _full_ pending transactions.
|
||||
pending_transaction_listener: Mutex<Vec<PendingTransactionHashListener>>,
|
||||
pending_transaction_listener: RwLock<Vec<PendingTransactionHashListener>>,
|
||||
/// Listeners for new transactions added to the pool.
|
||||
transaction_listener: Mutex<Vec<TransactionListener<T::Transaction>>>,
|
||||
transaction_listener: RwLock<Vec<TransactionListener<T::Transaction>>>,
|
||||
/// Listener for new blob transaction sidecars added to the pool.
|
||||
blob_transaction_sidecar_listener: Mutex<Vec<BlobTransactionSidecarListener>>,
|
||||
/// Metrics for the blob store
|
||||
@@ -243,7 +243,12 @@ where
|
||||
pub fn add_pending_listener(&self, kind: TransactionListenerKind) -> mpsc::Receiver<TxHash> {
|
||||
let (sender, rx) = mpsc::channel(self.config.pending_tx_listener_buffer_size);
|
||||
let listener = PendingTransactionHashListener { sender, kind };
|
||||
self.pending_transaction_listener.lock().push(listener);
|
||||
|
||||
let mut listeners = self.pending_transaction_listener.write();
|
||||
// Clean up dead listeners before adding new one
|
||||
listeners.retain(|l| !l.sender.is_closed());
|
||||
listeners.push(listener);
|
||||
|
||||
rx
|
||||
}
|
||||
|
||||
@@ -254,7 +259,12 @@ where
|
||||
) -> mpsc::Receiver<NewTransactionEvent<T::Transaction>> {
|
||||
let (sender, rx) = mpsc::channel(self.config.new_tx_listener_buffer_size);
|
||||
let listener = TransactionListener { sender, kind };
|
||||
self.transaction_listener.lock().push(listener);
|
||||
|
||||
let mut listeners = self.transaction_listener.write();
|
||||
// Clean up dead listeners before adding new one
|
||||
listeners.retain(|l| !l.sender.is_closed());
|
||||
listeners.push(listener);
|
||||
|
||||
rx
|
||||
}
|
||||
/// Adds a new blob sidecar listener to the pool that gets notified about every new
|
||||
@@ -475,6 +485,9 @@ where
|
||||
|
||||
/// Add a single validated transaction into the pool.
|
||||
///
|
||||
/// Returns the outcome and optionally metadata to be processed after the pool lock is
|
||||
/// released.
|
||||
///
|
||||
/// Note: this is only used internally by [`Self::add_transactions()`], all new transaction(s)
|
||||
/// come in through that function, either as a batch or `std::iter::once`.
|
||||
fn add_transaction(
|
||||
@@ -482,7 +495,7 @@ where
|
||||
pool: &mut RwLockWriteGuard<'_, TxPool<T>>,
|
||||
origin: TransactionOrigin,
|
||||
tx: TransactionValidationOutcome<T::Transaction>,
|
||||
) -> PoolResult<AddedTransactionOutcome> {
|
||||
) -> (PoolResult<AddedTransactionOutcome>, Option<AddedTransactionMeta<T::Transaction>>) {
|
||||
match tx {
|
||||
TransactionValidationOutcome::Valid {
|
||||
balance,
|
||||
@@ -496,7 +509,7 @@ where
|
||||
let transaction_id = TransactionId::new(sender_id, transaction.nonce());
|
||||
|
||||
// split the valid transaction and the blob sidecar if it has any
|
||||
let (transaction, maybe_sidecar) = match transaction {
|
||||
let (transaction, blob_sidecar) = match transaction {
|
||||
ValidTransaction::Valid(tx) => (tx, None),
|
||||
ValidTransaction::ValidWithSidecar { transaction, sidecar } => {
|
||||
debug_assert!(
|
||||
@@ -516,50 +529,26 @@ where
|
||||
authority_ids: authorities.map(|auths| self.get_sender_ids(auths)),
|
||||
};
|
||||
|
||||
let added = pool.add_transaction(tx, balance, state_nonce, bytecode_hash)?;
|
||||
let added = match pool.add_transaction(tx, balance, state_nonce, bytecode_hash) {
|
||||
Ok(added) => added,
|
||||
Err(err) => return (Err(err), None),
|
||||
};
|
||||
let hash = *added.hash();
|
||||
let state = added.transaction_state();
|
||||
|
||||
// transaction was successfully inserted into the pool
|
||||
if let Some(sidecar) = maybe_sidecar {
|
||||
// notify blob sidecar listeners
|
||||
self.on_new_blob_sidecar(&hash, &sidecar);
|
||||
// store the sidecar in the blob store
|
||||
self.insert_blob(hash, sidecar);
|
||||
}
|
||||
let meta = AddedTransactionMeta { added, blob_sidecar };
|
||||
|
||||
if let Some(replaced) = added.replaced_blob_transaction() {
|
||||
debug!(target: "txpool", "[{:?}] delete replaced blob sidecar", replaced);
|
||||
// delete the replaced transaction from the blob store
|
||||
self.delete_blob(replaced);
|
||||
}
|
||||
|
||||
// Notify about new pending transactions
|
||||
if let Some(pending) = added.as_pending() {
|
||||
self.on_new_pending_transaction(pending);
|
||||
}
|
||||
|
||||
// Notify tx event listeners
|
||||
self.notify_event_listeners(&added);
|
||||
|
||||
if let Some(discarded) = added.discarded_transactions() {
|
||||
self.delete_discarded_blobs(discarded.iter());
|
||||
}
|
||||
|
||||
// Notify listeners for _all_ transactions
|
||||
self.on_new_transaction(added.into_new_transaction_event());
|
||||
|
||||
Ok(AddedTransactionOutcome { hash, state })
|
||||
(Ok(AddedTransactionOutcome { hash, state }), Some(meta))
|
||||
}
|
||||
TransactionValidationOutcome::Invalid(tx, err) => {
|
||||
let mut listener = self.event_listener.write();
|
||||
listener.invalid(tx.hash());
|
||||
Err(PoolError::new(*tx.hash(), err))
|
||||
(Err(PoolError::new(*tx.hash(), err)), None)
|
||||
}
|
||||
TransactionValidationOutcome::Error(tx_hash, err) => {
|
||||
let mut listener = self.event_listener.write();
|
||||
listener.discarded(&tx_hash);
|
||||
Err(PoolError::other(tx_hash, err))
|
||||
(Err(PoolError::other(tx_hash, err)), None)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -580,33 +569,46 @@ where
|
||||
}
|
||||
|
||||
/// Adds all transactions in the iterator to the pool, returning a list of results.
|
||||
///
|
||||
/// Note: A large batch may lock the pool for a long time that blocks important operations
|
||||
/// like updating the pool on canonical state changes. The caller should consider having
|
||||
/// a max batch size to balance transaction insertions with other updates.
|
||||
pub fn add_transactions(
|
||||
&self,
|
||||
origin: TransactionOrigin,
|
||||
transactions: impl IntoIterator<Item = TransactionValidationOutcome<T::Transaction>>,
|
||||
) -> Vec<PoolResult<AddedTransactionOutcome>> {
|
||||
// Process all transactions in one write lock, maintaining individual origins
|
||||
let (mut added, discarded) = {
|
||||
// Collect results and metadata while holding the pool write lock
|
||||
let (mut results, added_metas, discarded) = {
|
||||
let mut pool = self.pool.write();
|
||||
let added = transactions
|
||||
let mut added_metas = Vec::new();
|
||||
|
||||
let results = transactions
|
||||
.into_iter()
|
||||
.map(|tx| self.add_transaction(&mut pool, origin, tx))
|
||||
.map(|tx| {
|
||||
let (result, meta) = self.add_transaction(&mut pool, origin, tx);
|
||||
|
||||
// Only collect metadata for successful insertions
|
||||
if result.is_ok() &&
|
||||
let Some(meta) = meta
|
||||
{
|
||||
added_metas.push(meta);
|
||||
}
|
||||
|
||||
result
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
// Enforce the pool size limits if at least one transaction was added successfully
|
||||
let discarded = if added.iter().any(Result::is_ok) {
|
||||
let discarded = if results.iter().any(Result::is_ok) {
|
||||
pool.discard_worst()
|
||||
} else {
|
||||
Default::default()
|
||||
};
|
||||
|
||||
(added, discarded)
|
||||
(results, added_metas, discarded)
|
||||
};
|
||||
|
||||
for meta in added_metas {
|
||||
self.on_added_transaction(meta);
|
||||
}
|
||||
|
||||
if !discarded.is_empty() {
|
||||
// Delete any blobs associated with discarded blob transactions
|
||||
self.delete_discarded_blobs(discarded.iter());
|
||||
@@ -617,7 +619,7 @@ where
|
||||
|
||||
// A newly added transaction may be immediately discarded, so we need to
|
||||
// adjust the result here
|
||||
for res in &mut added {
|
||||
for res in &mut results {
|
||||
if let Ok(AddedTransactionOutcome { hash, .. }) = res &&
|
||||
discarded_hashes.contains(hash)
|
||||
{
|
||||
@@ -626,7 +628,42 @@ where
|
||||
}
|
||||
};
|
||||
|
||||
added
|
||||
results
|
||||
}
|
||||
|
||||
/// Process a transaction that was added to the pool.
|
||||
///
|
||||
/// Performs blob storage operations and sends all notifications. This should be called
|
||||
/// after the pool write lock has been released to avoid blocking pool operations.
|
||||
fn on_added_transaction(&self, meta: AddedTransactionMeta<T::Transaction>) {
|
||||
// Handle blob sidecar storage and notifications for EIP-4844 transactions
|
||||
if let Some(sidecar) = meta.blob_sidecar {
|
||||
let hash = *meta.added.hash();
|
||||
self.on_new_blob_sidecar(&hash, &sidecar);
|
||||
self.insert_blob(hash, sidecar);
|
||||
}
|
||||
|
||||
// Delete replaced blob sidecar if any
|
||||
if let Some(replaced) = meta.added.replaced_blob_transaction() {
|
||||
debug!(target: "txpool", "[{:?}] delete replaced blob sidecar", replaced);
|
||||
self.delete_blob(replaced);
|
||||
}
|
||||
|
||||
// Delete discarded blob sidecars if any, this doesnt do any IO.
|
||||
if let Some(discarded) = meta.added.discarded_transactions() {
|
||||
self.delete_discarded_blobs(discarded.iter());
|
||||
}
|
||||
|
||||
// Notify pending transaction listeners
|
||||
if let Some(pending) = meta.added.as_pending() {
|
||||
self.on_new_pending_transaction(pending);
|
||||
}
|
||||
|
||||
// Notify event listeners
|
||||
self.notify_event_listeners(&meta.added);
|
||||
|
||||
// Notify new transaction listeners
|
||||
self.on_new_transaction(meta.added.into_new_transaction_event());
|
||||
}
|
||||
|
||||
/// Notify all listeners about a new pending transaction.
|
||||
@@ -638,11 +675,23 @@ where
|
||||
/// [`TransactionPool`](crate::TransactionPool) trait for a custom pool implementation
|
||||
/// [`TransactionPool::pending_transactions_listener_for`](crate::TransactionPool).
|
||||
pub fn on_new_pending_transaction(&self, pending: &AddedPendingTransaction<T::Transaction>) {
|
||||
let mut transaction_listeners = self.pending_transaction_listener.lock();
|
||||
transaction_listeners.retain_mut(|listener| {
|
||||
// broadcast all pending transactions to the listener
|
||||
listener.send_all(pending.pending_transactions(listener.kind))
|
||||
});
|
||||
let mut needs_cleanup = false;
|
||||
|
||||
{
|
||||
let listeners = self.pending_transaction_listener.read();
|
||||
for listener in listeners.iter() {
|
||||
if !listener.send_all(pending.pending_transactions(listener.kind)) {
|
||||
needs_cleanup = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Clean up dead listeners if we detected any closed channels
|
||||
if needs_cleanup {
|
||||
self.pending_transaction_listener
|
||||
.write()
|
||||
.retain(|listener| !listener.sender.is_closed());
|
||||
}
|
||||
}
|
||||
|
||||
/// Notify all listeners about a newly inserted pending transaction.
|
||||
@@ -654,16 +703,29 @@ where
|
||||
/// [`TransactionPool`](crate::TransactionPool) trait for a custom pool implementation
|
||||
/// [`TransactionPool::new_transactions_listener_for`](crate::TransactionPool).
|
||||
pub fn on_new_transaction(&self, event: NewTransactionEvent<T::Transaction>) {
|
||||
let mut transaction_listeners = self.transaction_listener.lock();
|
||||
transaction_listeners.retain_mut(|listener| {
|
||||
if listener.kind.is_propagate_only() && !event.transaction.propagate {
|
||||
// only emit this hash to listeners that are only allowed to receive propagate only
|
||||
// transactions, such as network
|
||||
return !listener.sender.is_closed()
|
||||
}
|
||||
let mut needs_cleanup = false;
|
||||
|
||||
listener.send(event.clone())
|
||||
});
|
||||
{
|
||||
let listeners = self.transaction_listener.read();
|
||||
for listener in listeners.iter() {
|
||||
if listener.kind.is_propagate_only() && !event.transaction.propagate {
|
||||
if listener.sender.is_closed() {
|
||||
needs_cleanup = true;
|
||||
}
|
||||
// Skip non-propagate transactions for propagate-only listeners
|
||||
continue
|
||||
}
|
||||
|
||||
if !listener.send(event.clone()) {
|
||||
needs_cleanup = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Clean up dead listeners if we detected any closed channels
|
||||
if needs_cleanup {
|
||||
self.transaction_listener.write().retain(|listener| !listener.sender.is_closed());
|
||||
}
|
||||
}
|
||||
|
||||
/// Notify all listeners about a blob sidecar for a newly inserted blob (eip4844) transaction.
|
||||
@@ -697,16 +759,33 @@ where
|
||||
fn notify_on_new_state(&self, outcome: OnNewCanonicalStateOutcome<T::Transaction>) {
|
||||
trace!(target: "txpool", promoted=outcome.promoted.len(), discarded= outcome.discarded.len() ,"notifying listeners on state change");
|
||||
|
||||
// notify about promoted pending transactions
|
||||
// emit hashes
|
||||
self.pending_transaction_listener
|
||||
.lock()
|
||||
.retain_mut(|listener| listener.send_all(outcome.pending_transactions(listener.kind)));
|
||||
// notify about promoted pending transactions - emit hashes
|
||||
let mut needs_pending_cleanup = false;
|
||||
{
|
||||
let listeners = self.pending_transaction_listener.read();
|
||||
for listener in listeners.iter() {
|
||||
if !listener.send_all(outcome.pending_transactions(listener.kind)) {
|
||||
needs_pending_cleanup = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
if needs_pending_cleanup {
|
||||
self.pending_transaction_listener.write().retain(|l| !l.sender.is_closed());
|
||||
}
|
||||
|
||||
// emit full transactions
|
||||
self.transaction_listener.lock().retain_mut(|listener| {
|
||||
listener.send_all(outcome.full_pending_transactions(listener.kind))
|
||||
});
|
||||
let mut needs_tx_cleanup = false;
|
||||
{
|
||||
let listeners = self.transaction_listener.read();
|
||||
for listener in listeners.iter() {
|
||||
if !listener.send_all(outcome.full_pending_transactions(listener.kind)) {
|
||||
needs_tx_cleanup = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
if needs_tx_cleanup {
|
||||
self.transaction_listener.write().retain(|l| !l.sender.is_closed());
|
||||
}
|
||||
|
||||
let OnNewCanonicalStateOutcome { mined, promoted, discarded, block_hash } = outcome;
|
||||
|
||||
@@ -742,28 +821,46 @@ where
|
||||
) {
|
||||
// Notify about promoted pending transactions (similar to notify_on_new_state)
|
||||
if !promoted.is_empty() {
|
||||
self.pending_transaction_listener.lock().retain_mut(|listener| {
|
||||
let promoted_hashes = promoted.iter().filter_map(|tx| {
|
||||
if listener.kind.is_propagate_only() && !tx.propagate {
|
||||
None
|
||||
} else {
|
||||
Some(*tx.hash())
|
||||
let mut needs_pending_cleanup = false;
|
||||
{
|
||||
let listeners = self.pending_transaction_listener.read();
|
||||
for listener in listeners.iter() {
|
||||
let promoted_hashes = promoted.iter().filter_map(|tx| {
|
||||
if listener.kind.is_propagate_only() && !tx.propagate {
|
||||
None
|
||||
} else {
|
||||
Some(*tx.hash())
|
||||
}
|
||||
});
|
||||
if !listener.send_all(promoted_hashes) {
|
||||
needs_pending_cleanup = true;
|
||||
}
|
||||
});
|
||||
listener.send_all(promoted_hashes)
|
||||
});
|
||||
}
|
||||
}
|
||||
if needs_pending_cleanup {
|
||||
self.pending_transaction_listener.write().retain(|l| !l.sender.is_closed());
|
||||
}
|
||||
|
||||
// in this case we should also emit promoted transactions in full
|
||||
self.transaction_listener.lock().retain_mut(|listener| {
|
||||
let promoted_txs = promoted.iter().filter_map(|tx| {
|
||||
if listener.kind.is_propagate_only() && !tx.propagate {
|
||||
None
|
||||
} else {
|
||||
Some(NewTransactionEvent::pending(tx.clone()))
|
||||
let mut needs_tx_cleanup = false;
|
||||
{
|
||||
let listeners = self.transaction_listener.read();
|
||||
for listener in listeners.iter() {
|
||||
let promoted_txs = promoted.iter().filter_map(|tx| {
|
||||
if listener.kind.is_propagate_only() && !tx.propagate {
|
||||
None
|
||||
} else {
|
||||
Some(NewTransactionEvent::pending(tx.clone()))
|
||||
}
|
||||
});
|
||||
if !listener.send_all(promoted_txs) {
|
||||
needs_tx_cleanup = true;
|
||||
}
|
||||
});
|
||||
listener.send_all(promoted_txs)
|
||||
});
|
||||
}
|
||||
}
|
||||
if needs_tx_cleanup {
|
||||
self.transaction_listener.write().retain(|l| !l.sender.is_closed());
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
@@ -1125,6 +1222,18 @@ impl<V, T: TransactionOrdering, S> fmt::Debug for PoolInner<V, T, S> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Metadata for a transaction that was added to the pool.
|
||||
///
|
||||
/// This holds all the data needed to complete post-insertion operations (notifications,
|
||||
/// blob storage).
|
||||
#[derive(Debug)]
|
||||
struct AddedTransactionMeta<T: PoolTransaction> {
|
||||
/// The transaction that was added to the pool
|
||||
added: AddedTransaction<T>,
|
||||
/// Optional blob sidecar for EIP-4844 transactions
|
||||
blob_sidecar: Option<BlobTransactionSidecarVariant>,
|
||||
}
|
||||
|
||||
/// Tracks an added transaction and all graph changes caused by adding it.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct AddedPendingTransaction<T: PoolTransaction> {
|
||||
|
||||
@@ -85,8 +85,6 @@ use tracing::{trace, warn};
|
||||
/// new --> |apply state changes| pool
|
||||
/// ```
|
||||
pub struct TxPool<T: TransactionOrdering> {
|
||||
/// Contains the currently known information about the senders.
|
||||
sender_info: FxHashMap<SenderId, SenderInfo>,
|
||||
/// pending subpool
|
||||
///
|
||||
/// Holds transactions that are ready to be executed on the current state.
|
||||
@@ -124,7 +122,6 @@ impl<T: TransactionOrdering> TxPool<T> {
|
||||
/// Create a new graph pool instance.
|
||||
pub fn new(ordering: T, config: PoolConfig) -> Self {
|
||||
Self {
|
||||
sender_info: Default::default(),
|
||||
pending_pool: PendingPool::with_buffer(
|
||||
ordering,
|
||||
config.max_new_pending_txs_notifications,
|
||||
@@ -165,7 +162,7 @@ impl<T: TransactionOrdering> TxPool<T> {
|
||||
let mut last_consecutive_tx = None;
|
||||
|
||||
// ensure this operates on the most recent
|
||||
if let Some(current) = self.sender_info.get(&on_chain.sender) {
|
||||
if let Some(current) = self.all_transactions.sender_info.get(&on_chain.sender) {
|
||||
on_chain.nonce = on_chain.nonce.max(current.state_nonce);
|
||||
}
|
||||
|
||||
@@ -625,7 +622,7 @@ impl<T: TransactionOrdering> TxPool<T> {
|
||||
let updates = self.all_transactions.update(&changed_senders);
|
||||
|
||||
// track changed accounts
|
||||
self.sender_info.extend(changed_senders);
|
||||
self.all_transactions.sender_info.extend(changed_senders);
|
||||
|
||||
// Process the sub-pool updates
|
||||
let update = self.process_updates(updates);
|
||||
@@ -743,7 +740,8 @@ impl<T: TransactionOrdering> TxPool<T> {
|
||||
self.validate_auth(&tx, on_chain_nonce, on_chain_code_hash)?;
|
||||
|
||||
// Update sender info with balance and nonce
|
||||
self.sender_info
|
||||
self.all_transactions
|
||||
.sender_info
|
||||
.entry(tx.sender_id())
|
||||
.or_default()
|
||||
.update(on_chain_nonce, on_chain_balance);
|
||||
@@ -938,6 +936,7 @@ impl<T: TransactionOrdering> TxPool<T> {
|
||||
/// This will move/discard the given transaction according to the `PoolUpdate`
|
||||
fn process_updates(&mut self, updates: Vec<PoolUpdate>) -> UpdateOutcome<T::Transaction> {
|
||||
let mut outcome = UpdateOutcome::default();
|
||||
let mut removed = 0;
|
||||
for PoolUpdate { id, current, destination } in updates {
|
||||
match destination {
|
||||
Destination::Discard => {
|
||||
@@ -945,7 +944,7 @@ impl<T: TransactionOrdering> TxPool<T> {
|
||||
if let Some(tx) = self.prune_transaction_by_id(&id) {
|
||||
outcome.discarded.push(tx);
|
||||
}
|
||||
self.metrics.removed_transactions.increment(1);
|
||||
removed += 1;
|
||||
}
|
||||
Destination::Pool(move_to) => {
|
||||
debug_assert_ne!(&move_to, ¤t, "destination must be different");
|
||||
@@ -960,6 +959,10 @@ impl<T: TransactionOrdering> TxPool<T> {
|
||||
}
|
||||
}
|
||||
|
||||
if removed > 0 {
|
||||
self.metrics.removed_transactions.increment(removed);
|
||||
}
|
||||
|
||||
outcome
|
||||
}
|
||||
|
||||
@@ -1324,6 +1327,8 @@ pub(crate) struct AllTransactions<T: PoolTransaction> {
|
||||
by_hash: HashMap<TxHash, Arc<ValidPoolTransaction<T>>>,
|
||||
/// _All_ transaction in the pool sorted by their sender and nonce pair.
|
||||
txs: BTreeMap<TransactionId, PoolInternalTransaction<T>>,
|
||||
/// Contains the currently known information about the senders.
|
||||
sender_info: FxHashMap<SenderId, SenderInfo>,
|
||||
/// Tracks the number of transactions by sender that are currently in the pool.
|
||||
tx_counter: FxHashMap<SenderId, usize>,
|
||||
/// The current block number the pool keeps track of.
|
||||
@@ -1391,6 +1396,7 @@ impl<T: PoolTransaction> AllTransactions<T> {
|
||||
let count = entry.get_mut();
|
||||
if *count == 1 {
|
||||
entry.remove();
|
||||
self.sender_info.remove(&sender);
|
||||
self.metrics.all_transactions_by_all_senders.decrement(1.0);
|
||||
return
|
||||
}
|
||||
@@ -2128,6 +2134,7 @@ impl<T: PoolTransaction> Default for AllTransactions<T> {
|
||||
block_gas_limit: ETHEREUM_BLOCK_GAS_LIMIT_30M,
|
||||
by_hash: Default::default(),
|
||||
txs: Default::default(),
|
||||
sender_info: Default::default(),
|
||||
tx_counter: Default::default(),
|
||||
last_seen_block_number: Default::default(),
|
||||
last_seen_block_hash: Default::default(),
|
||||
@@ -3614,7 +3621,7 @@ mod tests {
|
||||
// update the tracked nonce
|
||||
let mut info = SenderInfo::default();
|
||||
info.update(8, U256::ZERO);
|
||||
pool.sender_info.insert(sender_id, info);
|
||||
pool.all_transactions.sender_info.insert(sender_id, info);
|
||||
let next_tx =
|
||||
pool.get_highest_consecutive_transaction_by_sender(sender_id.into_transaction_id(5));
|
||||
assert_eq!(next_tx.map(|tx| tx.nonce()), Some(9), "Expected nonce 9 for on-chain nonce 8");
|
||||
|
||||
@@ -76,12 +76,6 @@ pub fn prefix_set_lookups(c: &mut Criterion) {
|
||||
test_data.clone(),
|
||||
size,
|
||||
);
|
||||
prefix_set_bench::<VecBinarySearchPrefixSet>(
|
||||
&mut group,
|
||||
"`Vec` with binary search lookup",
|
||||
test_data.clone(),
|
||||
size,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -207,43 +201,6 @@ mod implementations {
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct VecBinarySearchPrefixSet {
|
||||
keys: Vec<Nibbles>,
|
||||
sorted: bool,
|
||||
}
|
||||
|
||||
impl PrefixSetMutAbstraction for VecBinarySearchPrefixSet {
|
||||
type Frozen = Self;
|
||||
|
||||
fn insert(&mut self, key: Nibbles) {
|
||||
self.sorted = false;
|
||||
self.keys.push(key);
|
||||
}
|
||||
|
||||
fn freeze(self) -> Self::Frozen {
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
impl PrefixSetAbstraction for VecBinarySearchPrefixSet {
|
||||
fn contains(&mut self, prefix: Nibbles) -> bool {
|
||||
if !self.sorted {
|
||||
self.keys.sort();
|
||||
self.sorted = true;
|
||||
}
|
||||
|
||||
match self.keys.binary_search(&prefix) {
|
||||
Ok(_) => true,
|
||||
Err(idx) => match self.keys.get(idx) {
|
||||
Some(key) => key.starts_with(&prefix),
|
||||
None => false, // prefix > last key
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct VecCursorPrefixSet {
|
||||
keys: Vec<Nibbles>,
|
||||
|
||||
@@ -18,10 +18,7 @@ use reth_trie_sparse::{
|
||||
SparseTrieUpdates,
|
||||
};
|
||||
use smallvec::SmallVec;
|
||||
use std::{
|
||||
cmp::{Ord, Ordering, PartialOrd},
|
||||
sync::mpsc,
|
||||
};
|
||||
use std::cmp::{Ord, Ordering, PartialOrd};
|
||||
use tracing::{debug, instrument, trace};
|
||||
|
||||
/// The maximum length of a path, in nibbles, which belongs to the upper subtrie of a
|
||||
@@ -265,11 +262,9 @@ impl SparseTrieInterface for ParallelSparseTrie {
|
||||
})
|
||||
.collect();
|
||||
|
||||
let (tx, rx) = mpsc::channel();
|
||||
|
||||
// Zip the lower subtries and their corresponding node groups, and reveal lower subtrie
|
||||
// nodes in parallel
|
||||
lower_subtries
|
||||
let results: Vec<_> = lower_subtries
|
||||
.into_par_iter()
|
||||
.zip(node_groups.into_par_iter())
|
||||
.map(|((subtrie_idx, mut subtrie), nodes)| {
|
||||
@@ -286,16 +281,12 @@ impl SparseTrieInterface for ParallelSparseTrie {
|
||||
}
|
||||
(subtrie_idx, subtrie, Ok(()))
|
||||
})
|
||||
.for_each_init(|| tx.clone(), |tx, result| tx.send(result).unwrap());
|
||||
.collect();
|
||||
|
||||
drop(tx);
|
||||
|
||||
// Take back all lower subtries which were sent to the rayon pool, collecting the last
|
||||
// seen error in the process and returning that. If we don't fully drain the channel
|
||||
// then we lose lower sparse tries, putting the whole ParallelSparseTrie in an
|
||||
// inconsistent state.
|
||||
// Put subtries back which were processed in the rayon pool, collecting the last
|
||||
// seen error in the process and returning that.
|
||||
let mut any_err = Ok(());
|
||||
for (subtrie_idx, subtrie, res) in rx {
|
||||
for (subtrie_idx, subtrie, res) in results {
|
||||
self.lower_subtries[subtrie_idx] = LowerSparseSubtrie::Revealed(subtrie);
|
||||
if res.is_err() {
|
||||
any_err = res;
|
||||
@@ -745,11 +736,9 @@ impl SparseTrieInterface for ParallelSparseTrie {
|
||||
{
|
||||
use rayon::iter::{IntoParallelIterator, ParallelIterator};
|
||||
|
||||
let (tx, rx) = mpsc::channel();
|
||||
|
||||
let branch_node_tree_masks = &self.branch_node_tree_masks;
|
||||
let branch_node_hash_masks = &self.branch_node_hash_masks;
|
||||
changed_subtries
|
||||
let updated_subtries: Vec<_> = changed_subtries
|
||||
.into_par_iter()
|
||||
.map(|mut changed_subtrie| {
|
||||
#[cfg(feature = "metrics")]
|
||||
@@ -764,10 +753,9 @@ impl SparseTrieInterface for ParallelSparseTrie {
|
||||
self.metrics.subtrie_hash_update_latency.record(start.elapsed());
|
||||
changed_subtrie
|
||||
})
|
||||
.for_each_init(|| tx.clone(), |tx, result| tx.send(result).unwrap());
|
||||
.collect();
|
||||
|
||||
drop(tx);
|
||||
self.insert_changed_subtries(rx);
|
||||
self.insert_changed_subtries(updated_subtries);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -64,6 +64,7 @@ parking_lot.workspace = true
|
||||
pretty_assertions.workspace = true
|
||||
proptest-arbitrary-interop.workspace = true
|
||||
proptest.workspace = true
|
||||
rand.workspace = true
|
||||
|
||||
[features]
|
||||
metrics = ["reth-metrics", "dep:metrics"]
|
||||
@@ -84,6 +85,7 @@ serde = [
|
||||
"revm-state/serde",
|
||||
"parking_lot/serde",
|
||||
"reth-ethereum-primitives/serde",
|
||||
"rand/serde",
|
||||
]
|
||||
test-utils = [
|
||||
"triehash",
|
||||
|
||||
@@ -11,20 +11,19 @@ use reth_trie::{
|
||||
proof_v2::StorageProofCalculator,
|
||||
trie_cursor::{mock::MockTrieCursorFactory, TrieCursorFactory},
|
||||
};
|
||||
use reth_trie_common::{HashedPostState, HashedStorage, Nibbles};
|
||||
use std::collections::BTreeMap;
|
||||
use reth_trie_common::{HashedPostState, HashedStorage};
|
||||
|
||||
/// Generate test data for benchmarking.
|
||||
///
|
||||
/// Returns a tuple of:
|
||||
/// - Hashed address for the storage trie
|
||||
/// - `HashedPostState` with random storage slots
|
||||
/// - Proof targets (Nibbles) that are 80% from existing slots, 20% random
|
||||
/// - Proof targets as B256 (sorted) for V2 implementation
|
||||
/// - Equivalent [`B256Set`] for legacy implementation
|
||||
fn generate_test_data(
|
||||
dataset_size: usize,
|
||||
num_targets: usize,
|
||||
) -> (B256, HashedPostState, Vec<Nibbles>, B256Set) {
|
||||
) -> (B256, HashedPostState, Vec<B256>, B256Set) {
|
||||
let mut runner = TestRunner::deterministic();
|
||||
|
||||
// Use a fixed hashed address for the storage trie
|
||||
@@ -68,14 +67,8 @@ fn generate_test_data(
|
||||
|
||||
let target_b256s = targets_strategy.new_tree(&mut runner).unwrap().current();
|
||||
|
||||
// Convert B256 targets to sorted Nibbles for V2
|
||||
let mut targets: Vec<Nibbles> = target_b256s
|
||||
.iter()
|
||||
.map(|b256| {
|
||||
// SAFETY: B256 is exactly 32 bytes
|
||||
unsafe { Nibbles::unpack_unchecked(b256.as_slice()) }
|
||||
})
|
||||
.collect();
|
||||
// Sort B256 targets for V2 (storage_proof expects sorted targets)
|
||||
let mut targets: Vec<B256> = target_b256s.clone();
|
||||
targets.sort();
|
||||
|
||||
// Create B256Set for legacy
|
||||
@@ -86,19 +79,42 @@ fn generate_test_data(
|
||||
|
||||
/// Create cursor factories from a `HashedPostState` for storage trie testing.
|
||||
///
|
||||
/// This mimics the test harness pattern from the `proof_v2` tests.
|
||||
/// This mimics the test harness pattern from the `proof_v2` tests by using `StateRoot`
|
||||
/// to generate `TrieUpdates` from the `HashedPostState`.
|
||||
fn create_cursor_factories(
|
||||
post_state: &HashedPostState,
|
||||
) -> (MockTrieCursorFactory, MockHashedCursorFactory) {
|
||||
// Ensure that there's a storage trie dataset for every storage trie, even if empty
|
||||
let storage_trie_nodes: B256Map<BTreeMap<_, _>> =
|
||||
post_state.storages.keys().copied().map(|addr| (addr, Default::default())).collect();
|
||||
use reth_trie::{updates::StorageTrieUpdates, StateRoot};
|
||||
|
||||
// Create empty trie cursor factory to serve as the initial state for StateRoot
|
||||
// Ensure that there's a storage trie dataset for every storage account
|
||||
let storage_tries: B256Map<_> = post_state
|
||||
.storages
|
||||
.keys()
|
||||
.copied()
|
||||
.map(|addr| (addr, StorageTrieUpdates::default()))
|
||||
.collect();
|
||||
|
||||
let empty_trie_cursor_factory =
|
||||
MockTrieCursorFactory::from_trie_updates(reth_trie_common::updates::TrieUpdates {
|
||||
storage_tries: storage_tries.clone(),
|
||||
..Default::default()
|
||||
});
|
||||
|
||||
// Create mock hashed cursor factory from the post state
|
||||
let hashed_cursor_factory = MockHashedCursorFactory::from_hashed_post_state(post_state.clone());
|
||||
|
||||
// Create empty trie cursor factory (leaf-only calculator doesn't need trie nodes)
|
||||
let trie_cursor_factory = MockTrieCursorFactory::new(BTreeMap::new(), storage_trie_nodes);
|
||||
// Generate TrieUpdates using StateRoot
|
||||
let (_root, mut trie_updates) =
|
||||
StateRoot::new(empty_trie_cursor_factory, hashed_cursor_factory.clone())
|
||||
.root_with_updates()
|
||||
.expect("StateRoot should succeed");
|
||||
|
||||
// Continue using empty storage tries for each account
|
||||
trie_updates.storage_tries = storage_tries;
|
||||
|
||||
// Initialize trie cursor factory from the generated TrieUpdates
|
||||
let trie_cursor_factory = MockTrieCursorFactory::from_trie_updates(trie_updates);
|
||||
|
||||
(trie_cursor_factory, hashed_cursor_factory)
|
||||
}
|
||||
@@ -148,7 +164,7 @@ fn bench_proof_algos(c: &mut Criterion) {
|
||||
|| targets.clone(),
|
||||
|targets| {
|
||||
proof_calculator
|
||||
.storage_proof(hashed_address, targets.into_iter())
|
||||
.storage_proof(hashed_address, targets)
|
||||
.expect("Proof generation failed");
|
||||
},
|
||||
BatchSize::SmallInput,
|
||||
|
||||
@@ -48,7 +48,7 @@ impl MockHashedCursorFactory {
|
||||
.collect();
|
||||
|
||||
// Extract storages from post state
|
||||
let hashed_storages: B256Map<BTreeMap<B256, U256>> = post_state
|
||||
let mut hashed_storages: B256Map<BTreeMap<B256, U256>> = post_state
|
||||
.storages
|
||||
.into_iter()
|
||||
.map(|(addr, hashed_storage)| {
|
||||
@@ -62,6 +62,11 @@ impl MockHashedCursorFactory {
|
||||
})
|
||||
.collect();
|
||||
|
||||
// Ensure all accounts have at least an empty storage
|
||||
for account in hashed_accounts.keys() {
|
||||
hashed_storages.entry(*account).or_default();
|
||||
}
|
||||
|
||||
Self::new(hashed_accounts, hashed_storages)
|
||||
}
|
||||
|
||||
|
||||
@@ -428,15 +428,21 @@ mod tests {
|
||||
|
||||
/// Generate a sorted vector of (B256, U256) entries (including deletions as ZERO)
|
||||
fn sorted_post_state_nodes_strategy() -> impl Strategy<Value = Vec<(B256, U256)>> {
|
||||
prop::collection::vec((any::<u8>(), u256_strategy()), 0..20).prop_map(|entries| {
|
||||
let mut result: Vec<(B256, U256)> = entries
|
||||
.into_iter()
|
||||
.map(|(byte, value)| (B256::repeat_byte(byte), value))
|
||||
.collect();
|
||||
result.sort_by(|a, b| a.0.cmp(&b.0));
|
||||
result.dedup_by(|a, b| a.0 == b.0);
|
||||
result
|
||||
})
|
||||
// Explicitly inject ZERO values to model post-state deletions.
|
||||
prop::collection::vec((any::<u8>(), u256_strategy(), any::<bool>()), 0..20).prop_map(
|
||||
|entries| {
|
||||
let mut result: Vec<(B256, U256)> = entries
|
||||
.into_iter()
|
||||
.map(|(byte, value, is_deletion)| {
|
||||
let effective_value = if is_deletion { U256::ZERO } else { value };
|
||||
(B256::repeat_byte(byte), effective_value)
|
||||
})
|
||||
.collect();
|
||||
result.sort_by(|a, b| a.0.cmp(&b.0));
|
||||
result.dedup_by(|a, b| a.0 == b.0);
|
||||
result
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
proptest! {
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -25,7 +25,12 @@ pub(crate) enum ProofTrieBranchChild<RF> {
|
||||
child: RlpNode,
|
||||
},
|
||||
/// A branch node whose children have already been flattened into [`RlpNode`]s.
|
||||
Branch(BranchNode),
|
||||
Branch {
|
||||
/// The node itself, for use during RLP encoding.
|
||||
node: BranchNode,
|
||||
/// Bitmasks carried over from cached `BranchNodeCompact` values, if any.
|
||||
masks: TrieMasks,
|
||||
},
|
||||
/// A node whose type is not known, as it has already been converted to an [`RlpNode`].
|
||||
RlpNode(RlpNode),
|
||||
}
|
||||
@@ -64,7 +69,7 @@ impl<RF: DeferredValueEncoder> ProofTrieBranchChild<RF> {
|
||||
ExtensionNodeRef::new(&short_key, child.as_slice()).encode(buf);
|
||||
Ok((RlpNode::from_rlp(buf), None))
|
||||
}
|
||||
Self::Branch(branch_node) => {
|
||||
Self::Branch { node: branch_node, .. } => {
|
||||
branch_node.encode(buf);
|
||||
Ok((RlpNode::from_rlp(buf), Some(branch_node.stack)))
|
||||
}
|
||||
@@ -98,8 +103,7 @@ impl<RF: DeferredValueEncoder> ProofTrieBranchChild<RF> {
|
||||
Self::Extension { short_key, child } => {
|
||||
(TrieNode::Extension(ExtensionNode { key: short_key, child }), TrieMasks::none())
|
||||
}
|
||||
// TODO store trie masks on branch
|
||||
Self::Branch(branch_node) => (TrieNode::Branch(branch_node), TrieMasks::none()),
|
||||
Self::Branch { node, masks } => (TrieNode::Branch(node), masks),
|
||||
Self::RlpNode(_) => panic!("Cannot call `into_proof_trie_node` on RlpNode"),
|
||||
};
|
||||
|
||||
@@ -111,7 +115,7 @@ impl<RF: DeferredValueEncoder> ProofTrieBranchChild<RF> {
|
||||
pub(crate) fn short_key(&self) -> &Nibbles {
|
||||
match self {
|
||||
Self::Leaf { short_key, .. } | Self::Extension { short_key, .. } => short_key,
|
||||
Self::Branch(_) | Self::RlpNode(_) => {
|
||||
Self::Branch { .. } | Self::RlpNode(_) => {
|
||||
static EMPTY_NIBBLES: Nibbles = Nibbles::new();
|
||||
&EMPTY_NIBBLES
|
||||
}
|
||||
@@ -136,7 +140,7 @@ impl<RF: DeferredValueEncoder> ProofTrieBranchChild<RF> {
|
||||
Self::Leaf { short_key, .. } | Self::Extension { short_key, .. } => {
|
||||
*short_key = trim_nibbles_prefix(short_key, len);
|
||||
}
|
||||
Self::Branch(_) | Self::RlpNode(_) => {
|
||||
Self::Branch { .. } | Self::RlpNode(_) => {
|
||||
panic!("Cannot call `trim_short_key_prefix` on Branch or RlpNode")
|
||||
}
|
||||
}
|
||||
@@ -153,14 +157,8 @@ pub(crate) struct ProofTrieBranch {
|
||||
/// A mask tracking which child nibbles are set on the branch so far. There will be a single
|
||||
/// child on the stack for each set bit.
|
||||
pub(crate) state_mask: TrieMask,
|
||||
/// A subset of `state_mask`. Each bit is set if the `state_mask` bit is set and:
|
||||
/// - The child is a branch which is stored in the DB.
|
||||
/// - The child is an extension whose child branch is stored in the DB.
|
||||
#[expect(unused)]
|
||||
pub(crate) tree_mask: TrieMask,
|
||||
/// A subset of `state_mask`. Each bit is set if the hash for the child is cached in the DB.
|
||||
#[expect(unused)]
|
||||
pub(crate) hash_mask: TrieMask,
|
||||
/// Bitmasks which are subsets of `state_mask`.
|
||||
pub(crate) masks: TrieMasks,
|
||||
}
|
||||
|
||||
/// Trims the first `len` nibbles from the head of the given `Nibbles`.
|
||||
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user