mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-01-29 00:58:11 -05:00
fix: simplify SyncState and fix wrong update (#2256)
This commit is contained in:
@@ -1,7 +1,5 @@
|
||||
//! Traits used when interacting with the sync status of the network.
|
||||
|
||||
use reth_primitives::BlockNumber;
|
||||
|
||||
/// A type that provides information about whether the node is currently syncing and the network is
|
||||
/// currently serving syncing related requests.
|
||||
#[auto_impl::auto_impl(&, Arc, Box)]
|
||||
@@ -12,16 +10,11 @@ pub trait SyncStateProvider: Send + Sync {
|
||||
|
||||
/// An updater for updating the [SyncState] of the network.
|
||||
///
|
||||
/// The chain sync pipeline consists of several sequential Stages, like the `HeaderStage` for
|
||||
/// downloading bodies, or `ExecutionStage` for process all downloaded data.
|
||||
///
|
||||
/// Some stage transitions will result in an update of the [SyncState] of the network. For example,
|
||||
/// the transition from a download stage (`Headers`, `Bodies`) to a processing stage (`Sender
|
||||
/// Recovery`, `Execution`) marks a transition from [`SyncState::Downloading`] to
|
||||
/// [`SyncState::Executing`]. Since the execution takes some time, after the first pass the node
|
||||
/// will not be synced ([`SyncState::Idle`]) yet and instead transition back to download data, but
|
||||
/// now with a higher `block_target`. This cycle will continue until the node has caught up with the
|
||||
/// chain and will transition to [`SyncState::Idle`] sync.
|
||||
/// The node is either syncing, or it is idle.
|
||||
/// While syncing, the node will download data from the network and process it. The processing
|
||||
/// consists of several stages, like recovering senders, executing the blocks and indexing.
|
||||
/// Eventually the node reaches the `Finish` stage and will transition to [`SyncState::Idle`], it
|
||||
/// which point the node is considered fully synced.
|
||||
#[auto_impl::auto_impl(&, Arc, Box)]
|
||||
pub trait SyncStateUpdater: SyncStateProvider {
|
||||
/// Notifies about an [SyncState] update.
|
||||
@@ -35,21 +28,8 @@ pub enum SyncState {
|
||||
///
|
||||
/// The network just serves requests to keep up of the chain.
|
||||
Idle,
|
||||
/// Network is syncing and downloading up to the `target_block`.
|
||||
///
|
||||
/// This represents the headers and bodies stage.
|
||||
Downloading {
|
||||
/// The block to which the node is downloading state.
|
||||
target_block: BlockNumber,
|
||||
},
|
||||
/// All headers and bodies up to the `target_block` have been downloaded and are now being
|
||||
/// executed.
|
||||
///
|
||||
/// This represents stages that execute/recover the downloaded data.
|
||||
Executing {
|
||||
/// The block to which the node executes downloaded state.
|
||||
target_block: BlockNumber,
|
||||
},
|
||||
/// Network is syncing
|
||||
Syncing,
|
||||
}
|
||||
|
||||
impl SyncState {
|
||||
|
||||
@@ -741,7 +741,7 @@ mod tests {
|
||||
|
||||
tokio::task::spawn(network);
|
||||
|
||||
handle.update_sync_state(SyncState::Downloading { target_block: 100 });
|
||||
handle.update_sync_state(SyncState::Syncing);
|
||||
assert!(NetworkInfo::is_syncing(&handle));
|
||||
|
||||
let peer_id = PeerId::random();
|
||||
|
||||
@@ -282,7 +282,7 @@ async fn test_connect_to_trusted_peer() {
|
||||
handle.add_trusted_peer(node.id, node.tcp_addr());
|
||||
|
||||
let h = handle.clone();
|
||||
h.update_sync_state(SyncState::Downloading { target_block: 100 });
|
||||
h.update_sync_state(SyncState::Syncing);
|
||||
|
||||
task::spawn(async move {
|
||||
loop {
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
use crate::stages::{BODIES, HEADERS};
|
||||
use crate::stages::{BODIES, FINISH, HEADERS};
|
||||
use reth_db::{
|
||||
tables::SyncStage,
|
||||
transaction::{DbTx, DbTxMut},
|
||||
@@ -20,11 +20,16 @@ impl Display for StageId {
|
||||
}
|
||||
|
||||
impl StageId {
|
||||
/// Returns a flag indicating if it's a downloading stage
|
||||
/// Returns true if it's a downloading stage [HEADERS] or [BODIES
|
||||
pub fn is_downloading_stage(&self) -> bool {
|
||||
*self == HEADERS || *self == BODIES
|
||||
}
|
||||
|
||||
/// Returns true indicating if it's the finish stage [FINISH]
|
||||
pub fn is_finish(&self) -> bool {
|
||||
*self == FINISH
|
||||
}
|
||||
|
||||
/// Get the last committed progress of this stage.
|
||||
pub fn get_progress<'db>(&self, tx: &impl DbTx<'db>) -> Result<Option<BlockNumber>, DbError> {
|
||||
tx.get::<SyncStage>(self.0.to_string())
|
||||
|
||||
@@ -212,8 +212,11 @@ where
|
||||
|
||||
// Update sync state
|
||||
if let Some(ref updater) = self.sync_state_updater {
|
||||
let state = self.progress.current_sync_state(stage_id.is_downloading_stage());
|
||||
updater.update_sync_state(state);
|
||||
if stage_id.is_finish() {
|
||||
updater.update_sync_state(SyncState::Idle);
|
||||
} else {
|
||||
updater.update_sync_state(SyncState::Syncing);
|
||||
}
|
||||
}
|
||||
|
||||
trace!(target: "sync::pipeline", stage = %stage_id, "Executing stage");
|
||||
@@ -234,7 +237,7 @@ where
|
||||
ControlFlow::Unwind { target, bad_block } => {
|
||||
// reset the sync state
|
||||
if let Some(ref updater) = self.sync_state_updater {
|
||||
updater.update_sync_state(SyncState::Downloading { target_block: target });
|
||||
updater.update_sync_state(SyncState::Syncing);
|
||||
}
|
||||
self.unwind(db.as_ref(), target, bad_block).await?;
|
||||
return Ok(ControlFlow::Unwind { target, bad_block })
|
||||
@@ -432,20 +435,6 @@ mod tests {
|
||||
assert_eq!(progress.maximum_progress, Some(20));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn sync_states() {
|
||||
let mut progress = PipelineProgress::default();
|
||||
|
||||
// no progress, so we're idle
|
||||
assert_eq!(progress.current_sync_state(false), SyncState::Idle);
|
||||
assert_eq!(progress.current_sync_state(true), SyncState::Idle);
|
||||
|
||||
// progress and downloading/executing
|
||||
progress.update(1);
|
||||
assert_eq!(progress.current_sync_state(true), SyncState::Downloading { target_block: 1 });
|
||||
assert_eq!(progress.current_sync_state(false), SyncState::Executing { target_block: 1 });
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn progress_ctrl_flow() {
|
||||
let mut progress = PipelineProgress::default();
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
use super::ctrl::ControlFlow;
|
||||
use crate::util::opt;
|
||||
use reth_interfaces::sync::SyncState;
|
||||
use reth_primitives::BlockNumber;
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
@@ -20,15 +19,6 @@ impl PipelineProgress {
|
||||
self.maximum_progress = opt::max(self.maximum_progress, progress);
|
||||
}
|
||||
|
||||
/// Create a sync state from pipeline progress.
|
||||
pub(crate) fn current_sync_state(&self, downloading: bool) -> SyncState {
|
||||
match self.progress {
|
||||
Some(progress) if downloading => SyncState::Downloading { target_block: progress },
|
||||
Some(progress) => SyncState::Executing { target_block: progress },
|
||||
None => SyncState::Idle,
|
||||
}
|
||||
}
|
||||
|
||||
/// Get next control flow step
|
||||
pub(crate) fn next_ctrl(&self) -> ControlFlow {
|
||||
match self.progress {
|
||||
|
||||
Reference in New Issue
Block a user