diff --git a/crates/consensus/auto-seal/src/task.rs b/crates/consensus/auto-seal/src/task.rs index 06f637d389..5217661c4c 100644 --- a/crates/consensus/auto-seal/src/task.rs +++ b/crates/consensus/auto-seal/src/task.rs @@ -1,12 +1,10 @@ use crate::{mode::MiningMode, Storage}; -use futures_util::{future::BoxFuture, FutureExt, StreamExt}; -use reth_beacon_consensus::BeaconEngineMessage; +use futures_util::{future::BoxFuture, FutureExt}; +use reth_beacon_consensus::{BeaconEngineMessage, ForkchoiceStatus}; use reth_interfaces::consensus::ForkchoiceState; use reth_primitives::{ constants::{EMPTY_RECEIPTS, EMPTY_TRANSACTIONS, ETHEREUM_BLOCK_GAS_LIMIT}, - proofs, - stage::StageId, - Block, BlockBody, ChainSpec, Header, IntoRecoveredTransaction, ReceiptWithBloom, + proofs, Block, BlockBody, ChainSpec, Header, IntoRecoveredTransaction, ReceiptWithBloom, SealedBlockWithSenders, EMPTY_OMMER_ROOT, U256, }; use reth_provider::{CanonChainTracker, CanonStateNotificationSender, Chain, StateProviderFactory}; @@ -26,7 +24,7 @@ use std::{ }; use tokio::sync::{mpsc::UnboundedSender, oneshot}; use tokio_stream::wrappers::UnboundedReceiverStream; -use tracing::{debug, trace, warn}; +use tracing::{debug, error, trace, warn}; /// A Future that listens for new ready transactions and puts new blocks into storage pub struct MiningTask { @@ -117,7 +115,7 @@ where let client = this.client.clone(); let chain_spec = Arc::clone(&this.chain_spec); let pool = this.pool.clone(); - let mut events = this.pipe_line_events.take(); + let events = this.pipe_line_events.take(); let canon_state_notification = this.canon_state_notification.clone(); // Create the mining future that creates a block, notifies the engine that drives @@ -226,29 +224,38 @@ where }; drop(storage); - // send the new update to the engine, this will trigger the pipeline to - // download the block, execute it and store it in the database. - let (tx, _rx) = oneshot::channel(); - let _ = to_engine.send(BeaconEngineMessage::ForkchoiceUpdated { - state, - payload_attrs: None, - tx, - }); - debug!(target: "consensus::auto", ?state, "sent fork choice update"); + // TODO: make this a future + // await the fcu call rx for SYNCING, then wait for a VALID response + loop { + // send the new update to the engine, this will trigger the engine + // to download and execute the block we just inserted + let (tx, rx) = oneshot::channel(); + let _ = to_engine.send(BeaconEngineMessage::ForkchoiceUpdated { + state, + payload_attrs: None, + tx, + }); + debug!(target: "consensus::auto", ?state, "Sent fork choice update"); - // wait for the pipeline to finish - if let Some(events) = events.as_mut() { - debug!(target: "consensus::auto", "waiting for finish stage event..."); - // wait for the finish stage to - loop { - if let Some(PipelineEvent::Running { stage_id, .. }) = - events.next().await - { - if stage_id == StageId::Finish { - debug!(target: "consensus::auto", "received finish stage event"); - break + match rx.await.unwrap() { + Ok(fcu_response) => { + match fcu_response.forkchoice_status() { + ForkchoiceStatus::Valid => break, + ForkchoiceStatus::Invalid => { + error!(target: "consensus::auto", ?fcu_response, "Forkchoice update returned invalid response"); + return None + } + ForkchoiceStatus::Syncing => { + debug!(target: "consensus::auto", ?fcu_response, "Forkchoice update returned SYNCING, waiting for VALID"); + // wait for the next fork choice update + continue + } } } + Err(err) => { + error!(target: "consensus::auto", ?err, "Autoseal fork choice update failed"); + return None + } } } diff --git a/crates/consensus/beacon/src/engine/message.rs b/crates/consensus/beacon/src/engine/message.rs index 7223cb621f..2e5e542aab 100644 --- a/crates/consensus/beacon/src/engine/message.rs +++ b/crates/consensus/beacon/src/engine/message.rs @@ -40,7 +40,7 @@ impl OnForkChoiceUpdated { } /// Returns the determined status of the received ForkchoiceState. - pub(crate) fn forkchoice_status(&self) -> ForkchoiceStatus { + pub fn forkchoice_status(&self) -> ForkchoiceStatus { self.forkchoice_status } diff --git a/crates/consensus/beacon/src/engine/mod.rs b/crates/consensus/beacon/src/engine/mod.rs index 9403597278..f1f38cb0fb 100644 --- a/crates/consensus/beacon/src/engine/mod.rs +++ b/crates/consensus/beacon/src/engine/mod.rs @@ -67,6 +67,7 @@ mod handle; pub use handle::BeaconConsensusEngineHandle; mod forkchoice; +pub use forkchoice::ForkchoiceStatus; mod metrics; pub(crate) mod prune; pub(crate) mod sync;