From 447e3cab325cbf1975e59f9c83dde0ea1862dbdd Mon Sep 17 00:00:00 2001 From: Federico Gimenez Date: Mon, 5 Aug 2024 18:50:24 +0200 Subject: [PATCH] test: live sync transition, tree requests required blocks (#10082) --- crates/engine/tree/src/tree/mod.rs | 256 +++++++++++++++++------------ 1 file changed, 148 insertions(+), 108 deletions(-) diff --git a/crates/engine/tree/src/tree/mod.rs b/crates/engine/tree/src/tree/mod.rs index 4d967cf509..0f71d9dd37 100644 --- a/crates/engine/tree/src/tree/mod.rs +++ b/crates/engine/tree/src/tree/mod.rs @@ -741,12 +741,10 @@ where else { return }; - if sync_target_state.finalized_block_hash.is_zero() { // no finalized block, can't check distance return } - // get the block number of the finalized block, if we have it let newest_finalized = self .state @@ -2078,6 +2076,92 @@ mod tests { self.tree.provider.add_state_root(block.state_root); self.tree.insert_block(block) } + + async fn fcu_to(&mut self, block_hash: B256, fcu_status: impl Into) { + let fcu_status = fcu_status.into(); + + self.send_fcu(block_hash, fcu_status).await; + + self.check_fcu(block_hash, fcu_status).await; + } + + async fn send_fcu(&mut self, block_hash: B256, fcu_status: impl Into) { + let fcu_state = self.fcu_state(block_hash); + + let (tx, rx) = oneshot::channel(); + self.tree.on_engine_message(FromEngine::Request( + BeaconEngineMessage::ForkchoiceUpdated { + state: fcu_state, + payload_attrs: None, + tx, + }, + )); + + let response = rx.await.unwrap().unwrap().await.unwrap(); + let fcu_status = fcu_status.into(); + match fcu_status { + ForkchoiceStatus::Valid => assert!(response.payload_status.is_valid()), + ForkchoiceStatus::Syncing => assert!(response.payload_status.is_syncing()), + ForkchoiceStatus::Invalid => assert!(response.payload_status.is_invalid()), + } + } + + async fn check_fcu(&mut self, block_hash: B256, fcu_status: impl Into) { + let fcu_state = self.fcu_state(block_hash); + + // check for ForkchoiceUpdated event + let event = self.from_tree_rx.recv().await.unwrap(); + match event { + EngineApiEvent::BeaconConsensus(BeaconConsensusEngineEvent::ForkchoiceUpdated( + state, + status, + )) => { + assert_eq!(state, fcu_state); + assert_eq!(status, fcu_status.into()); + } + _ => panic!("Unexpected event: {:#?}", event), + } + } + + const fn fcu_state(&self, block_hash: B256) -> ForkchoiceState { + ForkchoiceState { + head_block_hash: block_hash, + safe_block_hash: block_hash, + finalized_block_hash: block_hash, + } + } + + async fn insert_chain( + &mut self, + mut chain: impl Iterator + Clone, + ) { + for block in chain.clone() { + self.insert_block(block.clone()).unwrap(); + + // check for CanonicalBlockAdded events, we expect chain.len() blocks added + let event = self.from_tree_rx.recv().await.unwrap(); + match event { + EngineApiEvent::BeaconConsensus( + BeaconConsensusEngineEvent::CanonicalBlockAdded(block, _), + ) => { + assert!(chain.any(|b| b.hash() == block.hash())); + } + _ => panic!("Unexpected event: {:#?}", event), + } + } + } + + async fn check_canon_commit(&mut self, hash: B256) { + let event = self.from_tree_rx.recv().await.unwrap(); + match event { + EngineApiEvent::BeaconConsensus( + BeaconConsensusEngineEvent::CanonicalChainCommitted(header, _), + ) => { + assert_eq!(header.hash(), hash); + } + _ => panic!("Unexpected event: {:#?}", event), + } + } } #[tokio::test] @@ -2452,35 +2536,9 @@ mod tests { let missing_block = test_block_builder .generate_random_block(6, test_harness.blocks.last().unwrap().block().hash()); - let fcu_state = ForkchoiceState { - head_block_hash: missing_block.hash(), - safe_block_hash: B256::ZERO, - finalized_block_hash: B256::ZERO, - }; + test_harness.fcu_to(missing_block.hash(), PayloadStatusEnum::Syncing).await; - let (tx, rx) = oneshot::channel(); - test_harness.tree.on_engine_message(FromEngine::Request( - BeaconEngineMessage::ForkchoiceUpdated { state: fcu_state, payload_attrs: None, tx }, - )); - - let response = rx.await.unwrap().unwrap().await.unwrap(); - - assert!(response.payload_status.is_syncing()); - - // we receive first an EngineApiEvent::BeaconConsensus event mirroring the FCU. - let event = test_harness.from_tree_rx.recv().await.unwrap(); - match event { - EngineApiEvent::BeaconConsensus(BeaconConsensusEngineEvent::ForkchoiceUpdated( - state, - status, - )) => { - assert_eq!(state, fcu_state); - assert_eq!(status, PayloadStatusEnum::Syncing.into()); - } - _ => panic!("Unexpected event: {:#?}", event), - } - - // then we receive an EngineApiEvent::Download event to get the missing block. + // after FCU we receive an EngineApiEvent::Download event to get the missing block. let event = test_harness.from_tree_rx.recv().await.unwrap(); match event { EngineApiEvent::Download(DownloadRequest::BlockSet(actual_block_set)) => { @@ -2499,53 +2557,14 @@ mod tests { let base_chain: Vec<_> = test_harness.block_builder.get_executed_blocks(0..1).collect(); test_harness = test_harness.with_blocks(base_chain.clone()); - // create FCU for the tip of the base chain - let fcu_state = ForkchoiceState { - head_block_hash: base_chain.last().unwrap().block().hash(), - safe_block_hash: B256::ZERO, - finalized_block_hash: B256::ZERO, - }; - - let (tx, rx) = oneshot::channel(); - test_harness.tree.on_engine_message(FromEngine::Request( - BeaconEngineMessage::ForkchoiceUpdated { state: fcu_state, payload_attrs: None, tx }, - )); - - let response = rx.await.unwrap().unwrap().await.unwrap(); - assert!(response.payload_status.is_valid()); - - // check for ForkchoiceUpdated event - let event = test_harness.from_tree_rx.recv().await.unwrap(); - match event { - EngineApiEvent::BeaconConsensus(BeaconConsensusEngineEvent::ForkchoiceUpdated( - state, - status, - )) => { - assert_eq!(state, fcu_state); - assert_eq!(status, ForkchoiceStatus::Valid); - } - _ => panic!("Unexpected event: {:#?}", event), - } + test_harness + .fcu_to(base_chain.last().unwrap().block().hash(), ForkchoiceStatus::Valid) + .await; // extend main chain let main_chain = test_harness.block_builder.create_fork(base_chain[0].block(), 3); - for (index, block) in main_chain.iter().enumerate() { - test_harness.insert_block(block.clone()).unwrap(); - } - - // check for CanonicalBlockAdded events, we expect main_chain.len() blocks added - for index in 0..main_chain.len() { - let event = test_harness.from_tree_rx.recv().await.unwrap(); - match event { - EngineApiEvent::BeaconConsensus( - BeaconConsensusEngineEvent::CanonicalBlockAdded(block, _), - ) => { - assert!(main_chain.iter().any(|b| b.hash() == block.hash())); - } - _ => panic!("Unexpected event: {:#?}", event), - } - } + test_harness.insert_chain(main_chain.iter()).await; } #[tokio::test] @@ -2557,25 +2576,13 @@ mod tests { test_harness = test_harness.with_blocks(main_chain.clone()); let fork_chain = test_harness.block_builder.create_fork(main_chain[2].block(), 3); + // add fork blocks to the tree for (index, block) in fork_chain.iter().enumerate() { test_harness.insert_block(block.clone()).unwrap(); } - // create FCU for the tip of the fork - let fcu_state = ForkchoiceState { - head_block_hash: fork_chain.last().unwrap().hash(), - safe_block_hash: B256::ZERO, - finalized_block_hash: B256::ZERO, - }; - - let (tx, rx) = oneshot::channel(); - test_harness.tree.on_engine_message(FromEngine::Request( - BeaconEngineMessage::ForkchoiceUpdated { state: fcu_state, payload_attrs: None, tx }, - )); - - let response = rx.await.unwrap().unwrap().await.unwrap(); - assert!(response.payload_status.is_valid()); + test_harness.send_fcu(fork_chain.last().unwrap().hash(), ForkchoiceStatus::Valid).await; // check for ForkBlockAdded events, we expect fork_chain.len() blocks added for index in 0..fork_chain.len() { @@ -2591,28 +2598,9 @@ mod tests { } // check for CanonicalChainCommitted event - let event = test_harness.from_tree_rx.recv().await.unwrap(); - match event { - EngineApiEvent::BeaconConsensus( - BeaconConsensusEngineEvent::CanonicalChainCommitted(header, _), - ) => { - assert_eq!(header.hash(), fork_chain.last().unwrap().hash()); - } - _ => panic!("Unexpected event: {:#?}", event), - } + test_harness.check_canon_commit(fork_chain.last().unwrap().hash()).await; - // check for ForkchoiceUpdated event - let event = test_harness.from_tree_rx.recv().await.unwrap(); - match event { - EngineApiEvent::BeaconConsensus(BeaconConsensusEngineEvent::ForkchoiceUpdated( - state, - status, - )) => { - assert_eq!(state, fcu_state); - assert_eq!(status, ForkchoiceStatus::Valid); - } - _ => panic!("Unexpected event: {:#?}", event), - } + test_harness.check_fcu(fork_chain.last().unwrap().hash(), ForkchoiceStatus::Valid).await; // new head is the tip of the fork chain assert_eq!( @@ -2620,4 +2608,56 @@ mod tests { fork_chain.last().unwrap().hash() ); } + + #[tokio::test] + async fn test_engine_tree_live_sync_transition_required_blocks_requested() { + reth_tracing::init_test_tracing(); + + let chain_spec = MAINNET.clone(); + let mut test_harness = TestHarness::new(chain_spec.clone()); + + let base_chain: Vec<_> = test_harness.block_builder.get_executed_blocks(0..1).collect(); + test_harness = test_harness.with_blocks(base_chain.clone()); + + test_harness + .fcu_to(base_chain.last().unwrap().block().hash(), ForkchoiceStatus::Valid) + .await; + + // extend main chain but don't insert the blocks + let main_chain = test_harness.block_builder.create_fork(base_chain[0].block(), 10); + + let main_chain_last_hash = main_chain.last().unwrap().hash(); + test_harness.send_fcu(main_chain_last_hash, ForkchoiceStatus::Syncing).await; + + test_harness.check_fcu(main_chain_last_hash, ForkchoiceStatus::Syncing).await; + + // create event for backfill finished + let backfill_finished = FromOrchestrator::BackfillSyncFinished(ControlFlow::Continue { + block_number: MIN_BLOCKS_FOR_PIPELINE_RUN + 1, + }); + + test_harness.tree.on_engine_message(FromEngine::Event(backfill_finished)); + + let event = test_harness.from_tree_rx.recv().await.unwrap(); + match event { + EngineApiEvent::Download(DownloadRequest::BlockSet(hash_set)) => { + assert_eq!(hash_set, HashSet::from([main_chain_last_hash])); + } + _ => panic!("Unexpected event: {:#?}", event), + } + + test_harness.tree.on_engine_message(FromEngine::DownloadedBlocks(vec![main_chain + .last() + .unwrap() + .clone()])); + + let event = test_harness.from_tree_rx.recv().await.unwrap(); + match event { + EngineApiEvent::Download(DownloadRequest::BlockRange(initial_hash, total_blocks)) => { + assert_eq!(total_blocks, (main_chain.len() - 1) as u64); + assert_eq!(initial_hash, main_chain.last().unwrap().parent_hash); + } + _ => panic!("Unexpected event: {:#?}", event), + } + } }