Compare commits

..

1 Commits

Author SHA1 Message Date
yongkangc
082daa7fc1 fix(rocksdb): filter account/storage history to match MDBX semantics 2026-01-22 12:58:59 +00:00
2 changed files with 67 additions and 218 deletions

View File

@@ -253,21 +253,14 @@ impl RocksDBProvider {
return Ok(None);
}
// Find the max block number across all entries, including sentinel shards.
// Find the max highest_block_number (excluding u64::MAX sentinel) across all
// entries. Also track if we found any non-sentinel entries.
let mut max_highest_block = 0u64;
let mut max_sentinel_block = 0u64;
let mut found_non_sentinel = false;
for result in self.iter::<tables::StoragesHistory>()? {
let (key, value) = result?;
let (key, _) = result?;
let highest = key.sharded_key.highest_block_number;
if highest == u64::MAX {
if let Some(max_in_shard) = value.max() &&
max_in_shard > max_sentinel_block
{
max_sentinel_block = max_in_shard;
}
} else {
if highest != u64::MAX {
found_non_sentinel = true;
if highest > max_highest_block {
max_highest_block = highest;
@@ -275,34 +268,31 @@ impl RocksDBProvider {
}
}
let effective_max = max_highest_block.max(max_sentinel_block);
// If we only have sentinel shards and they're all empty, we're consistent
if !found_non_sentinel && max_sentinel_block == 0 {
// If all entries are sentinel entries (u64::MAX), treat as first-run scenario.
// This means no completed shards exist (only sentinel shards with
// highest_block_number=u64::MAX), so no actual history has been indexed.
if !found_non_sentinel {
return Ok(None);
}
// If any entry has data > checkpoint, prune excess
if effective_max > checkpoint {
// If any entry has highest_block > checkpoint, prune excess
if max_highest_block > checkpoint {
tracing::info!(
target: "reth::providers::rocksdb",
rocks_highest = effective_max,
max_non_sentinel = max_highest_block,
max_sentinel = max_sentinel_block,
rocks_highest = max_highest_block,
checkpoint,
"StoragesHistory ahead of checkpoint, pruning excess data"
);
self.prune_storages_history_above(checkpoint)?;
return Ok(None);
} else if effective_max < checkpoint {
} else if max_highest_block < checkpoint {
// RocksDB is behind checkpoint, return highest block to signal unwind needed
tracing::warn!(
target: "reth::providers::rocksdb",
rocks_highest = effective_max,
rocks_highest = max_highest_block,
checkpoint,
"StoragesHistory behind checkpoint, unwind needed"
);
return Ok(Some(effective_max));
return Ok(Some(max_highest_block));
}
Ok(None)
@@ -319,42 +309,25 @@ impl RocksDBProvider {
/// For `StoragesHistory`, the key contains `highest_block_number`, so we can iterate
/// and delete entries where `key.sharded_key.highest_block_number > max_block`.
///
/// Sentinel shards (with `highest_block_number = u64::MAX`) require special handling:
/// we must read their contents and filter out block numbers > `max_block`.
///
/// TODO(<https://github.com/paradigmxyz/reth/issues/20417>): this iterates the whole table,
/// which is inefficient. Use changeset-based pruning instead.
fn prune_storages_history_above(&self, max_block: BlockNumber) -> ProviderResult<()> {
use reth_db_api::{models::storage_sharded_key::StorageShardedKey, BlockNumberList};
use reth_db_api::models::storage_sharded_key::StorageShardedKey;
let mut to_delete: Vec<StorageShardedKey> = Vec::new();
let mut to_rewrite: Vec<(StorageShardedKey, BlockNumberList)> = Vec::new();
for result in self.iter::<tables::StoragesHistory>()? {
let (key, value) = result?;
let (key, _) = result?;
let highest_block = key.sharded_key.highest_block_number;
if max_block == 0 {
to_delete.push(key);
} else if highest_block == u64::MAX {
let filtered: Vec<u64> = value.iter().filter(|&bn| bn <= max_block).collect();
if filtered.is_empty() {
to_delete.push(key);
} else if filtered.len() != value.len() as usize {
to_rewrite.push((key, BlockNumberList::new_pre_sorted(filtered)));
}
} else if highest_block > max_block {
if max_block == 0 || (highest_block != u64::MAX && highest_block > max_block) {
to_delete.push(key);
}
}
let deleted = to_delete.len();
let rewritten = to_rewrite.len();
if deleted > 0 || rewritten > 0 {
if deleted > 0 {
tracing::info!(
target: "reth::providers::rocksdb",
deleted_count = deleted,
rewritten_count = rewritten,
max_block,
"Pruning StoragesHistory entries"
);
@@ -363,9 +336,6 @@ impl RocksDBProvider {
for key in to_delete {
batch.delete::<tables::StoragesHistory>(key)?;
}
for (key, value) in to_rewrite {
batch.put::<tables::StoragesHistory>(key, &value)?;
}
batch.commit()?;
}
@@ -404,24 +374,14 @@ impl RocksDBProvider {
return Ok(None);
}
// Find the max block number across all entries, including sentinel shards.
// For sentinel shards (highest_block_number == u64::MAX), we need to look
// at the actual block numbers stored in the shard.
// Find the max highest_block_number (excluding u64::MAX sentinel) across all
// entries. Also track if we found any non-sentinel entries.
let mut max_highest_block = 0u64;
let mut max_sentinel_block = 0u64;
let mut found_non_sentinel = false;
for result in self.iter::<tables::AccountsHistory>()? {
let (key, value) = result?;
let (key, _) = result?;
let highest = key.highest_block_number;
if highest == u64::MAX {
// Sentinel shard: check the actual max block number in the shard
if let Some(max_in_shard) = value.max() &&
max_in_shard > max_sentinel_block
{
max_sentinel_block = max_in_shard;
}
} else {
if highest != u64::MAX {
found_non_sentinel = true;
if highest > max_highest_block {
max_highest_block = highest;
@@ -429,22 +389,18 @@ impl RocksDBProvider {
}
}
// Use the higher of the two maxes
let effective_max = max_highest_block.max(max_sentinel_block);
// If we only have sentinel shards and they're all within checkpoint,
// we're consistent
if !found_non_sentinel && max_sentinel_block == 0 {
// If all entries are sentinel entries (u64::MAX), treat as first-run scenario.
// This means no completed shards exist (only sentinel shards with
// highest_block_number=u64::MAX), so no actual history has been indexed.
if !found_non_sentinel {
return Ok(None);
}
// If any entry has data > checkpoint, prune excess
if effective_max > checkpoint {
// If any entry has highest_block > checkpoint, prune excess
if max_highest_block > checkpoint {
tracing::info!(
target: "reth::providers::rocksdb",
rocks_highest = effective_max,
max_non_sentinel = max_highest_block,
max_sentinel = max_sentinel_block,
rocks_highest = max_highest_block,
checkpoint,
"AccountsHistory ahead of checkpoint, pruning excess data"
);
@@ -453,14 +409,14 @@ impl RocksDBProvider {
}
// If RocksDB is behind the checkpoint, request an unwind to rebuild.
if effective_max < checkpoint {
if max_highest_block < checkpoint {
tracing::warn!(
target: "reth::providers::rocksdb",
rocks_highest = effective_max,
rocks_highest = max_highest_block,
checkpoint,
"AccountsHistory behind checkpoint, unwind needed"
);
return Ok(Some(effective_max));
return Ok(Some(max_highest_block));
}
Ok(None)
@@ -478,47 +434,26 @@ impl RocksDBProvider {
/// `highest_block_number`, so we can iterate and delete entries where
/// `key.highest_block_number > max_block`.
///
/// Sentinel shards (with `highest_block_number = u64::MAX`) require special handling:
/// we must read their contents and filter out block numbers > `max_block`.
///
/// TODO(<https://github.com/paradigmxyz/reth/issues/20417>): this iterates the whole table,
/// which is inefficient. Use changeset-based pruning instead.
fn prune_accounts_history_above(&self, max_block: BlockNumber) -> ProviderResult<()> {
use alloy_primitives::Address;
use reth_db_api::{models::ShardedKey, BlockNumberList};
use reth_db_api::models::ShardedKey;
let mut to_delete: Vec<ShardedKey<Address>> = Vec::new();
let mut to_rewrite: Vec<(ShardedKey<Address>, BlockNumberList)> = Vec::new();
for result in self.iter::<tables::AccountsHistory>()? {
let (key, value) = result?;
let (key, _) = result?;
let highest_block = key.highest_block_number;
if max_block == 0 {
// Clear everything
to_delete.push(key);
} else if highest_block == u64::MAX {
// Sentinel shard: filter out block numbers > max_block
let filtered: Vec<u64> = value.iter().filter(|&bn| bn <= max_block).collect();
if filtered.is_empty() {
to_delete.push(key);
} else if filtered.len() != value.len() as usize {
// Some entries were filtered out, rewrite the shard
to_rewrite.push((key, BlockNumberList::new_pre_sorted(filtered)));
}
} else if highest_block > max_block {
// Non-sentinel shard above max_block: delete
if max_block == 0 || (highest_block != u64::MAX && highest_block > max_block) {
to_delete.push(key);
}
}
let deleted = to_delete.len();
let rewritten = to_rewrite.len();
if deleted > 0 || rewritten > 0 {
if deleted > 0 {
tracing::info!(
target: "reth::providers::rocksdb",
deleted_count = deleted,
rewritten_count = rewritten,
max_block,
"Pruning AccountsHistory entries"
);
@@ -527,9 +462,6 @@ impl RocksDBProvider {
for key in to_delete {
batch.delete::<tables::AccountsHistory>(key)?;
}
for (key, value) in to_rewrite {
batch.put::<tables::AccountsHistory>(key, &value)?;
}
batch.commit()?;
}
@@ -1064,7 +996,6 @@ mod tests {
// This simulates a scenario where history tracking started but no shards were completed
let key_sentinel_1 = StorageShardedKey::new(Address::ZERO, B256::ZERO, u64::MAX);
let key_sentinel_2 = StorageShardedKey::new(Address::random(), B256::random(), u64::MAX);
// Use a checkpoint that matches the sentinel shard contents (max block = 30)
let block_list = BlockNumberList::new_pre_sorted([10, 20, 30]);
rocksdb.put::<tables::StoragesHistory>(key_sentinel_1, &block_list).unwrap();
rocksdb.put::<tables::StoragesHistory>(key_sentinel_2, &block_list).unwrap();
@@ -1078,21 +1009,24 @@ mod tests {
StorageSettings::legacy().with_storages_history_in_rocksdb(true),
);
// Set a checkpoint that matches the sentinel shard data (max block = 30)
// This is a normal scenario where sentinel shards exist and are in sync with checkpoint
// Set a checkpoint indicating we should have processed up to block 100
{
let provider = factory.database_provider_rw().unwrap();
provider
.save_stage_checkpoint(StageId::IndexStorageHistory, StageCheckpoint::new(30))
.save_stage_checkpoint(StageId::IndexStorageHistory, StageCheckpoint::new(100))
.unwrap();
provider.commit().unwrap();
}
let provider = factory.database_provider_ro().unwrap();
// RocksDB has sentinel entries matching the checkpoint - consistent state
// RocksDB has only sentinel entries (no completed shards) but checkpoint is set.
// This is treated as a first-run/migration scenario - no unwind needed.
let result = rocksdb.check_consistency(&provider).unwrap();
assert_eq!(result, None, "Sentinel-only entries matching checkpoint should be consistent");
assert_eq!(
result, None,
"Sentinel-only entries with checkpoint should be treated as first run"
);
}
#[test]
@@ -1108,7 +1042,6 @@ mod tests {
// Insert ONLY sentinel entries (highest_block_number = u64::MAX)
let key_sentinel_1 = ShardedKey::new(Address::ZERO, u64::MAX);
let key_sentinel_2 = ShardedKey::new(Address::random(), u64::MAX);
// Use a checkpoint that matches the sentinel shard contents (max block = 30)
let block_list = BlockNumberList::new_pre_sorted([10, 20, 30]);
rocksdb.put::<tables::AccountsHistory>(key_sentinel_1, &block_list).unwrap();
rocksdb.put::<tables::AccountsHistory>(key_sentinel_2, &block_list).unwrap();
@@ -1122,21 +1055,24 @@ mod tests {
StorageSettings::legacy().with_account_history_in_rocksdb(true),
);
// Set a checkpoint that matches the sentinel shard data (max block = 30)
// This is a normal scenario where sentinel shards exist and are in sync with checkpoint
// Set a checkpoint indicating we should have processed up to block 100
{
let provider = factory.database_provider_rw().unwrap();
provider
.save_stage_checkpoint(StageId::IndexAccountHistory, StageCheckpoint::new(30))
.save_stage_checkpoint(StageId::IndexAccountHistory, StageCheckpoint::new(100))
.unwrap();
provider.commit().unwrap();
}
let provider = factory.database_provider_ro().unwrap();
// RocksDB has sentinel entries matching the checkpoint - consistent state
// RocksDB has only sentinel entries (no completed shards) but checkpoint is set.
// This is treated as a first-run/migration scenario - no unwind needed.
let result = rocksdb.check_consistency(&provider).unwrap();
assert_eq!(result, None, "Sentinel-only entries matching checkpoint should be consistent");
assert_eq!(
result, None,
"Sentinel-only entries with checkpoint should be treated as first run"
);
}
#[test]
@@ -1466,101 +1402,4 @@ mod tests {
"Should require unwind to block 50 to rebuild AccountsHistory"
);
}
#[test]
fn test_prune_storages_history_sentinel_shard_rewrite() {
use reth_db_api::models::storage_sharded_key::StorageShardedKey;
let temp_dir = TempDir::new().unwrap();
let rocksdb = RocksDBBuilder::new(temp_dir.path())
.with_table::<tables::StoragesHistory>()
.build()
.unwrap();
let address = Address::random();
let storage_key = B256::random();
// Create a sentinel shard with blocks both above and below checkpoint 100
// Contains [10, 50, 150] - after pruning at checkpoint 100, should become [10, 50]
let sentinel_key = StorageShardedKey::new(address, storage_key, u64::MAX);
let block_list = BlockNumberList::new_pre_sorted([10, 50, 150]);
rocksdb.put::<tables::StoragesHistory>(sentinel_key.clone(), &block_list).unwrap();
// Create a test provider factory for MDBX
let factory = create_test_provider_factory();
factory.set_storage_settings_cache(
StorageSettings::legacy().with_storages_history_in_rocksdb(true),
);
// Set checkpoint to block 100
{
let provider = factory.database_provider_rw().unwrap();
provider
.save_stage_checkpoint(StageId::IndexStorageHistory, StageCheckpoint::new(100))
.unwrap();
provider.commit().unwrap();
}
let provider = factory.database_provider_ro().unwrap();
// Run consistency check - should prune the sentinel shard
let result = rocksdb.check_consistency(&provider).unwrap();
assert_eq!(result, None, "Should heal by pruning, no unwind needed");
// Verify the sentinel shard was rewritten (not deleted) with only [10, 50]
let rewritten = rocksdb.get::<tables::StoragesHistory>(sentinel_key).unwrap();
assert!(rewritten.is_some(), "Sentinel shard should be rewritten, not deleted");
let rewritten_list = rewritten.unwrap();
let blocks: Vec<u64> = rewritten_list.iter().collect();
assert_eq!(blocks, vec![10, 50], "Sentinel shard should contain only blocks <= 100");
}
#[test]
fn test_prune_accounts_history_sentinel_shard_rewrite() {
use reth_db_api::models::ShardedKey;
let temp_dir = TempDir::new().unwrap();
let rocksdb = RocksDBBuilder::new(temp_dir.path())
.with_table::<tables::AccountsHistory>()
.build()
.unwrap();
let address = Address::random();
// Create a sentinel shard with blocks both above and below checkpoint 100
// Contains [10, 50, 150] - after pruning at checkpoint 100, should become [10, 50]
let sentinel_key = ShardedKey::new(address, u64::MAX);
let block_list = BlockNumberList::new_pre_sorted([10, 50, 150]);
rocksdb.put::<tables::AccountsHistory>(sentinel_key.clone(), &block_list).unwrap();
// Create a test provider factory for MDBX
let factory = create_test_provider_factory();
factory.set_storage_settings_cache(
StorageSettings::legacy().with_account_history_in_rocksdb(true),
);
// Set checkpoint to block 100
{
let provider = factory.database_provider_rw().unwrap();
provider
.save_stage_checkpoint(StageId::IndexAccountHistory, StageCheckpoint::new(100))
.unwrap();
provider.commit().unwrap();
}
let provider = factory.database_provider_ro().unwrap();
// Run consistency check - should prune the sentinel shard
let result = rocksdb.check_consistency(&provider).unwrap();
assert_eq!(result, None, "Should heal by pruning, no unwind needed");
// Verify the sentinel shard was rewritten (not deleted) with only [10, 50]
let rewritten = rocksdb.get::<tables::AccountsHistory>(sentinel_key).unwrap();
assert!(rewritten.is_some(), "Sentinel shard should be rewritten, not deleted");
let rewritten_list = rewritten.unwrap();
let blocks: Vec<u64> = rewritten_list.iter().collect();
assert_eq!(blocks, vec![10, 50], "Sentinel shard should contain only blocks <= 100");
}
}

View File

@@ -1069,8 +1069,13 @@ impl RocksDBProvider {
for (block_idx, block) in blocks.iter().enumerate() {
let block_number = ctx.first_block_number + block_idx as u64;
let bundle = &block.execution_outcome().state;
for &address in bundle.state().keys() {
account_history.entry(address).or_default().push(block_number);
// Only record accounts where account-info changed OR account was destroyed.
// Skip accounts that only had storage changes - this matches MDBX semantics
// where AccountsHistory tracks account-level changes, not storage-only touches.
for (&address, account) in bundle.state() {
if account.is_info_changed() || account.was_destroyed() {
account_history.entry(address).or_default().push(block_number);
}
}
}
@@ -1095,9 +1100,14 @@ impl RocksDBProvider {
let block_number = ctx.first_block_number + block_idx as u64;
let bundle = &block.execution_outcome().state;
for (&address, account) in bundle.state() {
for &slot in account.storage.keys() {
let key = B256::new(slot.to_be_bytes());
storage_history.entry((address, key)).or_default().push(block_number);
// Only record storage slots that actually changed value.
// This matches MDBX semantics where StoragesHistory only tracks
// slots with value changes, not just touched slots.
for (&slot, storage_slot) in &account.storage {
if storage_slot.is_changed() {
let key = B256::new(slot.to_be_bytes());
storage_history.entry((address, key)).or_default().push(block_number);
}
}
}
}