test: live sync transition, tree requests required blocks (#10082)

This commit is contained in:
Federico Gimenez
2024-08-05 18:50:24 +02:00
committed by GitHub
parent 735738dfff
commit 447e3cab32

View File

@@ -741,12 +741,10 @@ where
else {
return
};
if sync_target_state.finalized_block_hash.is_zero() {
// no finalized block, can't check distance
return
}
// get the block number of the finalized block, if we have it
let newest_finalized = self
.state
@@ -2078,6 +2076,92 @@ mod tests {
self.tree.provider.add_state_root(block.state_root);
self.tree.insert_block(block)
}
async fn fcu_to(&mut self, block_hash: B256, fcu_status: impl Into<ForkchoiceStatus>) {
let fcu_status = fcu_status.into();
self.send_fcu(block_hash, fcu_status).await;
self.check_fcu(block_hash, fcu_status).await;
}
async fn send_fcu(&mut self, block_hash: B256, fcu_status: impl Into<ForkchoiceStatus>) {
let fcu_state = self.fcu_state(block_hash);
let (tx, rx) = oneshot::channel();
self.tree.on_engine_message(FromEngine::Request(
BeaconEngineMessage::ForkchoiceUpdated {
state: fcu_state,
payload_attrs: None,
tx,
},
));
let response = rx.await.unwrap().unwrap().await.unwrap();
let fcu_status = fcu_status.into();
match fcu_status {
ForkchoiceStatus::Valid => assert!(response.payload_status.is_valid()),
ForkchoiceStatus::Syncing => assert!(response.payload_status.is_syncing()),
ForkchoiceStatus::Invalid => assert!(response.payload_status.is_invalid()),
}
}
async fn check_fcu(&mut self, block_hash: B256, fcu_status: impl Into<ForkchoiceStatus>) {
let fcu_state = self.fcu_state(block_hash);
// check for ForkchoiceUpdated event
let event = self.from_tree_rx.recv().await.unwrap();
match event {
EngineApiEvent::BeaconConsensus(BeaconConsensusEngineEvent::ForkchoiceUpdated(
state,
status,
)) => {
assert_eq!(state, fcu_state);
assert_eq!(status, fcu_status.into());
}
_ => panic!("Unexpected event: {:#?}", event),
}
}
const fn fcu_state(&self, block_hash: B256) -> ForkchoiceState {
ForkchoiceState {
head_block_hash: block_hash,
safe_block_hash: block_hash,
finalized_block_hash: block_hash,
}
}
async fn insert_chain(
&mut self,
mut chain: impl Iterator<Item = &SealedBlockWithSenders> + Clone,
) {
for block in chain.clone() {
self.insert_block(block.clone()).unwrap();
// check for CanonicalBlockAdded events, we expect chain.len() blocks added
let event = self.from_tree_rx.recv().await.unwrap();
match event {
EngineApiEvent::BeaconConsensus(
BeaconConsensusEngineEvent::CanonicalBlockAdded(block, _),
) => {
assert!(chain.any(|b| b.hash() == block.hash()));
}
_ => panic!("Unexpected event: {:#?}", event),
}
}
}
async fn check_canon_commit(&mut self, hash: B256) {
let event = self.from_tree_rx.recv().await.unwrap();
match event {
EngineApiEvent::BeaconConsensus(
BeaconConsensusEngineEvent::CanonicalChainCommitted(header, _),
) => {
assert_eq!(header.hash(), hash);
}
_ => panic!("Unexpected event: {:#?}", event),
}
}
}
#[tokio::test]
@@ -2452,35 +2536,9 @@ mod tests {
let missing_block = test_block_builder
.generate_random_block(6, test_harness.blocks.last().unwrap().block().hash());
let fcu_state = ForkchoiceState {
head_block_hash: missing_block.hash(),
safe_block_hash: B256::ZERO,
finalized_block_hash: B256::ZERO,
};
test_harness.fcu_to(missing_block.hash(), PayloadStatusEnum::Syncing).await;
let (tx, rx) = oneshot::channel();
test_harness.tree.on_engine_message(FromEngine::Request(
BeaconEngineMessage::ForkchoiceUpdated { state: fcu_state, payload_attrs: None, tx },
));
let response = rx.await.unwrap().unwrap().await.unwrap();
assert!(response.payload_status.is_syncing());
// we receive first an EngineApiEvent::BeaconConsensus event mirroring the FCU.
let event = test_harness.from_tree_rx.recv().await.unwrap();
match event {
EngineApiEvent::BeaconConsensus(BeaconConsensusEngineEvent::ForkchoiceUpdated(
state,
status,
)) => {
assert_eq!(state, fcu_state);
assert_eq!(status, PayloadStatusEnum::Syncing.into());
}
_ => panic!("Unexpected event: {:#?}", event),
}
// then we receive an EngineApiEvent::Download event to get the missing block.
// after FCU we receive an EngineApiEvent::Download event to get the missing block.
let event = test_harness.from_tree_rx.recv().await.unwrap();
match event {
EngineApiEvent::Download(DownloadRequest::BlockSet(actual_block_set)) => {
@@ -2499,53 +2557,14 @@ mod tests {
let base_chain: Vec<_> = test_harness.block_builder.get_executed_blocks(0..1).collect();
test_harness = test_harness.with_blocks(base_chain.clone());
// create FCU for the tip of the base chain
let fcu_state = ForkchoiceState {
head_block_hash: base_chain.last().unwrap().block().hash(),
safe_block_hash: B256::ZERO,
finalized_block_hash: B256::ZERO,
};
let (tx, rx) = oneshot::channel();
test_harness.tree.on_engine_message(FromEngine::Request(
BeaconEngineMessage::ForkchoiceUpdated { state: fcu_state, payload_attrs: None, tx },
));
let response = rx.await.unwrap().unwrap().await.unwrap();
assert!(response.payload_status.is_valid());
// check for ForkchoiceUpdated event
let event = test_harness.from_tree_rx.recv().await.unwrap();
match event {
EngineApiEvent::BeaconConsensus(BeaconConsensusEngineEvent::ForkchoiceUpdated(
state,
status,
)) => {
assert_eq!(state, fcu_state);
assert_eq!(status, ForkchoiceStatus::Valid);
}
_ => panic!("Unexpected event: {:#?}", event),
}
test_harness
.fcu_to(base_chain.last().unwrap().block().hash(), ForkchoiceStatus::Valid)
.await;
// extend main chain
let main_chain = test_harness.block_builder.create_fork(base_chain[0].block(), 3);
for (index, block) in main_chain.iter().enumerate() {
test_harness.insert_block(block.clone()).unwrap();
}
// check for CanonicalBlockAdded events, we expect main_chain.len() blocks added
for index in 0..main_chain.len() {
let event = test_harness.from_tree_rx.recv().await.unwrap();
match event {
EngineApiEvent::BeaconConsensus(
BeaconConsensusEngineEvent::CanonicalBlockAdded(block, _),
) => {
assert!(main_chain.iter().any(|b| b.hash() == block.hash()));
}
_ => panic!("Unexpected event: {:#?}", event),
}
}
test_harness.insert_chain(main_chain.iter()).await;
}
#[tokio::test]
@@ -2557,25 +2576,13 @@ mod tests {
test_harness = test_harness.with_blocks(main_chain.clone());
let fork_chain = test_harness.block_builder.create_fork(main_chain[2].block(), 3);
// add fork blocks to the tree
for (index, block) in fork_chain.iter().enumerate() {
test_harness.insert_block(block.clone()).unwrap();
}
// create FCU for the tip of the fork
let fcu_state = ForkchoiceState {
head_block_hash: fork_chain.last().unwrap().hash(),
safe_block_hash: B256::ZERO,
finalized_block_hash: B256::ZERO,
};
let (tx, rx) = oneshot::channel();
test_harness.tree.on_engine_message(FromEngine::Request(
BeaconEngineMessage::ForkchoiceUpdated { state: fcu_state, payload_attrs: None, tx },
));
let response = rx.await.unwrap().unwrap().await.unwrap();
assert!(response.payload_status.is_valid());
test_harness.send_fcu(fork_chain.last().unwrap().hash(), ForkchoiceStatus::Valid).await;
// check for ForkBlockAdded events, we expect fork_chain.len() blocks added
for index in 0..fork_chain.len() {
@@ -2591,28 +2598,9 @@ mod tests {
}
// check for CanonicalChainCommitted event
let event = test_harness.from_tree_rx.recv().await.unwrap();
match event {
EngineApiEvent::BeaconConsensus(
BeaconConsensusEngineEvent::CanonicalChainCommitted(header, _),
) => {
assert_eq!(header.hash(), fork_chain.last().unwrap().hash());
}
_ => panic!("Unexpected event: {:#?}", event),
}
test_harness.check_canon_commit(fork_chain.last().unwrap().hash()).await;
// check for ForkchoiceUpdated event
let event = test_harness.from_tree_rx.recv().await.unwrap();
match event {
EngineApiEvent::BeaconConsensus(BeaconConsensusEngineEvent::ForkchoiceUpdated(
state,
status,
)) => {
assert_eq!(state, fcu_state);
assert_eq!(status, ForkchoiceStatus::Valid);
}
_ => panic!("Unexpected event: {:#?}", event),
}
test_harness.check_fcu(fork_chain.last().unwrap().hash(), ForkchoiceStatus::Valid).await;
// new head is the tip of the fork chain
assert_eq!(
@@ -2620,4 +2608,56 @@ mod tests {
fork_chain.last().unwrap().hash()
);
}
#[tokio::test]
async fn test_engine_tree_live_sync_transition_required_blocks_requested() {
reth_tracing::init_test_tracing();
let chain_spec = MAINNET.clone();
let mut test_harness = TestHarness::new(chain_spec.clone());
let base_chain: Vec<_> = test_harness.block_builder.get_executed_blocks(0..1).collect();
test_harness = test_harness.with_blocks(base_chain.clone());
test_harness
.fcu_to(base_chain.last().unwrap().block().hash(), ForkchoiceStatus::Valid)
.await;
// extend main chain but don't insert the blocks
let main_chain = test_harness.block_builder.create_fork(base_chain[0].block(), 10);
let main_chain_last_hash = main_chain.last().unwrap().hash();
test_harness.send_fcu(main_chain_last_hash, ForkchoiceStatus::Syncing).await;
test_harness.check_fcu(main_chain_last_hash, ForkchoiceStatus::Syncing).await;
// create event for backfill finished
let backfill_finished = FromOrchestrator::BackfillSyncFinished(ControlFlow::Continue {
block_number: MIN_BLOCKS_FOR_PIPELINE_RUN + 1,
});
test_harness.tree.on_engine_message(FromEngine::Event(backfill_finished));
let event = test_harness.from_tree_rx.recv().await.unwrap();
match event {
EngineApiEvent::Download(DownloadRequest::BlockSet(hash_set)) => {
assert_eq!(hash_set, HashSet::from([main_chain_last_hash]));
}
_ => panic!("Unexpected event: {:#?}", event),
}
test_harness.tree.on_engine_message(FromEngine::DownloadedBlocks(vec![main_chain
.last()
.unwrap()
.clone()]));
let event = test_harness.from_tree_rx.recv().await.unwrap();
match event {
EngineApiEvent::Download(DownloadRequest::BlockRange(initial_hash, total_blocks)) => {
assert_eq!(total_blocks, (main_chain.len() - 1) as u64);
assert_eq!(initial_hash, main_chain.last().unwrap().parent_hash);
}
_ => panic!("Unexpected event: {:#?}", event),
}
}
}