feat: add basic request loop (#8836)

This commit is contained in:
Matthias Seitz
2024-06-15 01:03:05 +02:00
committed by GitHub
parent fcf1b1878e
commit 2726a347cd
6 changed files with 67 additions and 17 deletions

1
Cargo.lock generated
View File

@@ -6889,6 +6889,7 @@ dependencies = [
"reth-metrics",
"reth-network-p2p",
"reth-payload-builder",
"reth-payload-primitives",
"reth-payload-validator",
"reth-primitives",
"reth-provider",

View File

@@ -16,6 +16,7 @@ reth-primitives.workspace = true
reth-blockchain-tree.workspace = true
reth-blockchain-tree-api.workspace = true
reth-ethereum-consensus.workspace = true
reth-payload-primitives.workspace = true
reth-stages-api.workspace = true
reth-errors.workspace = true
reth-db.workspace = true

View File

@@ -1,6 +1,6 @@
use crate::pipeline::{PipelineAction, PipelineEvent, PipelineHandler};
use futures::Stream;
use reth_primitives::stage::PipelineTarget;
use reth_stages_api::PipelineTarget;
use std::{
pin::Pin,
task::{Context, Poll},
@@ -60,7 +60,7 @@ where
///
/// Polls the `ChainOrchestrator` for the next event.
#[tracing::instrument(level = "debug", name = "ChainOrchestrator::poll", skip(self, cx))]
fn poll_next_event(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<ChainEvent> {
fn poll_next_event(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<ChainEvent> {
let this = self.get_mut();
// This loop polls the components

View File

@@ -4,6 +4,10 @@ use crate::{
chain::{ChainHandler, FromOrchestrator, HandlerEvent, OrchestratorState},
tree::EngineApiTreeHandler,
};
use futures::{
stream::Fuse,
Stream, StreamExt,
};
use reth_beacon_consensus::BeaconEngineMessage;
use reth_primitives::{SealedBlockWithSenders, B256};
use std::{
@@ -25,9 +29,8 @@ use std::{
/// It is responsible for handling the following:
/// - Downloading blocks on demand from the network if requested by the [`EngineApiRequestHandler`].
///
/// The core logic is part of the [EngineRequestHandler], which is responsible for processing the
/// The core logic is part of the [`EngineRequestHandler`], which is responsible for processing the
/// incoming requests.
#[derive(Debug)]
pub struct EngineHandler<T>
where
T: EngineRequestHandler,
@@ -37,12 +40,10 @@ where
/// This type is responsible for processing incoming requests.
handler: T,
/// Receiver for incoming requests that need to be processed.
// TODO add stream type for T::Request,
incoming_requests: (),
// TODO maybe use generic?
incoming_requests: Fuse<Pin<Box<dyn Stream<Item = T::Request> + Send + Sync + 'static>>>,
/// Access to the network sync to download blocks on demand.
network_sync: (),
/// Requests that are buffered and need to be processed.
buffered_events: VecDeque<()>,
}
impl<T> ChainHandler for EngineHandler<T>
@@ -50,11 +51,52 @@ where
T: EngineRequestHandler,
{
fn on_event(&mut self, event: FromOrchestrator) {
todo!()
// delegate event to the handler
self.handler.on_event(event.into());
}
fn poll(&mut self, cx: &mut Context<'_>) -> Poll<HandlerEvent> {
todo!()
loop {
// drain the handler
loop {
match self.handler.poll(cx) {
Poll::Ready(ev) => {
match ev {
RequestHandlerEvent::Idle => break,
RequestHandlerEvent::HandlerEvent(ev) => {
match ev {
HandlerEvent::Pipeline(target) => {
// bubble up pipeline request
// TODO: clear downloads in progress
return Poll::Ready(HandlerEvent::Pipeline(target))
}
HandlerEvent::WriteAccessPaused => {}
HandlerEvent::WriteAccessAcquired => {}
}
}
RequestHandlerEvent::Download(_) => {
// TODO delegate to network sync
}
}
}
Poll::Pending => break,
}
}
let mut progress = false;
if let Poll::Ready(Some(req)) = self.incoming_requests.poll_next_unpin(cx) {
// delegate new received request to the handler
self.handler.on_event(FromEngine::Request(req));
progress = true;
}
// TODO poll network sync
if !progress {
return Poll::Pending;
}
}
}
}
@@ -67,7 +109,7 @@ pub trait EngineRequestHandler: Send + Sync {
fn on_event(&mut self, event: FromEngine<Self::Request>);
/// Advances the handler.
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<RequestHandlerEvent>;
fn poll(&mut self, cx: &mut Context<'_>) -> Poll<RequestHandlerEvent>;
}
/// An [`EngineRequestHandler`] that processes engine API requests.
@@ -147,7 +189,7 @@ where
todo!()
}
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<RequestHandlerEvent> {
fn poll(&mut self, cx: &mut Context<'_>) -> Poll<RequestHandlerEvent> {
// advance tree tasks, trigger
todo!()
}
@@ -160,6 +202,12 @@ pub enum FromEngine<Req> {
DownloadedBlocks(Vec<SealedBlockWithSenders>),
}
impl<Req> From<FromOrchestrator> for FromEngine<Req> {
fn from(event: FromOrchestrator) -> Self {
Self::Event(event)
}
}
#[derive(Debug)]
pub enum RequestHandlerEvent {
Idle,

View File

@@ -7,8 +7,7 @@
//!
//! These modes are mutually exclusive and the node can only be in one mode at a time.
use reth_primitives::stage::PipelineTarget;
use reth_stages_api::{ControlFlow, PipelineError};
use reth_stages_api::{ControlFlow, PipelineError, PipelineTarget};
use std::task::{Context, Poll};
/// A handler for the pipeline.

View File

@@ -5,6 +5,7 @@ use reth_blockchain_tree::BlockBuffer;
use reth_blockchain_tree_api::{error::InsertBlockError, InsertPayloadOk};
use reth_engine_primitives::EngineTypes;
use reth_errors::ProviderResult;
use reth_payload_primitives::PayloadTypes;
use reth_payload_validator::ExecutionPayloadValidator;
use reth_primitives::{Address, Block, BlockNumber, SealedBlock, SealedBlockWithSenders, B256};
use reth_provider::BlockReader;
@@ -129,7 +130,7 @@ pub trait EngineApiTreeHandler: Send + Sync + Clone {
fn on_forkchoice_updated(
&self,
state: ForkchoiceState,
attrs: Option<<Self::Engine as EngineTypes>::PayloadAttributes>,
attrs: Option<<Self::Engine as PayloadTypes>::PayloadAttributes>,
) -> TreeOutcome<Result<OnForkChoiceUpdated, String>>;
}
@@ -155,7 +156,7 @@ impl<T> TreeOutcome<T> {
}
}
/// Events that can be emitted by the [EngineApiTreeHandler].
/// Events that can be emitted by the [`EngineApiTreeHandler`].
#[derive(Debug)]
pub enum TreeEvent {
/// Pipeline action is needed.
@@ -402,7 +403,7 @@ where
fn on_forkchoice_updated(
&self,
state: ForkchoiceState,
attrs: Option<<Self::Engine as EngineTypes>::PayloadAttributes>,
attrs: Option<<Self::Engine as PayloadTypes>::PayloadAttributes>,
) -> TreeOutcome<Result<OnForkChoiceUpdated, String>> {
todo!()
}