feat(test): rewrite test_engine_tree_live_sync_transition_eventually_canonical using e2e framework (#16972)

This commit is contained in:
Federico Gimenez
2025-06-23 10:37:13 +02:00
committed by GitHub
parent 9f710adee0
commit 2ba3d134a9
4 changed files with 183 additions and 203 deletions

View File

@@ -17,7 +17,9 @@ pub mod reorg;
pub use engine_api::{ExpectedPayloadStatus, SendNewPayload, SendNewPayloads};
pub use fork::{CreateFork, ForkBase, SetForkBase, SetForkBaseFromBlockInfo, ValidateFork};
pub use node_ops::{CaptureBlockOnNode, CompareNodeChainTips, SelectActiveNode, ValidateBlockTag};
pub use node_ops::{
CaptureBlockOnNode, CompareNodeChainTips, SelectActiveNode, ValidateBlockTag, WaitForSync,
};
pub use produce_blocks::{
AssertMineBlock, BroadcastLatestForkchoice, BroadcastNextNewPayload, CheckPayloadAccepted,
ExpectFcuStatus, GenerateNextPayload, GeneratePayloadAttributes, PickNextBlockProducer,

View File

@@ -6,6 +6,8 @@ use eyre::Result;
use futures_util::future::BoxFuture;
use reth_node_api::EngineTypes;
use reth_rpc_api::clients::EthApiClient;
use std::time::Duration;
use tokio::time::{sleep, timeout};
use tracing::debug;
/// Action to select which node should be active for subsequent single-node operations.
@@ -213,3 +215,116 @@ where
})
}
}
/// Action that waits for two nodes to sync and have the same chain tip.
#[derive(Debug)]
pub struct WaitForSync {
/// First node index
pub node_a: usize,
/// Second node index
pub node_b: usize,
/// Maximum time to wait for sync (default: 30 seconds)
pub timeout_secs: u64,
/// Polling interval (default: 1 second)
pub poll_interval_secs: u64,
}
impl WaitForSync {
/// Create a new `WaitForSync` action with default timeouts
pub const fn new(node_a: usize, node_b: usize) -> Self {
Self { node_a, node_b, timeout_secs: 30, poll_interval_secs: 1 }
}
/// Set custom timeout
pub const fn with_timeout(mut self, timeout_secs: u64) -> Self {
self.timeout_secs = timeout_secs;
self
}
/// Set custom poll interval
pub const fn with_poll_interval(mut self, poll_interval_secs: u64) -> Self {
self.poll_interval_secs = poll_interval_secs;
self
}
}
impl<Engine> Action<Engine> for WaitForSync
where
Engine: EngineTypes,
{
fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
Box::pin(async move {
if self.node_a >= env.node_count() || self.node_b >= env.node_count() {
return Err(eyre::eyre!("Node index out of bounds"));
}
let timeout_duration = Duration::from_secs(self.timeout_secs);
let poll_interval = Duration::from_secs(self.poll_interval_secs);
debug!(
"Waiting for nodes {} and {} to sync (timeout: {}s, poll interval: {}s)",
self.node_a, self.node_b, self.timeout_secs, self.poll_interval_secs
);
let sync_check = async {
loop {
let node_a_client = &env.node_clients[self.node_a];
let node_b_client = &env.node_clients[self.node_b];
// Get latest block from each node
let block_a =
EthApiClient::<Transaction, Block, Receipt, Header>::block_by_number(
&node_a_client.rpc,
alloy_eips::BlockNumberOrTag::Latest,
false,
)
.await?
.ok_or_else(|| {
eyre::eyre!("Failed to get latest block from node {}", self.node_a)
})?;
let block_b =
EthApiClient::<Transaction, Block, Receipt, Header>::block_by_number(
&node_b_client.rpc,
alloy_eips::BlockNumberOrTag::Latest,
false,
)
.await?
.ok_or_else(|| {
eyre::eyre!("Failed to get latest block from node {}", self.node_b)
})?;
debug!(
"Sync check: Node {} tip: {} (block {}), Node {} tip: {} (block {})",
self.node_a,
block_a.header.hash,
block_a.header.number,
self.node_b,
block_b.header.hash,
block_b.header.number
);
if block_a.header.hash == block_b.header.hash {
debug!(
"Nodes {} and {} successfully synced to block {} (hash: {})",
self.node_a, self.node_b, block_a.header.number, block_a.header.hash
);
return Ok(());
}
sleep(poll_interval).await;
}
};
match timeout(timeout_duration, sync_check).await {
Ok(result) => result,
Err(_) => Err(eyre::eyre!(
"Timeout waiting for nodes {} and {} to sync after {}s",
self.node_a,
self.node_b,
self.timeout_secs
)),
}
})
}
}

View File

@@ -7,7 +7,7 @@ use reth_e2e_test_utils::testsuite::{
actions::{
CaptureBlock, CompareNodeChainTips, CreateFork, ExpectFcuStatus, MakeCanonical,
ProduceBlocks, ProduceBlocksLocally, ProduceInvalidBlocks, ReorgTo, SelectActiveNode,
SendNewPayloads, UpdateBlockInfo, ValidateCanonicalTag,
SendNewPayloads, UpdateBlockInfo, ValidateCanonicalTag, WaitForSync,
},
setup::{NetworkSetup, Setup},
TestBuilder,
@@ -269,3 +269,66 @@ async fn test_engine_tree_fcu_extends_canon_chain_e2e() -> Result<()> {
Ok(())
}
/// Test that verifies live sync transition where a long chain eventually becomes canonical.
///
/// This test simulates a scenario where:
/// 1. Both nodes start with the same short base chain
/// 2. Node 0 builds a long chain locally (no broadcast, becomes its canonical tip)
/// 3. Node 1 still has only the short base chain as its canonical tip
/// 4. Node 1 receives FCU pointing to Node 0's long chain tip and must sync
/// 5. Both nodes end up with the same canonical chain through real P2P sync
#[tokio::test]
async fn test_engine_tree_live_sync_transition_eventually_canonical_e2e() -> Result<()> {
reth_tracing::init_test_tracing();
const MIN_BLOCKS_FOR_PIPELINE_RUN: u64 = 32; // EPOCH_SLOTS from alloy-eips
let test = TestBuilder::new()
.with_setup(
Setup::default()
.with_chain_spec(Arc::new(
ChainSpecBuilder::default()
.chain(MAINNET.chain)
.genesis(
serde_json::from_str(include_str!(
"../../../../e2e-test-utils/src/testsuite/assets/genesis.json"
))
.unwrap(),
)
.cancun_activated()
.build(),
))
.with_network(NetworkSetup::multi_node(2)) // Two connected nodes
.with_tree_config(
TreeConfig::default()
.with_legacy_state_root(false)
.with_has_enough_parallelism(true),
),
)
// Both nodes start with the same base chain (1 block)
.with_action(SelectActiveNode::new(0))
.with_action(ProduceBlocks::<EthEngineTypes>::new(1))
.with_action(MakeCanonical::new()) // Both nodes have the same base chain
.with_action(CaptureBlock::new("base_chain_tip"))
// Node 0: Build a much longer chain but don't broadcast it yet
.with_action(ProduceBlocksLocally::<EthEngineTypes>::new(MIN_BLOCKS_FOR_PIPELINE_RUN + 10))
.with_action(MakeCanonical::with_active_node()) // Only make it canonical on Node 0
.with_action(CaptureBlock::new("long_chain_tip"))
// Verify Node 0's canonical tip is the long chain tip
.with_action(ValidateCanonicalTag::new("long_chain_tip"))
// Verify Node 1's canonical tip is still the base chain tip
.with_action(SelectActiveNode::new(1))
.with_action(ValidateCanonicalTag::new("base_chain_tip"))
// Node 1: Send FCU pointing to Node 0's long chain tip
// This should trigger Node 1 to sync the missing blocks from Node 0
.with_action(ReorgTo::<EthEngineTypes>::new_from_tag("long_chain_tip"))
// Wait for Node 1 to sync with Node 0
.with_action(WaitForSync::new(0, 1).with_timeout(60))
// Verify both nodes end up with the same canonical chain
.with_action(CompareNodeChainTips::expect_same(0, 1));
test.run::<EthereumNode>().await?;
Ok(())
}

View File

@@ -90,7 +90,6 @@ struct TestHarness {
from_tree_rx: UnboundedReceiver<EngineApiEvent>,
blocks: Vec<ExecutedBlockWithTrieUpdates>,
action_rx: Receiver<PersistenceAction>,
evm_config: MockEvmConfig,
block_builder: TestBlockBuilder,
provider: MockEthProvider,
}
@@ -147,7 +146,7 @@ impl TestHarness {
// always assume enough parallelism for tests
TreeConfig::default().with_legacy_state_root(true).with_has_enough_parallelism(true),
EngineApiKind::Ethereum,
evm_config.clone(),
evm_config,
);
let block_builder = TestBlockBuilder::default().with_chain_spec((*chain_spec).clone());
@@ -157,7 +156,6 @@ impl TestHarness {
from_tree_rx,
blocks: vec![],
action_rx,
evm_config,
block_builder,
provider,
}
@@ -212,13 +210,6 @@ impl TestHarness {
self
}
fn extend_execution_outcome(
&self,
execution_outcomes: impl IntoIterator<Item = impl Into<ExecutionOutcome>>,
) {
self.evm_config.extend(execution_outcomes);
}
async fn fcu_to(&mut self, block_hash: B256, fcu_status: impl Into<ForkchoiceStatus>) {
let fcu_status = fcu_status.into();
@@ -276,40 +267,6 @@ impl TestHarness {
}
}
async fn check_canon_commit(&mut self, hash: B256) {
let event = self.from_tree_rx.recv().await.unwrap();
match event {
EngineApiEvent::BeaconConsensus(
BeaconConsensusEngineEvent::CanonicalChainCommitted(header, _),
) => {
assert_eq!(header.hash(), hash);
}
_ => panic!("Unexpected event: {event:#?}"),
}
}
async fn check_canon_chain_insertion(
&mut self,
chain: impl IntoIterator<Item = RecoveredBlock<reth_ethereum_primitives::Block>> + Clone,
) {
for block in chain.clone() {
self.check_canon_block_added(block.hash()).await;
}
}
async fn check_canon_block_added(&mut self, expected_hash: B256) {
let event = self.from_tree_rx.recv().await.unwrap();
match event {
EngineApiEvent::BeaconConsensus(BeaconConsensusEngineEvent::CanonicalBlockAdded(
executed,
_,
)) => {
assert_eq!(executed.recovered_block.hash(), expected_hash);
}
_ => panic!("Unexpected event: {event:#?}"),
}
}
fn persist_blocks(&self, blocks: Vec<RecoveredBlock<reth_ethereum_primitives::Block>>) {
let mut block_data: Vec<(B256, Block)> = Vec::with_capacity(blocks.len());
let mut headers_data: Vec<(B256, Header)> = Vec::with_capacity(blocks.len());
@@ -322,41 +279,6 @@ impl TestHarness {
self.provider.extend_blocks(block_data);
self.provider.extend_headers(headers_data);
}
fn setup_range_insertion_for_valid_chain(
&mut self,
chain: Vec<RecoveredBlock<reth_ethereum_primitives::Block>>,
) {
self.setup_range_insertion_for_chain(chain, None)
}
fn setup_range_insertion_for_chain(
&mut self,
chain: Vec<RecoveredBlock<reth_ethereum_primitives::Block>>,
invalid_index: Option<usize>,
) {
// setting up execution outcomes for the chain, the blocks will be
// executed starting from the oldest, so we need to reverse.
let mut chain_rev = chain;
chain_rev.reverse();
let mut execution_outcomes = Vec::with_capacity(chain_rev.len());
for (index, block) in chain_rev.iter().enumerate() {
let execution_outcome = self.block_builder.get_execution_outcome(block.clone());
let state_root = if invalid_index.is_some() && invalid_index.unwrap() == index {
B256::random()
} else {
block.state_root
};
self.tree.provider.add_state_root(state_root);
execution_outcomes.push(execution_outcome);
}
self.extend_execution_outcome(execution_outcomes);
}
fn check_canon_head(&self, head_hash: B256) {
assert_eq!(self.tree.state.tree_state.canonical_head().hash, head_hash);
}
}
#[test]
@@ -892,125 +814,3 @@ async fn test_engine_tree_live_sync_transition_required_blocks_requested() {
_ => panic!("Unexpected event: {event:#?}"),
}
}
#[tokio::test]
async fn test_engine_tree_live_sync_transition_eventually_canonical() {
reth_tracing::init_test_tracing();
let chain_spec = MAINNET.clone();
let mut test_harness = TestHarness::new(chain_spec.clone());
test_harness.tree.config = test_harness.tree.config.with_max_execute_block_batch_size(100);
// create base chain and setup test harness with it
let base_chain: Vec<_> = test_harness.block_builder.get_executed_blocks(0..1).collect();
test_harness = test_harness.with_blocks(base_chain.clone());
// fcu to the tip of base chain
test_harness
.fcu_to(base_chain.last().unwrap().recovered_block().hash(), ForkchoiceStatus::Valid)
.await;
// create main chain, extension of base chain, with enough blocks to
// trigger backfill sync
let main_chain = test_harness
.block_builder
.create_fork(base_chain[0].recovered_block(), MIN_BLOCKS_FOR_PIPELINE_RUN + 10);
let main_chain_last = main_chain.last().unwrap();
let main_chain_last_hash = main_chain_last.hash();
let main_chain_backfill_target = main_chain.get(MIN_BLOCKS_FOR_PIPELINE_RUN as usize).unwrap();
let main_chain_backfill_target_hash = main_chain_backfill_target.hash();
// fcu to the element of main chain that should trigger backfill sync
test_harness.send_fcu(main_chain_backfill_target_hash, ForkchoiceStatus::Syncing).await;
test_harness.check_fcu(main_chain_backfill_target_hash, ForkchoiceStatus::Syncing).await;
// check download request for target
let event = test_harness.from_tree_rx.recv().await.unwrap();
match event {
EngineApiEvent::Download(DownloadRequest::BlockSet(hash_set)) => {
assert_eq!(hash_set, HashSet::from_iter([main_chain_backfill_target_hash]));
}
_ => panic!("Unexpected event: {event:#?}"),
}
// send message to tell the engine the requested block was downloaded
test_harness
.tree
.on_engine_message(FromEngine::DownloadedBlocks(vec![main_chain_backfill_target.clone()]))
.unwrap();
// check that backfill is triggered
let event = test_harness.from_tree_rx.recv().await.unwrap();
match event {
EngineApiEvent::BackfillAction(BackfillAction::Start(
reth_stages::PipelineTarget::Sync(target_hash),
)) => {
assert_eq!(target_hash, main_chain_backfill_target_hash);
}
_ => panic!("Unexpected event: {event:#?}"),
}
// persist blocks of main chain, same as the backfill operation would do
let backfilled_chain: Vec<_> =
main_chain.clone().drain(0..(MIN_BLOCKS_FOR_PIPELINE_RUN + 1) as usize).collect();
test_harness.persist_blocks(backfilled_chain.clone());
test_harness.setup_range_insertion_for_valid_chain(backfilled_chain);
// send message to mark backfill finished
test_harness
.tree
.on_engine_message(FromEngine::Event(FromOrchestrator::BackfillSyncFinished(
ControlFlow::Continue { block_number: main_chain_backfill_target.number },
)))
.unwrap();
// send fcu to the tip of main
test_harness.fcu_to(main_chain_last_hash, ForkchoiceStatus::Syncing).await;
let event = test_harness.from_tree_rx.recv().await.unwrap();
match event {
EngineApiEvent::Download(DownloadRequest::BlockSet(target_hash)) => {
assert_eq!(target_hash, HashSet::from_iter([main_chain_last_hash]));
}
_ => panic!("Unexpected event: {event:#?}"),
}
// tell engine main chain tip downloaded
test_harness
.tree
.on_engine_message(FromEngine::DownloadedBlocks(vec![main_chain_last.clone()]))
.unwrap();
// check download range request
let event = test_harness.from_tree_rx.recv().await.unwrap();
match event {
EngineApiEvent::Download(DownloadRequest::BlockRange(initial_hash, total_blocks)) => {
assert_eq!(
total_blocks,
(main_chain.len() - MIN_BLOCKS_FOR_PIPELINE_RUN as usize - 2) as u64
);
assert_eq!(initial_hash, main_chain_last.parent_hash);
}
_ => panic!("Unexpected event: {event:#?}"),
}
let remaining: Vec<_> = main_chain
.clone()
.drain((MIN_BLOCKS_FOR_PIPELINE_RUN + 1) as usize..main_chain.len())
.collect();
test_harness.setup_range_insertion_for_valid_chain(remaining.clone());
// tell engine block range downloaded
test_harness.tree.on_engine_message(FromEngine::DownloadedBlocks(remaining.clone())).unwrap();
test_harness.check_canon_chain_insertion(remaining).await;
// check canonical chain committed event with the hash of the latest block
test_harness.check_canon_commit(main_chain_last_hash).await;
// new head is the tip of the main chain
test_harness.check_canon_head(main_chain_last_hash);
}