test: multi-node support in e2e testsuite (#16725)

This commit is contained in:
Federico Gimenez
2025-06-09 10:14:25 +02:00
committed by GitHub
parent e1a5ecd3bf
commit 4760b3286e
8 changed files with 562 additions and 142 deletions

View File

@@ -61,8 +61,8 @@ where
// resolve the fork base and execute the appropriate sequence
match &self.fork_base {
ForkBase::Number(block_number) => {
// store the fork base for later validation
env.current_fork_base = Some(*block_number);
// store the fork base for later validation on the active node
env.active_node_state_mut()?.current_fork_base = Some(*block_number);
let mut sequence = Sequence::new(vec![
Box::new(SetForkBase::new(*block_number)),
@@ -71,13 +71,13 @@ where
sequence.execute(env).await
}
ForkBase::Tag(tag) => {
let block_info =
let (block_info, _node_idx) =
env.block_registry.get(tag).copied().ok_or_else(|| {
eyre::eyre!("Block tag '{}' not found in registry", tag)
})?;
// store the fork base for later validation
env.current_fork_base = Some(block_info.number);
// store the fork base for later validation on the active node
env.active_node_state_mut()?.current_fork_base = Some(block_info.number);
let mut sequence = Sequence::new(vec![
Box::new(SetForkBaseFromBlockInfo::new(block_info)),
@@ -139,17 +139,18 @@ where
.await?
.ok_or_else(|| eyre::eyre!("Fork base block {} not found", self.fork_base_block))?;
// update environment to point to the fork base block
env.current_block_info = Some(BlockInfo {
// update active node state to point to the fork base block
let active_node_state = env.active_node_state_mut()?;
active_node_state.current_block_info = Some(BlockInfo {
hash: fork_base_block.header.hash,
number: fork_base_block.header.number,
timestamp: fork_base_block.header.timestamp,
});
env.latest_header_time = fork_base_block.header.timestamp;
active_node_state.latest_header_time = fork_base_block.header.timestamp;
// update fork choice state to the fork base
env.latest_fork_choice_state = ForkchoiceState {
active_node_state.latest_fork_choice_state = ForkchoiceState {
head_block_hash: fork_base_block.header.hash,
safe_block_hash: fork_base_block.header.hash,
finalized_block_hash: fork_base_block.header.hash,
@@ -178,12 +179,13 @@ where
block_info.number, block_info.hash
);
// update environment to point to the fork base block
env.current_block_info = Some(block_info);
env.latest_header_time = block_info.timestamp;
// update active node state to point to the fork base block
let active_node_state = env.active_node_state_mut()?;
active_node_state.current_block_info = Some(block_info);
active_node_state.latest_header_time = block_info.timestamp;
// update fork choice state to the fork base
env.latest_fork_choice_state = ForkchoiceState {
active_node_state.latest_fork_choice_state = ForkchoiceState {
head_block_hash: block_info.hash,
safe_block_hash: block_info.hash,
finalized_block_hash: block_info.hash,
@@ -217,8 +219,7 @@ where
fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
Box::pin(async move {
let current_block_info = env
.current_block_info
.as_ref()
.current_block_info()
.ok_or_else(|| eyre::eyre!("No current block information available"))?;
// verify that the current tip is at or ahead of the fork base
@@ -232,7 +233,8 @@ where
// get the fork base hash from the environment's fork choice state
// we assume the fork choice state was set correctly by SetForkBase
let fork_base_hash = env.latest_fork_choice_state.finalized_block_hash;
let fork_base_hash =
env.active_node_state()?.latest_fork_choice_state.finalized_block_hash;
// trace back from current tip to verify it's a descendant of the fork base
let rpc_client = &env.node_clients[0].rpc;

View File

@@ -9,10 +9,12 @@ use std::future::Future;
use tracing::debug;
pub mod fork;
pub mod node_ops;
pub mod produce_blocks;
pub mod reorg;
pub use fork::{CreateFork, ForkBase, SetForkBase, SetForkBaseFromBlockInfo, ValidateFork};
pub use node_ops::{CaptureBlockOnNode, CompareNodeChainTips, SelectActiveNode, ValidateBlockTag};
pub use produce_blocks::{
AssertMineBlock, BroadcastLatestForkchoice, BroadcastNextNewPayload, CheckPayloadAccepted,
ExpectFcuStatus, GenerateNextPayload, GeneratePayloadAttributes, PickNextBlockProducer,
@@ -124,11 +126,13 @@ where
];
// if we're on a fork, validate it now that it's canonical
if let Some(fork_base) = env.current_fork_base {
debug!("MakeCanonical: Adding fork validation from base block {}", fork_base);
actions.push(Box::new(ValidateFork::new(fork_base)));
// clear the fork base since we're now canonical
env.current_fork_base = None;
if let Ok(active_state) = env.active_node_state() {
if let Some(fork_base) = active_state.current_fork_base {
debug!("MakeCanonical: Adding fork validation from base block {}", fork_base);
actions.push(Box::new(ValidateFork::new(fork_base)));
// clear the fork base since we're now canonical
env.active_node_state_mut()?.current_fork_base = None;
}
}
let mut sequence = Sequence::new(actions);
@@ -158,15 +162,14 @@ where
fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
Box::pin(async move {
let current_block = env
.current_block_info
.as_ref()
.current_block_info()
.ok_or_else(|| eyre::eyre!("No current block information available"))?;
env.block_registry.insert(self.tag.clone(), *current_block);
env.block_registry.insert(self.tag.clone(), (current_block, env.active_node_idx));
debug!(
"Captured block {} (hash: {}) with tag '{}'",
current_block.number, current_block.hash, self.tag
"Captured block {} (hash: {}) from active node {} with tag '{}'",
current_block.number, current_block.hash, env.active_node_idx, self.tag
);
Ok(())

View File

@@ -0,0 +1,215 @@
//! Node-specific operations for multi-node testing.
use crate::testsuite::{Action, Environment};
use alloy_rpc_types_eth::{Block, Header, Receipt, Transaction};
use eyre::Result;
use futures_util::future::BoxFuture;
use reth_node_api::EngineTypes;
use reth_rpc_api::clients::EthApiClient;
use tracing::debug;
/// Action to select which node should be active for subsequent single-node operations.
#[derive(Debug)]
pub struct SelectActiveNode {
/// Node index to set as active
pub node_idx: usize,
}
impl SelectActiveNode {
/// Create a new `SelectActiveNode` action
pub const fn new(node_idx: usize) -> Self {
Self { node_idx }
}
}
impl<Engine> Action<Engine> for SelectActiveNode
where
Engine: EngineTypes,
{
fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
Box::pin(async move {
env.set_active_node(self.node_idx)?;
debug!("Set active node to {}", self.node_idx);
Ok(())
})
}
}
/// Action to compare chain tips between two nodes.
#[derive(Debug)]
pub struct CompareNodeChainTips {
/// First node index
pub node_a: usize,
/// Second node index
pub node_b: usize,
/// Whether tips should be the same or different
pub should_be_equal: bool,
}
impl CompareNodeChainTips {
/// Create a new action expecting nodes to have the same chain tip
pub const fn expect_same(node_a: usize, node_b: usize) -> Self {
Self { node_a, node_b, should_be_equal: true }
}
/// Create a new action expecting nodes to have different chain tips
pub const fn expect_different(node_a: usize, node_b: usize) -> Self {
Self { node_a, node_b, should_be_equal: false }
}
}
impl<Engine> Action<Engine> for CompareNodeChainTips
where
Engine: EngineTypes,
{
fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
Box::pin(async move {
if self.node_a >= env.node_count() || self.node_b >= env.node_count() {
return Err(eyre::eyre!("Node index out of bounds"));
}
let node_a_client = &env.node_clients[self.node_a];
let node_b_client = &env.node_clients[self.node_b];
// Get latest block from each node
let block_a = EthApiClient::<Transaction, Block, Receipt, Header>::block_by_number(
&node_a_client.rpc,
alloy_eips::BlockNumberOrTag::Latest,
false,
)
.await?
.ok_or_else(|| eyre::eyre!("Failed to get latest block from node {}", self.node_a))?;
let block_b = EthApiClient::<Transaction, Block, Receipt, Header>::block_by_number(
&node_b_client.rpc,
alloy_eips::BlockNumberOrTag::Latest,
false,
)
.await?
.ok_or_else(|| eyre::eyre!("Failed to get latest block from node {}", self.node_b))?;
let tips_equal = block_a.header.hash == block_b.header.hash;
debug!(
"Node {} chain tip: {} (block {}), Node {} chain tip: {} (block {})",
self.node_a,
block_a.header.hash,
block_a.header.number,
self.node_b,
block_b.header.hash,
block_b.header.number
);
if self.should_be_equal && !tips_equal {
return Err(eyre::eyre!(
"Expected nodes {} and {} to have the same chain tip, but node {} has {} and node {} has {}",
self.node_a, self.node_b, self.node_a, block_a.header.hash, self.node_b, block_b.header.hash
));
}
if !self.should_be_equal && tips_equal {
return Err(eyre::eyre!(
"Expected nodes {} and {} to have different chain tips, but both have {}",
self.node_a,
self.node_b,
block_a.header.hash
));
}
Ok(())
})
}
}
/// Action to capture a block with a tag, associating it with a specific node.
#[derive(Debug)]
pub struct CaptureBlockOnNode {
/// Tag name to associate with the block
pub tag: String,
/// Node index to capture the block from
pub node_idx: usize,
}
impl CaptureBlockOnNode {
/// Create a new `CaptureBlockOnNode` action
pub fn new(tag: impl Into<String>, node_idx: usize) -> Self {
Self { tag: tag.into(), node_idx }
}
}
impl<Engine> Action<Engine> for CaptureBlockOnNode
where
Engine: EngineTypes,
{
fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
Box::pin(async move {
let node_state = env.node_state(self.node_idx)?;
let current_block = node_state.current_block_info.ok_or_else(|| {
eyre::eyre!("No current block information available for node {}", self.node_idx)
})?;
env.block_registry.insert(self.tag.clone(), (current_block, self.node_idx));
debug!(
"Captured block {} (hash: {}) from node {} with tag '{}'",
current_block.number, current_block.hash, self.node_idx, self.tag
);
Ok(())
})
}
}
/// Action to get a block by tag and verify which node it came from.
#[derive(Debug)]
pub struct ValidateBlockTag {
/// Tag to look up
pub tag: String,
/// Expected node index (optional)
pub expected_node_idx: Option<usize>,
}
impl ValidateBlockTag {
/// Create a new action to validate a block tag exists
pub fn exists(tag: impl Into<String>) -> Self {
Self { tag: tag.into(), expected_node_idx: None }
}
/// Create a new action to validate a block tag came from a specific node
pub fn from_node(tag: impl Into<String>, node_idx: usize) -> Self {
Self { tag: tag.into(), expected_node_idx: Some(node_idx) }
}
}
impl<Engine> Action<Engine> for ValidateBlockTag
where
Engine: EngineTypes,
{
fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
Box::pin(async move {
let (block_info, node_idx) = env
.block_registry
.get(&self.tag)
.copied()
.ok_or_else(|| eyre::eyre!("Block tag '{}' not found in registry", self.tag))?;
if let Some(expected_node) = self.expected_node_idx {
if node_idx != expected_node {
return Err(eyre::eyre!(
"Block tag '{}' came from node {} but expected node {}",
self.tag,
node_idx,
expected_node
));
}
}
debug!(
"Validated block tag '{}': block {} (hash: {}) from node {}",
self.tag, block_info.number, block_info.hash, node_idx
);
Ok(())
})
}
}

View File

@@ -146,8 +146,7 @@ where
}
let latest_info = env
.current_block_info
.as_ref()
.current_block_info()
.ok_or_else(|| eyre::eyre!("No latest block information available"))?;
// simple round-robin selection based on next block number
@@ -177,11 +176,11 @@ where
fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
Box::pin(async move {
let latest_block = env
.current_block_info
.as_ref()
.current_block_info()
.ok_or_else(|| eyre::eyre!("No latest block information available"))?;
let block_number = latest_block.number;
let timestamp = env.latest_header_time + env.block_timestamp_increment;
let timestamp =
env.active_node_state()?.latest_header_time + env.block_timestamp_increment;
let payload_attributes = PayloadAttributes {
timestamp,
prev_randao: B256::random(),
@@ -190,7 +189,9 @@ where
parent_beacon_block_root: Some(B256::ZERO),
};
env.payload_attributes.insert(latest_block.number + 1, payload_attributes);
env.active_node_state_mut()?
.payload_attributes
.insert(latest_block.number + 1, payload_attributes);
debug!("Stored payload attributes for block {}", block_number + 1);
Ok(())
})
@@ -209,8 +210,7 @@ where
fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
Box::pin(async move {
let latest_block = env
.current_block_info
.as_ref()
.current_block_info()
.ok_or_else(|| eyre::eyre!("No latest block information available"))?;
let parent_hash = latest_block.hash;
@@ -223,6 +223,7 @@ where
};
let payload_attributes = env
.active_node_state()?
.payload_attributes
.get(&(latest_block.number + 1))
.cloned()
@@ -250,7 +251,8 @@ where
debug!("No payload ID returned, generating fresh payload attributes for forking");
let fresh_payload_attributes = PayloadAttributes {
timestamp: env.latest_header_time + env.block_timestamp_increment,
timestamp: env.active_node_state()?.latest_header_time +
env.block_timestamp_increment,
prev_randao: B256::random(),
suggested_fee_recipient: alloy_primitives::Address::random(),
withdrawals: Some(vec![]),
@@ -277,7 +279,7 @@ where
}
};
env.next_payload_id = Some(payload_id);
env.active_node_state_mut()?.next_payload_id = Some(payload_id);
sleep(Duration::from_secs(1)).await;
@@ -289,9 +291,11 @@ where
// Store the payload attributes that were used to generate this payload
let built_payload = payload_attributes.clone();
env.payload_id_history.insert(latest_block.number + 1, payload_id);
env.latest_payload_built = Some(built_payload);
env.latest_payload_envelope = Some(built_payload_envelope);
env.active_node_state_mut()?
.payload_id_history
.insert(latest_block.number + 1, payload_id);
env.active_node_state_mut()?.latest_payload_built = Some(built_payload);
env.active_node_state_mut()?.latest_payload_envelope = Some(built_payload_envelope);
Ok(())
})
@@ -315,7 +319,9 @@ where
}
// use the hash of the newly executed payload if available
let head_hash = if let Some(payload_envelope) = &env.latest_payload_envelope {
let head_hash = if let Some(payload_envelope) =
&env.active_node_state()?.latest_payload_envelope
{
let execution_payload_envelope: ExecutionPayloadEnvelopeV3 =
payload_envelope.clone().into();
let new_block_hash = execution_payload_envelope
@@ -408,14 +414,15 @@ where
.ok_or_else(|| eyre::eyre!("No latest block found from RPC"))?;
// update environment with the new block information
env.current_block_info = Some(BlockInfo {
env.set_current_block_info(BlockInfo {
hash: latest_block.header.hash,
number: latest_block.header.number,
timestamp: latest_block.header.timestamp,
});
})?;
env.latest_header_time = latest_block.header.timestamp;
env.latest_fork_choice_state.head_block_hash = latest_block.header.hash;
env.active_node_state_mut()?.latest_header_time = latest_block.header.timestamp;
env.active_node_state_mut()?.latest_fork_choice_state.head_block_hash =
latest_block.header.hash;
debug!(
"Updated environment to block {} (hash: {})",
@@ -443,6 +450,7 @@ where
fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
Box::pin(async move {
let payload_envelope = env
.active_node_state()?
.latest_payload_envelope
.as_ref()
.ok_or_else(|| eyre::eyre!("No execution payload envelope available"))?;
@@ -456,14 +464,14 @@ where
let block_timestamp = execution_payload.payload_inner.payload_inner.timestamp;
// update environment with the new block information from the payload
env.current_block_info = Some(BlockInfo {
env.set_current_block_info(BlockInfo {
hash: block_hash,
number: block_number,
timestamp: block_timestamp,
});
})?;
env.latest_header_time = block_timestamp;
env.latest_fork_choice_state.head_block_hash = block_hash;
env.active_node_state_mut()?.latest_header_time = block_timestamp;
env.active_node_state_mut()?.latest_fork_choice_state.head_block_hash = block_hash;
debug!(
"Updated environment to newly produced block {} (hash: {})",
@@ -488,17 +496,18 @@ where
Box::pin(async move {
let mut accepted_check: bool = false;
let latest_block = env
.current_block_info
.as_mut()
let mut latest_block = env
.current_block_info()
.ok_or_else(|| eyre::eyre!("No latest block information available"))?;
let payload_id = *env
.active_node_state()?
.payload_id_history
.get(&(latest_block.number + 1))
.ok_or_else(|| eyre::eyre!("Cannot find payload_id"))?;
for (idx, client) in env.node_clients.iter().enumerate() {
let node_clients = env.node_clients.clone();
for (idx, client) in node_clients.iter().enumerate() {
let rpc_client = &client.rpc;
// get the last header by number using latest_head_number
@@ -512,6 +521,7 @@ where
// perform several checks
let next_new_payload = env
.active_node_state()?
.latest_payload_built
.as_ref()
.ok_or_else(|| eyre::eyre!("No next built payload found"))?;
@@ -563,10 +573,11 @@ where
if !accepted_check {
accepted_check = true;
// save the header in Env
env.latest_header_time = next_new_payload.timestamp;
env.active_node_state_mut()?.latest_header_time = next_new_payload.timestamp;
// add it to header history
env.latest_fork_choice_state.head_block_hash = rpc_latest_header.hash;
env.active_node_state_mut()?.latest_fork_choice_state.head_block_hash =
rpc_latest_header.hash;
latest_block.hash = rpc_latest_header.hash;
latest_block.number = rpc_latest_header.inner.number;
}
@@ -595,26 +606,30 @@ where
Box::pin(async move {
// Get the next new payload to broadcast
let next_new_payload = env
.active_node_state()?
.latest_payload_built
.as_ref()
.ok_or_else(|| eyre::eyre!("No next built payload found"))?;
.ok_or_else(|| eyre::eyre!("No next built payload found"))?
.clone();
let parent_beacon_block_root = next_new_payload
.parent_beacon_block_root
.ok_or_else(|| eyre::eyre!("No parent beacon block root for next new payload"))?;
let payload_envelope = env
.active_node_state()?
.latest_payload_envelope
.as_ref()
.ok_or_else(|| eyre::eyre!("No execution payload envelope available"))?;
.ok_or_else(|| eyre::eyre!("No execution payload envelope available"))?
.clone();
let execution_payload_envelope: ExecutionPayloadEnvelopeV3 =
payload_envelope.clone().into();
let execution_payload_envelope: ExecutionPayloadEnvelopeV3 = payload_envelope.into();
let execution_payload = execution_payload_envelope.execution_payload;
// Loop through all clients and broadcast the next new payload
let mut successful_broadcast: bool = false;
let mut broadcast_results = Vec::new();
let mut first_valid_seen = false;
for client in &env.node_clients {
for (idx, client) in env.node_clients.iter().enumerate() {
let engine = client.engine.http_client();
// Broadcast the execution payload
@@ -626,25 +641,34 @@ where
)
.await?;
// Check if broadcast was successful
if result.status == PayloadStatusEnum::Valid {
successful_broadcast = true;
// We don't need to update the latest payload built since it should be the same.
// env.latest_payload_built = Some(next_new_payload.clone());
env.latest_payload_executed = Some(next_new_payload.clone());
break;
broadcast_results.push((idx, result.status.clone()));
debug!("Node {}: new_payload broadcast status: {:?}", idx, result.status);
// Check if this node accepted the payload
if result.status == PayloadStatusEnum::Valid && !first_valid_seen {
first_valid_seen = true;
} else if let PayloadStatusEnum::Invalid { validation_error } = result.status {
debug!(
"Invalid payload status returned from broadcast: {:?}",
validation_error
"Node {}: Invalid payload status returned from broadcast: {:?}",
idx, validation_error
);
}
}
if !successful_broadcast {
// Update the executed payload state after broadcasting to all nodes
if first_valid_seen {
env.active_node_state_mut()?.latest_payload_executed = Some(next_new_payload);
}
// Check if at least one node accepted the payload
let any_valid =
broadcast_results.iter().any(|(_, status)| *status == PayloadStatusEnum::Valid);
if !any_valid {
return Err(eyre::eyre!("Failed to successfully broadcast payload to any client"));
}
debug!("Broadcast complete. Results: {:?}", broadcast_results);
Ok(())
})
}
@@ -721,7 +745,7 @@ where
fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
Box::pin(async move {
// get the target block from the registry
let target_block = env
let (target_block, _node_idx) = env
.block_registry
.get(&self.tag)
.copied()
@@ -893,10 +917,10 @@ where
sequence.execute(env).await?;
// get the latest payload and corrupt it
let latest_envelope = env
.latest_payload_envelope
.as_ref()
.ok_or_else(|| eyre::eyre!("No payload envelope available to corrupt"))?;
let latest_envelope =
env.active_node_state()?.latest_payload_envelope.as_ref().ok_or_else(
|| eyre::eyre!("No payload envelope available to corrupt"),
)?;
let envelope_v3: ExecutionPayloadEnvelopeV3 = latest_envelope.clone().into();
let mut corrupted_payload = envelope_v3.execution_payload;
@@ -942,11 +966,11 @@ where
}
// update block info with the corrupted block (for potential future reference)
env.current_block_info = Some(BlockInfo {
env.set_current_block_info(BlockInfo {
hash: corrupted_payload.payload_inner.payload_inner.block_hash,
number: corrupted_payload.payload_inner.payload_inner.block_number,
timestamp: corrupted_payload.timestamp(),
});
})?;
} else {
debug!("Producing valid block at index {}", block_index);

View File

@@ -58,11 +58,13 @@ where
"Direct hash reorgs are not supported. Use CaptureBlock to tag the target block first, then use ReorgTo::new_from_tag()"
));
}
ReorgTarget::Tag(tag) => env
.block_registry
.get(tag)
.copied()
.ok_or_else(|| eyre::eyre!("Block tag '{}' not found in registry", tag))?,
ReorgTarget::Tag(tag) => {
let (block_info, _node_idx) =
env.block_registry.get(tag).copied().ok_or_else(|| {
eyre::eyre!("Block tag '{}' not found in registry", tag)
})?;
block_info
}
};
let mut sequence = Sequence::new(vec![
@@ -102,12 +104,13 @@ where
block_info.number, block_info.hash
);
// update environment to point to the target block
env.current_block_info = Some(block_info);
env.latest_header_time = block_info.timestamp;
// update active node state to point to the target block
let active_node_state = env.active_node_state_mut()?;
active_node_state.current_block_info = Some(block_info);
active_node_state.latest_header_time = block_info.timestamp;
// update fork choice state to make the target block canonical
env.latest_fork_choice_state = ForkchoiceState {
active_node_state.latest_fork_choice_state = ForkchoiceState {
head_block_hash: block_info.hash,
safe_block_hash: block_info.hash,
finalized_block_hash: block_info.hash,

View File

@@ -1,7 +1,10 @@
//! Example tests using the test suite framework.
use crate::testsuite::{
actions::{AssertMineBlock, CaptureBlock, CreateFork, MakeCanonical, ProduceBlocks, ReorgTo},
actions::{
AssertMineBlock, CaptureBlock, CaptureBlockOnNode, CompareNodeChainTips, CreateFork,
MakeCanonical, ProduceBlocks, ReorgTo, SelectActiveNode,
},
setup::{NetworkSetup, Setup},
TestBuilder,
};
@@ -162,3 +165,49 @@ async fn test_testsuite_deep_reorg() -> Result<()> {
Ok(())
}
/// Multi-node test demonstrating block creation and coordination across multiple nodes.
///
/// This test demonstrates the working multi-node framework:
/// - Multiple nodes start from the same genesis
/// - Nodes can be selected for specific operations
/// - Block production can happen on different nodes
/// - Chain tips can be compared between nodes
/// - Node-specific state is properly tracked
#[tokio::test]
async fn test_testsuite_multinode_block_production() -> Result<()> {
reth_tracing::init_test_tracing();
let setup = Setup::default()
.with_chain_spec(Arc::new(
ChainSpecBuilder::default()
.chain(MAINNET.chain)
.genesis(serde_json::from_str(include_str!("assets/genesis.json")).unwrap())
.cancun_activated()
.build(),
))
.with_network(NetworkSetup::multi_node(2)) // Create 2 nodes
.with_tree_config(TreeConfig::default().with_state_root_fallback(true));
let test = TestBuilder::new()
.with_setup(setup)
// both nodes start from genesis
.with_action(CaptureBlock::new("genesis"))
.with_action(CompareNodeChainTips::expect_same(0, 1))
// build main chain (blocks 1-3)
.with_action(SelectActiveNode::new(0))
.with_action(ProduceBlocks::<EthEngineTypes>::new(3))
.with_action(MakeCanonical::new())
.with_action(CaptureBlockOnNode::new("node0_tip", 0))
.with_action(CompareNodeChainTips::expect_same(0, 1))
// node 0 already has the state and can continue producing blocks
.with_action(ProduceBlocks::<EthEngineTypes>::new(2))
.with_action(MakeCanonical::new())
.with_action(CaptureBlockOnNode::new("node0_tip_2", 0))
// verify both nodes remain in sync
.with_action(CompareNodeChainTips::expect_same(0, 1));
test.run::<EthereumNode>().await?;
Ok(())
}

View File

@@ -21,7 +21,7 @@ use reth_rpc_builder::auth::AuthServerHandle;
mod examples;
/// Client handles for both regular RPC and Engine API endpoints
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct NodeClient {
/// Regular JSON-RPC client
pub rpc: HttpClient,
@@ -46,6 +46,75 @@ pub struct BlockInfo {
/// Timestamp of the block
pub timestamp: u64,
}
/// Per-node state tracking for multi-node environments
#[derive(Clone)]
pub struct NodeState<I>
where
I: EngineTypes,
{
/// Current block information for this node
pub current_block_info: Option<BlockInfo>,
/// Stores payload attributes indexed by block number for this node
pub payload_attributes: HashMap<u64, PayloadAttributes>,
/// Tracks the latest block header timestamp for this node
pub latest_header_time: u64,
/// Stores payload IDs returned by this node, indexed by block number
pub payload_id_history: HashMap<u64, PayloadId>,
/// Stores the next expected payload ID for this node
pub next_payload_id: Option<PayloadId>,
/// Stores the latest fork choice state for this node
pub latest_fork_choice_state: ForkchoiceState,
/// Stores the most recent built execution payload for this node
pub latest_payload_built: Option<PayloadAttributes>,
/// Stores the most recent executed payload for this node
pub latest_payload_executed: Option<PayloadAttributes>,
/// Stores the most recent built execution payload envelope for this node
pub latest_payload_envelope: Option<I::ExecutionPayloadEnvelopeV3>,
/// Fork base block number for validation (if this node is currently on a fork)
pub current_fork_base: Option<u64>,
}
impl<I> Default for NodeState<I>
where
I: EngineTypes,
{
fn default() -> Self {
Self {
current_block_info: None,
payload_attributes: HashMap::new(),
latest_header_time: 0,
payload_id_history: HashMap::new(),
next_payload_id: None,
latest_fork_choice_state: ForkchoiceState::default(),
latest_payload_built: None,
latest_payload_executed: None,
latest_payload_envelope: None,
current_fork_base: None,
}
}
}
impl<I> std::fmt::Debug for NodeState<I>
where
I: EngineTypes,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("NodeState")
.field("current_block_info", &self.current_block_info)
.field("payload_attributes", &self.payload_attributes)
.field("latest_header_time", &self.latest_header_time)
.field("payload_id_history", &self.payload_id_history)
.field("next_payload_id", &self.next_payload_id)
.field("latest_fork_choice_state", &self.latest_fork_choice_state)
.field("latest_payload_built", &self.latest_payload_built)
.field("latest_payload_executed", &self.latest_payload_executed)
.field("latest_payload_envelope", &"<ExecutionPayloadEnvelopeV3>")
.field("current_fork_base", &self.current_fork_base)
.finish()
}
}
/// Represents a test environment.
#[derive(Debug)]
pub struct Environment<I>
@@ -54,38 +123,22 @@ where
{
/// Combined clients with both RPC and Engine API endpoints
pub node_clients: Vec<NodeClient>,
/// Per-node state tracking
pub node_states: Vec<NodeState<I>>,
/// Tracks instance generic.
_phantom: PhantomData<I>,
/// Current block information
pub current_block_info: Option<BlockInfo>,
/// Last producer index
pub last_producer_idx: Option<usize>,
/// Stores payload attributes indexed by block number
pub payload_attributes: HashMap<u64, PayloadAttributes>,
/// Tracks the latest block header timestamp
pub latest_header_time: u64,
/// Defines the increment for block timestamps (default: 2 seconds)
pub block_timestamp_increment: u64,
/// Stores payload IDs returned by block producers, indexed by block number
pub payload_id_history: HashMap<u64, PayloadId>,
/// Stores the next expected payload ID
pub next_payload_id: Option<PayloadId>,
/// Stores the latest fork choice state
pub latest_fork_choice_state: ForkchoiceState,
/// Stores the most recent built execution payload
pub latest_payload_built: Option<PayloadAttributes>,
/// Stores the most recent executed payload
pub latest_payload_executed: Option<PayloadAttributes>,
/// Stores the most recent built execution payload envelope
pub latest_payload_envelope: Option<I::ExecutionPayloadEnvelopeV3>,
/// Number of slots until a block is considered safe
pub slots_to_safe: u64,
/// Number of slots until a block is considered finalized
pub slots_to_finalized: u64,
/// Registry for tagged blocks, mapping tag names to complete block info
pub block_registry: HashMap<String, BlockInfo>,
/// Fork base block number for validation (if we're currently on a fork)
pub current_fork_base: Option<u64>,
/// Registry for tagged blocks, mapping tag names to block info and node index
pub block_registry: HashMap<String, (BlockInfo, usize)>,
/// Currently active node index for backward compatibility with single-node actions
pub active_node_idx: usize,
}
impl<I> Default for Environment<I>
@@ -95,26 +148,83 @@ where
fn default() -> Self {
Self {
node_clients: vec![],
node_states: vec![],
_phantom: Default::default(),
current_block_info: None,
last_producer_idx: None,
payload_attributes: Default::default(),
latest_header_time: 0,
block_timestamp_increment: 2,
payload_id_history: HashMap::new(),
next_payload_id: None,
latest_fork_choice_state: ForkchoiceState::default(),
latest_payload_built: None,
latest_payload_executed: None,
latest_payload_envelope: None,
slots_to_safe: 0,
slots_to_finalized: 0,
block_registry: HashMap::new(),
current_fork_base: None,
active_node_idx: 0,
}
}
}
impl<I> Environment<I>
where
I: EngineTypes,
{
/// Get the number of nodes in the environment
pub fn node_count(&self) -> usize {
self.node_clients.len()
}
/// Get mutable reference to a specific node's state
pub fn node_state_mut(&mut self, node_idx: usize) -> Result<&mut NodeState<I>, eyre::Error> {
let node_count = self.node_count();
self.node_states.get_mut(node_idx).ok_or_else(|| {
eyre::eyre!("Node index {} out of bounds (have {} nodes)", node_idx, node_count)
})
}
/// Get immutable reference to a specific node's state
pub fn node_state(&self, node_idx: usize) -> Result<&NodeState<I>, eyre::Error> {
self.node_states.get(node_idx).ok_or_else(|| {
eyre::eyre!("Node index {} out of bounds (have {} nodes)", node_idx, self.node_count())
})
}
/// Get the currently active node's state
pub fn active_node_state(&self) -> Result<&NodeState<I>, eyre::Error> {
self.node_state(self.active_node_idx)
}
/// Get mutable reference to the currently active node's state
pub fn active_node_state_mut(&mut self) -> Result<&mut NodeState<I>, eyre::Error> {
let idx = self.active_node_idx;
self.node_state_mut(idx)
}
/// Set the active node index
pub fn set_active_node(&mut self, node_idx: usize) -> Result<(), eyre::Error> {
if node_idx >= self.node_count() {
return Err(eyre::eyre!(
"Node index {} out of bounds (have {} nodes)",
node_idx,
self.node_count()
));
}
self.active_node_idx = node_idx;
Ok(())
}
/// Initialize node states when nodes are created
pub fn initialize_node_states(&mut self, node_count: usize) {
self.node_states = (0..node_count).map(|_| NodeState::default()).collect();
}
/// Get current block info from active node
pub fn current_block_info(&self) -> Option<BlockInfo> {
self.active_node_state().ok()?.current_block_info
}
/// Set current block info on active node
pub fn set_current_block_info(&mut self, block_info: BlockInfo) -> Result<(), eyre::Error> {
self.active_node_state_mut()?.current_block_info = Some(block_info);
Ok(())
}
}
/// Builder for creating test scenarios
#[expect(missing_debug_implementations)]
pub struct TestBuilder<I>

View File

@@ -234,33 +234,47 @@ where
env.node_clients = node_clients;
// Initialize the environment with genesis block information
let first_client = &env.node_clients[0];
let genesis_block =
EthApiClient::<Transaction, RpcBlock, Receipt, Header>::block_by_number(
&first_client.rpc,
BlockNumberOrTag::Number(0),
false,
)
.await?
.ok_or_else(|| eyre!("Genesis block not found"))?;
// Initialize per-node states for all nodes
env.initialize_node_states(node_count);
env.current_block_info = Some(crate::testsuite::BlockInfo {
hash: genesis_block.header.hash,
number: genesis_block.header.number,
timestamp: genesis_block.header.timestamp,
});
// Initialize each node's state with genesis block information
let genesis_block_info = {
let first_client = &env.node_clients[0];
let genesis_block =
EthApiClient::<Transaction, RpcBlock, Receipt, Header>::block_by_number(
&first_client.rpc,
BlockNumberOrTag::Number(0),
false,
)
.await?
.ok_or_else(|| eyre!("Genesis block not found"))?;
env.latest_header_time = genesis_block.header.timestamp;
env.latest_fork_choice_state = ForkchoiceState {
head_block_hash: genesis_block.header.hash,
safe_block_hash: genesis_block.header.hash,
finalized_block_hash: genesis_block.header.hash,
crate::testsuite::BlockInfo {
hash: genesis_block.header.hash,
number: genesis_block.header.number,
timestamp: genesis_block.header.timestamp,
}
};
// Initialize all node states with the same genesis block
for (node_idx, node_state) in env.node_states.iter_mut().enumerate() {
node_state.current_block_info = Some(genesis_block_info);
node_state.latest_header_time = genesis_block_info.timestamp;
node_state.latest_fork_choice_state = ForkchoiceState {
head_block_hash: genesis_block_info.hash,
safe_block_hash: genesis_block_info.hash,
finalized_block_hash: genesis_block_info.hash,
};
debug!(
"Node {} initialized with genesis block {} (hash: {})",
node_idx, genesis_block_info.number, genesis_block_info.hash
);
}
debug!(
"Environment initialized with genesis block {} (hash: {})",
genesis_block.header.number, genesis_block.header.hash
"Environment initialized with {} nodes, all starting from genesis block {} (hash: {})",
node_count, genesis_block_info.number, genesis_block_info.hash
);
// TODO: For each block in self.blocks, replay it on the node