Merge remote-tracking branch 'origin/alexey/snapshot-stage' into alexey/snapshots-all

This commit is contained in:
Alexey Shekhirin
2024-01-29 18:44:08 +00:00
4 changed files with 10 additions and 5 deletions

1
Cargo.lock generated
View File

@@ -6830,6 +6830,7 @@ version = "0.1.0-alpha.16"
dependencies = [
"assert_matches",
"clap",
"rayon",
"reth-db",
"reth-interfaces",
"reth-nippy-jar",

View File

@@ -26,6 +26,7 @@ tokio = { workspace = true, features = ["sync"] }
thiserror.workspace = true
tracing.workspace = true
clap = { workspace = true, features = ["derive"], optional = true }
rayon.workspace = true
[dev-dependencies]
# reth

View File

@@ -27,7 +27,7 @@ use std::{ops::RangeInclusive, path::Path, sync::Arc};
pub(crate) type Rows<const COLUMNS: usize> = [Vec<Vec<u8>>; COLUMNS];
/// A segment represents a snapshotting of some portion of the data.
pub trait Segment<DB: Database> {
pub trait Segment<DB: Database>: Send + Sync {
/// Returns the [`SnapshotSegment`].
fn segment(&self) -> SnapshotSegment;

View File

@@ -1,6 +1,7 @@
//! Support for snapshotting.
use crate::{segments, segments::Segment};
use rayon::prelude::*;
use reth_db::database::Database;
use reth_interfaces::RethResult;
use reth_primitives::{snapshot::HighestSnapshots, BlockNumber, PruneModes};
@@ -77,7 +78,7 @@ impl<DB: Database> Snapshotter<DB> {
///
/// For each [Some] target in [SnapshotTargets], initializes a corresponding [Segment] and runs
/// it with the provided block range using [SnapshotProvider] and a read-only database
/// transaction from [ProviderFactory].
/// transaction from [ProviderFactory]. All segments are run in parallel.
///
/// NOTE: it doesn't delete the data from database, and the actual deleting (aka pruning) logic
/// lives in the `prune` crate.
@@ -100,18 +101,20 @@ impl<DB: Database> Snapshotter<DB> {
segments.push((Box::new(segments::Receipts), block_range));
}
for (segment, block_range) in &segments {
segments.par_iter().try_for_each(|(segment, block_range)| -> RethResult<()> {
debug!(target: "snapshot", segment = %segment.segment(), ?block_range, "Snapshotting segment");
let start = Instant::now();
// Create a new database transaction on every segment to prevent long-lived read-only
// transactions
let provider = self.provider_factory.provider()?;
let provider = self.provider_factory.provider()?.disable_long_read_transaction_safety();
segment.snapshot(provider, self.snapshot_provider.clone(), block_range.clone())?;
let elapsed = start.elapsed(); // TODO(alexey): track in metrics
debug!(target: "snapshot", segment = %segment.segment(), ?block_range, ?elapsed, "Finished snapshotting segment");
}
Ok(())
})?;
self.snapshot_provider.commit()?;
for (segment, block_range) in segments {