test(stages): add pipeline forward sync and unwind test (#21553)

Co-authored-by: Amp <amp@ampcode.com>
This commit is contained in:
joshieDo
2026-01-29 21:13:07 +00:00
committed by GitHub
parent 5592c362d4
commit 2b1833576b
9 changed files with 579 additions and 14 deletions

View File

@@ -22,7 +22,7 @@ concurrency:
jobs:
test:
name: test / ${{ matrix.network }}
name: test / ${{ matrix.network }} / ${{ matrix.storage }}
if: github.event_name != 'schedule'
runs-on: depot-ubuntu-latest-4
env:
@@ -30,6 +30,10 @@ jobs:
strategy:
matrix:
network: ["ethereum", "optimism"]
storage: ["stable", "edge"]
exclude:
- network: optimism
storage: edge
timeout-minutes: 60
steps:
- uses: actions/checkout@v6
@@ -46,7 +50,7 @@ jobs:
name: Run tests
run: |
cargo nextest run \
--locked --features "asm-keccak ${{ matrix.network }}" \
--locked --features "asm-keccak ${{ matrix.network }} ${{ matrix.storage == 'edge' && 'edge' || '' }}" \
--workspace --exclude ef-tests \
-E "kind(test) and not binary(e2e_testsuite)"
- if: matrix.network == 'optimism'

3
Cargo.lock generated
View File

@@ -10732,6 +10732,7 @@ version = "1.10.2"
dependencies = [
"alloy-consensus",
"alloy-eips",
"alloy-genesis",
"alloy-primitives",
"alloy-rlp",
"assert_matches",
@@ -10751,6 +10752,7 @@ dependencies = [
"reth-consensus",
"reth-db",
"reth-db-api",
"reth-db-common",
"reth-downloaders",
"reth-era",
"reth-era-downloader",
@@ -10776,6 +10778,7 @@ dependencies = [
"reth-storage-api",
"reth-storage-errors",
"reth-testing-utils",
"reth-tracing",
"reth-trie",
"reth-trie-db",
"tempfile",

View File

@@ -83,6 +83,28 @@ impl From<&'static str> for FileClientError {
}
impl<B: FullBlock> FileClient<B> {
/// Create a new file client from a slice of sealed blocks.
pub fn from_blocks(blocks: impl IntoIterator<Item = SealedBlock<B>>) -> Self {
let blocks: Vec<_> = blocks.into_iter().collect();
let capacity = blocks.len();
let mut headers = HashMap::with_capacity(capacity);
let mut hash_to_number = HashMap::with_capacity(capacity);
let mut bodies = HashMap::with_capacity(capacity);
for block in blocks {
let number = block.number();
let hash = block.hash();
let (header, body) = block.split_sealed_header_body();
headers.insert(number, header.into_header());
hash_to_number.insert(hash, number);
bodies.insert(hash, body);
}
Self { headers, hash_to_number, bodies }
}
/// Create a new file client from a file path.
pub async fn new<P: AsRef<Path>>(
path: P,

View File

@@ -73,7 +73,7 @@ reth-ethereum-consensus.workspace = true
reth-evm-ethereum.workspace = true
reth-consensus = { workspace = true, features = ["test-utils"] }
reth-network-p2p = { workspace = true, features = ["test-utils"] }
reth-downloaders.workspace = true
reth-downloaders = { workspace = true, features = ["file-client"] }
reth-static-file.workspace = true
reth-stages-api = { workspace = true, features = ["test-utils"] }
reth-storage-api.workspace = true
@@ -82,8 +82,11 @@ reth-trie = { workspace = true, features = ["test-utils"] }
reth-provider = { workspace = true, features = ["test-utils"] }
reth-network-peers.workspace = true
alloy-genesis.workspace = true
alloy-primitives = { workspace = true, features = ["getrandom", "rand"] }
alloy-rlp.workspace = true
reth-db-common.workspace = true
reth-tracing.workspace = true
tokio = { workspace = true, features = ["rt", "sync", "macros"] }
assert_matches.workspace = true
@@ -119,6 +122,7 @@ test-utils = [
"reth-evm-ethereum/test-utils",
]
rocksdb = ["reth-provider/rocksdb"]
edge = ["reth-provider/edge", "reth-db-common/edge", "rocksdb"]
[[bench]]
name = "criterion"

View File

@@ -0,0 +1,490 @@
//! Pipeline forward sync and unwind tests.
use alloy_consensus::{constants::ETH_TO_WEI, Header, TxEip1559, TxReceipt};
use alloy_eips::eip1559::INITIAL_BASE_FEE;
use alloy_genesis::{Genesis, GenesisAccount};
use alloy_primitives::{bytes, Address, Bytes, TxKind, B256, U256};
use reth_chainspec::{ChainSpecBuilder, ChainSpecProvider, MAINNET};
use reth_config::config::StageConfig;
use reth_consensus::noop::NoopConsensus;
use reth_db_api::{cursor::DbCursorRO, models::BlockNumberAddress, transaction::DbTx};
use reth_db_common::init::init_genesis;
use reth_downloaders::{
bodies::bodies::BodiesDownloaderBuilder, file_client::FileClient,
headers::reverse_headers::ReverseHeadersDownloaderBuilder,
};
use reth_ethereum_primitives::{Block, BlockBody, Transaction};
use reth_evm::{execute::Executor, ConfigureEvm};
use reth_evm_ethereum::EthEvmConfig;
use reth_network_p2p::{
bodies::downloader::BodyDownloader,
headers::downloader::{HeaderDownloader, SyncTarget},
};
use reth_primitives_traits::{
crypto::secp256k1::public_key_to_address,
proofs::{calculate_receipt_root, calculate_transaction_root},
RecoveredBlock, SealedBlock,
};
use reth_provider::{
test_utils::create_test_provider_factory_with_chain_spec, BlockNumReader, DBProvider,
DatabaseProviderFactory, HeaderProvider, OriginalValuesKnown, StageCheckpointReader,
StateWriter, StaticFileProviderFactory,
};
use reth_prune_types::PruneModes;
use reth_revm::database::StateProviderDatabase;
use reth_stages::sets::DefaultStages;
use reth_stages_api::{Pipeline, StageId};
use reth_static_file::StaticFileProducer;
use reth_storage_api::{
ChangeSetReader, StateProvider, StorageChangeSetReader, StorageSettingsCache,
};
use reth_testing_utils::generators::{self, generate_key, sign_tx_with_key_pair};
use reth_trie::{HashedPostState, KeccakKeyHasher, StateRoot};
use reth_trie_db::DatabaseStateRoot;
use std::sync::Arc;
use tokio::sync::watch;
/// Counter contract deployed bytecode compiled with Solidity 0.8.31.
/// ```solidity
/// contract Counter {
/// uint256 public count;
/// function increment() public { count += 1; }
/// }
/// ```
const COUNTER_DEPLOYED_BYTECODE: Bytes = bytes!(
"6080604052348015600e575f5ffd5b50600436106030575f3560e01c806306661abd146034578063d09de08a14604e575b5f5ffd5b603a6056565b604051604591906089565b60405180910390f35b6054605b565b005b5f5481565b60015f5f828254606a919060cd565b92505081905550565b5f819050919050565b6083816073565b82525050565b5f602082019050609a5f830184607c565b92915050565b7f4e487b71000000000000000000000000000000000000000000000000000000005f52601160045260245ffd5b5f60d5826073565b915060de836073565b925082820190508082111560f35760f260a0565b5b9291505056fea2646970667358221220576016d010ec2f4f83b992fb97d16efd1bc54110c97aa5d5cb47d20d3b39a35264736f6c634300081f0033"
);
/// `increment()` function selector: `keccak256("increment()")`[:4]
const INCREMENT_SELECTOR: [u8; 4] = [0xd0, 0x9d, 0xe0, 0x8a];
/// Contract address (deterministic for test)
const CONTRACT_ADDRESS: Address = Address::new([0x42; 20]);
/// Creates a `FileClient` populated with the given blocks.
fn create_file_client_from_blocks(blocks: Vec<SealedBlock<Block>>) -> Arc<FileClient<Block>> {
Arc::new(FileClient::from_blocks(blocks))
}
/// Verifies that changesets are queryable from the correct source based on storage settings.
///
/// Queries static files when changesets are configured to be stored there, otherwise queries MDBX.
fn assert_changesets_queryable(
provider_factory: &reth_provider::ProviderFactory<
reth_provider::test_utils::MockNodeTypesWithDB,
>,
block_range: std::ops::RangeInclusive<u64>,
) -> eyre::Result<()> {
let provider = provider_factory.provider()?;
let settings = provider.cached_storage_settings();
// Verify storage changesets
if settings.storage_changesets_in_static_files {
let static_file_provider = provider_factory.static_file_provider();
static_file_provider.initialize_index()?;
let storage_changesets =
static_file_provider.storage_changesets_range(block_range.clone())?;
assert!(
!storage_changesets.is_empty(),
"storage changesets should be queryable from static files for blocks {:?}",
block_range
);
} else {
let storage_changesets: Vec<_> = provider
.tx_ref()
.cursor_dup_read::<reth_db::tables::StorageChangeSets>()?
.walk_range(BlockNumberAddress::range(block_range.clone()))?
.collect::<Result<Vec<_>, _>>()?;
assert!(
!storage_changesets.is_empty(),
"storage changesets should be queryable from MDBX for blocks {:?}",
block_range
);
}
// Verify account changesets
if settings.account_changesets_in_static_files {
let static_file_provider = provider_factory.static_file_provider();
static_file_provider.initialize_index()?;
let account_changesets =
static_file_provider.account_changesets_range(block_range.clone())?;
assert!(
!account_changesets.is_empty(),
"account changesets should be queryable from static files for blocks {:?}",
block_range
);
} else {
let account_changesets: Vec<_> = provider
.tx_ref()
.cursor_read::<reth_db::tables::AccountChangeSets>()?
.walk_range(block_range.clone())?
.collect::<Result<Vec<_>, _>>()?;
assert!(
!account_changesets.is_empty(),
"account changesets should be queryable from MDBX for blocks {:?}",
block_range
);
}
Ok(())
}
/// Builds downloaders from a `FileClient`.
fn build_downloaders_from_file_client(
file_client: Arc<FileClient<Block>>,
genesis: reth_primitives_traits::SealedHeader<Header>,
stages_config: StageConfig,
consensus: Arc<NoopConsensus>,
provider_factory: reth_provider::ProviderFactory<
reth_provider::test_utils::MockNodeTypesWithDB,
>,
) -> (impl HeaderDownloader<Header = Header>, impl BodyDownloader<Block = Block>) {
let tip = file_client.tip().expect("file client should have tip");
let min_block = file_client.min_block().expect("file client should have min block");
let max_block = file_client.max_block().expect("file client should have max block");
let mut header_downloader = ReverseHeadersDownloaderBuilder::new(stages_config.headers)
.build(file_client.clone(), consensus.clone())
.into_task();
header_downloader.update_local_head(genesis);
header_downloader.update_sync_target(SyncTarget::Tip(tip));
let mut body_downloader = BodiesDownloaderBuilder::new(stages_config.bodies)
.build(file_client, consensus, provider_factory)
.into_task();
body_downloader.set_download_range(min_block..=max_block).expect("set download range");
(header_downloader, body_downloader)
}
/// Builds a pipeline with `DefaultStages`.
fn build_pipeline<H, B>(
provider_factory: reth_provider::ProviderFactory<
reth_provider::test_utils::MockNodeTypesWithDB,
>,
header_downloader: H,
body_downloader: B,
max_block: u64,
tip: B256,
) -> Pipeline<reth_provider::test_utils::MockNodeTypesWithDB>
where
H: HeaderDownloader<Header = Header> + 'static,
B: BodyDownloader<Block = Block> + 'static,
{
let consensus = NoopConsensus::arc();
let stages_config = StageConfig::default();
let evm_config = EthEvmConfig::new(provider_factory.chain_spec());
let (tip_tx, tip_rx) = watch::channel(B256::ZERO);
let static_file_producer =
StaticFileProducer::new(provider_factory.clone(), PruneModes::default());
let stages = DefaultStages::new(
provider_factory.clone(),
tip_rx,
consensus,
header_downloader,
body_downloader,
evm_config,
stages_config,
PruneModes::default(),
None,
);
let pipeline = Pipeline::builder()
.with_tip_sender(tip_tx)
.with_max_block(max_block)
.with_fail_on_unwind(true)
.add_stages(stages)
.build(provider_factory, static_file_producer);
pipeline.set_tip(tip);
pipeline
}
/// Tests pipeline with ALL stages enabled using both ETH transfers and contract storage changes.
///
/// This test:
/// 1. Pre-funds a signer account and deploys a Counter contract in genesis
/// 2. Each block contains two transactions:
/// - ETH transfer to a recipient (account state changes)
/// - Counter `increment()` call (storage state changes)
/// 3. Runs the full pipeline with ALL stages enabled
/// 4. Forward syncs to block 5, unwinds to block 2
///
/// This exercises both account and storage hashing/history stages.
#[tokio::test(flavor = "multi_thread")]
async fn test_pipeline() -> eyre::Result<()> {
reth_tracing::init_test_tracing();
// Generate a keypair for signing transactions
let mut rng = generators::rng();
let key_pair = generate_key(&mut rng);
let signer_address = public_key_to_address(key_pair.public_key());
// Recipient address for ETH transfers
let recipient_address = Address::new([0x11; 20]);
// Create a chain spec with:
// - Signer pre-funded with 1000 ETH
// - Counter contract pre-deployed at CONTRACT_ADDRESS
let initial_balance = U256::from(ETH_TO_WEI) * U256::from(1000);
let chain_spec = Arc::new(
ChainSpecBuilder::default()
.chain(MAINNET.chain)
.genesis(Genesis {
alloc: [
(
signer_address,
GenesisAccount { balance: initial_balance, ..Default::default() },
),
(
CONTRACT_ADDRESS,
GenesisAccount {
code: Some(COUNTER_DEPLOYED_BYTECODE),
..Default::default()
},
),
]
.into(),
..MAINNET.genesis.clone()
})
.shanghai_activated()
.build(),
);
let provider_factory = create_test_provider_factory_with_chain_spec(chain_spec.clone());
init_genesis(&provider_factory).expect("init genesis");
let genesis = provider_factory.sealed_header(0)?.expect("genesis should exist");
let evm_config = EthEvmConfig::new(chain_spec.clone());
// Build blocks by actually executing transactions to get correct state roots
let num_blocks = 5u64;
let mut blocks: Vec<SealedBlock<Block>> = Vec::new();
let mut parent_hash = genesis.hash();
let gas_price = INITIAL_BASE_FEE as u128;
let transfer_value = U256::from(ETH_TO_WEI); // 1 ETH per block
for block_num in 1..=num_blocks {
// Each block has 2 transactions: ETH transfer + Counter increment
let base_nonce = (block_num - 1) * 2;
// Transaction 1: ETH transfer
let eth_transfer_tx = sign_tx_with_key_pair(
key_pair,
Transaction::Eip1559(TxEip1559 {
chain_id: chain_spec.chain.id(),
nonce: base_nonce,
gas_limit: 21_000,
max_fee_per_gas: gas_price,
max_priority_fee_per_gas: 0,
to: TxKind::Call(recipient_address),
value: transfer_value,
input: Bytes::new(),
..Default::default()
}),
);
// Transaction 2: Counter increment
let counter_tx = sign_tx_with_key_pair(
key_pair,
Transaction::Eip1559(TxEip1559 {
chain_id: chain_spec.chain.id(),
nonce: base_nonce + 1,
gas_limit: 100_000, // Enough gas for SSTORE operations
max_fee_per_gas: gas_price,
max_priority_fee_per_gas: 0,
to: TxKind::Call(CONTRACT_ADDRESS),
value: U256::ZERO,
input: Bytes::from(INCREMENT_SELECTOR.to_vec()),
..Default::default()
}),
);
let transactions = vec![eth_transfer_tx, counter_tx];
let tx_root = calculate_transaction_root(&transactions);
// Build a temporary header for execution
let temp_header = Header {
parent_hash,
number: block_num,
gas_limit: 30_000_000,
base_fee_per_gas: Some(INITIAL_BASE_FEE),
timestamp: block_num * 12,
..Default::default()
};
// Execute the block to get the state changes
let provider = provider_factory.database_provider_rw()?;
let block_with_senders = RecoveredBlock::new_unhashed(
Block::new(
temp_header.clone(),
BlockBody {
transactions: transactions.clone(),
ommers: Vec::new(),
withdrawals: None,
},
),
vec![signer_address, signer_address], // Both txs from same sender
);
// Execute in a scope so state_provider is dropped before we use provider for writes
let output = {
let state_provider = provider.latest();
let db = StateProviderDatabase::new(&*state_provider);
let executor = evm_config.batch_executor(db);
executor.execute(&block_with_senders)?
};
let gas_used = output.gas_used;
// Convert bundle state to hashed post state and compute state root
let hashed_state =
HashedPostState::from_bundle_state::<KeccakKeyHasher>(output.state.state());
let (state_root, _trie_updates) = StateRoot::overlay_root_with_updates(
provider.tx_ref(),
&hashed_state.clone().into_sorted(),
)?;
// Create receipts for receipt root calculation (one per transaction)
let receipts: Vec<_> = output.receipts.iter().map(|r| r.with_bloom_ref()).collect();
let receipts_root = calculate_receipt_root(&receipts);
let header = Header {
parent_hash,
number: block_num,
state_root,
transactions_root: tx_root,
receipts_root,
gas_limit: 30_000_000,
gas_used,
base_fee_per_gas: Some(INITIAL_BASE_FEE),
timestamp: block_num * 12,
..Default::default()
};
let block: SealedBlock<Block> = SealedBlock::seal_parts(
header.clone(),
BlockBody { transactions, ommers: Vec::new(), withdrawals: None },
);
// Write the plain state to database so subsequent blocks build on it
let plain_state = output.state.to_plain_state(OriginalValuesKnown::Yes);
provider.write_state_changes(plain_state)?;
provider.write_hashed_state(&hashed_state.into_sorted())?;
provider.commit()?;
parent_hash = block.hash();
blocks.push(block);
}
// Create a fresh provider factory for the pipeline (clean state from genesis)
// This is needed because we wrote state during block generation for computing state roots
let pipeline_provider_factory =
create_test_provider_factory_with_chain_spec(chain_spec.clone());
init_genesis(&pipeline_provider_factory).expect("init genesis");
let pipeline_genesis =
pipeline_provider_factory.sealed_header(0)?.expect("genesis should exist");
let pipeline_consensus = NoopConsensus::arc();
let file_client = create_file_client_from_blocks(blocks);
let max_block = file_client.max_block().unwrap();
let tip = file_client.tip().expect("tip");
let stages_config = StageConfig::default();
let (header_downloader, body_downloader) = build_downloaders_from_file_client(
file_client,
pipeline_genesis,
stages_config,
pipeline_consensus,
pipeline_provider_factory.clone(),
);
let pipeline = build_pipeline(
pipeline_provider_factory.clone(),
header_downloader,
body_downloader,
max_block,
tip,
);
let (mut pipeline, result) = pipeline.run_as_fut(None).await;
result?;
// Verify forward sync
{
let provider = pipeline_provider_factory.provider()?;
let last_block = provider.last_block_number()?;
assert_eq!(last_block, 5, "should have synced 5 blocks");
for stage_id in [
StageId::Headers,
StageId::Bodies,
StageId::SenderRecovery,
StageId::Execution,
StageId::AccountHashing,
StageId::StorageHashing,
StageId::MerkleExecute,
StageId::TransactionLookup,
StageId::IndexAccountHistory,
StageId::IndexStorageHistory,
StageId::Finish,
] {
let checkpoint = provider.get_stage_checkpoint(stage_id)?;
assert_eq!(
checkpoint.map(|c| c.block_number),
Some(5),
"{stage_id} checkpoint should be at block 5"
);
}
// Verify the counter contract's storage was updated
// After 5 blocks with 1 increment each, slot 0 should be 5
let state = provider.latest();
let counter_storage = state.storage(CONTRACT_ADDRESS, B256::ZERO)?;
assert_eq!(
counter_storage,
Some(U256::from(5)),
"Counter storage slot 0 should be 5 after 5 increments"
);
}
// Verify changesets are queryable before unwind
// This validates that the #21561 fix works - unwind needs to read changesets from the correct
// source
assert_changesets_queryable(&pipeline_provider_factory, 1..=5)?;
// Unwind to block 2
let unwind_target = 2u64;
pipeline.unwind(unwind_target, None)?;
// Verify unwind
{
let provider = pipeline_provider_factory.provider()?;
for stage_id in [
StageId::Headers,
StageId::Bodies,
StageId::SenderRecovery,
StageId::Execution,
StageId::AccountHashing,
StageId::StorageHashing,
StageId::MerkleExecute,
StageId::TransactionLookup,
StageId::IndexAccountHistory,
StageId::IndexStorageHistory,
] {
let checkpoint = provider.get_stage_checkpoint(stage_id)?;
if let Some(cp) = checkpoint {
assert!(
cp.block_number <= unwind_target,
"{stage_id} checkpoint {} should be <= {unwind_target}",
cp.block_number
);
}
}
}
Ok(())
}

View File

@@ -1,4 +1,4 @@
use crate::{BlockNumber, Compression};
use crate::{find_fixed_range, BlockNumber, Compression};
use alloc::{format, string::String, vec::Vec};
use alloy_primitives::TxNumber;
use core::{
@@ -376,6 +376,20 @@ impl SegmentHeader {
self.expected_block_range.start()
}
/// Sets the expected block start of the segment, using the file boundary end
/// from `find_fixed_range`.
///
/// This is useful for non-zero genesis blocks where the actual starting block
/// differs from the file range start determined by `find_fixed_range`.
/// For example, if `blocks_per_file` is 500 and genesis is at 502, the range
/// becomes 502..=999 (start at genesis, end at file boundary).
pub const fn set_expected_block_start(&mut self, block: BlockNumber) {
let blocks_per_file =
self.expected_block_range.end() - self.expected_block_range.start() + 1;
let file_range = find_fixed_range(block, blocks_per_file);
self.expected_block_range = SegmentRangeInclusive::new(block, file_range.end());
}
/// The expected block end of the segment.
pub const fn expected_block_end(&self) -> BlockNumber {
self.expected_block_range.end()

View File

@@ -211,6 +211,26 @@ where
// Behaviour reserved only for new nodes should be set in the storage settings.
provider_rw.write_storage_settings(genesis_storage_settings)?;
// For non-zero genesis blocks, set expected_block_start BEFORE insert_genesis_state.
// When block_range is None, next_block_number() uses expected_block_start. By default,
// expected_block_start comes from find_fixed_range which returns the file range start (0),
// not the genesis block number. This would cause increment_block(N) to fail.
let static_file_provider = provider_rw.static_file_provider();
if genesis_block_number > 0 {
if genesis_storage_settings.account_changesets_in_static_files {
static_file_provider
.get_writer(genesis_block_number, StaticFileSegment::AccountChangeSets)?
.user_header_mut()
.set_expected_block_start(genesis_block_number);
}
if genesis_storage_settings.storage_changesets_in_static_files {
static_file_provider
.get_writer(genesis_block_number, StaticFileSegment::StorageChangeSets)?
.user_header_mut()
.set_expected_block_start(genesis_block_number);
}
}
insert_genesis_hashes(&provider_rw, alloc.iter())?;
insert_genesis_history(&provider_rw, alloc.iter())?;
@@ -228,16 +248,11 @@ where
provider_rw.save_stage_checkpoint(stage, checkpoint)?;
}
// Static file segments start empty, so we need to initialize the genesis block.
//
// We do not do this for changesets because they get initialized in `insert_state` /
// `write_state` / `write_state_reverts`. If the node is configured for writing changesets to
// static files they will be written there, otherwise they will be written to the DB.
// Static file segments start empty, so we need to initialize the block range.
// For genesis blocks with non-zero block numbers, we use get_writer() instead of
// latest_writer() and set_block_range() to ensure static files start at the correct block.
let static_file_provider = provider_rw.static_file_provider();
// Static file segments start empty, so we need to initialize the genesis block.
// For genesis blocks with non-zero block numbers, we need to use get_writer() instead of
// latest_writer() to ensure the genesis block is stored in the correct static file range.
static_file_provider
.get_writer(genesis_block_number, StaticFileSegment::Receipts)?
.user_header_mut()

View File

@@ -2511,7 +2511,19 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> StateWriter
} else {
self.take::<tables::StorageChangeSets>(storage_range)?
};
let account_changeset = self.take::<tables::AccountChangeSets>(range)?;
let account_changeset = if let Some(_highest_block) = self
.static_file_provider
.get_highest_static_file_block(StaticFileSegment::AccountChangeSets) &&
self.cached_storage_settings().account_changesets_in_static_files
{
let changesets = self.account_changesets_range(range)?;
let mut changeset_writer =
self.static_file_provider.latest_writer(StaticFileSegment::AccountChangeSets)?;
changeset_writer.prune_account_changesets(block)?;
changesets
} else {
self.take::<tables::AccountChangeSets>(range)?
};
// This is not working for blocks that are not at tip. as plain state is not the last
// state of end range. We should rename the functions or add support to access

View File

@@ -58,11 +58,12 @@ pub fn create_test_provider_factory_with_node_types<N: NodeTypesForProvider>(
let (static_dir, _) = create_test_static_files_dir();
let (rocksdb_dir, _) = create_test_rocksdb_dir();
let db = create_test_rw_db();
let rocksdb_path = rocksdb_dir.keep();
ProviderFactory::new(
db,
chain_spec,
StaticFileProvider::read_write(static_dir.keep()).expect("static file provider"),
RocksDBBuilder::new(&rocksdb_dir)
RocksDBBuilder::new(&rocksdb_path)
.with_default_tables()
.build()
.expect("failed to create test RocksDB provider"),