feat: main control loop for chain orchestrator (#8798)

Co-authored-by: Dan Cline <6798349+Rjected@users.noreply.github.com>
This commit is contained in:
Matthias Seitz
2024-06-13 21:24:08 +02:00
committed by GitHub
parent 4047f5f6e3
commit e708cb4e93
5 changed files with 133 additions and 23 deletions

View File

@@ -1,4 +1,6 @@
use crate::pipeline::{PipelineAction, PipelineEvent, PipelineHandler};
use futures::Stream;
use reth_primitives::stage::PipelineTarget;
use std::{
pin::Pin,
task::{Context, Poll},
@@ -15,8 +17,8 @@ use std::{
/// It polls the given `handler`, which is responsible for advancing the chain, how is up to the
/// handler. However, due to database restrictions (e.g. exclusive write access), following
/// invariants apply:
/// - If the handler requests a pipeline run (e.g. [`PipelineAction::SyncPipeline`]), the handler
/// must ensure that while the pipeline is running, no other write access is granted.
/// - If the handler requests a pipeline run (e.g. [`PipelineAction::Start`]), the handler must
/// ensure that while the pipeline is running, no other write access is granted.
/// - At any time the [`ChainOrchestrator`] can request exclusive write access to the database
/// (e.g. if pruning is required), but will not do so until the handler has acknowledged the
/// request for write access.
@@ -26,21 +28,23 @@ use std::{
/// [`ChainHandler::on_event`].
#[must_use = "Stream does nothing unless polled"]
#[derive(Debug)]
pub struct ChainOrchestrator<T>
pub struct ChainOrchestrator<T, P>
where
T: ChainHandler,
P: PipelineHandler,
{
/// The handler for advancing the chain.
handler: T,
/// Controls pipeline sync.
pipeline: (),
pipeline: P,
/// Additional hooks (e.g. pruning) that can require exclusive access to the database.
hooks: (),
}
impl<T> ChainOrchestrator<T>
impl<T, P> ChainOrchestrator<T, P>
where
T: ChainHandler,
T: ChainHandler + Unpin,
P: PipelineHandler + Unpin,
{
/// Returns the handler
pub const fn handler(&self) -> &T {
@@ -57,13 +61,71 @@ where
/// Polls the `ChainOrchestrator` for the next event.
#[tracing::instrument(level = "debug", name = "ChainOrchestrator::poll", skip(self, cx))]
fn poll_next_event(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<ChainEvent> {
todo!("do we need this?")
let this = self.get_mut();
// This loop polls the components
//
// 1. Polls the pipeline to completion, if active.
// 2. Advances the chain by polling the handler.
'outer: loop {
// try to poll the pipeline to completion, if active
match this.pipeline.poll(cx) {
Poll::Ready(pipeline_event) => match pipeline_event {
PipelineEvent::Idle => {}
PipelineEvent::Started(_) => {
// notify handler that pipeline started
this.handler.on_event(FromOrchestrator::PipelineStarted);
return Poll::Ready(ChainEvent::PipelineStarted);
}
PipelineEvent::Finished(res) => {
return match res {
Ok(event) => {
tracing::debug!(?event, "pipeline finished");
// notify handler that pipeline finished
this.handler.on_event(FromOrchestrator::PipelineFinished);
Poll::Ready(ChainEvent::PipelineFinished)
}
Err(err) => {
tracing::error!( %err, "pipeline failed");
Poll::Ready(ChainEvent::FatalError)
}
}
}
},
Poll::Pending => {}
}
// drain the handler
loop {
// poll the handler for the next event
match this.handler.poll(cx) {
Poll::Ready(handler_event) => {
match handler_event {
HandlerEvent::Pipeline(target) => {
// trigger pipeline and start polling it
this.pipeline.on_action(PipelineAction::Start(target));
continue 'outer
}
HandlerEvent::WriteAccessPaused => {}
HandlerEvent::WriteAccessAcquired => {}
}
}
Poll::Pending => {
// no more events to process
break 'outer
}
}
}
}
Poll::Pending
}
}
impl<T> Stream for ChainOrchestrator<T>
impl<T, P> Stream for ChainOrchestrator<T, P>
where
T: ChainHandler,
T: ChainHandler + Unpin,
P: PipelineHandler + Unpin,
{
type Item = ChainEvent;
@@ -72,6 +134,14 @@ where
}
}
/// Represents the sync mode the chain is operating in.
#[derive(Debug, Default)]
enum SyncMode {
#[default]
Handler,
Pipeline,
}
/// Event emitted by the [`ChainOrchestrator`]
///
/// These are meant to be used for observability and debugging purposes.
@@ -79,6 +149,10 @@ where
pub enum ChainEvent {
/// Pipeline sync started
PipelineStarted,
/// Pipeline sync finished
PipelineFinished,
/// Fatal error
FatalError,
}
/// A trait that advances the chain by handling actions.
@@ -89,25 +163,17 @@ pub trait ChainHandler: Send + Sync {
fn on_event(&mut self, event: FromOrchestrator);
/// Polls for actions that [`ChainOrchestrator`] should handle.
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<HandlerEvent>;
fn poll(&mut self, cx: &mut Context<'_>) -> Poll<HandlerEvent>;
}
/// Events/Requests that the [`ChainHandler`] can emit to the [`ChainOrchestrator`].
#[derive(Clone, Debug)]
pub enum HandlerEvent {
Pipeline(PipelineAction),
Pipeline(PipelineTarget),
/// Ack paused write access to the database
WriteAccessPaused,
/// Operating in write-access mode
WriteAccess,
}
#[derive(Clone, Debug)]
pub enum PipelineAction {
/// Start pipeline sync
SyncPipeline,
/// Unwind via the pipeline
UnwindPipeline,
WriteAccessAcquired,
}
/// Internal events issued by the [`ChainOrchestrator`].
@@ -118,7 +184,9 @@ pub enum FromOrchestrator {
/// Orchestrator no longer requires exclusive write access to the database.
ReleaseWriteHookAccess,
/// Invoked when pipeline sync finished
OnPipelineOutcome,
PipelineFinished,
/// Invoked when pipeline started
PipelineStarted,
}
/// Represents the state of the chain.

View File

@@ -53,7 +53,7 @@ where
todo!()
}
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<HandlerEvent> {
fn poll(&mut self, cx: &mut Context<'_>) -> Poll<HandlerEvent> {
todo!()
}
}

View File

@@ -16,5 +16,7 @@ pub use reth_blockchain_tree_api::*;
pub mod chain;
/// Engine Api chain handler support.
pub mod engine;
/// Support for managing the pipeline.
pub mod pipeline;
/// Support for interacting with the blockchain tree.
pub mod tree;

View File

@@ -0,0 +1,40 @@
//! It is expected that the node has two sync modes:
//!
//! - Pipeline sync: Sync to a certain block height in stages, e.g. download data from p2p then
//! execute that range.
//! - Live sync: In this mode the nodes is keeping up with the latest tip and listens for new
//! requests from the consensus client.
//!
//! These modes are mutually exclusive and the node can only be in one mode at a time.
use reth_primitives::stage::PipelineTarget;
use reth_stages_api::{ControlFlow, PipelineError};
use std::task::{Context, Poll};
/// A handler for the pipeline.
pub trait PipelineHandler: Send + Sync {
/// Performs an action on the pipeline.
fn on_action(&mut self, event: PipelineAction);
/// Polls the pipeline for completion.
fn poll(&mut self, cx: &mut Context<'_>) -> Poll<PipelineEvent>;
}
/// The actions that can be performed on the pipeline.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum PipelineAction {
/// Start the pipeline with the given target.
Start(PipelineTarget),
}
/// The events that can be emitted by the pipeline.
#[derive(Debug)]
pub enum PipelineEvent {
Idle,
/// Pipeline started syncing
Started(PipelineTarget),
/// Pipeline finished
///
/// If this is returned, the pipeline is idle.
Finished(Result<ControlFlow, PipelineError>),
}

View File

@@ -1,4 +1,4 @@
use crate::{chain::PipelineAction, engine::DownloadRequest};
use crate::{engine::DownloadRequest, pipeline::PipelineAction};
use parking_lot::{Mutex, MutexGuard, RwLock};
use reth_beacon_consensus::{ForkchoiceStateTracker, InvalidHeaderCache, OnForkChoiceUpdated};
use reth_blockchain_tree::BlockBuffer;