mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-02-19 03:04:27 -05:00
chore: revert "test(stages): add pipeline forward sync and unwind test" (#21601)
This commit is contained in:
8
.github/workflows/integration.yml
vendored
8
.github/workflows/integration.yml
vendored
@@ -22,7 +22,7 @@ concurrency:
|
||||
|
||||
jobs:
|
||||
test:
|
||||
name: test / ${{ matrix.network }} / ${{ matrix.storage }}
|
||||
name: test / ${{ matrix.network }}
|
||||
if: github.event_name != 'schedule'
|
||||
runs-on: depot-ubuntu-latest-4
|
||||
env:
|
||||
@@ -30,10 +30,6 @@ jobs:
|
||||
strategy:
|
||||
matrix:
|
||||
network: ["ethereum", "optimism"]
|
||||
storage: ["stable", "edge"]
|
||||
exclude:
|
||||
- network: optimism
|
||||
storage: edge
|
||||
timeout-minutes: 60
|
||||
steps:
|
||||
- uses: actions/checkout@v6
|
||||
@@ -50,7 +46,7 @@ jobs:
|
||||
name: Run tests
|
||||
run: |
|
||||
cargo nextest run \
|
||||
--locked --features "asm-keccak ${{ matrix.network }} ${{ matrix.storage == 'edge' && 'edge' || '' }}" \
|
||||
--locked --features "asm-keccak ${{ matrix.network }}" \
|
||||
--workspace --exclude ef-tests \
|
||||
-E "kind(test) and not binary(e2e_testsuite)"
|
||||
- if: matrix.network == 'optimism'
|
||||
|
||||
3
Cargo.lock
generated
3
Cargo.lock
generated
@@ -10732,7 +10732,6 @@ version = "1.10.2"
|
||||
dependencies = [
|
||||
"alloy-consensus",
|
||||
"alloy-eips",
|
||||
"alloy-genesis",
|
||||
"alloy-primitives",
|
||||
"alloy-rlp",
|
||||
"assert_matches",
|
||||
@@ -10752,7 +10751,6 @@ dependencies = [
|
||||
"reth-consensus",
|
||||
"reth-db",
|
||||
"reth-db-api",
|
||||
"reth-db-common",
|
||||
"reth-downloaders",
|
||||
"reth-era",
|
||||
"reth-era-downloader",
|
||||
@@ -10778,7 +10776,6 @@ dependencies = [
|
||||
"reth-storage-api",
|
||||
"reth-storage-errors",
|
||||
"reth-testing-utils",
|
||||
"reth-tracing",
|
||||
"reth-trie",
|
||||
"reth-trie-db",
|
||||
"tempfile",
|
||||
|
||||
@@ -83,28 +83,6 @@ impl From<&'static str> for FileClientError {
|
||||
}
|
||||
|
||||
impl<B: FullBlock> FileClient<B> {
|
||||
/// Create a new file client from a slice of sealed blocks.
|
||||
pub fn from_blocks(blocks: impl IntoIterator<Item = SealedBlock<B>>) -> Self {
|
||||
let blocks: Vec<_> = blocks.into_iter().collect();
|
||||
let capacity = blocks.len();
|
||||
|
||||
let mut headers = HashMap::with_capacity(capacity);
|
||||
let mut hash_to_number = HashMap::with_capacity(capacity);
|
||||
let mut bodies = HashMap::with_capacity(capacity);
|
||||
|
||||
for block in blocks {
|
||||
let number = block.number();
|
||||
let hash = block.hash();
|
||||
let (header, body) = block.split_sealed_header_body();
|
||||
|
||||
headers.insert(number, header.into_header());
|
||||
hash_to_number.insert(hash, number);
|
||||
bodies.insert(hash, body);
|
||||
}
|
||||
|
||||
Self { headers, hash_to_number, bodies }
|
||||
}
|
||||
|
||||
/// Create a new file client from a file path.
|
||||
pub async fn new<P: AsRef<Path>>(
|
||||
path: P,
|
||||
|
||||
@@ -73,7 +73,7 @@ reth-ethereum-consensus.workspace = true
|
||||
reth-evm-ethereum.workspace = true
|
||||
reth-consensus = { workspace = true, features = ["test-utils"] }
|
||||
reth-network-p2p = { workspace = true, features = ["test-utils"] }
|
||||
reth-downloaders = { workspace = true, features = ["file-client"] }
|
||||
reth-downloaders.workspace = true
|
||||
reth-static-file.workspace = true
|
||||
reth-stages-api = { workspace = true, features = ["test-utils"] }
|
||||
reth-storage-api.workspace = true
|
||||
@@ -82,11 +82,8 @@ reth-trie = { workspace = true, features = ["test-utils"] }
|
||||
reth-provider = { workspace = true, features = ["test-utils"] }
|
||||
reth-network-peers.workspace = true
|
||||
|
||||
alloy-genesis.workspace = true
|
||||
alloy-primitives = { workspace = true, features = ["getrandom", "rand"] }
|
||||
alloy-rlp.workspace = true
|
||||
reth-db-common.workspace = true
|
||||
reth-tracing.workspace = true
|
||||
|
||||
tokio = { workspace = true, features = ["rt", "sync", "macros"] }
|
||||
assert_matches.workspace = true
|
||||
@@ -122,7 +119,6 @@ test-utils = [
|
||||
"reth-evm-ethereum/test-utils",
|
||||
]
|
||||
rocksdb = ["reth-provider/rocksdb"]
|
||||
edge = ["reth-provider/edge", "reth-db-common/edge", "rocksdb"]
|
||||
|
||||
[[bench]]
|
||||
name = "criterion"
|
||||
|
||||
@@ -1,490 +0,0 @@
|
||||
//! Pipeline forward sync and unwind tests.
|
||||
|
||||
use alloy_consensus::{constants::ETH_TO_WEI, Header, TxEip1559, TxReceipt};
|
||||
use alloy_eips::eip1559::INITIAL_BASE_FEE;
|
||||
use alloy_genesis::{Genesis, GenesisAccount};
|
||||
use alloy_primitives::{bytes, Address, Bytes, TxKind, B256, U256};
|
||||
use reth_chainspec::{ChainSpecBuilder, ChainSpecProvider, MAINNET};
|
||||
use reth_config::config::StageConfig;
|
||||
use reth_consensus::noop::NoopConsensus;
|
||||
use reth_db_api::{cursor::DbCursorRO, models::BlockNumberAddress, transaction::DbTx};
|
||||
use reth_db_common::init::init_genesis;
|
||||
use reth_downloaders::{
|
||||
bodies::bodies::BodiesDownloaderBuilder, file_client::FileClient,
|
||||
headers::reverse_headers::ReverseHeadersDownloaderBuilder,
|
||||
};
|
||||
use reth_ethereum_primitives::{Block, BlockBody, Transaction};
|
||||
use reth_evm::{execute::Executor, ConfigureEvm};
|
||||
use reth_evm_ethereum::EthEvmConfig;
|
||||
use reth_network_p2p::{
|
||||
bodies::downloader::BodyDownloader,
|
||||
headers::downloader::{HeaderDownloader, SyncTarget},
|
||||
};
|
||||
use reth_primitives_traits::{
|
||||
crypto::secp256k1::public_key_to_address,
|
||||
proofs::{calculate_receipt_root, calculate_transaction_root},
|
||||
RecoveredBlock, SealedBlock,
|
||||
};
|
||||
use reth_provider::{
|
||||
test_utils::create_test_provider_factory_with_chain_spec, BlockNumReader, DBProvider,
|
||||
DatabaseProviderFactory, HeaderProvider, OriginalValuesKnown, StageCheckpointReader,
|
||||
StateWriter, StaticFileProviderFactory,
|
||||
};
|
||||
use reth_prune_types::PruneModes;
|
||||
use reth_revm::database::StateProviderDatabase;
|
||||
use reth_stages::sets::DefaultStages;
|
||||
use reth_stages_api::{Pipeline, StageId};
|
||||
use reth_static_file::StaticFileProducer;
|
||||
use reth_storage_api::{
|
||||
ChangeSetReader, StateProvider, StorageChangeSetReader, StorageSettingsCache,
|
||||
};
|
||||
use reth_testing_utils::generators::{self, generate_key, sign_tx_with_key_pair};
|
||||
use reth_trie::{HashedPostState, KeccakKeyHasher, StateRoot};
|
||||
use reth_trie_db::DatabaseStateRoot;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::watch;
|
||||
|
||||
/// Counter contract deployed bytecode compiled with Solidity 0.8.31.
|
||||
/// ```solidity
|
||||
/// contract Counter {
|
||||
/// uint256 public count;
|
||||
/// function increment() public { count += 1; }
|
||||
/// }
|
||||
/// ```
|
||||
const COUNTER_DEPLOYED_BYTECODE: Bytes = bytes!(
|
||||
"6080604052348015600e575f5ffd5b50600436106030575f3560e01c806306661abd146034578063d09de08a14604e575b5f5ffd5b603a6056565b604051604591906089565b60405180910390f35b6054605b565b005b5f5481565b60015f5f828254606a919060cd565b92505081905550565b5f819050919050565b6083816073565b82525050565b5f602082019050609a5f830184607c565b92915050565b7f4e487b71000000000000000000000000000000000000000000000000000000005f52601160045260245ffd5b5f60d5826073565b915060de836073565b925082820190508082111560f35760f260a0565b5b9291505056fea2646970667358221220576016d010ec2f4f83b992fb97d16efd1bc54110c97aa5d5cb47d20d3b39a35264736f6c634300081f0033"
|
||||
);
|
||||
|
||||
/// `increment()` function selector: `keccak256("increment()")`[:4]
|
||||
const INCREMENT_SELECTOR: [u8; 4] = [0xd0, 0x9d, 0xe0, 0x8a];
|
||||
|
||||
/// Contract address (deterministic for test)
|
||||
const CONTRACT_ADDRESS: Address = Address::new([0x42; 20]);
|
||||
|
||||
/// Creates a `FileClient` populated with the given blocks.
|
||||
fn create_file_client_from_blocks(blocks: Vec<SealedBlock<Block>>) -> Arc<FileClient<Block>> {
|
||||
Arc::new(FileClient::from_blocks(blocks))
|
||||
}
|
||||
|
||||
/// Verifies that changesets are queryable from the correct source based on storage settings.
|
||||
///
|
||||
/// Queries static files when changesets are configured to be stored there, otherwise queries MDBX.
|
||||
fn assert_changesets_queryable(
|
||||
provider_factory: &reth_provider::ProviderFactory<
|
||||
reth_provider::test_utils::MockNodeTypesWithDB,
|
||||
>,
|
||||
block_range: std::ops::RangeInclusive<u64>,
|
||||
) -> eyre::Result<()> {
|
||||
let provider = provider_factory.provider()?;
|
||||
let settings = provider.cached_storage_settings();
|
||||
|
||||
// Verify storage changesets
|
||||
if settings.storage_changesets_in_static_files {
|
||||
let static_file_provider = provider_factory.static_file_provider();
|
||||
static_file_provider.initialize_index()?;
|
||||
let storage_changesets =
|
||||
static_file_provider.storage_changesets_range(block_range.clone())?;
|
||||
assert!(
|
||||
!storage_changesets.is_empty(),
|
||||
"storage changesets should be queryable from static files for blocks {:?}",
|
||||
block_range
|
||||
);
|
||||
} else {
|
||||
let storage_changesets: Vec<_> = provider
|
||||
.tx_ref()
|
||||
.cursor_dup_read::<reth_db::tables::StorageChangeSets>()?
|
||||
.walk_range(BlockNumberAddress::range(block_range.clone()))?
|
||||
.collect::<Result<Vec<_>, _>>()?;
|
||||
assert!(
|
||||
!storage_changesets.is_empty(),
|
||||
"storage changesets should be queryable from MDBX for blocks {:?}",
|
||||
block_range
|
||||
);
|
||||
}
|
||||
|
||||
// Verify account changesets
|
||||
if settings.account_changesets_in_static_files {
|
||||
let static_file_provider = provider_factory.static_file_provider();
|
||||
static_file_provider.initialize_index()?;
|
||||
let account_changesets =
|
||||
static_file_provider.account_changesets_range(block_range.clone())?;
|
||||
assert!(
|
||||
!account_changesets.is_empty(),
|
||||
"account changesets should be queryable from static files for blocks {:?}",
|
||||
block_range
|
||||
);
|
||||
} else {
|
||||
let account_changesets: Vec<_> = provider
|
||||
.tx_ref()
|
||||
.cursor_read::<reth_db::tables::AccountChangeSets>()?
|
||||
.walk_range(block_range.clone())?
|
||||
.collect::<Result<Vec<_>, _>>()?;
|
||||
assert!(
|
||||
!account_changesets.is_empty(),
|
||||
"account changesets should be queryable from MDBX for blocks {:?}",
|
||||
block_range
|
||||
);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Builds downloaders from a `FileClient`.
|
||||
fn build_downloaders_from_file_client(
|
||||
file_client: Arc<FileClient<Block>>,
|
||||
genesis: reth_primitives_traits::SealedHeader<Header>,
|
||||
stages_config: StageConfig,
|
||||
consensus: Arc<NoopConsensus>,
|
||||
provider_factory: reth_provider::ProviderFactory<
|
||||
reth_provider::test_utils::MockNodeTypesWithDB,
|
||||
>,
|
||||
) -> (impl HeaderDownloader<Header = Header>, impl BodyDownloader<Block = Block>) {
|
||||
let tip = file_client.tip().expect("file client should have tip");
|
||||
let min_block = file_client.min_block().expect("file client should have min block");
|
||||
let max_block = file_client.max_block().expect("file client should have max block");
|
||||
|
||||
let mut header_downloader = ReverseHeadersDownloaderBuilder::new(stages_config.headers)
|
||||
.build(file_client.clone(), consensus.clone())
|
||||
.into_task();
|
||||
header_downloader.update_local_head(genesis);
|
||||
header_downloader.update_sync_target(SyncTarget::Tip(tip));
|
||||
|
||||
let mut body_downloader = BodiesDownloaderBuilder::new(stages_config.bodies)
|
||||
.build(file_client, consensus, provider_factory)
|
||||
.into_task();
|
||||
body_downloader.set_download_range(min_block..=max_block).expect("set download range");
|
||||
|
||||
(header_downloader, body_downloader)
|
||||
}
|
||||
|
||||
/// Builds a pipeline with `DefaultStages`.
|
||||
fn build_pipeline<H, B>(
|
||||
provider_factory: reth_provider::ProviderFactory<
|
||||
reth_provider::test_utils::MockNodeTypesWithDB,
|
||||
>,
|
||||
header_downloader: H,
|
||||
body_downloader: B,
|
||||
max_block: u64,
|
||||
tip: B256,
|
||||
) -> Pipeline<reth_provider::test_utils::MockNodeTypesWithDB>
|
||||
where
|
||||
H: HeaderDownloader<Header = Header> + 'static,
|
||||
B: BodyDownloader<Block = Block> + 'static,
|
||||
{
|
||||
let consensus = NoopConsensus::arc();
|
||||
let stages_config = StageConfig::default();
|
||||
let evm_config = EthEvmConfig::new(provider_factory.chain_spec());
|
||||
|
||||
let (tip_tx, tip_rx) = watch::channel(B256::ZERO);
|
||||
let static_file_producer =
|
||||
StaticFileProducer::new(provider_factory.clone(), PruneModes::default());
|
||||
|
||||
let stages = DefaultStages::new(
|
||||
provider_factory.clone(),
|
||||
tip_rx,
|
||||
consensus,
|
||||
header_downloader,
|
||||
body_downloader,
|
||||
evm_config,
|
||||
stages_config,
|
||||
PruneModes::default(),
|
||||
None,
|
||||
);
|
||||
|
||||
let pipeline = Pipeline::builder()
|
||||
.with_tip_sender(tip_tx)
|
||||
.with_max_block(max_block)
|
||||
.with_fail_on_unwind(true)
|
||||
.add_stages(stages)
|
||||
.build(provider_factory, static_file_producer);
|
||||
pipeline.set_tip(tip);
|
||||
pipeline
|
||||
}
|
||||
|
||||
/// Tests pipeline with ALL stages enabled using both ETH transfers and contract storage changes.
|
||||
///
|
||||
/// This test:
|
||||
/// 1. Pre-funds a signer account and deploys a Counter contract in genesis
|
||||
/// 2. Each block contains two transactions:
|
||||
/// - ETH transfer to a recipient (account state changes)
|
||||
/// - Counter `increment()` call (storage state changes)
|
||||
/// 3. Runs the full pipeline with ALL stages enabled
|
||||
/// 4. Forward syncs to block 5, unwinds to block 2
|
||||
///
|
||||
/// This exercises both account and storage hashing/history stages.
|
||||
#[tokio::test(flavor = "multi_thread")]
|
||||
async fn test_pipeline() -> eyre::Result<()> {
|
||||
reth_tracing::init_test_tracing();
|
||||
|
||||
// Generate a keypair for signing transactions
|
||||
let mut rng = generators::rng();
|
||||
let key_pair = generate_key(&mut rng);
|
||||
let signer_address = public_key_to_address(key_pair.public_key());
|
||||
|
||||
// Recipient address for ETH transfers
|
||||
let recipient_address = Address::new([0x11; 20]);
|
||||
|
||||
// Create a chain spec with:
|
||||
// - Signer pre-funded with 1000 ETH
|
||||
// - Counter contract pre-deployed at CONTRACT_ADDRESS
|
||||
let initial_balance = U256::from(ETH_TO_WEI) * U256::from(1000);
|
||||
let chain_spec = Arc::new(
|
||||
ChainSpecBuilder::default()
|
||||
.chain(MAINNET.chain)
|
||||
.genesis(Genesis {
|
||||
alloc: [
|
||||
(
|
||||
signer_address,
|
||||
GenesisAccount { balance: initial_balance, ..Default::default() },
|
||||
),
|
||||
(
|
||||
CONTRACT_ADDRESS,
|
||||
GenesisAccount {
|
||||
code: Some(COUNTER_DEPLOYED_BYTECODE),
|
||||
..Default::default()
|
||||
},
|
||||
),
|
||||
]
|
||||
.into(),
|
||||
..MAINNET.genesis.clone()
|
||||
})
|
||||
.shanghai_activated()
|
||||
.build(),
|
||||
);
|
||||
|
||||
let provider_factory = create_test_provider_factory_with_chain_spec(chain_spec.clone());
|
||||
init_genesis(&provider_factory).expect("init genesis");
|
||||
|
||||
let genesis = provider_factory.sealed_header(0)?.expect("genesis should exist");
|
||||
let evm_config = EthEvmConfig::new(chain_spec.clone());
|
||||
|
||||
// Build blocks by actually executing transactions to get correct state roots
|
||||
let num_blocks = 5u64;
|
||||
let mut blocks: Vec<SealedBlock<Block>> = Vec::new();
|
||||
let mut parent_hash = genesis.hash();
|
||||
|
||||
let gas_price = INITIAL_BASE_FEE as u128;
|
||||
let transfer_value = U256::from(ETH_TO_WEI); // 1 ETH per block
|
||||
|
||||
for block_num in 1..=num_blocks {
|
||||
// Each block has 2 transactions: ETH transfer + Counter increment
|
||||
let base_nonce = (block_num - 1) * 2;
|
||||
|
||||
// Transaction 1: ETH transfer
|
||||
let eth_transfer_tx = sign_tx_with_key_pair(
|
||||
key_pair,
|
||||
Transaction::Eip1559(TxEip1559 {
|
||||
chain_id: chain_spec.chain.id(),
|
||||
nonce: base_nonce,
|
||||
gas_limit: 21_000,
|
||||
max_fee_per_gas: gas_price,
|
||||
max_priority_fee_per_gas: 0,
|
||||
to: TxKind::Call(recipient_address),
|
||||
value: transfer_value,
|
||||
input: Bytes::new(),
|
||||
..Default::default()
|
||||
}),
|
||||
);
|
||||
|
||||
// Transaction 2: Counter increment
|
||||
let counter_tx = sign_tx_with_key_pair(
|
||||
key_pair,
|
||||
Transaction::Eip1559(TxEip1559 {
|
||||
chain_id: chain_spec.chain.id(),
|
||||
nonce: base_nonce + 1,
|
||||
gas_limit: 100_000, // Enough gas for SSTORE operations
|
||||
max_fee_per_gas: gas_price,
|
||||
max_priority_fee_per_gas: 0,
|
||||
to: TxKind::Call(CONTRACT_ADDRESS),
|
||||
value: U256::ZERO,
|
||||
input: Bytes::from(INCREMENT_SELECTOR.to_vec()),
|
||||
..Default::default()
|
||||
}),
|
||||
);
|
||||
|
||||
let transactions = vec![eth_transfer_tx, counter_tx];
|
||||
let tx_root = calculate_transaction_root(&transactions);
|
||||
|
||||
// Build a temporary header for execution
|
||||
let temp_header = Header {
|
||||
parent_hash,
|
||||
number: block_num,
|
||||
gas_limit: 30_000_000,
|
||||
base_fee_per_gas: Some(INITIAL_BASE_FEE),
|
||||
timestamp: block_num * 12,
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
// Execute the block to get the state changes
|
||||
let provider = provider_factory.database_provider_rw()?;
|
||||
|
||||
let block_with_senders = RecoveredBlock::new_unhashed(
|
||||
Block::new(
|
||||
temp_header.clone(),
|
||||
BlockBody {
|
||||
transactions: transactions.clone(),
|
||||
ommers: Vec::new(),
|
||||
withdrawals: None,
|
||||
},
|
||||
),
|
||||
vec![signer_address, signer_address], // Both txs from same sender
|
||||
);
|
||||
|
||||
// Execute in a scope so state_provider is dropped before we use provider for writes
|
||||
let output = {
|
||||
let state_provider = provider.latest();
|
||||
let db = StateProviderDatabase::new(&*state_provider);
|
||||
let executor = evm_config.batch_executor(db);
|
||||
executor.execute(&block_with_senders)?
|
||||
};
|
||||
|
||||
let gas_used = output.gas_used;
|
||||
|
||||
// Convert bundle state to hashed post state and compute state root
|
||||
let hashed_state =
|
||||
HashedPostState::from_bundle_state::<KeccakKeyHasher>(output.state.state());
|
||||
let (state_root, _trie_updates) = StateRoot::overlay_root_with_updates(
|
||||
provider.tx_ref(),
|
||||
&hashed_state.clone().into_sorted(),
|
||||
)?;
|
||||
|
||||
// Create receipts for receipt root calculation (one per transaction)
|
||||
let receipts: Vec<_> = output.receipts.iter().map(|r| r.with_bloom_ref()).collect();
|
||||
let receipts_root = calculate_receipt_root(&receipts);
|
||||
|
||||
let header = Header {
|
||||
parent_hash,
|
||||
number: block_num,
|
||||
state_root,
|
||||
transactions_root: tx_root,
|
||||
receipts_root,
|
||||
gas_limit: 30_000_000,
|
||||
gas_used,
|
||||
base_fee_per_gas: Some(INITIAL_BASE_FEE),
|
||||
timestamp: block_num * 12,
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let block: SealedBlock<Block> = SealedBlock::seal_parts(
|
||||
header.clone(),
|
||||
BlockBody { transactions, ommers: Vec::new(), withdrawals: None },
|
||||
);
|
||||
|
||||
// Write the plain state to database so subsequent blocks build on it
|
||||
let plain_state = output.state.to_plain_state(OriginalValuesKnown::Yes);
|
||||
provider.write_state_changes(plain_state)?;
|
||||
provider.write_hashed_state(&hashed_state.into_sorted())?;
|
||||
provider.commit()?;
|
||||
|
||||
parent_hash = block.hash();
|
||||
blocks.push(block);
|
||||
}
|
||||
|
||||
// Create a fresh provider factory for the pipeline (clean state from genesis)
|
||||
// This is needed because we wrote state during block generation for computing state roots
|
||||
let pipeline_provider_factory =
|
||||
create_test_provider_factory_with_chain_spec(chain_spec.clone());
|
||||
init_genesis(&pipeline_provider_factory).expect("init genesis");
|
||||
let pipeline_genesis =
|
||||
pipeline_provider_factory.sealed_header(0)?.expect("genesis should exist");
|
||||
let pipeline_consensus = NoopConsensus::arc();
|
||||
|
||||
let file_client = create_file_client_from_blocks(blocks);
|
||||
let max_block = file_client.max_block().unwrap();
|
||||
let tip = file_client.tip().expect("tip");
|
||||
|
||||
let stages_config = StageConfig::default();
|
||||
let (header_downloader, body_downloader) = build_downloaders_from_file_client(
|
||||
file_client,
|
||||
pipeline_genesis,
|
||||
stages_config,
|
||||
pipeline_consensus,
|
||||
pipeline_provider_factory.clone(),
|
||||
);
|
||||
|
||||
let pipeline = build_pipeline(
|
||||
pipeline_provider_factory.clone(),
|
||||
header_downloader,
|
||||
body_downloader,
|
||||
max_block,
|
||||
tip,
|
||||
);
|
||||
|
||||
let (mut pipeline, result) = pipeline.run_as_fut(None).await;
|
||||
result?;
|
||||
|
||||
// Verify forward sync
|
||||
{
|
||||
let provider = pipeline_provider_factory.provider()?;
|
||||
let last_block = provider.last_block_number()?;
|
||||
assert_eq!(last_block, 5, "should have synced 5 blocks");
|
||||
|
||||
for stage_id in [
|
||||
StageId::Headers,
|
||||
StageId::Bodies,
|
||||
StageId::SenderRecovery,
|
||||
StageId::Execution,
|
||||
StageId::AccountHashing,
|
||||
StageId::StorageHashing,
|
||||
StageId::MerkleExecute,
|
||||
StageId::TransactionLookup,
|
||||
StageId::IndexAccountHistory,
|
||||
StageId::IndexStorageHistory,
|
||||
StageId::Finish,
|
||||
] {
|
||||
let checkpoint = provider.get_stage_checkpoint(stage_id)?;
|
||||
assert_eq!(
|
||||
checkpoint.map(|c| c.block_number),
|
||||
Some(5),
|
||||
"{stage_id} checkpoint should be at block 5"
|
||||
);
|
||||
}
|
||||
|
||||
// Verify the counter contract's storage was updated
|
||||
// After 5 blocks with 1 increment each, slot 0 should be 5
|
||||
let state = provider.latest();
|
||||
let counter_storage = state.storage(CONTRACT_ADDRESS, B256::ZERO)?;
|
||||
assert_eq!(
|
||||
counter_storage,
|
||||
Some(U256::from(5)),
|
||||
"Counter storage slot 0 should be 5 after 5 increments"
|
||||
);
|
||||
}
|
||||
|
||||
// Verify changesets are queryable before unwind
|
||||
// This validates that the #21561 fix works - unwind needs to read changesets from the correct
|
||||
// source
|
||||
assert_changesets_queryable(&pipeline_provider_factory, 1..=5)?;
|
||||
|
||||
// Unwind to block 2
|
||||
let unwind_target = 2u64;
|
||||
pipeline.unwind(unwind_target, None)?;
|
||||
|
||||
// Verify unwind
|
||||
{
|
||||
let provider = pipeline_provider_factory.provider()?;
|
||||
for stage_id in [
|
||||
StageId::Headers,
|
||||
StageId::Bodies,
|
||||
StageId::SenderRecovery,
|
||||
StageId::Execution,
|
||||
StageId::AccountHashing,
|
||||
StageId::StorageHashing,
|
||||
StageId::MerkleExecute,
|
||||
StageId::TransactionLookup,
|
||||
StageId::IndexAccountHistory,
|
||||
StageId::IndexStorageHistory,
|
||||
] {
|
||||
let checkpoint = provider.get_stage_checkpoint(stage_id)?;
|
||||
if let Some(cp) = checkpoint {
|
||||
assert!(
|
||||
cp.block_number <= unwind_target,
|
||||
"{stage_id} checkpoint {} should be <= {unwind_target}",
|
||||
cp.block_number
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -1,4 +1,4 @@
|
||||
use crate::{find_fixed_range, BlockNumber, Compression};
|
||||
use crate::{BlockNumber, Compression};
|
||||
use alloc::{format, string::String, vec::Vec};
|
||||
use alloy_primitives::TxNumber;
|
||||
use core::{
|
||||
@@ -376,20 +376,6 @@ impl SegmentHeader {
|
||||
self.expected_block_range.start()
|
||||
}
|
||||
|
||||
/// Sets the expected block start of the segment, using the file boundary end
|
||||
/// from `find_fixed_range`.
|
||||
///
|
||||
/// This is useful for non-zero genesis blocks where the actual starting block
|
||||
/// differs from the file range start determined by `find_fixed_range`.
|
||||
/// For example, if `blocks_per_file` is 500 and genesis is at 502, the range
|
||||
/// becomes 502..=999 (start at genesis, end at file boundary).
|
||||
pub const fn set_expected_block_start(&mut self, block: BlockNumber) {
|
||||
let blocks_per_file =
|
||||
self.expected_block_range.end() - self.expected_block_range.start() + 1;
|
||||
let file_range = find_fixed_range(block, blocks_per_file);
|
||||
self.expected_block_range = SegmentRangeInclusive::new(block, file_range.end());
|
||||
}
|
||||
|
||||
/// The expected block end of the segment.
|
||||
pub const fn expected_block_end(&self) -> BlockNumber {
|
||||
self.expected_block_range.end()
|
||||
|
||||
@@ -211,26 +211,6 @@ where
|
||||
// Behaviour reserved only for new nodes should be set in the storage settings.
|
||||
provider_rw.write_storage_settings(genesis_storage_settings)?;
|
||||
|
||||
// For non-zero genesis blocks, set expected_block_start BEFORE insert_genesis_state.
|
||||
// When block_range is None, next_block_number() uses expected_block_start. By default,
|
||||
// expected_block_start comes from find_fixed_range which returns the file range start (0),
|
||||
// not the genesis block number. This would cause increment_block(N) to fail.
|
||||
let static_file_provider = provider_rw.static_file_provider();
|
||||
if genesis_block_number > 0 {
|
||||
if genesis_storage_settings.account_changesets_in_static_files {
|
||||
static_file_provider
|
||||
.get_writer(genesis_block_number, StaticFileSegment::AccountChangeSets)?
|
||||
.user_header_mut()
|
||||
.set_expected_block_start(genesis_block_number);
|
||||
}
|
||||
if genesis_storage_settings.storage_changesets_in_static_files {
|
||||
static_file_provider
|
||||
.get_writer(genesis_block_number, StaticFileSegment::StorageChangeSets)?
|
||||
.user_header_mut()
|
||||
.set_expected_block_start(genesis_block_number);
|
||||
}
|
||||
}
|
||||
|
||||
insert_genesis_hashes(&provider_rw, alloc.iter())?;
|
||||
insert_genesis_history(&provider_rw, alloc.iter())?;
|
||||
|
||||
@@ -248,11 +228,16 @@ where
|
||||
provider_rw.save_stage_checkpoint(stage, checkpoint)?;
|
||||
}
|
||||
|
||||
// Static file segments start empty, so we need to initialize the block range.
|
||||
// For genesis blocks with non-zero block numbers, we use get_writer() instead of
|
||||
// latest_writer() and set_block_range() to ensure static files start at the correct block.
|
||||
// Static file segments start empty, so we need to initialize the genesis block.
|
||||
//
|
||||
// We do not do this for changesets because they get initialized in `insert_state` /
|
||||
// `write_state` / `write_state_reverts`. If the node is configured for writing changesets to
|
||||
// static files they will be written there, otherwise they will be written to the DB.
|
||||
let static_file_provider = provider_rw.static_file_provider();
|
||||
|
||||
// Static file segments start empty, so we need to initialize the genesis block.
|
||||
// For genesis blocks with non-zero block numbers, we need to use get_writer() instead of
|
||||
// latest_writer() to ensure the genesis block is stored in the correct static file range.
|
||||
static_file_provider
|
||||
.get_writer(genesis_block_number, StaticFileSegment::Receipts)?
|
||||
.user_header_mut()
|
||||
|
||||
@@ -2511,19 +2511,7 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> StateWriter
|
||||
} else {
|
||||
self.take::<tables::StorageChangeSets>(storage_range)?
|
||||
};
|
||||
let account_changeset = if let Some(_highest_block) = self
|
||||
.static_file_provider
|
||||
.get_highest_static_file_block(StaticFileSegment::AccountChangeSets) &&
|
||||
self.cached_storage_settings().account_changesets_in_static_files
|
||||
{
|
||||
let changesets = self.account_changesets_range(range)?;
|
||||
let mut changeset_writer =
|
||||
self.static_file_provider.latest_writer(StaticFileSegment::AccountChangeSets)?;
|
||||
changeset_writer.prune_account_changesets(block)?;
|
||||
changesets
|
||||
} else {
|
||||
self.take::<tables::AccountChangeSets>(range)?
|
||||
};
|
||||
let account_changeset = self.take::<tables::AccountChangeSets>(range)?;
|
||||
|
||||
// This is not working for blocks that are not at tip. as plain state is not the last
|
||||
// state of end range. We should rename the functions or add support to access
|
||||
|
||||
@@ -58,12 +58,11 @@ pub fn create_test_provider_factory_with_node_types<N: NodeTypesForProvider>(
|
||||
let (static_dir, _) = create_test_static_files_dir();
|
||||
let (rocksdb_dir, _) = create_test_rocksdb_dir();
|
||||
let db = create_test_rw_db();
|
||||
let rocksdb_path = rocksdb_dir.keep();
|
||||
ProviderFactory::new(
|
||||
db,
|
||||
chain_spec,
|
||||
StaticFileProvider::read_write(static_dir.keep()).expect("static file provider"),
|
||||
RocksDBBuilder::new(&rocksdb_path)
|
||||
RocksDBBuilder::new(&rocksdb_dir)
|
||||
.with_default_tables()
|
||||
.build()
|
||||
.expect("failed to create test RocksDB provider"),
|
||||
|
||||
Reference in New Issue
Block a user