Turn EthService into a Stream and log the events (#9622)

Co-authored-by: Miguel T <88039515+mvares@users.noreply.github.com>
This commit is contained in:
Jonathan Underwood
2024-07-19 04:15:15 +09:00
committed by GitHub
parent 820d3da2f6
commit 4a19161557
2 changed files with 12 additions and 17 deletions

View File

@@ -1,11 +1,11 @@
use futures::{ready, StreamExt};
use futures::{Stream, StreamExt};
use pin_project::pin_project;
use reth_beacon_consensus::{BeaconEngineMessage, EthBeaconConsensus};
use reth_chainspec::ChainSpec;
use reth_db_api::database::Database;
use reth_engine_tree::{
backfill::PipelineSync,
chain::ChainOrchestrator,
chain::{ChainEvent, ChainOrchestrator},
download::BasicBlockDownloader,
engine::{EngineApiEvent, EngineApiRequestHandler, EngineHandler, FromEngine},
};
@@ -14,7 +14,6 @@ use reth_network_p2p::{bodies::client::BodiesClient, headers::client::HeadersCli
use reth_stages_api::Pipeline;
use reth_tasks::TaskSpawner;
use std::{
future::Future,
pin::Pin,
sync::{mpsc::Sender, Arc},
task::{Context, Poll},
@@ -70,22 +69,16 @@ where
}
}
impl<DB, Client> Future for EthService<DB, Client>
impl<DB, Client> Stream for EthService<DB, Client>
where
DB: Database + 'static,
Client: HeadersClient + BodiesClient + Clone + Unpin + 'static,
{
type Output = Result<(), EthServiceError>;
type Item = ChainEvent<EngineApiEvent>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// Call poll on the inner orchestrator.
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut orchestrator = self.project().orchestrator;
loop {
match ready!(StreamExt::poll_next_unpin(&mut orchestrator, cx)) {
Some(_event) => continue,
None => return Poll::Ready(Ok(())),
}
}
StreamExt::poll_next_unpin(&mut orchestrator, cx)
}
}

View File

@@ -165,7 +165,7 @@ where
let (_from_tree_tx, from_tree_rx) = unbounded_channel();
// Configure the consensus engine
let eth_service = EthService::new(
let mut eth_service = EthService::new(
ctx.chain_spec(),
network_client.clone(),
// to tree
@@ -243,8 +243,10 @@ where
let (tx, rx) = oneshot::channel();
info!(target: "reth::cli", "Starting consensus engine");
ctx.task_executor().spawn_critical_blocking("consensus engine", async move {
let res = eth_service.await;
let _ = tx.send(res);
while let Some(event) = eth_service.next().await {
info!(target: "reth::cli", "Event: {event:?}");
}
let _ = tx.send(());
});
let full_node = FullNode {
@@ -265,7 +267,7 @@ where
let handle = NodeHandle {
node_exit_future: NodeExitFuture::new(
async { Ok(rx.await??) },
async { Ok(rx.await?) },
full_node.config.debug.terminate,
),
node: full_node,