From e87960ea8d581a6cb5084751a48cdea1d73161a9 Mon Sep 17 00:00:00 2001 From: Roman Krasiuk Date: Wed, 12 Apr 2023 09:42:01 +0300 Subject: [PATCH] fix(engine): pipeline run conditions (#2193) --- crates/consensus/beacon/src/engine/mod.rs | 67 ++++++++++++----------- 1 file changed, 34 insertions(+), 33 deletions(-) diff --git a/crates/consensus/beacon/src/engine/mod.rs b/crates/consensus/beacon/src/engine/mod.rs index bd89f261ea..d1e9e3f0c6 100644 --- a/crates/consensus/beacon/src/engine/mod.rs +++ b/crates/consensus/beacon/src/engine/mod.rs @@ -8,7 +8,7 @@ use reth_interfaces::{ Error, }; use reth_miner::PayloadStore; -use reth_primitives::{BlockHash, BlockNumber, Header, SealedBlock, H256}; +use reth_primitives::{BlockNumber, Header, SealedBlock, H256}; use reth_rpc_types::engine::{ EngineRpcError, ExecutionPayload, ExecutionPayloadEnvelope, ForkchoiceUpdated, PayloadAttributes, PayloadId, PayloadStatus, PayloadStatusEnum, @@ -319,11 +319,6 @@ where .then_some(H256::zero()); let status = match error { Error::Execution(ExecutorError::PendingBlockIsInFuture { .. }) => { - if let Some(state) = self.forkchoice_state { - if self.get_block_number(state.head_block_hash)?.is_none() { - self.require_pipeline_run(PipelineTarget::Head); - } - } PayloadStatusEnum::Syncing } error => PayloadStatusEnum::Invalid { validation_error: error.to_string() }, @@ -371,12 +366,23 @@ where /// If the finalized block is missing from the database, trigger the pipeline run. fn restore_tree_if_possible( &mut self, - finalized_hash: BlockHash, + state: ForkchoiceState, ) -> Result<(), reth_interfaces::Error> { - match self.get_block_number(finalized_hash)? { - Some(number) => self.blockchain_tree.restore_canonical_hashes(number)?, - None => self.require_pipeline_run(PipelineTarget::Head), + let needs_pipeline_run = match self.get_block_number(state.finalized_block_hash)? { + Some(number) => { + // Attempt to restore the tree. + self.blockchain_tree.restore_canonical_hashes(number)?; + + // After restoring the tree, check if the head block is missing. + self.db + .view(|tx| tx.get::(state.head_block_hash))?? + .is_none() + } + None => true, }; + if needs_pipeline_run { + self.require_pipeline_run(PipelineTarget::Head); + } Ok(()) } @@ -510,9 +516,7 @@ where }; // Update the state and hashes of the blockchain tree if possible - if let Err(error) = - this.restore_tree_if_possible(forkchoice_state.finalized_block_hash) - { + if let Err(error) = this.restore_tree_if_possible(forkchoice_state) { error!(target: "consensus::engine", ?error, "Error restoring blockchain tree"); return Poll::Ready(Err(error.into())) } @@ -917,7 +921,6 @@ mod tests { VecDeque::from([ Ok(ExecOutput { done: true, stage_progress: 0 }), Ok(ExecOutput { done: true, stage_progress: 0 }), - Ok(ExecOutput { done: true, stage_progress: 0 }), ]), Vec::default(), ); @@ -928,27 +931,25 @@ mod tests { let mut engine_rx = spawn_consensus_engine(consensus_engine); - let invalid_forkchoice_state = ForkchoiceState { - head_block_hash: H256::random(), + let next_head = random_block(2, Some(block1.hash), None, Some(0)); + let next_forkchoice_state = ForkchoiceState { + head_block_hash: next_head.hash, finalized_block_hash: block1.hash, ..Default::default() }; - let rx = env.send_forkchoice_updated(invalid_forkchoice_state); - let expected_result = ForkchoiceUpdated::from_status(PayloadStatusEnum::Syncing); - assert_matches!(rx.await, Ok(Ok(result)) => assert_eq!(result, expected_result)); + let invalid_rx = env.send_forkchoice_updated(next_forkchoice_state); - let rx = env.send_forkchoice_updated(invalid_forkchoice_state); - let expected_result = ForkchoiceUpdated::from_status(PayloadStatusEnum::Syncing); - assert_matches!(rx.await, Ok(Ok(result)) => assert_eq!(result, expected_result)); + // Insert next head immediately after sending forkchoice update + insert_blocks(env.db.as_ref(), [&next_head].into_iter()); - let rx_valid = env.send_forkchoice_updated(ForkchoiceState { - head_block_hash: H256::random(), - finalized_block_hash: block1.hash, - ..Default::default() - }); let expected_result = ForkchoiceUpdated::from_status(PayloadStatusEnum::Syncing); - assert_matches!(rx_valid.await, Ok(Ok(result)) => assert_eq!(result, expected_result)); + assert_matches!(invalid_rx.await, Ok(Ok(result)) => assert_eq!(result, expected_result)); + + let valid_rx = env.send_forkchoice_updated(next_forkchoice_state); + let expected_result = ForkchoiceUpdated::from_status(PayloadStatusEnum::Valid) + .with_latest_valid_hash(next_head.hash); + assert_matches!(valid_rx.await, Ok(Ok(result)) => assert_eq!(result, expected_result)); assert_matches!(engine_rx.try_recv(), Err(TryRecvError::Empty)); } @@ -972,7 +973,7 @@ mod tests { let block1 = random_block(1, Some(genesis.hash), None, Some(0)); insert_blocks(env.db.as_ref(), [&genesis, &block1].into_iter()); - let _engine = spawn_consensus_engine(consensus_engine); + let engine = spawn_consensus_engine(consensus_engine); let rx = env.send_forkchoice_updated(ForkchoiceState { head_block_hash: H256::random(), @@ -981,7 +982,7 @@ mod tests { }); let expected_result = ForkchoiceUpdated::from_status(PayloadStatusEnum::Syncing); assert_matches!(rx.await, Ok(Ok(result)) => assert_eq!(result, expected_result)); - drop(_engine); + drop(engine); } #[tokio::test] @@ -1007,10 +1008,10 @@ mod tests { insert_blocks(env.db.as_ref(), [&genesis, &block1].into_iter()); - let _engine = spawn_consensus_engine(consensus_engine); + let engine = spawn_consensus_engine(consensus_engine); let rx = env.send_forkchoice_updated(ForkchoiceState { - head_block_hash: H256::random(), + head_block_hash: block1.hash, finalized_block_hash: block1.hash, ..Default::default() }); @@ -1027,7 +1028,7 @@ mod tests { }) .with_latest_valid_hash(H256::zero()); assert_matches!(rx.await, Ok(Ok(result)) => assert_eq!(result, expected_result)); - drop(_engine); + drop(engine); } }