From ccdaa74e41b7601fce62ceef3b3a916622257ab2 Mon Sep 17 00:00:00 2001 From: Dan Cline <6798349+Rjected@users.noreply.github.com> Date: Tue, 25 Apr 2023 13:39:51 -0400 Subject: [PATCH] fix: drive pipeline forever in debug.continuous (#2375) Co-authored-by: Roman Krasiuk --- bin/reth/src/args/debug_args.rs | 4 ++-- bin/reth/src/node/mod.rs | 20 +++++++++++++--- crates/consensus/beacon/src/engine/mod.rs | 28 ++++++++++++++++++----- crates/stages/src/pipeline/builder.rs | 7 ++++++ crates/stages/src/pipeline/mod.rs | 12 +++++++--- 5 files changed, 57 insertions(+), 14 deletions(-) diff --git a/bin/reth/src/args/debug_args.rs b/bin/reth/src/args/debug_args.rs index 2121bdd719..fd820271b1 100644 --- a/bin/reth/src/args/debug_args.rs +++ b/bin/reth/src/args/debug_args.rs @@ -10,7 +10,7 @@ pub struct DebugArgs { /// Prompt the downloader to download blocks one at a time. /// /// NOTE: This is for testing purposes only. - #[arg(long = "debug.continuous", help_heading = "Debug")] + #[arg(long = "debug.continuous", help_heading = "Debug", conflicts_with = "tip")] pub continuous: bool, /// Flag indicating whether the node should be terminated after the pipeline sync. @@ -20,7 +20,7 @@ pub struct DebugArgs { /// Set the chain tip manually for testing purposes. /// /// NOTE: This is a temporary flag - #[arg(long = "debug.tip", help_heading = "Debug")] + #[arg(long = "debug.tip", help_heading = "Debug", conflicts_with = "continuous")] pub tip: Option, /// Runs the sync only up to the specified block. diff --git a/bin/reth/src/node/mod.rs b/bin/reth/src/node/mod.rs index 9fa7638b85..6ae21b2869 100644 --- a/bin/reth/src/node/mod.rs +++ b/bin/reth/src/node/mod.rs @@ -158,7 +158,7 @@ impl Command { debug!(target: "reth::cli", chain=%self.chain.chain, genesis=?self.chain.genesis_hash(), "Initializing genesis"); - init_genesis(db.clone(), self.chain.clone())?; + let genesis_hash = init_genesis(db.clone(), self.chain.clone())?; let consensus: Arc = if self.auto_mine { debug!(target: "reth::cli", "Using auto seal"); @@ -212,12 +212,25 @@ impl Command { info!(target: "reth::cli", peer_id = %network.peer_id(), local_addr = %network.local_addr(), "Connected to P2P network"); debug!(target: "reth::cli", peer_id = ?network.peer_id(), "Full peer ID"); + let (consensus_engine_tx, consensus_engine_rx) = unbounded_channel(); + + // Forward genesis as forkchoice state to the consensus engine. + // This will allow the downloader to start if self.debug.continuous { info!(target: "reth::cli", "Continuous sync mode enabled"); + let (tip_tx, _tip_rx) = oneshot::channel(); + let state = ForkchoiceState { + head_block_hash: genesis_hash, + finalized_block_hash: genesis_hash, + safe_block_hash: genesis_hash, + }; + consensus_engine_tx.send(BeaconEngineMessage::ForkchoiceUpdated { + state, + payload_attrs: None, + tx: tip_tx, + })?; } - let (consensus_engine_tx, consensus_engine_rx) = unbounded_channel(); - // Forward the `debug.tip` as forkchoice state to the consensus engine. // This will initiate the sync up to the provided tip. let _tip_rx = match self.debug.tip { @@ -307,6 +320,7 @@ impl Command { pipeline, blockchain_tree.clone(), self.debug.max_block, + self.debug.continuous, payload_builder.clone(), consensus_engine_tx, consensus_engine_rx, diff --git a/crates/consensus/beacon/src/engine/mod.rs b/crates/consensus/beacon/src/engine/mod.rs index 56729d31e3..62a920c0bb 100644 --- a/crates/consensus/beacon/src/engine/mod.rs +++ b/crates/consensus/beacon/src/engine/mod.rs @@ -146,6 +146,9 @@ where /// Max block after which the consensus engine would terminate the sync. Used for debugging /// purposes. max_block: Option, + /// If true, the engine will run the pipeline continuously, regardless of whether or not there + /// is a new fork choice state. + continuous: bool, /// The payload store. payload_builder: PayloadBuilderHandle, /// Consensus engine metrics. @@ -166,6 +169,7 @@ where pipeline: Pipeline, blockchain_tree: BT, max_block: Option, + continuous: bool, payload_builder: PayloadBuilderHandle, ) -> (Self, BeaconConsensusEngineHandle) { let (to_engine, rx) = mpsc::unbounded_channel(); @@ -175,6 +179,7 @@ where pipeline, blockchain_tree, max_block, + continuous, payload_builder, to_engine, rx, @@ -190,6 +195,7 @@ where pipeline: Pipeline, blockchain_tree: BT, max_block: Option, + continuous: bool, payload_builder: PayloadBuilderHandle, to_engine: UnboundedSender, rx: UnboundedReceiver, @@ -205,6 +211,7 @@ where forkchoice_state: None, next_action: BeaconEngineAction::None, max_block, + continuous, payload_builder, metrics: Metrics::default(), }; @@ -421,13 +428,21 @@ where forkchoice_state: ForkchoiceState, ) -> PipelineState { let next_action = std::mem::take(&mut self.next_action); - if let BeaconEngineAction::RunPipeline(target) = next_action { + + let (tip, should_run_pipeline) = match next_action { + BeaconEngineAction::RunPipeline(target) => { + let tip = match target { + PipelineTarget::Head => forkchoice_state.head_block_hash, + PipelineTarget::Safe => forkchoice_state.safe_block_hash, + }; + (Some(tip), true) + } + BeaconEngineAction::None => (None, self.continuous), + }; + + if should_run_pipeline { self.metrics.pipeline_runs.increment(1); - let tip = match target { - PipelineTarget::Head => forkchoice_state.head_block_hash, - PipelineTarget::Safe => forkchoice_state.safe_block_hash, - }; - trace!(target: "consensus::engine", ?tip, "Starting the pipeline"); + trace!(target: "consensus::engine", ?tip, continuous = tip.is_none(), "Starting the pipeline"); let (tx, rx) = oneshot::channel(); let db = self.db.clone(); self.task_spawner.spawn_critical_blocking( @@ -748,6 +763,7 @@ mod tests { pipeline, tree, None, + false, payload_builder, ); diff --git a/crates/stages/src/pipeline/builder.rs b/crates/stages/src/pipeline/builder.rs index 37853708cd..9506fa21c0 100644 --- a/crates/stages/src/pipeline/builder.rs +++ b/crates/stages/src/pipeline/builder.rs @@ -57,6 +57,13 @@ where self } + /// Tell the pipeline to continuously run the pipeline, to accomodate continuous syncing / + /// downloading. + pub fn with_continuous(mut self) -> Self { + self.pipeline.continuous = true; + self + } + /// Set the tip sender. pub fn with_tip_sender(mut self, tip_tx: watch::Sender) -> Self { self.pipeline.tip_tx = Some(tip_tx); diff --git a/crates/stages/src/pipeline/mod.rs b/crates/stages/src/pipeline/mod.rs index 7229093d40..3b31e8ff42 100644 --- a/crates/stages/src/pipeline/mod.rs +++ b/crates/stages/src/pipeline/mod.rs @@ -81,6 +81,7 @@ use sync_metrics::*; pub struct Pipeline { stages: Vec>, max_block: Option, + continuous: bool, listeners: PipelineEventListeners, sync_state_updater: Option, progress: PipelineProgress, @@ -100,6 +101,7 @@ impl Default for Pipeline { Self { stages: Vec::new(), max_block: None, + continuous: false, listeners: PipelineEventListeners::default(), sync_state_updater: None, progress: PipelineProgress::default(), @@ -159,14 +161,18 @@ where } } - /// Consume the pipeline and run it. Return the pipeline and its result as a future. + /// Consume the pipeline and run it until it reaches the provided tip, if set. Return the + /// pipeline and its result as a future. #[track_caller] - pub fn run_as_fut(mut self, db: Arc, tip: H256) -> PipelineFut { + pub fn run_as_fut(mut self, db: Arc, tip: Option) -> PipelineFut { // TODO: fix this in a follow up PR. ideally, consensus engine would be responsible for // updating metrics. self.register_metrics(db.clone()); Box::pin(async move { - self.set_tip(tip); + // NOTE: the tip should only be None if we are in continuous sync mode. + if let Some(tip) = tip { + self.set_tip(tip); + } let result = self.run_loop(db).await; trace!(target: "sync::pipeline", ?tip, ?result, "Pipeline finished"); (self, result)