feat: implement EngineApiRequestHandler::poll (#9670)

Co-authored-by: Federico Gimenez <fgimenez@users.noreply.github.com>
This commit is contained in:
Matthias Seitz
2024-07-20 12:44:24 +02:00
committed by GitHub
parent 1c131219c1
commit c8e6e379d9
3 changed files with 27 additions and 11 deletions

View File

@@ -113,9 +113,9 @@ where
match this.handler.poll(cx) {
Poll::Ready(handler_event) => {
match handler_event {
HandlerEvent::BackfillSync(target) => {
// trigger backfill sync and start polling it
this.backfill_sync.on_action(BackfillAction::Start(target));
HandlerEvent::BackfillSync(action) => {
// forward action to backfill_sync
this.backfill_sync.on_action(action);
continue 'outer
}
HandlerEvent::Event(ev) => {
@@ -187,8 +187,8 @@ pub trait ChainHandler: Send + Sync {
/// Events/Requests that the [`ChainHandler`] can emit to the [`ChainOrchestrator`].
#[derive(Clone, Debug)]
pub enum HandlerEvent<T> {
/// Request to start a backfill sync
BackfillSync(PipelineTarget),
/// Request an action to backfill sync
BackfillSync(BackfillAction),
/// Other event emitted by the handler
Event(T),
}

View File

@@ -12,7 +12,7 @@ use reth_primitives::{SealedBlockWithSenders, B256};
use std::{
collections::HashSet,
sync::mpsc::Sender,
task::{Context, Poll},
task::{ready, Context, Poll},
};
use tokio::sync::mpsc::UnboundedReceiver;
@@ -177,7 +177,7 @@ impl<T> EngineRequestHandler for EngineApiRequestHandler<T>
where
T: EngineTypes,
{
type Event = EngineApiEvent;
type Event = BeaconConsensusEngineEvent;
type Request = BeaconEngineMessage<T>;
fn on_event(&mut self, event: FromEngine<Self::Request>) {
@@ -186,7 +186,23 @@ where
}
fn poll(&mut self, cx: &mut Context<'_>) -> Poll<RequestHandlerEvent<Self::Event>> {
todo!("poll tree")
let Some(ev) = ready!(self.from_tree.poll_recv(cx)) else { return Poll::Pending };
let ev = match ev {
EngineApiEvent::BeaconConsensus(ev) => {
RequestHandlerEvent::HandlerEvent(HandlerEvent::Event(ev))
}
EngineApiEvent::FromTree(ev) => match ev {
TreeEvent::BackfillAction(target) => {
RequestHandlerEvent::HandlerEvent(HandlerEvent::BackfillSync(target))
}
TreeEvent::Download(download) => RequestHandlerEvent::Download(download),
TreeEvent::TreeAction(ev) => {
// TODO revise this
return Poll::Pending
}
},
};
Poll::Ready(ev)
}
}

View File

@@ -1,13 +1,13 @@
use futures::{Stream, StreamExt};
use pin_project::pin_project;
use reth_beacon_consensus::{BeaconEngineMessage, EthBeaconConsensus};
use reth_beacon_consensus::{BeaconConsensusEngineEvent, BeaconEngineMessage, EthBeaconConsensus};
use reth_chainspec::ChainSpec;
use reth_db_api::database::Database;
use reth_engine_tree::{
backfill::PipelineSync,
chain::{ChainEvent, ChainOrchestrator},
download::BasicBlockDownloader,
engine::{EngineApiEvent, EngineApiRequestHandler, EngineHandler},
engine::{EngineApiRequestHandler, EngineHandler},
persistence::PersistenceHandle,
tree::EngineApiTreeHandlerImpl,
};
@@ -96,7 +96,7 @@ where
DB: Database + 'static,
Client: HeadersClient + BodiesClient + Clone + Unpin + 'static,
{
type Item = ChainEvent<EngineApiEvent>;
type Item = ChainEvent<BeaconConsensusEngineEvent>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut orchestrator = self.project().orchestrator;