From 536381d02d76850e9921bc1bd923cd8974ae2ac1 Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Tue, 23 Jul 2024 18:52:10 +0100 Subject: [PATCH] feat(exex): stream parallelism on backfill factory level (#9734) --- crates/exex/exex/src/backfill/factory.rs | 14 ++++++++++++++ crates/exex/exex/src/backfill/job.rs | 4 +++- crates/exex/exex/src/backfill/stream.rs | 2 +- 3 files changed, 18 insertions(+), 2 deletions(-) diff --git a/crates/exex/exex/src/backfill/factory.rs b/crates/exex/exex/src/backfill/factory.rs index 3190fcaae0..6e845e2409 100644 --- a/crates/exex/exex/src/backfill/factory.rs +++ b/crates/exex/exex/src/backfill/factory.rs @@ -6,6 +6,8 @@ use reth_primitives::BlockNumber; use reth_prune_types::PruneModes; use reth_stages_api::ExecutionStageThresholds; +use super::stream::DEFAULT_PARALLELISM; + /// Factory for creating new backfill jobs. #[derive(Debug, Clone)] pub struct BackfillJobFactory { @@ -13,6 +15,7 @@ pub struct BackfillJobFactory { provider: P, prune_modes: PruneModes, thresholds: ExecutionStageThresholds, + stream_parallelism: usize, } impl BackfillJobFactory { @@ -23,6 +26,7 @@ impl BackfillJobFactory { provider, prune_modes: PruneModes::none(), thresholds: ExecutionStageThresholds::default(), + stream_parallelism: DEFAULT_PARALLELISM, } } @@ -37,6 +41,15 @@ impl BackfillJobFactory { self.thresholds = thresholds; self } + + /// Sets the stream parallelism. + /// + /// Configures the [`BackFillJobStream`](super::stream::BackFillJobStream) created via + /// [`BackfillJob::into_stream`]. + pub const fn with_stream_parallelism(mut self, stream_parallelism: usize) -> Self { + self.stream_parallelism = stream_parallelism; + self + } } impl BackfillJobFactory { @@ -48,6 +61,7 @@ impl BackfillJobFactory { prune_modes: self.prune_modes.clone(), range, thresholds: self.thresholds.clone(), + stream_parallelism: self.stream_parallelism, } } } diff --git a/crates/exex/exex/src/backfill/job.rs b/crates/exex/exex/src/backfill/job.rs index 64bb3a2642..e3c04815bb 100644 --- a/crates/exex/exex/src/backfill/job.rs +++ b/crates/exex/exex/src/backfill/job.rs @@ -28,6 +28,7 @@ pub struct BackfillJob { pub(crate) prune_modes: PruneModes, pub(crate) thresholds: ExecutionStageThresholds, pub(crate) range: RangeInclusive, + pub(crate) stream_parallelism: usize, } impl Iterator for BackfillJob @@ -148,7 +149,8 @@ impl BackfillJob { E: BlockExecutorProvider + Clone + 'static, P: HeaderProvider + BlockReader + StateProviderFactory + Clone + 'static, { - BackFillJobStream::new(self.into_single_blocks()) + let parallelism = self.stream_parallelism; + BackFillJobStream::new(self.into_single_blocks()).with_parallelism(parallelism) } } diff --git a/crates/exex/exex/src/backfill/stream.rs b/crates/exex/exex/src/backfill/stream.rs index cd0141c6bb..5529301bcc 100644 --- a/crates/exex/exex/src/backfill/stream.rs +++ b/crates/exex/exex/src/backfill/stream.rs @@ -19,7 +19,7 @@ type BackfillTasks = FuturesOrdered< >; /// The default parallelism for active tasks in [`BackFillJobStream`]. -const DEFAULT_PARALLELISM: usize = 4; +pub(crate) const DEFAULT_PARALLELISM: usize = 4; /// Stream for processing backfill jobs asynchronously. ///