test: live sync test to check make canonical (#10131)

This commit is contained in:
Federico Gimenez
2024-08-07 11:55:39 +02:00
committed by GitHub
parent 2ab17a4dec
commit e7214afe51

View File

@@ -1336,6 +1336,16 @@ where
match self.insert_block(child) {
Ok(res) => {
debug!(target: "engine", child =?child_num_hash, ?res, "connected buffered block");
if self.is_sync_target_head(child_num_hash.hash) &&
matches!(
res,
InsertPayloadOk::Inserted(BlockStatus::Valid(
BlockAttachment::Canonical
))
)
{
self.make_canonical(child_num_hash.hash);
}
}
Err(err) => {
debug!(target: "engine", ?err, "failed to connect buffered block to tree");
@@ -1973,6 +1983,7 @@ mod tests {
action_rx: Receiver<PersistenceAction>,
executor_provider: MockExecutorProvider,
block_builder: TestBlockBuilder,
provider: MockEthProvider,
}
impl TestHarness {
@@ -1997,7 +2008,7 @@ mod tests {
let payload_builder = PayloadBuilderHandle::new(to_payload_service);
let tree = EngineApiTreeHandler::new(
provider,
provider.clone(),
executor_provider.clone(),
consensus,
payload_validator,
@@ -2021,6 +2032,7 @@ mod tests {
action_rx,
executor_provider,
block_builder,
provider,
}
}
@@ -2056,7 +2068,17 @@ mod tests {
self.tree.canonical_in_memory_state =
CanonicalInMemoryState::new(state_by_hash, hash_by_number, pending, None);
self.blocks = blocks;
self.blocks = blocks.clone();
self.persist_blocks(
blocks
.into_iter()
.map(|b| SealedBlockWithSenders {
block: (*b.block).clone(),
senders: b.senders.to_vec(),
})
.collect(),
);
self
}
@@ -2138,7 +2160,7 @@ mod tests {
async fn insert_chain(
&mut self,
mut chain: impl Iterator<Item = &SealedBlockWithSenders> + Clone,
chain: impl IntoIterator<Item = SealedBlockWithSenders> + Clone,
) {
for block in chain.clone() {
self.insert_block(block.clone()).unwrap();
@@ -2149,7 +2171,7 @@ mod tests {
EngineApiEvent::BeaconConsensus(
BeaconConsensusEngineEvent::CanonicalBlockAdded(block, _),
) => {
assert!(chain.any(|b| b.hash() == block.hash()));
assert!(chain.clone().into_iter().any(|b| b.hash() == block.hash()));
}
_ => panic!("Unexpected event: {:#?}", event),
}
@@ -2167,6 +2189,49 @@ mod tests {
_ => panic!("Unexpected event: {:#?}", event),
}
}
async fn check_fork_chain_insertion(
&mut self,
chain: impl IntoIterator<Item = SealedBlockWithSenders> + Clone,
) {
for _ in chain.clone() {
let event = self.from_tree_rx.recv().await.unwrap();
match event {
EngineApiEvent::BeaconConsensus(
BeaconConsensusEngineEvent::ForkBlockAdded(block),
) => {
assert!(chain.clone().into_iter().any(|b| b.hash() == block.hash()));
}
_ => panic!("Unexpected event: {:#?}", event),
}
}
}
fn persist_blocks(&mut self, blocks: Vec<SealedBlockWithSenders>) {
self.setup_range_insertion_for_chain(blocks.clone());
let mut block_data: Vec<(B256, Block)> = Vec::with_capacity(blocks.len());
let mut headers_data: Vec<(B256, Header)> = Vec::with_capacity(blocks.len());
for block in &blocks {
let unsealed_block = block.clone().unseal();
block_data.push((block.hash(), unsealed_block.clone().block));
headers_data.push((block.hash(), unsealed_block.header.clone()));
}
self.provider.extend_blocks(block_data);
self.provider.extend_headers(headers_data);
}
fn setup_range_insertion_for_chain(&mut self, chain: Vec<SealedBlockWithSenders>) {
let mut execution_outcomes = Vec::new();
for block in &chain {
let execution_outcome = self.block_builder.get_execution_outcome(block.clone());
self.tree.provider.add_state_root(block.state_root);
execution_outcomes.push(execution_outcome);
}
self.extend_execution_outcome(execution_outcomes);
}
}
#[tokio::test]
@@ -2563,7 +2628,7 @@ mod tests {
// extend main chain
let main_chain = test_harness.block_builder.create_fork(base_chain[0].block(), 3);
test_harness.insert_chain(main_chain.iter()).await;
test_harness.insert_chain(main_chain).await;
}
#[tokio::test]
@@ -2584,17 +2649,7 @@ mod tests {
test_harness.send_fcu(fork_chain.last().unwrap().hash(), ForkchoiceStatus::Valid).await;
// check for ForkBlockAdded events, we expect fork_chain.len() blocks added
for _ in 0..fork_chain.len() {
let event = test_harness.from_tree_rx.recv().await.unwrap();
match event {
EngineApiEvent::BeaconConsensus(BeaconConsensusEngineEvent::ForkBlockAdded(
block,
)) => {
assert!(fork_chain.iter().any(|b| b.hash() == block.hash()));
}
_ => panic!("Unexpected event: {:#?}", event),
}
}
test_harness.check_fork_chain_insertion(fork_chain.clone()).await;
// check for CanonicalChainCommitted event
test_harness.check_canon_commit(fork_chain.last().unwrap().hash()).await;
@@ -2659,4 +2714,139 @@ mod tests {
_ => panic!("Unexpected event: {:#?}", event),
}
}
#[tokio::test]
async fn test_engine_tree_live_sync_transition_eventually_canonical() {
reth_tracing::init_test_tracing();
let chain_spec = MAINNET.clone();
let mut test_harness = TestHarness::new(chain_spec.clone());
// create base chain and setup test harness with it
let base_chain: Vec<_> = test_harness.block_builder.get_executed_blocks(0..1).collect();
test_harness = test_harness.with_blocks(base_chain.clone());
// fcu to the tip of base chain
test_harness
.fcu_to(base_chain.last().unwrap().block().hash(), ForkchoiceStatus::Valid)
.await;
// create main chain, extension of base chain, with enough blocks to
// trigger backfill sync
let main_chain = test_harness
.block_builder
.create_fork(base_chain[0].block(), MIN_BLOCKS_FOR_PIPELINE_RUN + 10);
let main_chain_last = main_chain.last().unwrap();
let main_chain_last_hash = main_chain_last.hash();
let main_chain_backfill_target =
main_chain.get(MIN_BLOCKS_FOR_PIPELINE_RUN as usize).unwrap();
let main_chain_backfill_target_hash = main_chain_backfill_target.hash();
// fcu to the element of main chain that should trigger backfill sync
test_harness.send_fcu(main_chain_backfill_target_hash, ForkchoiceStatus::Syncing).await;
test_harness.check_fcu(main_chain_backfill_target_hash, ForkchoiceStatus::Syncing).await;
// check download request for target
let event = test_harness.from_tree_rx.recv().await.unwrap();
match event {
EngineApiEvent::Download(DownloadRequest::BlockSet(hash_set)) => {
assert_eq!(hash_set, HashSet::from([main_chain_backfill_target_hash]));
}
_ => panic!("Unexpected event: {:#?}", event),
}
// send message to tell the engine the requested block was downloaded
test_harness.tree.on_engine_message(FromEngine::DownloadedBlocks(vec![
main_chain_backfill_target.clone(),
]));
// check that backfill is triggered
let event = test_harness.from_tree_rx.recv().await.unwrap();
match event {
EngineApiEvent::BackfillAction(BackfillAction::Start(
reth_stages::PipelineTarget::Sync(target_hash),
)) => {
assert_eq!(target_hash, main_chain_backfill_target_hash);
}
_ => panic!("Unexpected event: {:#?}", event),
}
// persist blocks of main chain, same as the backfill operation would do
let backfilled_chain: Vec<_> =
main_chain.clone().drain(0..(MIN_BLOCKS_FOR_PIPELINE_RUN + 1) as usize).collect();
test_harness.persist_blocks(backfilled_chain.clone());
// setting up execution outcomes for the chain, the blocks will be
// executed starting from the oldest, so we need to reverse.
let mut backfilled_chain_rev = backfilled_chain.clone();
backfilled_chain_rev.reverse();
test_harness.setup_range_insertion_for_chain(backfilled_chain_rev.to_vec());
// send message to mark backfill finished
test_harness.tree.on_engine_message(FromEngine::Event(
FromOrchestrator::BackfillSyncFinished(ControlFlow::Continue {
block_number: main_chain_backfill_target.number,
}),
));
// send fcu to the tip of main
test_harness.fcu_to(main_chain_last_hash, ForkchoiceStatus::Syncing).await;
let event = test_harness.from_tree_rx.recv().await.unwrap();
match event {
EngineApiEvent::Download(DownloadRequest::BlockSet(target_hash)) => {
assert_eq!(target_hash, HashSet::from([main_chain_last_hash]));
}
_ => panic!("Unexpected event: {:#?}", event),
}
// tell engine main chain tip downloaded
test_harness
.tree
.on_engine_message(FromEngine::DownloadedBlocks(vec![main_chain_last.clone()]));
// check download range request
let event = test_harness.from_tree_rx.recv().await.unwrap();
match event {
EngineApiEvent::Download(DownloadRequest::BlockRange(initial_hash, total_blocks)) => {
assert_eq!(
total_blocks,
(main_chain.len() - MIN_BLOCKS_FOR_PIPELINE_RUN as usize - 2) as u64
);
assert_eq!(initial_hash, main_chain_last.parent_hash);
}
_ => panic!("Unexpected event: {:#?}", event),
}
let remaining: Vec<_> = main_chain
.clone()
.drain((MIN_BLOCKS_FOR_PIPELINE_RUN + 1) as usize..main_chain.len())
.collect();
// setting up execution outcomes for the chain, the blocks will be
// executed starting from the oldest, so we need to reverse.
let mut remaining_rev = remaining.clone();
remaining_rev.reverse();
test_harness.setup_range_insertion_for_chain(remaining_rev.to_vec());
// tell engine block range downloaded
test_harness.tree.on_engine_message(FromEngine::DownloadedBlocks(remaining.clone()));
test_harness.check_fork_chain_insertion(remaining).await;
// check canonical chain committed event with the hash of the latest block
let event = test_harness.from_tree_rx.recv().await.unwrap();
match event {
EngineApiEvent::BeaconConsensus(
BeaconConsensusEngineEvent::CanonicalChainCommitted(header, ..),
) => {
assert_eq!(header.hash(), main_chain_last_hash);
}
_ => panic!("Unexpected event: {:#?}", event),
}
// new head is the tip of the main chain
assert_eq!(test_harness.tree.state.tree_state.canonical_head().hash, main_chain_last_hash);
}
}