From e708cb4e93b920dbf2e304e86eb4efe06ec66745 Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Thu, 13 Jun 2024 21:24:08 +0200 Subject: [PATCH] feat: main control loop for chain orchestrator (#8798) Co-authored-by: Dan Cline <6798349+Rjected@users.noreply.github.com> --- crates/engine/tree/src/chain.rs | 110 +++++++++++++++++++++++------ crates/engine/tree/src/engine.rs | 2 +- crates/engine/tree/src/lib.rs | 2 + crates/engine/tree/src/pipeline.rs | 40 +++++++++++ crates/engine/tree/src/tree.rs | 2 +- 5 files changed, 133 insertions(+), 23 deletions(-) create mode 100644 crates/engine/tree/src/pipeline.rs diff --git a/crates/engine/tree/src/chain.rs b/crates/engine/tree/src/chain.rs index 7ba76c318e..cb9797ab0e 100644 --- a/crates/engine/tree/src/chain.rs +++ b/crates/engine/tree/src/chain.rs @@ -1,4 +1,6 @@ +use crate::pipeline::{PipelineAction, PipelineEvent, PipelineHandler}; use futures::Stream; +use reth_primitives::stage::PipelineTarget; use std::{ pin::Pin, task::{Context, Poll}, @@ -15,8 +17,8 @@ use std::{ /// It polls the given `handler`, which is responsible for advancing the chain, how is up to the /// handler. However, due to database restrictions (e.g. exclusive write access), following /// invariants apply: -/// - If the handler requests a pipeline run (e.g. [`PipelineAction::SyncPipeline`]), the handler -/// must ensure that while the pipeline is running, no other write access is granted. +/// - If the handler requests a pipeline run (e.g. [`PipelineAction::Start`]), the handler must +/// ensure that while the pipeline is running, no other write access is granted. /// - At any time the [`ChainOrchestrator`] can request exclusive write access to the database /// (e.g. if pruning is required), but will not do so until the handler has acknowledged the /// request for write access. @@ -26,21 +28,23 @@ use std::{ /// [`ChainHandler::on_event`]. #[must_use = "Stream does nothing unless polled"] #[derive(Debug)] -pub struct ChainOrchestrator +pub struct ChainOrchestrator where T: ChainHandler, + P: PipelineHandler, { /// The handler for advancing the chain. handler: T, /// Controls pipeline sync. - pipeline: (), + pipeline: P, /// Additional hooks (e.g. pruning) that can require exclusive access to the database. hooks: (), } -impl ChainOrchestrator +impl ChainOrchestrator where - T: ChainHandler, + T: ChainHandler + Unpin, + P: PipelineHandler + Unpin, { /// Returns the handler pub const fn handler(&self) -> &T { @@ -57,13 +61,71 @@ where /// Polls the `ChainOrchestrator` for the next event. #[tracing::instrument(level = "debug", name = "ChainOrchestrator::poll", skip(self, cx))] fn poll_next_event(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - todo!("do we need this?") + let this = self.get_mut(); + + // This loop polls the components + // + // 1. Polls the pipeline to completion, if active. + // 2. Advances the chain by polling the handler. + 'outer: loop { + // try to poll the pipeline to completion, if active + match this.pipeline.poll(cx) { + Poll::Ready(pipeline_event) => match pipeline_event { + PipelineEvent::Idle => {} + PipelineEvent::Started(_) => { + // notify handler that pipeline started + this.handler.on_event(FromOrchestrator::PipelineStarted); + return Poll::Ready(ChainEvent::PipelineStarted); + } + PipelineEvent::Finished(res) => { + return match res { + Ok(event) => { + tracing::debug!(?event, "pipeline finished"); + // notify handler that pipeline finished + this.handler.on_event(FromOrchestrator::PipelineFinished); + Poll::Ready(ChainEvent::PipelineFinished) + } + Err(err) => { + tracing::error!( %err, "pipeline failed"); + Poll::Ready(ChainEvent::FatalError) + } + } + } + }, + Poll::Pending => {} + } + + // drain the handler + loop { + // poll the handler for the next event + match this.handler.poll(cx) { + Poll::Ready(handler_event) => { + match handler_event { + HandlerEvent::Pipeline(target) => { + // trigger pipeline and start polling it + this.pipeline.on_action(PipelineAction::Start(target)); + continue 'outer + } + HandlerEvent::WriteAccessPaused => {} + HandlerEvent::WriteAccessAcquired => {} + } + } + Poll::Pending => { + // no more events to process + break 'outer + } + } + } + } + + Poll::Pending } } -impl Stream for ChainOrchestrator +impl Stream for ChainOrchestrator where - T: ChainHandler, + T: ChainHandler + Unpin, + P: PipelineHandler + Unpin, { type Item = ChainEvent; @@ -72,6 +134,14 @@ where } } +/// Represents the sync mode the chain is operating in. +#[derive(Debug, Default)] +enum SyncMode { + #[default] + Handler, + Pipeline, +} + /// Event emitted by the [`ChainOrchestrator`] /// /// These are meant to be used for observability and debugging purposes. @@ -79,6 +149,10 @@ where pub enum ChainEvent { /// Pipeline sync started PipelineStarted, + /// Pipeline sync finished + PipelineFinished, + /// Fatal error + FatalError, } /// A trait that advances the chain by handling actions. @@ -89,25 +163,17 @@ pub trait ChainHandler: Send + Sync { fn on_event(&mut self, event: FromOrchestrator); /// Polls for actions that [`ChainOrchestrator`] should handle. - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll; + fn poll(&mut self, cx: &mut Context<'_>) -> Poll; } /// Events/Requests that the [`ChainHandler`] can emit to the [`ChainOrchestrator`]. #[derive(Clone, Debug)] pub enum HandlerEvent { - Pipeline(PipelineAction), + Pipeline(PipelineTarget), /// Ack paused write access to the database WriteAccessPaused, /// Operating in write-access mode - WriteAccess, -} - -#[derive(Clone, Debug)] -pub enum PipelineAction { - /// Start pipeline sync - SyncPipeline, - /// Unwind via the pipeline - UnwindPipeline, + WriteAccessAcquired, } /// Internal events issued by the [`ChainOrchestrator`]. @@ -118,7 +184,9 @@ pub enum FromOrchestrator { /// Orchestrator no longer requires exclusive write access to the database. ReleaseWriteHookAccess, /// Invoked when pipeline sync finished - OnPipelineOutcome, + PipelineFinished, + /// Invoked when pipeline started + PipelineStarted, } /// Represents the state of the chain. diff --git a/crates/engine/tree/src/engine.rs b/crates/engine/tree/src/engine.rs index 7b67ab6550..5c48cf1a26 100644 --- a/crates/engine/tree/src/engine.rs +++ b/crates/engine/tree/src/engine.rs @@ -53,7 +53,7 @@ where todo!() } - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + fn poll(&mut self, cx: &mut Context<'_>) -> Poll { todo!() } } diff --git a/crates/engine/tree/src/lib.rs b/crates/engine/tree/src/lib.rs index 640b93675a..7480d8da47 100644 --- a/crates/engine/tree/src/lib.rs +++ b/crates/engine/tree/src/lib.rs @@ -16,5 +16,7 @@ pub use reth_blockchain_tree_api::*; pub mod chain; /// Engine Api chain handler support. pub mod engine; +/// Support for managing the pipeline. +pub mod pipeline; /// Support for interacting with the blockchain tree. pub mod tree; diff --git a/crates/engine/tree/src/pipeline.rs b/crates/engine/tree/src/pipeline.rs new file mode 100644 index 0000000000..ee16b745d8 --- /dev/null +++ b/crates/engine/tree/src/pipeline.rs @@ -0,0 +1,40 @@ +//! It is expected that the node has two sync modes: +//! +//! - Pipeline sync: Sync to a certain block height in stages, e.g. download data from p2p then +//! execute that range. +//! - Live sync: In this mode the nodes is keeping up with the latest tip and listens for new +//! requests from the consensus client. +//! +//! These modes are mutually exclusive and the node can only be in one mode at a time. + +use reth_primitives::stage::PipelineTarget; +use reth_stages_api::{ControlFlow, PipelineError}; +use std::task::{Context, Poll}; + +/// A handler for the pipeline. +pub trait PipelineHandler: Send + Sync { + /// Performs an action on the pipeline. + fn on_action(&mut self, event: PipelineAction); + + /// Polls the pipeline for completion. + fn poll(&mut self, cx: &mut Context<'_>) -> Poll; +} + +/// The actions that can be performed on the pipeline. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum PipelineAction { + /// Start the pipeline with the given target. + Start(PipelineTarget), +} + +/// The events that can be emitted by the pipeline. +#[derive(Debug)] +pub enum PipelineEvent { + Idle, + /// Pipeline started syncing + Started(PipelineTarget), + /// Pipeline finished + /// + /// If this is returned, the pipeline is idle. + Finished(Result), +} diff --git a/crates/engine/tree/src/tree.rs b/crates/engine/tree/src/tree.rs index fbb99f2531..507c75a6d8 100644 --- a/crates/engine/tree/src/tree.rs +++ b/crates/engine/tree/src/tree.rs @@ -1,4 +1,4 @@ -use crate::{chain::PipelineAction, engine::DownloadRequest}; +use crate::{engine::DownloadRequest, pipeline::PipelineAction}; use parking_lot::{Mutex, MutexGuard, RwLock}; use reth_beacon_consensus::{ForkchoiceStateTracker, InvalidHeaderCache, OnForkChoiceUpdated}; use reth_blockchain_tree::BlockBuffer;