mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-01-27 16:18:08 -05:00
engine: improve backfill state tracking (#9820)
This commit is contained in:
@@ -15,6 +15,37 @@ use std::task::{ready, Context, Poll};
|
||||
use tokio::sync::oneshot;
|
||||
use tracing::trace;
|
||||
|
||||
/// Represents the state of the backfill synchronization process.
|
||||
#[derive(Debug, PartialEq, Eq, Default)]
|
||||
pub enum BackfillSyncState {
|
||||
/// The node is not performing any backfill synchronization.
|
||||
/// This is the initial or default state.
|
||||
#[default]
|
||||
Idle,
|
||||
/// A backfill synchronization has been requested or planned, but processing has not started
|
||||
/// yet.
|
||||
Pending,
|
||||
/// The node is actively engaged in backfill synchronization.
|
||||
Active,
|
||||
}
|
||||
|
||||
impl BackfillSyncState {
|
||||
/// Returns true if the state is idle.
|
||||
pub const fn is_idle(&self) -> bool {
|
||||
matches!(self, Self::Idle)
|
||||
}
|
||||
|
||||
/// Returns true if the state is pending.
|
||||
pub const fn is_pending(&self) -> bool {
|
||||
matches!(self, Self::Pending)
|
||||
}
|
||||
|
||||
/// Returns true if the state is active.
|
||||
pub const fn is_active(&self) -> bool {
|
||||
matches!(self, Self::Active)
|
||||
}
|
||||
}
|
||||
|
||||
/// Backfill sync mode functionality.
|
||||
pub trait BackfillSync: Send + Sync {
|
||||
/// Performs a backfill action.
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
use crate::{
|
||||
backfill::BackfillAction,
|
||||
backfill::{BackfillAction, BackfillSyncState},
|
||||
chain::FromOrchestrator,
|
||||
engine::{DownloadRequest, EngineApiEvent, FromEngine},
|
||||
persistence::PersistenceHandle,
|
||||
@@ -379,8 +379,8 @@ pub struct EngineApiTreeHandlerImpl<P, E, T: EngineTypes> {
|
||||
persistence: PersistenceHandle,
|
||||
/// Tracks the state changes of the persistence task.
|
||||
persistence_state: PersistenceState,
|
||||
/// Flag indicating whether the node is currently syncing via backfill.
|
||||
is_backfill_active: bool,
|
||||
/// Flag indicating the state of the node's backfill synchronization process.
|
||||
backfill_sync_state: BackfillSyncState,
|
||||
/// Keeps track of the state of the canonical chain that isn't persisted yet.
|
||||
/// This is intended to be accessed from external sources, such as rpc.
|
||||
canonical_in_memory_state: CanonicalInMemoryState,
|
||||
@@ -417,7 +417,7 @@ where
|
||||
outgoing,
|
||||
persistence,
|
||||
persistence_state: PersistenceState::default(),
|
||||
is_backfill_active: false,
|
||||
backfill_sync_state: BackfillSyncState::Idle,
|
||||
state,
|
||||
canonical_in_memory_state,
|
||||
payload_builder,
|
||||
@@ -509,7 +509,7 @@ where
|
||||
FromEngine::Event(event) => match event {
|
||||
FromOrchestrator::BackfillSyncStarted => {
|
||||
debug!(target: "consensus::engine", "received backfill sync started event");
|
||||
self.is_backfill_active = true;
|
||||
self.backfill_sync_state = BackfillSyncState::Active;
|
||||
}
|
||||
FromOrchestrator::BackfillSyncFinished(ctrl) => {
|
||||
self.on_backfill_sync_finished(ctrl);
|
||||
@@ -563,7 +563,7 @@ where
|
||||
/// This will also try to connect the buffered blocks.
|
||||
fn on_backfill_sync_finished(&mut self, ctrl: ControlFlow) {
|
||||
debug!(target: "consensus::engine", "received backfill sync finished event");
|
||||
self.is_backfill_active = false;
|
||||
self.backfill_sync_state = BackfillSyncState::Idle;
|
||||
|
||||
// Pipeline unwound, memorize the invalid block and wait for CL for next sync target.
|
||||
if let ControlFlow::Unwind { bad_block, .. } = ctrl {
|
||||
@@ -1043,7 +1043,7 @@ where
|
||||
return None
|
||||
}
|
||||
|
||||
if self.is_backfill_active {
|
||||
if !self.backfill_sync_state.is_idle() {
|
||||
return None
|
||||
}
|
||||
|
||||
@@ -1243,7 +1243,7 @@ where
|
||||
return Ok(Some(OnForkChoiceUpdated::with_invalid(status)))
|
||||
}
|
||||
|
||||
if self.is_backfill_active {
|
||||
if !self.backfill_sync_state.is_idle() {
|
||||
// We can only process new forkchoice updates if the pipeline is idle, since it requires
|
||||
// exclusive access to the database
|
||||
trace!(target: "consensus::engine", "Pipeline is syncing, skipping forkchoice update");
|
||||
@@ -1398,7 +1398,7 @@ where
|
||||
return Ok(TreeOutcome::new(status))
|
||||
}
|
||||
|
||||
let status = if self.is_backfill_active {
|
||||
let status = if !self.backfill_sync_state.is_idle() {
|
||||
self.buffer_block_without_senders(block).unwrap();
|
||||
PayloadStatus::from_status(PayloadStatusEnum::Syncing)
|
||||
} else {
|
||||
@@ -1735,7 +1735,7 @@ mod tests {
|
||||
get_default_test_harness(PERSISTENCE_THRESHOLD);
|
||||
|
||||
// set backfill active
|
||||
tree.is_backfill_active = true;
|
||||
tree.backfill_sync_state = BackfillSyncState::Active;
|
||||
|
||||
let (tx, rx) = oneshot::channel();
|
||||
tree.on_engine_message(FromEngine::Request(BeaconEngineMessage::ForkchoiceUpdated {
|
||||
@@ -1764,7 +1764,7 @@ mod tests {
|
||||
TestHarness::holesky();
|
||||
|
||||
// set backfill active
|
||||
tree.is_backfill_active = true;
|
||||
tree.backfill_sync_state = BackfillSyncState::Active;
|
||||
|
||||
let (tx, rx) = oneshot::channel();
|
||||
tree.on_engine_message(FromEngine::Request(BeaconEngineMessage::NewPayload {
|
||||
|
||||
Reference in New Issue
Block a user