From 4a1916155752db84679315aa56175bf3f331adac Mon Sep 17 00:00:00 2001 From: Jonathan Underwood Date: Fri, 19 Jul 2024 04:15:15 +0900 Subject: [PATCH] Turn EthService into a Stream and log the events (#9622) Co-authored-by: Miguel T <88039515+mvares@users.noreply.github.com> --- crates/ethereum/engine/src/service.rs | 19 ++++++------------- crates/ethereum/node/src/launch.rs | 10 ++++++---- 2 files changed, 12 insertions(+), 17 deletions(-) diff --git a/crates/ethereum/engine/src/service.rs b/crates/ethereum/engine/src/service.rs index bb7e8b06bb..89fd821c36 100644 --- a/crates/ethereum/engine/src/service.rs +++ b/crates/ethereum/engine/src/service.rs @@ -1,11 +1,11 @@ -use futures::{ready, StreamExt}; +use futures::{Stream, StreamExt}; use pin_project::pin_project; use reth_beacon_consensus::{BeaconEngineMessage, EthBeaconConsensus}; use reth_chainspec::ChainSpec; use reth_db_api::database::Database; use reth_engine_tree::{ backfill::PipelineSync, - chain::ChainOrchestrator, + chain::{ChainEvent, ChainOrchestrator}, download::BasicBlockDownloader, engine::{EngineApiEvent, EngineApiRequestHandler, EngineHandler, FromEngine}, }; @@ -14,7 +14,6 @@ use reth_network_p2p::{bodies::client::BodiesClient, headers::client::HeadersCli use reth_stages_api::Pipeline; use reth_tasks::TaskSpawner; use std::{ - future::Future, pin::Pin, sync::{mpsc::Sender, Arc}, task::{Context, Poll}, @@ -70,22 +69,16 @@ where } } -impl Future for EthService +impl Stream for EthService where DB: Database + 'static, Client: HeadersClient + BodiesClient + Clone + Unpin + 'static, { - type Output = Result<(), EthServiceError>; + type Item = ChainEvent; - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - // Call poll on the inner orchestrator. + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let mut orchestrator = self.project().orchestrator; - loop { - match ready!(StreamExt::poll_next_unpin(&mut orchestrator, cx)) { - Some(_event) => continue, - None => return Poll::Ready(Ok(())), - } - } + StreamExt::poll_next_unpin(&mut orchestrator, cx) } } diff --git a/crates/ethereum/node/src/launch.rs b/crates/ethereum/node/src/launch.rs index 029e3960d9..5ddb757cf8 100644 --- a/crates/ethereum/node/src/launch.rs +++ b/crates/ethereum/node/src/launch.rs @@ -165,7 +165,7 @@ where let (_from_tree_tx, from_tree_rx) = unbounded_channel(); // Configure the consensus engine - let eth_service = EthService::new( + let mut eth_service = EthService::new( ctx.chain_spec(), network_client.clone(), // to tree @@ -243,8 +243,10 @@ where let (tx, rx) = oneshot::channel(); info!(target: "reth::cli", "Starting consensus engine"); ctx.task_executor().spawn_critical_blocking("consensus engine", async move { - let res = eth_service.await; - let _ = tx.send(res); + while let Some(event) = eth_service.next().await { + info!(target: "reth::cli", "Event: {event:?}"); + } + let _ = tx.send(()); }); let full_node = FullNode { @@ -265,7 +267,7 @@ where let handle = NodeHandle { node_exit_future: NodeExitFuture::new( - async { Ok(rx.await??) }, + async { Ok(rx.await?) }, full_node.config.debug.terminate, ), node: full_node,