mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-04-30 03:01:58 -04:00
Compare commits
1 Commits
main
...
dan/static
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
db6dd12b8b |
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -7740,6 +7740,7 @@ dependencies = [
|
||||
"reth-network",
|
||||
"reth-network-p2p",
|
||||
"reth-network-peers",
|
||||
"reth-nippy-jar",
|
||||
"reth-node-api",
|
||||
"reth-node-builder",
|
||||
"reth-node-core",
|
||||
|
||||
@@ -31,6 +31,7 @@ reth-etl.workspace = true
|
||||
reth-evm.workspace = true
|
||||
reth-exex.workspace = true
|
||||
reth-fs-util.workspace = true
|
||||
reth-nippy-jar.workspace = true
|
||||
reth-net-nat.workspace = true
|
||||
reth-network = { workspace = true, features = ["serde"] }
|
||||
reth-network-p2p.workspace = true
|
||||
|
||||
269
crates/cli/commands/src/db/extract.rs
Normal file
269
crates/cli/commands/src/db/extract.rs
Normal file
@@ -0,0 +1,269 @@
|
||||
use clap::Parser;
|
||||
use itertools::Itertools;
|
||||
use reth_db::static_file::iter_static_files;
|
||||
use reth_nippy_jar::{NippyJar, NippyJarWriter};
|
||||
use reth_provider::{providers::ProviderNodeTypes, StaticFileProviderFactory};
|
||||
use reth_static_file_types::{
|
||||
ChangesetOffset, ChangesetOffsetWriter, SegmentHeader, SegmentRangeInclusive, StaticFileSegment,
|
||||
};
|
||||
use std::{fs, path::PathBuf};
|
||||
use tracing::{info, warn};
|
||||
|
||||
/// The arguments for the `reth db extract` command
|
||||
#[derive(Parser, Debug)]
|
||||
pub struct Command {
|
||||
/// Number of most recent blocks to extract from each segment.
|
||||
#[arg(long, short)]
|
||||
num_blocks: u64,
|
||||
|
||||
/// Output directory for the extracted static files.
|
||||
#[arg(long, short)]
|
||||
output: PathBuf,
|
||||
|
||||
/// Only extract specific segments (default: all).
|
||||
#[arg(long, value_enum)]
|
||||
segments: Vec<StaticFileSegment>,
|
||||
}
|
||||
|
||||
impl Command {
|
||||
/// Execute `db extract` command
|
||||
pub fn execute<N: ProviderNodeTypes>(
|
||||
self,
|
||||
tool: &reth_db_common::DbTool<N>,
|
||||
) -> eyre::Result<()> {
|
||||
eyre::ensure!(self.num_blocks > 0, "num_blocks must be greater than 0");
|
||||
|
||||
let static_file_provider = tool.provider_factory.static_file_provider();
|
||||
|
||||
// Ensure the output directory is not the same as the source static files directory
|
||||
let source_dir = fs::canonicalize(static_file_provider.directory())?;
|
||||
reth_fs_util::create_dir_all(&self.output)?;
|
||||
let output_dir = fs::canonicalize(&self.output)?;
|
||||
eyre::ensure!(
|
||||
source_dir != output_dir,
|
||||
"Output directory must differ from the source static files directory"
|
||||
);
|
||||
|
||||
if let Err(err) = static_file_provider.check_consistency(&tool.provider_factory.provider()?)
|
||||
{
|
||||
warn!("Error checking consistency of static files: {err}");
|
||||
}
|
||||
|
||||
let all_static_files = iter_static_files(static_file_provider.directory())?;
|
||||
|
||||
let segments: Vec<StaticFileSegment> = if self.segments.is_empty() {
|
||||
StaticFileSegment::iter().collect()
|
||||
} else {
|
||||
self.segments.clone()
|
||||
};
|
||||
|
||||
for segment in segments {
|
||||
let ranges = match all_static_files.get(segment) {
|
||||
Some(ranges) if !ranges.is_empty() => ranges,
|
||||
_ => {
|
||||
info!("No static files found for segment: {segment}, skipping");
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
// Find the highest block across all jars for this segment
|
||||
let highest_block = ranges.iter().map(|(range, _)| range.end()).max().unwrap();
|
||||
|
||||
let start_block = highest_block.saturating_sub(self.num_blocks - 1);
|
||||
|
||||
info!(
|
||||
"Extracting {segment} blocks {start_block}..={highest_block} to {}",
|
||||
self.output.display()
|
||||
);
|
||||
|
||||
let output_range = SegmentRangeInclusive::new(start_block, highest_block);
|
||||
let output_path = self.output.join(segment.filename(&output_range));
|
||||
|
||||
// Create a new jar matching the source segment's configuration
|
||||
let header = SegmentHeader::new(output_range, None, None, segment);
|
||||
let mut jar = NippyJar::new(segment.columns(), &output_path, header);
|
||||
|
||||
// Match compression from the source
|
||||
if segment.is_headers() {
|
||||
jar = jar.with_lz4();
|
||||
}
|
||||
|
||||
let mut writer = NippyJarWriter::new(jar)?;
|
||||
|
||||
// Changeset offsets to write for change-based segments
|
||||
let mut changeset_offsets: Vec<ChangesetOffset> = Vec::new();
|
||||
|
||||
// Track block/tx ranges for the output header
|
||||
let mut first_block = None;
|
||||
let mut last_block = None;
|
||||
let mut first_tx = None;
|
||||
let mut last_tx = None;
|
||||
let mut total_rows: u64 = 0;
|
||||
|
||||
// Iterate source jars in block order, extracting rows in range
|
||||
let sorted_ranges = ranges.iter().sorted_by_key(|(range, _)| range.start());
|
||||
|
||||
for (block_range, _source_header) in sorted_ranges {
|
||||
// Skip jars entirely before our range
|
||||
if block_range.end() < start_block {
|
||||
continue;
|
||||
}
|
||||
// Stop if jar starts after our range
|
||||
if block_range.start() > highest_block {
|
||||
break;
|
||||
}
|
||||
|
||||
let fixed_block_range =
|
||||
static_file_provider.find_fixed_range(segment, block_range.start());
|
||||
|
||||
let jar_provider = static_file_provider
|
||||
.get_segment_provider_for_range(segment, || Some(fixed_block_range), None)?
|
||||
.ok_or_else(|| {
|
||||
eyre::eyre!(
|
||||
"Failed to get segment provider for {segment} at range {block_range}"
|
||||
)
|
||||
})?;
|
||||
|
||||
let source_header = jar_provider.user_header().clone();
|
||||
let mut cursor = jar_provider.cursor()?;
|
||||
|
||||
if segment.is_change_based() {
|
||||
// For change-based segments, we need the offsets to know which rows
|
||||
// belong to which block
|
||||
let offsets = jar_provider.read_changeset_offsets()?.ok_or_else(|| {
|
||||
eyre::eyre!(
|
||||
"Missing changeset offsets for {segment} at range {block_range}"
|
||||
)
|
||||
})?;
|
||||
|
||||
for (offset_index, offset) in offsets.iter().enumerate() {
|
||||
let block_number = block_range.start() + offset_index as u64;
|
||||
let in_range = block_number >= start_block && block_number <= highest_block;
|
||||
|
||||
if in_range {
|
||||
// Track the row offset in the output jar
|
||||
let new_offset = ChangesetOffset::new(total_rows, offset.num_changes());
|
||||
changeset_offsets.push(new_offset);
|
||||
|
||||
if first_block.is_none() {
|
||||
first_block = Some(block_number);
|
||||
}
|
||||
last_block = Some(block_number);
|
||||
}
|
||||
|
||||
for _ in 0..offset.num_changes() {
|
||||
let row = cursor.next_row()?.ok_or_else(|| {
|
||||
eyre::eyre!("Unexpected EOF in {segment} at {block_range}")
|
||||
})?;
|
||||
|
||||
if in_range {
|
||||
for col in &row {
|
||||
writer.append_column(Some(Ok(col)))?;
|
||||
}
|
||||
total_rows += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
} else if segment.is_tx_based() {
|
||||
// For tx-based segments (Transactions, Receipts, TransactionSenders),
|
||||
// rows correspond to transaction numbers. Since we can't map blocks to
|
||||
// tx rows without body indices, we include all rows from overlapping jars.
|
||||
// The output block range is set to the full coverage of overlapping jars.
|
||||
let source_block_range = source_header.block_range().ok_or_else(|| {
|
||||
eyre::eyre!("Missing block range in {segment} header at {block_range}")
|
||||
})?;
|
||||
let source_tx_range = source_header.tx_range().ok_or_else(|| {
|
||||
eyre::eyre!("Missing tx range in {segment} header at {block_range}")
|
||||
})?;
|
||||
|
||||
let mut row_index: u64 = 0;
|
||||
while let Some(row) = cursor.next_row()? {
|
||||
let tx_num = source_tx_range.start() + row_index;
|
||||
|
||||
for col in &row {
|
||||
writer.append_column(Some(Ok(col)))?;
|
||||
}
|
||||
|
||||
if first_tx.is_none() {
|
||||
first_tx = Some(tx_num);
|
||||
}
|
||||
last_tx = Some(tx_num);
|
||||
total_rows += 1;
|
||||
row_index += 1;
|
||||
}
|
||||
|
||||
// Use the full source jar's block range (not clamped), since we copy
|
||||
// all rows from overlapping jars
|
||||
if first_block.is_none() {
|
||||
first_block = Some(source_block_range.start());
|
||||
}
|
||||
last_block = Some(source_block_range.end());
|
||||
} else {
|
||||
// Block-based segments (Headers) - one row per block
|
||||
let mut block_number = block_range.start();
|
||||
while let Some(row) = cursor.next_row()? {
|
||||
let actual_block = match source_header.block_range() {
|
||||
Some(br) => br.start() + (block_number - block_range.start()),
|
||||
None => block_number,
|
||||
};
|
||||
|
||||
if actual_block >= start_block && actual_block <= highest_block {
|
||||
for col in &row {
|
||||
writer.append_column(Some(Ok(col)))?;
|
||||
}
|
||||
total_rows += 1;
|
||||
|
||||
if first_block.is_none() {
|
||||
first_block = Some(actual_block);
|
||||
}
|
||||
last_block = Some(actual_block);
|
||||
}
|
||||
|
||||
block_number += 1;
|
||||
}
|
||||
}
|
||||
|
||||
// Drop provider before removing from cache
|
||||
drop(jar_provider);
|
||||
static_file_provider.remove_cached_provider(segment, fixed_block_range.end());
|
||||
}
|
||||
|
||||
// Update the output header with actual ranges
|
||||
{
|
||||
let header = writer.user_header_mut();
|
||||
if let (Some(first), Some(last)) = (first_block, last_block) {
|
||||
header.set_block_range(first, last);
|
||||
}
|
||||
if let (Some(first), Some(last)) = (first_tx, last_tx) {
|
||||
header.set_tx_range(first, last);
|
||||
}
|
||||
if segment.is_change_based() {
|
||||
header.set_changeset_offsets_len(changeset_offsets.len() as u64);
|
||||
}
|
||||
}
|
||||
|
||||
// For change-based segments, write the sidecar *before* finalizing the header.
|
||||
// This matches the durability contract: sidecar first, header last.
|
||||
writer.sync_all()?;
|
||||
|
||||
if segment.is_change_based() && !changeset_offsets.is_empty() {
|
||||
let csoff_path = output_path.with_extension("csoff");
|
||||
let mut csoff_writer = ChangesetOffsetWriter::new(&csoff_path, 0)?;
|
||||
csoff_writer.append_many(&changeset_offsets)?;
|
||||
csoff_writer.sync()?;
|
||||
}
|
||||
|
||||
writer.finalize()?;
|
||||
|
||||
info!(
|
||||
"Extracted {total_rows} rows for {segment} (blocks {}..={})",
|
||||
first_block.unwrap_or(0),
|
||||
last_block.unwrap_or(0)
|
||||
);
|
||||
}
|
||||
|
||||
info!("Extraction complete. Output: {}", self.output.display());
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -14,6 +14,7 @@ mod checksum;
|
||||
mod clear;
|
||||
mod copy;
|
||||
mod diff;
|
||||
mod extract;
|
||||
mod get;
|
||||
mod list;
|
||||
mod prune_checkpoints;
|
||||
@@ -49,6 +50,8 @@ pub enum Subcommands {
|
||||
Copy(copy::Command),
|
||||
/// Create a diff between two database tables or two entire databases.
|
||||
Diff(diff::Command),
|
||||
/// Extracts the most recent N blocks of data from each static file segment
|
||||
Extract(extract::Command),
|
||||
/// Gets the content of a table for the given key
|
||||
Get(get::Command),
|
||||
/// Deletes all database entries
|
||||
@@ -144,6 +147,11 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> Command<C>
|
||||
command.execute(&tool)?;
|
||||
});
|
||||
}
|
||||
Subcommands::Extract(command) => {
|
||||
db_exec!(self.env, tool, N, AccessRights::RO, {
|
||||
command.execute(&tool)?;
|
||||
});
|
||||
}
|
||||
Subcommands::Get(command) => {
|
||||
db_exec!(self.env, tool, N, AccessRights::RO, {
|
||||
command.execute(&tool)?;
|
||||
|
||||
@@ -15,6 +15,7 @@
|
||||
- [`reth db checksum rocksdb`](./reth/db/checksum/rocksdb.mdx)
|
||||
- [`reth db copy`](./reth/db/copy.mdx)
|
||||
- [`reth db diff`](./reth/db/diff.mdx)
|
||||
- [`reth db extract`](./reth/db/extract.mdx)
|
||||
- [`reth db get`](./reth/db/get.mdx)
|
||||
- [`reth db get mdbx`](./reth/db/get/mdbx.mdx)
|
||||
- [`reth db get static-file`](./reth/db/get/static-file.mdx)
|
||||
|
||||
@@ -14,6 +14,7 @@ Commands:
|
||||
checksum Calculates the content checksum of a table or static file segment
|
||||
copy Copies the MDBX database to a new location (bundled mdbx_copy)
|
||||
diff Create a diff between two database tables or two entire databases
|
||||
extract Extracts the most recent N blocks of data from each static file segment
|
||||
get Gets the content of a table for the given key
|
||||
drop Deletes all database entries
|
||||
clear Deletes all table entries
|
||||
|
||||
183
docs/vocs/docs/pages/cli/reth/db/extract.mdx
Normal file
183
docs/vocs/docs/pages/cli/reth/db/extract.mdx
Normal file
@@ -0,0 +1,183 @@
|
||||
# reth db extract
|
||||
|
||||
Extracts the most recent N blocks of data from each static file segment
|
||||
|
||||
```bash
|
||||
$ reth db extract --help
|
||||
```
|
||||
```txt
|
||||
Usage: reth db extract [OPTIONS] --num-blocks <NUM_BLOCKS> --output <OUTPUT>
|
||||
|
||||
Options:
|
||||
-n, --num-blocks <NUM_BLOCKS>
|
||||
Number of most recent blocks to extract from each segment
|
||||
|
||||
-o, --output <OUTPUT>
|
||||
Output directory for the extracted static files
|
||||
|
||||
--segments <SEGMENTS>
|
||||
Only extract specific segments (default: all)
|
||||
|
||||
Possible values:
|
||||
- headers: Static File segment responsible for the `CanonicalHeaders`, `Headers`, `HeaderTerminalDifficulties` tables
|
||||
- transactions: Static File segment responsible for the `Transactions` table
|
||||
- receipts: Static File segment responsible for the `Receipts` table
|
||||
- transaction-senders: Static File segment responsible for the `TransactionSenders` table
|
||||
- account-change-sets: Static File segment responsible for the `AccountChangeSets` table
|
||||
- storage-change-sets: Static File segment responsible for the `StorageChangeSets` table
|
||||
|
||||
-h, --help
|
||||
Print help (see a summary with '-h')
|
||||
|
||||
Datadir:
|
||||
--chain <CHAIN_OR_PATH>
|
||||
The chain this node is running.
|
||||
Possible values are either a built-in chain or the path to a chain specification file.
|
||||
|
||||
Built-in chains:
|
||||
mainnet, sepolia, holesky, hoodi, dev
|
||||
|
||||
[default: mainnet]
|
||||
|
||||
Logging:
|
||||
--log.stdout.format <FORMAT>
|
||||
The format to use for logs written to stdout
|
||||
|
||||
Possible values:
|
||||
- json: Represents JSON formatting for logs. This format outputs log records as JSON objects, making it suitable for structured logging
|
||||
- log-fmt: Represents logfmt (key=value) formatting for logs. This format is concise and human-readable, typically used in command-line applications
|
||||
- terminal: Represents terminal-friendly formatting for logs
|
||||
|
||||
[default: terminal]
|
||||
|
||||
--log.stdout.filter <FILTER>
|
||||
The filter to use for logs written to stdout
|
||||
|
||||
[default: ""]
|
||||
|
||||
--log.file.format <FORMAT>
|
||||
The format to use for logs written to the log file
|
||||
|
||||
Possible values:
|
||||
- json: Represents JSON formatting for logs. This format outputs log records as JSON objects, making it suitable for structured logging
|
||||
- log-fmt: Represents logfmt (key=value) formatting for logs. This format is concise and human-readable, typically used in command-line applications
|
||||
- terminal: Represents terminal-friendly formatting for logs
|
||||
|
||||
[default: terminal]
|
||||
|
||||
--log.file.filter <FILTER>
|
||||
The filter to use for logs written to the log file
|
||||
|
||||
[default: debug]
|
||||
|
||||
--log.file.directory <PATH>
|
||||
The path to put log files in
|
||||
|
||||
[default: <CACHE_DIR>/logs]
|
||||
|
||||
--log.file.name <NAME>
|
||||
The prefix name of the log files
|
||||
|
||||
[default: reth.log]
|
||||
|
||||
--log.file.max-size <SIZE>
|
||||
The maximum size (in MB) of one log file
|
||||
|
||||
[default: 200]
|
||||
|
||||
--log.file.max-files <COUNT>
|
||||
The maximum amount of log files that will be stored. If set to 0, background file logging is disabled.
|
||||
|
||||
Default: 5 for `node` command, 0 for non-node utility subcommands.
|
||||
|
||||
--log.journald
|
||||
Write logs to journald
|
||||
|
||||
--log.journald.filter <FILTER>
|
||||
The filter to use for logs written to journald
|
||||
|
||||
[default: error]
|
||||
|
||||
--color <COLOR>
|
||||
Sets whether or not the formatter emits ANSI terminal escape codes for colors and other text formatting
|
||||
|
||||
Possible values:
|
||||
- always: Colors on
|
||||
- auto: Auto-detect
|
||||
- never: Colors off
|
||||
|
||||
[default: always]
|
||||
|
||||
--logs-otlp[=<URL>]
|
||||
Enable `Opentelemetry` logs export to an OTLP endpoint.
|
||||
|
||||
If no value provided, defaults based on protocol: - HTTP: `http://localhost:4318/v1/logs` - gRPC: `http://localhost:4317`
|
||||
|
||||
Example: --logs-otlp=http://collector:4318/v1/logs
|
||||
|
||||
[env: OTEL_EXPORTER_OTLP_LOGS_ENDPOINT=]
|
||||
|
||||
--logs-otlp.filter <FILTER>
|
||||
Set a filter directive for the OTLP logs exporter. This controls the verbosity of logs sent to the OTLP endpoint. It follows the same syntax as the `RUST_LOG` environment variable.
|
||||
|
||||
Example: --logs-otlp.filter=info,reth=debug
|
||||
|
||||
Defaults to INFO if not specified.
|
||||
|
||||
[default: info]
|
||||
|
||||
Display:
|
||||
-v, --verbosity...
|
||||
Set the minimum log level.
|
||||
|
||||
-v Errors
|
||||
-vv Warnings
|
||||
-vvv Info
|
||||
-vvvv Debug
|
||||
-vvvvv Traces (warning: very verbose!)
|
||||
|
||||
-q, --quiet
|
||||
Silence all log output
|
||||
|
||||
Tracing:
|
||||
--tracing-otlp[=<URL>]
|
||||
Enable `Opentelemetry` tracing export to an OTLP endpoint.
|
||||
|
||||
If no value provided, defaults based on protocol: - HTTP: `http://localhost:4318/v1/traces` - gRPC: `http://localhost:4317`
|
||||
|
||||
Example: --tracing-otlp=http://collector:4318/v1/traces
|
||||
|
||||
[env: OTEL_EXPORTER_OTLP_TRACES_ENDPOINT=]
|
||||
|
||||
--tracing-otlp-protocol <PROTOCOL>
|
||||
OTLP transport protocol to use for exporting traces and logs.
|
||||
|
||||
- `http`: expects endpoint path to end with `/v1/traces` or `/v1/logs` - `grpc`: expects endpoint without a path
|
||||
|
||||
Defaults to HTTP if not specified.
|
||||
|
||||
Possible values:
|
||||
- http: HTTP/Protobuf transport, port 4318, requires `/v1/traces` path
|
||||
- grpc: gRPC transport, port 4317
|
||||
|
||||
[env: OTEL_EXPORTER_OTLP_PROTOCOL=]
|
||||
[default: http]
|
||||
|
||||
--tracing-otlp.filter <FILTER>
|
||||
Set a filter directive for the OTLP tracer. This controls the verbosity of spans and events sent to the OTLP endpoint. It follows the same syntax as the `RUST_LOG` environment variable.
|
||||
|
||||
Example: --tracing-otlp.filter=info,reth=debug,hyper_util=off
|
||||
|
||||
Defaults to TRACE if not specified.
|
||||
|
||||
[default: debug]
|
||||
|
||||
--tracing-otlp.sample-ratio <RATIO>
|
||||
Trace sampling ratio to control the percentage of traces to export.
|
||||
|
||||
Valid range: 0.0 to 1.0 - 1.0, default: Sample all traces - 0.01: Sample 1% of traces - 0.0: Disable sampling
|
||||
|
||||
Example: --tracing-otlp.sample-ratio=0.0.
|
||||
|
||||
[env: OTEL_TRACES_SAMPLER_ARG=]
|
||||
```
|
||||
@@ -73,6 +73,10 @@ export const rethCliSidebar: SidebarItem = {
|
||||
text: "reth db diff",
|
||||
link: "/cli/reth/db/diff"
|
||||
},
|
||||
{
|
||||
text: "reth db extract",
|
||||
link: "/cli/reth/db/extract"
|
||||
},
|
||||
{
|
||||
text: "reth db get",
|
||||
link: "/cli/reth/db/get",
|
||||
|
||||
Reference in New Issue
Block a user