refactor: split reth-stages into reth-stages-api and reth-stages (#7666)

Co-authored-by: Oliver Nordbjerg <hi@notbjerg.me>
This commit is contained in:
Abner Zheng
2024-04-19 19:35:20 +08:00
committed by GitHub
parent f14bf14d19
commit defe5ff0af
47 changed files with 212 additions and 114 deletions

View File

@@ -0,0 +1,170 @@
use reth_interfaces::{
consensus, db::DatabaseError as DbError, executor, p2p::error::DownloadError, RethError,
};
use reth_primitives::{BlockNumber, SealedHeader, StaticFileSegment, TxNumber};
use reth_provider::ProviderError;
use thiserror::Error;
use crate::PipelineEvent;
use tokio::sync::mpsc::error::SendError;
/// Represents the specific error type within a block error.
#[derive(Error, Debug)]
pub enum BlockErrorKind {
/// The block encountered a validation error.
#[error("validation error: {0}")]
Validation(#[from] consensus::ConsensusError),
/// The block encountered an execution error.
#[error("execution error: {0}")]
Execution(#[from] executor::BlockExecutionError),
}
/// A stage execution error.
#[derive(Error, Debug)]
pub enum StageError {
/// The stage encountered an error related to a block.
#[error("stage encountered an error in block #{number}: {error}", number = block.number)]
Block {
/// The block that caused the error.
block: Box<SealedHeader>,
/// The specific error type, either consensus or execution error.
#[source]
error: BlockErrorKind,
},
/// The stage encountered a downloader error where the responses cannot be attached to the
/// current head.
#[error(
"stage encountered inconsistent chain: \
downloaded header #{header_number} ({header_hash}) is detached from \
local head #{head_number} ({head_hash}): {error}",
header_number = header.number,
header_hash = header.hash(),
head_number = local_head.number,
head_hash = local_head.hash(),
)]
DetachedHead {
/// The local head we attempted to attach to.
local_head: Box<SealedHeader>,
/// The header we attempted to attach.
header: Box<SealedHeader>,
/// The error that occurred when attempting to attach the header.
#[source]
error: Box<consensus::ConsensusError>,
},
/// The headers stage is missing sync gap.
#[error("missing sync gap")]
MissingSyncGap,
/// The stage encountered a database error.
#[error("internal database error occurred: {0}")]
Database(#[from] DbError),
/// Invalid pruning configuration
#[error(transparent)]
PruningConfiguration(#[from] reth_primitives::PruneSegmentError),
/// Invalid checkpoint passed to the stage
#[error("invalid stage checkpoint: {0}")]
StageCheckpoint(u64),
/// Missing download buffer on stage execution.
/// Returned if stage execution was called without polling for readiness.
#[error("missing download buffer")]
MissingDownloadBuffer,
/// Download channel closed
#[error("download channel closed")]
ChannelClosed,
/// The stage encountered a database integrity error.
#[error("database integrity error occurred: {0}")]
DatabaseIntegrity(#[from] ProviderError),
/// Invalid download response. Applicable for stages which
/// rely on external downloaders
#[error("invalid download response: {0}")]
Download(#[from] DownloadError),
/// Database is ahead of static file data.
#[error("missing static file data for block number: {number}", number = block.number)]
MissingStaticFileData {
/// Starting block with missing data.
block: Box<SealedHeader>,
/// Static File segment
segment: StaticFileSegment,
},
/// Unrecoverable inconsistency error related to a transaction number in a static file segment.
#[error(
"inconsistent transaction number for {segment}. db: {database}, static_file: {static_file}"
)]
InconsistentTxNumber {
/// Static File segment where this error was encountered.
segment: StaticFileSegment,
/// Expected database transaction number.
database: TxNumber,
/// Expected static file transaction number.
static_file: TxNumber,
},
/// Unrecoverable inconsistency error related to a block number in a static file segment.
#[error("inconsistent block number for {segment}. db: {database}, static_file: {static_file}")]
InconsistentBlockNumber {
/// Static File segment where this error was encountered.
segment: StaticFileSegment,
/// Expected database block number.
database: BlockNumber,
/// Expected static file block number.
static_file: BlockNumber,
},
/// Internal error
#[error(transparent)]
Internal(#[from] RethError),
/// The stage encountered a recoverable error.
///
/// These types of errors are caught by the [Pipeline][crate::Pipeline] and trigger a restart
/// of the stage.
#[error(transparent)]
Recoverable(Box<dyn std::error::Error + Send + Sync>),
/// The stage encountered a fatal error.
///
/// These types of errors stop the pipeline.
#[error(transparent)]
Fatal(Box<dyn std::error::Error + Send + Sync>),
}
impl StageError {
/// If the error is fatal the pipeline will stop.
pub fn is_fatal(&self) -> bool {
matches!(
self,
StageError::Database(_) |
StageError::Download(_) |
StageError::DatabaseIntegrity(_) |
StageError::StageCheckpoint(_) |
StageError::MissingDownloadBuffer |
StageError::MissingSyncGap |
StageError::ChannelClosed |
StageError::InconsistentBlockNumber { .. } |
StageError::InconsistentTxNumber { .. } |
StageError::Internal(_) |
StageError::Fatal(_)
)
}
}
impl From<std::io::Error> for StageError {
fn from(source: std::io::Error) -> Self {
StageError::Fatal(Box::new(source))
}
}
/// A pipeline execution error.
#[derive(Error, Debug)]
pub enum PipelineError {
/// The pipeline encountered an irrecoverable error in one of the stages.
#[error(transparent)]
Stage(#[from] StageError),
/// The pipeline encountered a database error.
#[error(transparent)]
Database(#[from] DbError),
/// Provider error.
#[error(transparent)]
Provider(#[from] ProviderError),
/// The pipeline encountered an error while trying to send an event.
#[error("pipeline encountered an error while trying to send an event")]
Channel(#[from] Box<SendError<PipelineEvent>>),
/// Internal error
#[error(transparent)]
Internal(#[from] RethError),
}

View File

@@ -0,0 +1,14 @@
//! Staged syncing primitives for reth.
mod error;
mod metrics;
mod pipeline;
mod stage;
#[allow(missing_docs)]
#[cfg(any(test, feature = "test-utils"))]
pub mod test_utils;
mod util;
pub use crate::metrics::*;
pub use error::*;
pub use pipeline::*;
pub use stage::*;

View File

@@ -0,0 +1,112 @@
use crate::metrics::SyncMetrics;
use reth_primitives::{
constants::MGAS_TO_GAS,
stage::{StageCheckpoint, StageId},
BlockNumber,
};
use std::{
future::Future,
pin::Pin,
task::{ready, Context, Poll},
};
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
use tracing::trace;
/// Alias type for metric producers to use.
pub type MetricEventsSender = UnboundedSender<MetricEvent>;
/// Collection of metric events.
#[derive(Clone, Copy, Debug)]
pub enum MetricEvent {
/// Sync reached new height. All stage checkpoints are updated.
SyncHeight {
/// Maximum height measured in block number that sync reached.
height: BlockNumber,
},
/// Stage reached new checkpoint.
StageCheckpoint {
/// Stage ID.
stage_id: StageId,
/// Stage checkpoint.
checkpoint: StageCheckpoint,
/// Maximum known block number reachable by this stage.
/// If specified, `entities_total` metric is updated.
max_block_number: Option<BlockNumber>,
},
/// Execution stage processed some amount of gas.
ExecutionStageGas {
/// Gas processed.
gas: u64,
},
}
/// Metrics routine that listens to new metric events on the `events_rx` receiver.
/// Upon receiving new event, related metrics are updated.
#[derive(Debug)]
pub struct MetricsListener {
events_rx: UnboundedReceiver<MetricEvent>,
/// underline metrics of stages
pub sync_metrics: SyncMetrics,
}
impl MetricsListener {
/// Creates a new [MetricsListener] with the provided receiver of [MetricEvent].
pub fn new(events_rx: UnboundedReceiver<MetricEvent>) -> Self {
Self { events_rx, sync_metrics: SyncMetrics::default() }
}
fn handle_event(&mut self, event: MetricEvent) {
trace!(target: "sync::metrics", ?event, "Metric event received");
match event {
MetricEvent::SyncHeight { height } => {
for stage_id in StageId::ALL {
self.handle_event(MetricEvent::StageCheckpoint {
stage_id,
checkpoint: StageCheckpoint {
block_number: height,
stage_checkpoint: None,
},
max_block_number: Some(height),
});
}
}
MetricEvent::StageCheckpoint { stage_id, checkpoint, max_block_number } => {
let stage_metrics = self.sync_metrics.get_stage_metrics(stage_id);
stage_metrics.checkpoint.set(checkpoint.block_number as f64);
let (processed, total) = match checkpoint.entities() {
Some(entities) => (entities.processed, Some(entities.total)),
None => (checkpoint.block_number, max_block_number),
};
stage_metrics.entities_processed.set(processed as f64);
if let Some(total) = total {
stage_metrics.entities_total.set(total as f64);
}
}
MetricEvent::ExecutionStageGas { gas } => {
self.sync_metrics.execution_stage.mgas_processed_total.increment(gas / MGAS_TO_GAS)
}
}
}
}
impl Future for MetricsListener {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
// Loop until we drain the `events_rx` channel
loop {
let Some(event) = ready!(this.events_rx.poll_recv(cx)) else {
// Channel has closed
return Poll::Ready(())
};
this.handle_event(event);
}
}
}

View File

@@ -0,0 +1,5 @@
mod listener;
mod sync_metrics;
pub use listener::{MetricEvent, MetricEventsSender, MetricsListener};
use sync_metrics::*;

View File

@@ -0,0 +1,40 @@
use reth_metrics::{
metrics::{Counter, Gauge},
Metrics,
};
use reth_primitives::stage::StageId;
use std::collections::HashMap;
#[derive(Debug, Default)]
pub struct SyncMetrics {
pub stages: HashMap<StageId, StageMetrics>,
pub execution_stage: ExecutionStageMetrics,
}
impl SyncMetrics {
/// Returns existing or initializes a new instance of [StageMetrics] for the provided [StageId].
pub fn get_stage_metrics(&mut self, stage_id: StageId) -> &mut StageMetrics {
self.stages
.entry(stage_id)
.or_insert_with(|| StageMetrics::new_with_labels(&[("stage", stage_id.to_string())]))
}
}
#[derive(Metrics)]
#[metrics(scope = "sync")]
pub struct StageMetrics {
/// The block number of the last commit for a stage.
pub checkpoint: Gauge,
/// The number of processed entities of the last commit for a stage, if applicable.
pub entities_processed: Gauge,
/// The number of total entities of the last commit for a stage, if applicable.
pub entities_total: Gauge,
}
/// Execution stage metrics.
#[derive(Metrics)]
#[metrics(scope = "sync.execution")]
pub struct ExecutionStageMetrics {
/// The total amount of gas processed (in millions)
pub mgas_processed_total: Counter,
}

View File

@@ -0,0 +1,103 @@
use crate::{pipeline::BoxedStage, MetricEventsSender, Pipeline, Stage, StageSet};
use reth_db::database::Database;
use reth_primitives::{stage::StageId, BlockNumber, B256};
use reth_provider::ProviderFactory;
use reth_static_file::StaticFileProducer;
use tokio::sync::watch;
/// Builds a [`Pipeline`].
#[must_use = "call `build` to construct the pipeline"]
pub struct PipelineBuilder<DB>
where
DB: Database,
{
/// All configured stages in the order they will be executed.
stages: Vec<BoxedStage<DB>>,
/// The maximum block number to sync to.
max_block: Option<BlockNumber>,
/// A receiver for the current chain tip to sync to.
tip_tx: Option<watch::Sender<B256>>,
metrics_tx: Option<MetricEventsSender>,
}
impl<DB> PipelineBuilder<DB>
where
DB: Database,
{
/// Add a stage to the pipeline.
pub fn add_stage<S>(mut self, stage: S) -> Self
where
S: Stage<DB> + 'static,
{
self.stages.push(Box::new(stage));
self
}
/// Add a set of stages to the pipeline.
///
/// Stages can be grouped into a set by using a [`StageSet`].
///
/// To customize the stages in the set (reorder, disable, insert a stage) call
/// [`builder`][StageSet::builder] on the set which will convert it to a
/// [`StageSetBuilder`][crate::StageSetBuilder].
pub fn add_stages<Set: StageSet<DB>>(mut self, set: Set) -> Self {
for stage in set.builder().build() {
self.stages.push(stage);
}
self
}
/// Set the target block.
///
/// Once this block is reached, the pipeline will stop.
pub fn with_max_block(mut self, block: BlockNumber) -> Self {
self.max_block = Some(block);
self
}
/// Set the tip sender.
pub fn with_tip_sender(mut self, tip_tx: watch::Sender<B256>) -> Self {
self.tip_tx = Some(tip_tx);
self
}
/// Set the metric events sender.
pub fn with_metrics_tx(mut self, metrics_tx: MetricEventsSender) -> Self {
self.metrics_tx = Some(metrics_tx);
self
}
/// Builds the final [`Pipeline`] using the given database.
pub fn build(
self,
provider_factory: ProviderFactory<DB>,
static_file_producer: StaticFileProducer<DB>,
) -> Pipeline<DB> {
let Self { stages, max_block, tip_tx, metrics_tx } = self;
Pipeline {
provider_factory,
stages,
max_block,
static_file_producer,
tip_tx,
listeners: Default::default(),
progress: Default::default(),
metrics_tx,
}
}
}
impl<DB: Database> Default for PipelineBuilder<DB> {
fn default() -> Self {
Self { stages: Vec::new(), max_block: None, tip_tx: None, metrics_tx: None }
}
}
impl<DB: Database> std::fmt::Debug for PipelineBuilder<DB> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PipelineBuilder")
.field("stages", &self.stages.iter().map(|stage| stage.id()).collect::<Vec<StageId>>())
.field("max_block", &self.max_block)
.finish()
}
}

View File

@@ -0,0 +1,46 @@
use reth_primitives::{BlockNumber, SealedHeader};
/// Determines the control flow during pipeline execution.
///
/// See [Pipeline::run_loop](crate::Pipeline::run_loop) for more information.
#[derive(Debug, Eq, PartialEq)]
pub enum ControlFlow {
/// An unwind was requested and must be performed before continuing.
Unwind {
/// The block to unwind to.
target: BlockNumber,
/// The block that caused the unwind.
bad_block: Box<SealedHeader>,
},
/// The pipeline made progress.
Continue {
/// Block number reached by the stage.
block_number: BlockNumber,
},
/// Pipeline made no progress
NoProgress {
/// Block number reached by the stage.
block_number: Option<BlockNumber>,
},
}
impl ControlFlow {
/// Whether the pipeline should continue executing stages.
pub fn should_continue(&self) -> bool {
matches!(self, ControlFlow::Continue { .. } | ControlFlow::NoProgress { .. })
}
/// Returns true if the control flow is unwind.
pub fn is_unwind(&self) -> bool {
matches!(self, ControlFlow::Unwind { .. })
}
/// Returns the pipeline block number the stage reached, if the state is not `Unwind`.
pub fn block_number(&self) -> Option<BlockNumber> {
match self {
ControlFlow::Unwind { .. } => None,
ControlFlow::Continue { block_number } => Some(*block_number),
ControlFlow::NoProgress { block_number } => *block_number,
}
}
}

View File

@@ -0,0 +1,91 @@
use crate::stage::{ExecOutput, UnwindInput, UnwindOutput};
use reth_primitives::{
stage::{StageCheckpoint, StageId},
BlockNumber,
};
use std::fmt::{Display, Formatter};
/// An event emitted by a [Pipeline][crate::Pipeline].
///
/// It is possible for multiple of these events to be emitted over the duration of a pipeline's
/// execution since:
///
/// - Other stages may ask the pipeline to unwind
/// - The pipeline will loop indefinitely unless a target block is set
#[derive(Debug, PartialEq, Eq, Clone)]
pub enum PipelineEvent {
/// Emitted when a stage is about to be prepared for a run.
Prepare {
/// Pipeline stages progress.
pipeline_stages_progress: PipelineStagesProgress,
/// The stage that is about to be run.
stage_id: StageId,
/// The previous checkpoint of the stage.
checkpoint: Option<StageCheckpoint>,
/// The block number up to which the stage is running, if known.
target: Option<BlockNumber>,
},
/// Emitted when a stage is about to be run.
Run {
/// Pipeline stages progress.
pipeline_stages_progress: PipelineStagesProgress,
/// The stage that is about to be run.
stage_id: StageId,
/// The previous checkpoint of the stage.
checkpoint: Option<StageCheckpoint>,
/// The block number up to which the stage is running, if known.
target: Option<BlockNumber>,
},
/// Emitted when a stage has run a single time.
Ran {
/// Pipeline stages progress.
pipeline_stages_progress: PipelineStagesProgress,
/// The stage that was run.
stage_id: StageId,
/// The result of executing the stage.
result: ExecOutput,
},
/// Emitted when a stage is about to be unwound.
Unwind {
/// The stage that is about to be unwound.
stage_id: StageId,
/// The unwind parameters.
input: UnwindInput,
},
/// Emitted when a stage has been unwound.
Unwound {
/// The stage that was unwound.
stage_id: StageId,
/// The result of unwinding the stage.
result: UnwindOutput,
},
/// Emitted when a stage encounters an error either during execution or unwinding.
Error {
/// The stage that encountered an error.
stage_id: StageId,
},
/// Emitted when a stage was skipped due to it's run conditions not being met:
///
/// - The stage might have progressed beyond the point of our target block
/// - The stage might not need to be unwound since it has not progressed past the unwind target
/// - The stage requires that the pipeline has reached the tip, but it has not done so yet
Skipped {
/// The stage that was skipped.
stage_id: StageId,
},
}
/// Pipeline stages progress.
#[derive(Debug, PartialEq, Eq, Clone)]
pub struct PipelineStagesProgress {
/// 1-indexed ID of the stage that is about to be run out of total stages in the pipeline.
pub current: usize,
/// Total number of stages in the pipeline.
pub total: usize,
}
impl Display for PipelineStagesProgress {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{}/{}", self.current, self.total)
}
}

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,28 @@
use crate::{util::opt, ControlFlow};
use reth_primitives::BlockNumber;
#[derive(Debug, Default)]
pub struct PipelineProgress {
/// Block number reached by the stage.
pub block_number: Option<BlockNumber>,
/// The maximum block number achieved by any stage during the execution of the pipeline.
pub maximum_block_number: Option<BlockNumber>,
/// The minimum block number achieved by any stage during the execution of the pipeline.
pub minimum_block_number: Option<BlockNumber>,
}
impl PipelineProgress {
pub(crate) fn update(&mut self, block_number: BlockNumber) {
self.block_number = Some(block_number);
self.minimum_block_number = opt::min(self.minimum_block_number, block_number);
self.maximum_block_number = opt::max(self.maximum_block_number, block_number);
}
/// Get next control flow step
pub(crate) fn next_ctrl(&self) -> ControlFlow {
match self.block_number {
Some(block_number) => ControlFlow::Continue { block_number },
None => ControlFlow::NoProgress { block_number: None },
}
}
}

View File

@@ -0,0 +1,221 @@
use crate::Stage;
use reth_db::database::Database;
use reth_primitives::stage::StageId;
use std::{
collections::HashMap,
fmt::{Debug, Formatter},
};
/// Combines multiple [`Stage`]s into a single unit.
///
/// A [`StageSet`] is a logical chunk of stages that depend on each other. It is up to the
/// individual stage sets to determine what kind of configuration they expose.
///
/// Individual stages in the set can be added, removed and overridden using [`StageSetBuilder`].
pub trait StageSet<DB: Database>: Sized {
/// Configures the stages in the set.
fn builder(self) -> StageSetBuilder<DB>;
/// Overrides the given [`Stage`], if it is in this set.
///
/// # Panics
///
/// Panics if the [`Stage`] is not in this set.
fn set<S: Stage<DB> + 'static>(self, stage: S) -> StageSetBuilder<DB> {
self.builder().set(stage)
}
}
struct StageEntry<DB> {
stage: Box<dyn Stage<DB>>,
enabled: bool,
}
impl<DB: Database> Debug for StageEntry<DB> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("StageEntry")
.field("stage", &self.stage.id())
.field("enabled", &self.enabled)
.finish()
}
}
/// Helper to create and configure a [`StageSet`].
///
/// The builder provides ordering helpers to ensure that stages that depend on each other are added
/// to the final sync pipeline before/after their dependencies.
///
/// Stages inside the set can be disabled, enabled, overridden and reordered.
pub struct StageSetBuilder<DB> {
stages: HashMap<StageId, StageEntry<DB>>,
order: Vec<StageId>,
}
impl<DB: Database> Default for StageSetBuilder<DB> {
fn default() -> Self {
Self { stages: HashMap::new(), order: Vec::new() }
}
}
impl<DB: Database> Debug for StageSetBuilder<DB> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("StageSetBuilder")
.field("stages", &self.stages)
.field("order", &self.order)
.finish()
}
}
impl<DB> StageSetBuilder<DB>
where
DB: Database,
{
fn index_of(&self, stage_id: StageId) -> usize {
let index = self.order.iter().position(|&id| id == stage_id);
index.unwrap_or_else(|| panic!("Stage does not exist in set: {stage_id}"))
}
fn upsert_stage_state(&mut self, stage: Box<dyn Stage<DB>>, added_at_index: usize) {
let stage_id = stage.id();
if self.stages.insert(stage.id(), StageEntry { stage, enabled: true }).is_some() {
if let Some(to_remove) = self
.order
.iter()
.enumerate()
.find(|(i, id)| *i != added_at_index && **id == stage_id)
.map(|(i, _)| i)
{
self.order.remove(to_remove);
}
}
}
/// Overrides the given [`Stage`], if it is in this set.
///
/// # Panics
///
/// Panics if the [`Stage`] is not in this set.
pub fn set<S: Stage<DB> + 'static>(mut self, stage: S) -> Self {
let entry = self
.stages
.get_mut(&stage.id())
.unwrap_or_else(|| panic!("Stage does not exist in set: {}", stage.id()));
entry.stage = Box::new(stage);
self
}
/// Adds the given [`Stage`] at the end of this set.
///
/// If the stage was already in the group, it is removed from its previous place.
pub fn add_stage<S: Stage<DB> + 'static>(mut self, stage: S) -> Self {
let target_index = self.order.len();
self.order.push(stage.id());
self.upsert_stage_state(Box::new(stage), target_index);
self
}
/// Adds the given [`StageSet`] to the end of this set.
///
/// If a stage is in both sets, it is removed from its previous place in this set. Because of
/// this, it is advisable to merge sets first and re-order stages after if needed.
pub fn add_set<Set: StageSet<DB>>(mut self, set: Set) -> Self {
for stage in set.builder().build() {
let target_index = self.order.len();
self.order.push(stage.id());
self.upsert_stage_state(stage, target_index);
}
self
}
/// Adds the given [`Stage`] before the stage with the given [`StageId`].
///
/// If the stage was already in the group, it is removed from its previous place.
///
/// # Panics
///
/// Panics if the dependency stage is not in this set.
pub fn add_before<S: Stage<DB> + 'static>(mut self, stage: S, before: StageId) -> Self {
let target_index = self.index_of(before);
self.order.insert(target_index, stage.id());
self.upsert_stage_state(Box::new(stage), target_index);
self
}
/// Adds the given [`Stage`] after the stage with the given [`StageId`].
///
/// If the stage was already in the group, it is removed from its previous place.
///
/// # Panics
///
/// Panics if the dependency stage is not in this set.
pub fn add_after<S: Stage<DB> + 'static>(mut self, stage: S, after: StageId) -> Self {
let target_index = self.index_of(after) + 1;
self.order.insert(target_index, stage.id());
self.upsert_stage_state(Box::new(stage), target_index);
self
}
/// Enables the given stage.
///
/// All stages within a [`StageSet`] are enabled by default.
///
/// # Panics
///
/// Panics if the stage is not in this set.
pub fn enable(mut self, stage_id: StageId) -> Self {
let entry =
self.stages.get_mut(&stage_id).expect("Cannot enable a stage that is not in the set.");
entry.enabled = true;
self
}
/// Disables the given stage.
///
/// The disabled [`Stage`] keeps its place in the set, so it can be used for ordering with
/// [`StageSetBuilder::add_before`] or [`StageSetBuilder::add_after`], or it can be re-enabled.
///
/// All stages within a [`StageSet`] are enabled by default.
///
/// # Panics
///
/// Panics if the stage is not in this set.
pub fn disable(mut self, stage_id: StageId) -> Self {
let entry =
self.stages.get_mut(&stage_id).expect("Cannot disable a stage that is not in the set.");
entry.enabled = false;
self
}
/// Disables the given stage if the given closure returns true.
///
/// See [Self::disable]
pub fn disable_if<F>(self, stage_id: StageId, f: F) -> Self
where
F: FnOnce() -> bool,
{
if f() {
return self.disable(stage_id)
}
self
}
/// Consumes the builder and returns the contained [`Stage`]s in the order specified.
pub fn build(mut self) -> Vec<Box<dyn Stage<DB>>> {
let mut stages = Vec::new();
for id in &self.order {
if let Some(entry) = self.stages.remove(id) {
if entry.enabled {
stages.push(entry.stage);
}
}
}
stages
}
}
impl<DB: Database> StageSet<DB> for StageSetBuilder<DB> {
fn builder(self) -> StageSetBuilder<DB> {
self
}
}

View File

@@ -0,0 +1,261 @@
use crate::error::StageError;
use reth_db::database::Database;
use reth_primitives::{
stage::{StageCheckpoint, StageId},
BlockNumber, TxNumber,
};
use reth_provider::{BlockReader, DatabaseProviderRW, ProviderError, TransactionsProvider};
use std::{
cmp::{max, min},
future::{poll_fn, Future},
ops::{Range, RangeInclusive},
task::{Context, Poll},
};
/// Stage execution input, see [Stage::execute].
#[derive(Debug, Default, PartialEq, Eq, Clone, Copy)]
pub struct ExecInput {
/// The target block number the stage needs to execute towards.
pub target: Option<BlockNumber>,
/// The checkpoint of this stage the last time it was executed.
pub checkpoint: Option<StageCheckpoint>,
}
impl ExecInput {
/// Return the checkpoint of the stage or default.
pub fn checkpoint(&self) -> StageCheckpoint {
self.checkpoint.unwrap_or_default()
}
/// Return the next block number after the current
/// +1 is needed to skip the present block and always start from block number 1, not 0.
pub fn next_block(&self) -> BlockNumber {
let current_block = self.checkpoint();
current_block.block_number + 1
}
/// Returns `true` if the target block number has already been reached.
pub fn target_reached(&self) -> bool {
self.checkpoint().block_number >= self.target()
}
/// Return the target block number or default.
pub fn target(&self) -> BlockNumber {
self.target.unwrap_or_default()
}
/// Return next block range that needs to be executed.
pub fn next_block_range(&self) -> RangeInclusive<BlockNumber> {
let (range, _) = self.next_block_range_with_threshold(u64::MAX);
range
}
/// Return true if this is the first block range to execute.
pub fn is_first_range(&self) -> bool {
self.checkpoint.is_none()
}
/// Return the next block range to execute.
/// Return pair of the block range and if this is final block range.
pub fn next_block_range_with_threshold(
&self,
threshold: u64,
) -> (RangeInclusive<BlockNumber>, bool) {
let current_block = self.checkpoint();
let start = current_block.block_number + 1;
let target = self.target();
let end = min(target, current_block.block_number.saturating_add(threshold));
let is_final_range = end == target;
(start..=end, is_final_range)
}
/// Return the next block range determined the number of transactions within it.
/// This function walks the block indices until either the end of the range is reached or
/// the number of transactions exceeds the threshold.
pub fn next_block_range_with_transaction_threshold<DB: Database>(
&self,
provider: &DatabaseProviderRW<DB>,
tx_threshold: u64,
) -> Result<(Range<TxNumber>, RangeInclusive<BlockNumber>, bool), StageError> {
let start_block = self.next_block();
let target_block = self.target();
let start_block_body = provider
.block_body_indices(start_block)?
.ok_or(ProviderError::BlockBodyIndicesNotFound(start_block))?;
let first_tx_num = start_block_body.first_tx_num();
let target_block_body = provider
.block_body_indices(target_block)?
.ok_or(ProviderError::BlockBodyIndicesNotFound(target_block))?;
// number of transactions left to execute.
let all_tx_cnt = target_block_body.next_tx_num() - first_tx_num;
if all_tx_cnt == 0 {
// if there is no more transaction return back.
return Ok((first_tx_num..first_tx_num, start_block..=target_block, true))
}
// get block of this tx
let (end_block, is_final_range, next_tx_num) = if all_tx_cnt <= tx_threshold {
(target_block, true, target_block_body.next_tx_num())
} else {
// get tx block number. next_tx_num in this case will be less thean all_tx_cnt.
// So we are sure that transaction must exist.
let end_block_number = provider
.transaction_block(first_tx_num + tx_threshold)?
.expect("block of tx must exist");
// we want to get range of all transactions of this block, so we are fetching block
// body.
let end_block_body = provider
.block_body_indices(end_block_number)?
.ok_or(ProviderError::BlockBodyIndicesNotFound(target_block))?;
(end_block_number, false, end_block_body.next_tx_num())
};
let tx_range = first_tx_num..next_tx_num;
Ok((tx_range, start_block..=end_block, is_final_range))
}
}
/// Stage unwind input, see [Stage::unwind].
#[derive(Debug, Default, PartialEq, Eq, Clone, Copy)]
pub struct UnwindInput {
/// The current highest checkpoint of the stage.
pub checkpoint: StageCheckpoint,
/// The block to unwind to.
pub unwind_to: BlockNumber,
/// The bad block that caused the unwind, if any.
pub bad_block: Option<BlockNumber>,
}
impl UnwindInput {
/// Return next block range that needs to be unwound.
pub fn unwind_block_range(&self) -> RangeInclusive<BlockNumber> {
self.unwind_block_range_with_threshold(u64::MAX).0
}
/// Return the next block range to unwind and the block we're unwinding to.
pub fn unwind_block_range_with_threshold(
&self,
threshold: u64,
) -> (RangeInclusive<BlockNumber>, BlockNumber, bool) {
// +1 is to skip the block we're unwinding to
let mut start = self.unwind_to + 1;
let end = self.checkpoint;
start = max(start, end.block_number.saturating_sub(threshold));
let unwind_to = start - 1;
let is_final_range = unwind_to == self.unwind_to;
(start..=end.block_number, unwind_to, is_final_range)
}
}
/// The output of a stage execution.
#[derive(Debug, PartialEq, Eq, Clone)]
pub struct ExecOutput {
/// How far the stage got.
pub checkpoint: StageCheckpoint,
/// Whether or not the stage is done.
pub done: bool,
}
impl ExecOutput {
/// Mark the stage as done, checkpointing at the given place.
pub fn done(checkpoint: StageCheckpoint) -> Self {
Self { checkpoint, done: true }
}
}
/// The output of a stage unwinding.
#[derive(Debug, PartialEq, Eq, Clone)]
pub struct UnwindOutput {
/// The checkpoint at which the stage has unwound to.
pub checkpoint: StageCheckpoint,
}
/// A stage is a segmented part of the syncing process of the node.
///
/// Each stage takes care of a well-defined task, such as downloading headers or executing
/// transactions, and persist their results to a database.
///
/// Stages must have a unique [ID][StageId] and implement a way to "roll forwards"
/// ([Stage::execute]) and a way to "roll back" ([Stage::unwind]).
///
/// Stages are executed as part of a pipeline where they are executed serially.
///
/// Stages receive [`DatabaseProviderRW`].
#[auto_impl::auto_impl(Box)]
pub trait Stage<DB: Database>: Send + Sync {
/// Get the ID of the stage.
///
/// Stage IDs must be unique.
fn id(&self) -> StageId;
/// Returns `Poll::Ready(Ok(()))` when the stage is ready to execute the given range.
///
/// This method is heavily inspired by [tower](https://crates.io/crates/tower)'s `Service` trait.
/// Any asynchronous tasks or communication should be handled in `poll_ready`, e.g. moving
/// downloaded items from downloaders to an internal buffer in the stage.
///
/// If the stage has any pending external state, then `Poll::Pending` is returned.
///
/// If `Poll::Ready(Err(_))` is returned, the stage may not be able to execute anymore
/// depending on the specific error. In that case, an unwind must be issued instead.
///
/// Once `Poll::Ready(Ok(()))` is returned, the stage may be executed once using `execute`.
/// Until the stage has been executed, repeated calls to `poll_ready` must return either
/// `Poll::Ready(Ok(()))` or `Poll::Ready(Err(_))`.
///
/// Note that `poll_ready` may reserve shared resources that are consumed in a subsequent call
/// of `execute`, e.g. internal buffers. It is crucial for implementations to not assume that
/// `execute` will always be invoked and to ensure that those resources are appropriately
/// released if the stage is dropped before `execute` is called.
///
/// For the same reason, it is also important that any shared resources do not exhibit
/// unbounded growth on repeated calls to `poll_ready`.
///
/// Unwinds may happen without consulting `poll_ready` first.
fn poll_execute_ready(
&mut self,
_cx: &mut Context<'_>,
_input: ExecInput,
) -> Poll<Result<(), StageError>> {
Poll::Ready(Ok(()))
}
/// Execute the stage.
/// It is expected that the stage will write all necessary data to the database
/// upon invoking this method.
fn execute(
&mut self,
provider: &DatabaseProviderRW<DB>,
input: ExecInput,
) -> Result<ExecOutput, StageError>;
/// Unwind the stage.
fn unwind(
&mut self,
provider: &DatabaseProviderRW<DB>,
input: UnwindInput,
) -> Result<UnwindOutput, StageError>;
}
/// [Stage] trait extension.
pub trait StageExt<DB: Database>: Stage<DB> {
/// Utility extension for the `Stage` trait that invokes `Stage::poll_execute_ready`
/// with [poll_fn] context. For more information see [Stage::poll_execute_ready].
fn execute_ready(
&mut self,
input: ExecInput,
) -> impl Future<Output = Result<(), StageError>> + Send {
poll_fn(move |cx| self.poll_execute_ready(cx, input))
}
}
impl<DB: Database, S: Stage<DB>> StageExt<DB> for S {}

View File

@@ -0,0 +1,2 @@
mod stage;
pub use stage::TestStage;

View File

@@ -0,0 +1,67 @@
use crate::{ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput};
use reth_db::database::Database;
use reth_primitives::stage::StageId;
use reth_provider::DatabaseProviderRW;
use std::collections::VecDeque;
#[derive(Debug)]
pub struct TestStage {
id: StageId,
exec_outputs: VecDeque<Result<ExecOutput, StageError>>,
unwind_outputs: VecDeque<Result<UnwindOutput, StageError>>,
}
impl TestStage {
pub fn new(id: StageId) -> Self {
Self { id, exec_outputs: VecDeque::new(), unwind_outputs: VecDeque::new() }
}
pub fn with_exec(mut self, exec_outputs: VecDeque<Result<ExecOutput, StageError>>) -> Self {
self.exec_outputs = exec_outputs;
self
}
pub fn with_unwind(
mut self,
unwind_outputs: VecDeque<Result<UnwindOutput, StageError>>,
) -> Self {
self.unwind_outputs = unwind_outputs;
self
}
pub fn add_exec(mut self, output: Result<ExecOutput, StageError>) -> Self {
self.exec_outputs.push_back(output);
self
}
pub fn add_unwind(mut self, output: Result<UnwindOutput, StageError>) -> Self {
self.unwind_outputs.push_back(output);
self
}
}
impl<DB: Database> Stage<DB> for TestStage {
fn id(&self) -> StageId {
self.id
}
fn execute(
&mut self,
_: &DatabaseProviderRW<DB>,
_input: ExecInput,
) -> Result<ExecOutput, StageError> {
self.exec_outputs
.pop_front()
.unwrap_or_else(|| panic!("Test stage {} executed too many times.", self.id))
}
fn unwind(
&mut self,
_: &DatabaseProviderRW<DB>,
_input: UnwindInput,
) -> Result<UnwindOutput, StageError> {
self.unwind_outputs
.pop_front()
.unwrap_or_else(|| panic!("Test stage {} unwound too many times.", self.id))
}
}

View File

@@ -0,0 +1,34 @@
pub(crate) mod opt {
/// Get an [Option] with the maximum value, compared between the passed in value and the inner
/// value of the [Option]. If the [Option] is `None`, then an option containing the passed in
/// value will be returned.
pub(crate) fn max<T: Ord + Copy>(a: Option<T>, b: T) -> Option<T> {
a.map_or(Some(b), |v| Some(std::cmp::max(v, b)))
}
/// Get an [Option] with the minimum value, compared between the passed in value and the inner
/// value of the [Option]. If the [Option] is `None`, then an option containing the passed in
/// value will be returned.
pub(crate) fn min<T: Ord + Copy>(a: Option<T>, b: T) -> Option<T> {
a.map_or(Some(b), |v| Some(std::cmp::min(v, b)))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn opt_max() {
assert_eq!(max(None, 5), Some(5));
assert_eq!(max(Some(1), 5), Some(5));
assert_eq!(max(Some(10), 5), Some(10));
}
#[test]
fn opt_min() {
assert_eq!(min(None, 5), Some(5));
assert_eq!(min(Some(1), 5), Some(1));
assert_eq!(min(Some(10), 5), Some(5));
}
}
}