mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-02-09 22:45:57 -05:00
feat(exex): stream parallelism on backfill factory level (#9734)
This commit is contained in:
@@ -6,6 +6,8 @@ use reth_primitives::BlockNumber;
|
||||
use reth_prune_types::PruneModes;
|
||||
use reth_stages_api::ExecutionStageThresholds;
|
||||
|
||||
use super::stream::DEFAULT_PARALLELISM;
|
||||
|
||||
/// Factory for creating new backfill jobs.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct BackfillJobFactory<E, P> {
|
||||
@@ -13,6 +15,7 @@ pub struct BackfillJobFactory<E, P> {
|
||||
provider: P,
|
||||
prune_modes: PruneModes,
|
||||
thresholds: ExecutionStageThresholds,
|
||||
stream_parallelism: usize,
|
||||
}
|
||||
|
||||
impl<E, P> BackfillJobFactory<E, P> {
|
||||
@@ -23,6 +26,7 @@ impl<E, P> BackfillJobFactory<E, P> {
|
||||
provider,
|
||||
prune_modes: PruneModes::none(),
|
||||
thresholds: ExecutionStageThresholds::default(),
|
||||
stream_parallelism: DEFAULT_PARALLELISM,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -37,6 +41,15 @@ impl<E, P> BackfillJobFactory<E, P> {
|
||||
self.thresholds = thresholds;
|
||||
self
|
||||
}
|
||||
|
||||
/// Sets the stream parallelism.
|
||||
///
|
||||
/// Configures the [`BackFillJobStream`](super::stream::BackFillJobStream) created via
|
||||
/// [`BackfillJob::into_stream`].
|
||||
pub const fn with_stream_parallelism(mut self, stream_parallelism: usize) -> Self {
|
||||
self.stream_parallelism = stream_parallelism;
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
impl<E: Clone, P: Clone> BackfillJobFactory<E, P> {
|
||||
@@ -48,6 +61,7 @@ impl<E: Clone, P: Clone> BackfillJobFactory<E, P> {
|
||||
prune_modes: self.prune_modes.clone(),
|
||||
range,
|
||||
thresholds: self.thresholds.clone(),
|
||||
stream_parallelism: self.stream_parallelism,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -28,6 +28,7 @@ pub struct BackfillJob<E, P> {
|
||||
pub(crate) prune_modes: PruneModes,
|
||||
pub(crate) thresholds: ExecutionStageThresholds,
|
||||
pub(crate) range: RangeInclusive<BlockNumber>,
|
||||
pub(crate) stream_parallelism: usize,
|
||||
}
|
||||
|
||||
impl<E, P> Iterator for BackfillJob<E, P>
|
||||
@@ -148,7 +149,8 @@ impl<E, P> BackfillJob<E, P> {
|
||||
E: BlockExecutorProvider + Clone + 'static,
|
||||
P: HeaderProvider + BlockReader + StateProviderFactory + Clone + 'static,
|
||||
{
|
||||
BackFillJobStream::new(self.into_single_blocks())
|
||||
let parallelism = self.stream_parallelism;
|
||||
BackFillJobStream::new(self.into_single_blocks()).with_parallelism(parallelism)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -19,7 +19,7 @@ type BackfillTasks = FuturesOrdered<
|
||||
>;
|
||||
|
||||
/// The default parallelism for active tasks in [`BackFillJobStream`].
|
||||
const DEFAULT_PARALLELISM: usize = 4;
|
||||
pub(crate) const DEFAULT_PARALLELISM: usize = 4;
|
||||
|
||||
/// Stream for processing backfill jobs asynchronously.
|
||||
///
|
||||
|
||||
Reference in New Issue
Block a user