mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-01-29 09:08:05 -05:00
fix: drive pipeline forever in debug.continuous (#2375)
Co-authored-by: Roman Krasiuk <rokrassyuk@gmail.com>
This commit is contained in:
@@ -10,7 +10,7 @@ pub struct DebugArgs {
|
||||
/// Prompt the downloader to download blocks one at a time.
|
||||
///
|
||||
/// NOTE: This is for testing purposes only.
|
||||
#[arg(long = "debug.continuous", help_heading = "Debug")]
|
||||
#[arg(long = "debug.continuous", help_heading = "Debug", conflicts_with = "tip")]
|
||||
pub continuous: bool,
|
||||
|
||||
/// Flag indicating whether the node should be terminated after the pipeline sync.
|
||||
@@ -20,7 +20,7 @@ pub struct DebugArgs {
|
||||
/// Set the chain tip manually for testing purposes.
|
||||
///
|
||||
/// NOTE: This is a temporary flag
|
||||
#[arg(long = "debug.tip", help_heading = "Debug")]
|
||||
#[arg(long = "debug.tip", help_heading = "Debug", conflicts_with = "continuous")]
|
||||
pub tip: Option<H256>,
|
||||
|
||||
/// Runs the sync only up to the specified block.
|
||||
|
||||
@@ -158,7 +158,7 @@ impl Command {
|
||||
|
||||
debug!(target: "reth::cli", chain=%self.chain.chain, genesis=?self.chain.genesis_hash(), "Initializing genesis");
|
||||
|
||||
init_genesis(db.clone(), self.chain.clone())?;
|
||||
let genesis_hash = init_genesis(db.clone(), self.chain.clone())?;
|
||||
|
||||
let consensus: Arc<dyn Consensus> = if self.auto_mine {
|
||||
debug!(target: "reth::cli", "Using auto seal");
|
||||
@@ -212,12 +212,25 @@ impl Command {
|
||||
info!(target: "reth::cli", peer_id = %network.peer_id(), local_addr = %network.local_addr(), "Connected to P2P network");
|
||||
debug!(target: "reth::cli", peer_id = ?network.peer_id(), "Full peer ID");
|
||||
|
||||
let (consensus_engine_tx, consensus_engine_rx) = unbounded_channel();
|
||||
|
||||
// Forward genesis as forkchoice state to the consensus engine.
|
||||
// This will allow the downloader to start
|
||||
if self.debug.continuous {
|
||||
info!(target: "reth::cli", "Continuous sync mode enabled");
|
||||
let (tip_tx, _tip_rx) = oneshot::channel();
|
||||
let state = ForkchoiceState {
|
||||
head_block_hash: genesis_hash,
|
||||
finalized_block_hash: genesis_hash,
|
||||
safe_block_hash: genesis_hash,
|
||||
};
|
||||
consensus_engine_tx.send(BeaconEngineMessage::ForkchoiceUpdated {
|
||||
state,
|
||||
payload_attrs: None,
|
||||
tx: tip_tx,
|
||||
})?;
|
||||
}
|
||||
|
||||
let (consensus_engine_tx, consensus_engine_rx) = unbounded_channel();
|
||||
|
||||
// Forward the `debug.tip` as forkchoice state to the consensus engine.
|
||||
// This will initiate the sync up to the provided tip.
|
||||
let _tip_rx = match self.debug.tip {
|
||||
@@ -307,6 +320,7 @@ impl Command {
|
||||
pipeline,
|
||||
blockchain_tree.clone(),
|
||||
self.debug.max_block,
|
||||
self.debug.continuous,
|
||||
payload_builder.clone(),
|
||||
consensus_engine_tx,
|
||||
consensus_engine_rx,
|
||||
|
||||
@@ -146,6 +146,9 @@ where
|
||||
/// Max block after which the consensus engine would terminate the sync. Used for debugging
|
||||
/// purposes.
|
||||
max_block: Option<BlockNumber>,
|
||||
/// If true, the engine will run the pipeline continuously, regardless of whether or not there
|
||||
/// is a new fork choice state.
|
||||
continuous: bool,
|
||||
/// The payload store.
|
||||
payload_builder: PayloadBuilderHandle,
|
||||
/// Consensus engine metrics.
|
||||
@@ -166,6 +169,7 @@ where
|
||||
pipeline: Pipeline<DB, U>,
|
||||
blockchain_tree: BT,
|
||||
max_block: Option<BlockNumber>,
|
||||
continuous: bool,
|
||||
payload_builder: PayloadBuilderHandle,
|
||||
) -> (Self, BeaconConsensusEngineHandle) {
|
||||
let (to_engine, rx) = mpsc::unbounded_channel();
|
||||
@@ -175,6 +179,7 @@ where
|
||||
pipeline,
|
||||
blockchain_tree,
|
||||
max_block,
|
||||
continuous,
|
||||
payload_builder,
|
||||
to_engine,
|
||||
rx,
|
||||
@@ -190,6 +195,7 @@ where
|
||||
pipeline: Pipeline<DB, U>,
|
||||
blockchain_tree: BT,
|
||||
max_block: Option<BlockNumber>,
|
||||
continuous: bool,
|
||||
payload_builder: PayloadBuilderHandle,
|
||||
to_engine: UnboundedSender<BeaconEngineMessage>,
|
||||
rx: UnboundedReceiver<BeaconEngineMessage>,
|
||||
@@ -205,6 +211,7 @@ where
|
||||
forkchoice_state: None,
|
||||
next_action: BeaconEngineAction::None,
|
||||
max_block,
|
||||
continuous,
|
||||
payload_builder,
|
||||
metrics: Metrics::default(),
|
||||
};
|
||||
@@ -421,13 +428,21 @@ where
|
||||
forkchoice_state: ForkchoiceState,
|
||||
) -> PipelineState<DB, U> {
|
||||
let next_action = std::mem::take(&mut self.next_action);
|
||||
if let BeaconEngineAction::RunPipeline(target) = next_action {
|
||||
|
||||
let (tip, should_run_pipeline) = match next_action {
|
||||
BeaconEngineAction::RunPipeline(target) => {
|
||||
let tip = match target {
|
||||
PipelineTarget::Head => forkchoice_state.head_block_hash,
|
||||
PipelineTarget::Safe => forkchoice_state.safe_block_hash,
|
||||
};
|
||||
(Some(tip), true)
|
||||
}
|
||||
BeaconEngineAction::None => (None, self.continuous),
|
||||
};
|
||||
|
||||
if should_run_pipeline {
|
||||
self.metrics.pipeline_runs.increment(1);
|
||||
let tip = match target {
|
||||
PipelineTarget::Head => forkchoice_state.head_block_hash,
|
||||
PipelineTarget::Safe => forkchoice_state.safe_block_hash,
|
||||
};
|
||||
trace!(target: "consensus::engine", ?tip, "Starting the pipeline");
|
||||
trace!(target: "consensus::engine", ?tip, continuous = tip.is_none(), "Starting the pipeline");
|
||||
let (tx, rx) = oneshot::channel();
|
||||
let db = self.db.clone();
|
||||
self.task_spawner.spawn_critical_blocking(
|
||||
@@ -748,6 +763,7 @@ mod tests {
|
||||
pipeline,
|
||||
tree,
|
||||
None,
|
||||
false,
|
||||
payload_builder,
|
||||
);
|
||||
|
||||
|
||||
@@ -57,6 +57,13 @@ where
|
||||
self
|
||||
}
|
||||
|
||||
/// Tell the pipeline to continuously run the pipeline, to accomodate continuous syncing /
|
||||
/// downloading.
|
||||
pub fn with_continuous(mut self) -> Self {
|
||||
self.pipeline.continuous = true;
|
||||
self
|
||||
}
|
||||
|
||||
/// Set the tip sender.
|
||||
pub fn with_tip_sender(mut self, tip_tx: watch::Sender<H256>) -> Self {
|
||||
self.pipeline.tip_tx = Some(tip_tx);
|
||||
|
||||
@@ -81,6 +81,7 @@ use sync_metrics::*;
|
||||
pub struct Pipeline<DB: Database, U: SyncStateUpdater> {
|
||||
stages: Vec<BoxedStage<DB>>,
|
||||
max_block: Option<BlockNumber>,
|
||||
continuous: bool,
|
||||
listeners: PipelineEventListeners,
|
||||
sync_state_updater: Option<U>,
|
||||
progress: PipelineProgress,
|
||||
@@ -100,6 +101,7 @@ impl<DB: Database, U: SyncStateUpdater> Default for Pipeline<DB, U> {
|
||||
Self {
|
||||
stages: Vec::new(),
|
||||
max_block: None,
|
||||
continuous: false,
|
||||
listeners: PipelineEventListeners::default(),
|
||||
sync_state_updater: None,
|
||||
progress: PipelineProgress::default(),
|
||||
@@ -159,14 +161,18 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
/// Consume the pipeline and run it. Return the pipeline and its result as a future.
|
||||
/// Consume the pipeline and run it until it reaches the provided tip, if set. Return the
|
||||
/// pipeline and its result as a future.
|
||||
#[track_caller]
|
||||
pub fn run_as_fut(mut self, db: Arc<DB>, tip: H256) -> PipelineFut<DB, U> {
|
||||
pub fn run_as_fut(mut self, db: Arc<DB>, tip: Option<H256>) -> PipelineFut<DB, U> {
|
||||
// TODO: fix this in a follow up PR. ideally, consensus engine would be responsible for
|
||||
// updating metrics.
|
||||
self.register_metrics(db.clone());
|
||||
Box::pin(async move {
|
||||
self.set_tip(tip);
|
||||
// NOTE: the tip should only be None if we are in continuous sync mode.
|
||||
if let Some(tip) = tip {
|
||||
self.set_tip(tip);
|
||||
}
|
||||
let result = self.run_loop(db).await;
|
||||
trace!(target: "sync::pipeline", ?tip, ?result, "Pipeline finished");
|
||||
(self, result)
|
||||
|
||||
Reference in New Issue
Block a user