diff --git a/Cargo.lock b/Cargo.lock index f02ea437e5..3a24135ceb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6830,6 +6830,7 @@ version = "0.1.0-alpha.16" dependencies = [ "assert_matches", "clap", + "rayon", "reth-db", "reth-interfaces", "reth-nippy-jar", diff --git a/crates/snapshot/Cargo.toml b/crates/snapshot/Cargo.toml index a082c01135..a17c1d0ac6 100644 --- a/crates/snapshot/Cargo.toml +++ b/crates/snapshot/Cargo.toml @@ -26,6 +26,7 @@ tokio = { workspace = true, features = ["sync"] } thiserror.workspace = true tracing.workspace = true clap = { workspace = true, features = ["derive"], optional = true } +rayon.workspace = true [dev-dependencies] # reth diff --git a/crates/snapshot/src/segments/mod.rs b/crates/snapshot/src/segments/mod.rs index 40a55377d3..f88d8503ce 100644 --- a/crates/snapshot/src/segments/mod.rs +++ b/crates/snapshot/src/segments/mod.rs @@ -27,7 +27,7 @@ use std::{ops::RangeInclusive, path::Path, sync::Arc}; pub(crate) type Rows = [Vec>; COLUMNS]; /// A segment represents a snapshotting of some portion of the data. -pub trait Segment { +pub trait Segment: Send + Sync { /// Returns the [`SnapshotSegment`]. fn segment(&self) -> SnapshotSegment; diff --git a/crates/snapshot/src/snapshotter.rs b/crates/snapshot/src/snapshotter.rs index c77bccbfb9..147fd94d01 100644 --- a/crates/snapshot/src/snapshotter.rs +++ b/crates/snapshot/src/snapshotter.rs @@ -1,6 +1,7 @@ //! Support for snapshotting. use crate::{segments, segments::Segment}; +use rayon::prelude::*; use reth_db::database::Database; use reth_interfaces::RethResult; use reth_primitives::{snapshot::HighestSnapshots, BlockNumber, PruneModes}; @@ -77,7 +78,7 @@ impl Snapshotter { /// /// For each [Some] target in [SnapshotTargets], initializes a corresponding [Segment] and runs /// it with the provided block range using [SnapshotProvider] and a read-only database - /// transaction from [ProviderFactory]. + /// transaction from [ProviderFactory]. All segments are run in parallel. /// /// NOTE: it doesn't delete the data from database, and the actual deleting (aka pruning) logic /// lives in the `prune` crate. @@ -100,18 +101,20 @@ impl Snapshotter { segments.push((Box::new(segments::Receipts), block_range)); } - for (segment, block_range) in &segments { + segments.par_iter().try_for_each(|(segment, block_range)| -> RethResult<()> { debug!(target: "snapshot", segment = %segment.segment(), ?block_range, "Snapshotting segment"); let start = Instant::now(); // Create a new database transaction on every segment to prevent long-lived read-only // transactions - let provider = self.provider_factory.provider()?; + let provider = self.provider_factory.provider()?.disable_long_read_transaction_safety(); segment.snapshot(provider, self.snapshot_provider.clone(), block_range.clone())?; let elapsed = start.elapsed(); // TODO(alexey): track in metrics debug!(target: "snapshot", segment = %segment.segment(), ?block_range, ?elapsed, "Finished snapshotting segment"); - } + + Ok(()) + })?; self.snapshot_provider.commit()?; for (segment, block_range) in segments {