feat(consensus): spawn pipeline (#1965)

This commit is contained in:
Roman Krasiuk
2023-03-24 14:49:01 +02:00
committed by GitHub
parent 37752ccdaf
commit c4bc65aa5d
6 changed files with 62 additions and 13 deletions

1
Cargo.lock generated
View File

@@ -4493,6 +4493,7 @@ dependencies = [
"reth-provider",
"reth-rpc-types",
"reth-stages",
"reth-tasks",
"reth-tracing",
"thiserror",
"tokio",

View File

@@ -285,8 +285,13 @@ impl Command {
ctx.task_executor.spawn(events::handle_events(Some(network.clone()), events));
let beacon_consensus_engine =
self.build_consensus_engine(db.clone(), consensus, pipeline, consensus_engine_rx)?;
let beacon_consensus_engine = self.build_consensus_engine(
db.clone(),
&ctx.task_executor,
consensus,
pipeline,
consensus_engine_rx,
)?;
// Run consensus engine
let (rx, tx) = tokio::sync::oneshot::channel();
@@ -360,10 +365,11 @@ impl Command {
fn build_consensus_engine<DB, U, C>(
&self,
db: Arc<DB>,
task_executor: &TaskExecutor,
consensus: C,
pipeline: Pipeline<DB, U>,
message_rx: UnboundedReceiver<BeaconEngineMessage>,
) -> eyre::Result<BeaconConsensusEngine<DB, U, C, Factory>>
) -> eyre::Result<BeaconConsensusEngine<DB, TaskExecutor, U, C, Factory>>
where
DB: Database + Unpin + 'static,
U: SyncStateUpdater + Unpin + 'static,
@@ -374,7 +380,14 @@ impl Command {
TreeExternals::new(db.clone(), consensus, executor_factory, self.chain.clone());
let blockchain_tree = BlockchainTree::new(tree_externals, BlockchainTreeConfig::default())?;
Ok(BeaconConsensusEngine::new(db, pipeline, blockchain_tree, message_rx, self.max_block))
Ok(BeaconConsensusEngine::new(
db,
task_executor.clone(),
pipeline,
blockchain_tree,
message_rx,
self.max_block,
))
}
fn load_config(&self) -> eyre::Result<Config> {

View File

@@ -16,6 +16,7 @@ reth-db = { path = "../../storage/db" }
reth-provider = { path = "../../storage/provider" }
reth-executor = { path = "../../executor" }
reth-rpc-types = { path = "../../rpc/rpc-types" }
reth-tasks = { path = "../../tasks" }
# async
tokio = { version = "1.21.2", features = ["sync"] }

View File

@@ -11,6 +11,9 @@ pub enum BeaconEngineError {
/// Forkchoice zero hash head received.
#[error("Received zero hash as forkchoice head")]
ForkchoiceEmptyHead,
/// Pipeline channel closed.
#[error("Pipeline channel closed")]
PipelineChannelClosed,
/// Encountered a payload error.
#[error(transparent)]
Payload(#[from] PayloadError),

View File

@@ -13,12 +13,13 @@ use reth_rpc_types::engine::{
ExecutionPayload, ForkchoiceUpdated, PayloadAttributes, PayloadStatus, PayloadStatusEnum,
};
use reth_stages::Pipeline;
use reth_tasks::TaskSpawner;
use std::{
pin::Pin,
sync::Arc,
task::{Context, Poll},
};
use tokio::sync::mpsc::UnboundedReceiver;
use tokio::sync::{mpsc::UnboundedReceiver, oneshot};
use tokio_stream::wrappers::UnboundedReceiverStream;
use tracing::*;
@@ -48,15 +49,18 @@ pub use pipeline_state::PipelineState;
///
/// If the future is polled more than once. Leads to undefined state.
#[must_use = "Future does nothing unless polled"]
pub struct BeaconConsensusEngine<DB, U, C, EF>
pub struct BeaconConsensusEngine<DB, TS, U, C, EF>
where
DB: Database,
TS: TaskSpawner,
U: SyncStateUpdater,
C: Consensus,
EF: ExecutorFactory,
{
/// The database handle.
db: Arc<DB>,
/// Task spawner for spawning the pipeline.
task_spawner: TS,
/// The current state of the pipeline.
/// Must always be [Some] unless the state is being reevaluated.
/// The pipeline is used for historical sync by setting the current forkchoice head.
@@ -75,9 +79,10 @@ where
max_block: Option<BlockNumber>,
}
impl<DB, U, C, EF> BeaconConsensusEngine<DB, U, C, EF>
impl<DB, TS, U, C, EF> BeaconConsensusEngine<DB, TS, U, C, EF>
where
DB: Database + Unpin + 'static,
TS: TaskSpawner,
U: SyncStateUpdater + 'static,
C: Consensus,
EF: ExecutorFactory + 'static,
@@ -88,6 +93,7 @@ where
/// handle the messages received from the Consensus Layer.
pub fn new(
db: Arc<DB>,
task_spawner: TS,
pipeline: Pipeline<DB, U>,
blockchain_tree: BlockchainTree<DB, C, EF>,
message_rx: UnboundedReceiver<BeaconEngineMessage>,
@@ -95,6 +101,7 @@ where
) -> Self {
Self {
db,
task_spawner,
pipeline_state: Some(PipelineState::Idle(pipeline)),
blockchain_tree,
message_rx: UnboundedReceiverStream::new(message_rx),
@@ -232,7 +239,16 @@ where
PipelineTarget::Safe => forkchoice_state.safe_block_hash,
};
trace!(target: "consensus::engine", ?tip, "Starting the pipeline");
PipelineState::Running(pipeline.run_as_fut(self.db.clone(), tip))
let (tx, rx) = oneshot::channel();
let db = self.db.clone();
self.task_spawner.spawn_critical(
"pipeline",
Box::pin(async move {
let result = pipeline.run_as_fut(db, tip).await;
let _ = tx.send(result);
}),
);
PipelineState::Running(rx)
} else {
PipelineState::Idle(pipeline)
}
@@ -274,9 +290,10 @@ where
/// local forkchoice state, it will launch the pipeline to sync to the head hash.
/// While the pipeline is syncing, the consensus engine will keep processing messages from the
/// receiver and forwarding them to the blockchain tree.
impl<DB, U, C, EF> Future for BeaconConsensusEngine<DB, U, C, EF>
impl<DB, TS, U, C, EF> Future for BeaconConsensusEngine<DB, TS, U, C, EF>
where
DB: Database + Unpin + 'static,
TS: TaskSpawner + Unpin,
U: SyncStateUpdater + Unpin + 'static,
C: Consensus + Unpin,
EF: ExecutorFactory + Unpin + 'static,
@@ -322,7 +339,7 @@ where
let next_state = match this.pipeline_state.take().expect("pipeline state is set") {
PipelineState::Running(mut fut) => {
match fut.poll_unpin(cx) {
Poll::Ready((pipeline, result)) => {
Poll::Ready(Ok((pipeline, result))) => {
if let Err(error) = result {
return Poll::Ready(Err(error.into()))
}
@@ -356,6 +373,10 @@ where
// Get next pipeline state.
this.next_pipeline_state(pipeline, forkchoice_state)
}
Poll::Ready(Err(error)) => {
error!(target: "consensus::engine", ?error, "Failed to receive pipeline result");
return Poll::Ready(Err(BeaconEngineError::PipelineChannelClosed))
}
Poll::Pending => {
this.pipeline_state = Some(PipelineState::Running(fut));
return Poll::Pending
@@ -409,6 +430,7 @@ mod tests {
use reth_primitives::{ChainSpec, ChainSpecBuilder, SealedBlockWithSenders, H256, MAINNET};
use reth_provider::Transaction;
use reth_stages::{test_utils::TestStages, ExecOutput, PipelineError, StageError};
use reth_tasks::TokioTaskExecutor;
use std::{collections::VecDeque, time::Duration};
use tokio::sync::{
mpsc::{unbounded_channel, UnboundedSender},
@@ -418,6 +440,7 @@ mod tests {
type TestBeaconConsensusEngine = BeaconConsensusEngine<
Env<WriteMap>,
TokioTaskExecutor,
NoopSyncStateUpdate,
TestConsensus,
TestExecutorFactory,
@@ -488,7 +511,14 @@ mod tests {
let (sync_tx, sync_rx) = unbounded_channel();
(
BeaconConsensusEngine::new(db.clone(), pipeline, tree, sync_rx, None),
BeaconConsensusEngine::new(
db.clone(),
TokioTaskExecutor::default(),
pipeline,
tree,
sync_rx,
None,
),
TestEnv::new(db, tip_rx, sync_tx),
)
}

View File

@@ -1,6 +1,7 @@
use reth_db::database::Database;
use reth_interfaces::sync::SyncStateUpdater;
use reth_stages::{Pipeline, PipelineFut};
use reth_stages::{Pipeline, PipelineWithResult};
use tokio::sync::oneshot;
/// The possible pipeline states within the sync controller.
///
@@ -15,7 +16,7 @@ pub enum PipelineState<DB: Database, U: SyncStateUpdater> {
/// Pipeline is idle.
Idle(Pipeline<DB, U>),
/// Pipeline is running.
Running(PipelineFut<DB, U>),
Running(oneshot::Receiver<PipelineWithResult<DB, U>>),
}
impl<DB: Database, U: SyncStateUpdater> PipelineState<DB, U> {