Compare commits

...

1 Commits

Author SHA1 Message Date
yongkangc
aeb4862c23 fix(rocksdb): handle out-of-order history index appends
**Problem**

When RocksDB already has history indices up to block N (e.g., from a snapshot
or previous sync), persisting history for block M ≤ N fails with
`UnsortedInput` because `IntegerList::append()` requires strictly increasing
values.

This occurs during archive replay, reorgs, or benchmark scenarios.

**Solution**

Filter out already-indexed block numbers before appending:

1. **Fast path**: Read sentinel shard, filter indices > sentinel.last(), append
2. **Slow path** (on error or empty sentinel): Reverse-seek for closed shard
   max, re-filter, re-read sentinel, append

Key details:
- `partition_point(|&x| x <= cutoff)` for O(log n) zero-alloc filtering
- Re-read sentinel after failed append (RoaringTreemap mutates before erroring)
- Skip re-read when sentinel was empty (no corruption possible)

Closes RETH-301

Amp-Thread-ID: https://ampcode.com/threads/T-019c1e5c-d0c7-7645-99ae-8ec1c552d407
2026-02-02 12:56:30 +00:00

View File

@@ -9,8 +9,8 @@ use reth_chain_state::ExecutedBlock;
use reth_db_api::{
database_metrics::DatabaseMetrics,
models::{
sharded_key::NUM_OF_INDICES_IN_SHARD, storage_sharded_key::StorageShardedKey, ShardedKey,
StorageSettings,
integer_list::IntegerListError, sharded_key::NUM_OF_INDICES_IN_SHARD,
storage_sharded_key::StorageShardedKey, ShardedKey, StorageSettings,
},
table::{Compress, Decode, Decompress, Encode, Table},
tables, BlockNumberList, DatabaseError,
@@ -35,7 +35,7 @@ use std::{
thread,
time::Instant,
};
use tracing::instrument;
use tracing::{debug, instrument};
/// Pending `RocksDB` batches type alias.
pub(crate) type PendingRocksDBBatches = Arc<Mutex<Vec<WriteBatchWithTransaction<true>>>>;
@@ -1000,6 +1000,71 @@ impl RocksDBProvider {
Ok(result)
}
/// Returns the highest block number from closed history shards for the given key.
///
/// Closed shards are keyed by their max block number, so we can find the max via
/// reverse seek without reading shard values. Returns `None` if no closed shards exist.
///
/// This is O(1) - a single reverse iteration step.
///
/// # Arguments
/// - `seek_key`: The key to seek from (should have `highest_block_number = u64::MAX - 1`)
/// - `matches_query`: Predicate to check if the decoded key matches the original query
fn last_history_shard_block_number<T: Table>(
&self,
seek_key: T::Key,
matches_query: impl FnOnce(&T::Key) -> Option<BlockNumber>,
) -> ProviderResult<Option<BlockNumber>>
where
T::Key: Encode + Decode,
{
let cf = self.get_cf_handle::<T>()?;
let seek_bytes = seek_key.encode();
let mut iter = self
.0
.iterator_cf(cf, IteratorMode::From(seek_bytes.as_ref(), rocksdb::Direction::Reverse));
if let Some(Ok((key_bytes, _))) = iter.next() {
let key = T::Key::decode(&key_bytes)
.map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
return Ok(matches_query(&key));
}
Ok(None)
}
/// Returns the highest block number from closed account history shards for the given address.
pub fn last_account_history_shard_key(
&self,
address: Address,
) -> ProviderResult<Option<BlockNumber>> {
self.last_history_shard_block_number::<tables::AccountsHistory>(
ShardedKey::new(address, u64::MAX - 1),
|key| {
(key.key == address && key.highest_block_number != u64::MAX)
.then_some(key.highest_block_number)
},
)
}
/// Returns the highest block number from closed storage history shards.
pub fn last_storage_history_shard_key(
&self,
address: Address,
storage_key: B256,
) -> ProviderResult<Option<BlockNumber>> {
self.last_history_shard_block_number::<tables::StoragesHistory>(
StorageShardedKey::new(address, storage_key, u64::MAX - 1),
|key| {
(key.address == address &&
key.sharded_key.key == storage_key &&
key.sharded_key.highest_block_number != u64::MAX)
.then_some(key.sharded_key.highest_block_number)
},
)
}
/// Returns all storage history shards for the given `(address, storage_key)` pair.
///
/// Iterates through all shards in ascending `highest_block_number` order until
@@ -1433,8 +1498,11 @@ impl<'a> RocksDBBatch<'a> {
/// Appends indices to an account history shard with proper shard management.
///
/// Loads the existing shard (if any), appends new indices, and rechunks into
/// multiple shards if needed (respecting `NUM_OF_INDICES_IN_SHARD` limit).
/// Uses a hybrid-lazy strategy for performance:
/// 1. Read only the sentinel shard (fast, usually cached)
/// 2. Filter using sentinel's last element if overlap detected
/// 3. Optimistic append
/// 4. On `UnsortedInput` error only: do full reverse seek + retry once
///
/// # Requirements
///
@@ -1459,33 +1527,97 @@ impl<'a> RocksDBBatch<'a> {
indices
);
let last_key = ShardedKey::new(address, u64::MAX);
let last_shard_opt = self.provider.get::<tables::AccountsHistory>(last_key.clone())?;
let mut last_shard = last_shard_opt.unwrap_or_else(BlockNumberList::empty);
let sentinel_key = ShardedKey::new(address, u64::MAX);
last_shard.append(indices).map_err(ProviderError::other)?;
// Fast path: read sentinel shard only (usually in block cache)
let mut shard = self
.provider
.get::<tables::AccountsHistory>(sentinel_key.clone())?
.unwrap_or_else(BlockNumberList::empty);
let sentinel_max = shard.iter().next_back();
// If sentinel has data, try fast-path filtering and append
if let Some(cutoff) = sentinel_max {
let start_idx =
if cutoff >= indices[0] { indices.partition_point(|&x| x <= cutoff) } else { 0 };
if start_idx == indices.len() {
return Ok(());
}
match shard.append(indices[start_idx..].iter().copied()) {
Ok(_) => return self.write_account_history_shards(address, sentinel_key, shard),
Err(IntegerListError::UnsortedInput) => {
// Fall through to slow path
}
Err(e) => return Err(ProviderError::other(e)),
}
}
// Slow path: sentinel empty/missing OR append failed - check closed shards
let closed_shard_max = self.provider.last_account_history_shard_key(address)?;
// Only log when we actually have closed shards (not just fresh DB)
if closed_shard_max.is_some() {
debug!(
target: "rocksdb::history",
?address,
?sentinel_max,
?closed_shard_max,
"Closed shards exist, using full seek path"
);
}
let existing_max = sentinel_max.max(closed_shard_max);
let start_idx = existing_max.map_or(0, |cutoff| indices.partition_point(|&x| x <= cutoff));
if start_idx == indices.len() {
return Ok(());
}
// Re-read sentinel only if append failed (shard was corrupted by partial append).
// If sentinel_max was None, we can reuse the empty shard we already have.
let mut final_shard = if sentinel_max.is_some() {
self.provider
.get::<tables::AccountsHistory>(sentinel_key.clone())?
.unwrap_or_else(BlockNumberList::empty)
} else {
shard
};
final_shard.append(indices[start_idx..].iter().copied()).map_err(ProviderError::other)?;
self.write_account_history_shards(address, sentinel_key, final_shard)
}
/// Writes account history shards, rechunking if needed.
fn write_account_history_shards(
&mut self,
address: Address,
sentinel_key: ShardedKey<Address>,
shard: BlockNumberList,
) -> ProviderResult<()> {
// Fast path: all indices fit in one shard
if last_shard.len() <= NUM_OF_INDICES_IN_SHARD as u64 {
self.put::<tables::AccountsHistory>(last_key, &last_shard)?;
if shard.len() <= NUM_OF_INDICES_IN_SHARD as u64 {
self.put::<tables::AccountsHistory>(sentinel_key, &shard)?;
return Ok(());
}
// Slow path: rechunk into multiple shards
let chunks = last_shard.iter().chunks(NUM_OF_INDICES_IN_SHARD);
let chunks = shard.iter().chunks(NUM_OF_INDICES_IN_SHARD);
let mut chunks_peekable = chunks.into_iter().peekable();
while let Some(chunk) = chunks_peekable.next() {
let shard = BlockNumberList::new_pre_sorted(chunk);
let chunk_shard = BlockNumberList::new_pre_sorted(chunk);
let highest_block_number = if chunks_peekable.peek().is_some() {
shard.iter().next_back().expect("`chunks` does not return empty list")
chunk_shard.iter().next_back().expect("`chunks` does not return empty list")
} else {
u64::MAX
};
self.put::<tables::AccountsHistory>(
ShardedKey::new(address, highest_block_number),
&shard,
&chunk_shard,
)?;
}
@@ -1494,8 +1626,11 @@ impl<'a> RocksDBBatch<'a> {
/// Appends indices to a storage history shard with proper shard management.
///
/// Loads the existing shard (if any), appends new indices, and rechunks into
/// multiple shards if needed (respecting `NUM_OF_INDICES_IN_SHARD` limit).
/// Uses a hybrid-lazy strategy for performance:
/// 1. Read only the sentinel shard (fast, usually cached)
/// 2. Filter using sentinel's last element if overlap detected
/// 3. Optimistic append
/// 4. On `UnsortedInput` error only: do full reverse seek + retry once
///
/// # Requirements
///
@@ -1521,33 +1656,107 @@ impl<'a> RocksDBBatch<'a> {
indices
);
let last_key = StorageShardedKey::last(address, storage_key);
let last_shard_opt = self.provider.get::<tables::StoragesHistory>(last_key.clone())?;
let mut last_shard = last_shard_opt.unwrap_or_else(BlockNumberList::empty);
let sentinel_key = StorageShardedKey::last(address, storage_key);
last_shard.append(indices).map_err(ProviderError::other)?;
// Fast path: read sentinel shard only (usually in block cache)
let mut shard = self
.provider
.get::<tables::StoragesHistory>(sentinel_key.clone())?
.unwrap_or_else(BlockNumberList::empty);
let sentinel_max = shard.iter().next_back();
// If sentinel has data, try fast-path filtering and append
if let Some(cutoff) = sentinel_max {
let start_idx =
if cutoff >= indices[0] { indices.partition_point(|&x| x <= cutoff) } else { 0 };
if start_idx == indices.len() {
return Ok(());
}
match shard.append(indices[start_idx..].iter().copied()) {
Ok(_) => {
return self.write_storage_history_shards(
address,
storage_key,
sentinel_key,
shard,
);
}
Err(IntegerListError::UnsortedInput) => {
// Fall through to slow path
}
Err(e) => return Err(ProviderError::other(e)),
}
}
// Slow path: sentinel empty/missing OR append failed - check closed shards
let closed_shard_max =
self.provider.last_storage_history_shard_key(address, storage_key)?;
// Only log when we actually have closed shards (not just fresh DB)
if closed_shard_max.is_some() {
debug!(
target: "rocksdb::history",
?address,
?storage_key,
?sentinel_max,
?closed_shard_max,
"Closed shards exist, using full seek path"
);
}
let existing_max = sentinel_max.max(closed_shard_max);
let start_idx = existing_max.map_or(0, |cutoff| indices.partition_point(|&x| x <= cutoff));
if start_idx == indices.len() {
return Ok(());
}
// Re-read sentinel only if append failed (shard was corrupted by partial append).
// If sentinel_max was None, we can reuse the empty shard we already have.
let mut final_shard = if sentinel_max.is_some() {
self.provider
.get::<tables::StoragesHistory>(sentinel_key.clone())?
.unwrap_or_else(BlockNumberList::empty)
} else {
shard
};
final_shard.append(indices[start_idx..].iter().copied()).map_err(ProviderError::other)?;
self.write_storage_history_shards(address, storage_key, sentinel_key, final_shard)
}
/// Writes storage history shards, rechunking if needed.
fn write_storage_history_shards(
&mut self,
address: Address,
storage_key: B256,
sentinel_key: StorageShardedKey,
shard: BlockNumberList,
) -> ProviderResult<()> {
// Fast path: all indices fit in one shard
if last_shard.len() <= NUM_OF_INDICES_IN_SHARD as u64 {
self.put::<tables::StoragesHistory>(last_key, &last_shard)?;
if shard.len() <= NUM_OF_INDICES_IN_SHARD as u64 {
self.put::<tables::StoragesHistory>(sentinel_key, &shard)?;
return Ok(());
}
// Slow path: rechunk into multiple shards
let chunks = last_shard.iter().chunks(NUM_OF_INDICES_IN_SHARD);
let chunks = shard.iter().chunks(NUM_OF_INDICES_IN_SHARD);
let mut chunks_peekable = chunks.into_iter().peekable();
while let Some(chunk) = chunks_peekable.next() {
let shard = BlockNumberList::new_pre_sorted(chunk);
let chunk_shard = BlockNumberList::new_pre_sorted(chunk);
let highest_block_number = if chunks_peekable.peek().is_some() {
shard.iter().next_back().expect("`chunks` does not return empty list")
chunk_shard.iter().next_back().expect("`chunks` does not return empty list")
} else {
u64::MAX
};
self.put::<tables::StoragesHistory>(
StorageShardedKey::new(address, storage_key, highest_block_number),
&shard,
&chunk_shard,
)?;
}
@@ -3013,6 +3222,195 @@ mod tests {
assert!(shards.is_empty());
}
#[test]
fn test_append_history_filters_overlapping_indices_with_sentinel() {
// Test the fast path: sentinel shard exists with overlap
let temp_dir = TempDir::new().unwrap();
let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
let address = Address::from([0xAA; 20]);
// Create sentinel shard with some existing indices
let existing = BlockNumberList::new_pre_sorted([10, 20, 30]);
let mut batch = provider.batch();
batch
.put::<tables::AccountsHistory>(ShardedKey::new(address, u64::MAX), &existing)
.unwrap();
batch.commit().unwrap();
// Append includes overlap and older values; should only append > 30
let mut batch = provider.batch();
batch.append_account_history_shard(address, vec![20, 30, 31, 40]).unwrap();
batch.commit().unwrap();
let shards = provider.account_history_shards(address).unwrap();
assert_eq!(shards.len(), 1);
assert_eq!(shards[0].0.highest_block_number, u64::MAX);
assert_eq!(shards[0].1.iter().collect::<Vec<_>>(), vec![10, 20, 30, 31, 40]);
}
#[test]
fn test_append_history_filters_overlapping_indices_with_closed_shards() {
// Test the slow path: no sentinel, closed shard exists with higher block numbers
// This triggers UnsortedInput -> fallback to full seek
let temp_dir = TempDir::new().unwrap();
let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
let address = Address::from([0x42; 20]);
// First, manually write a "closed" shard with block number 1000 as the key
// This simulates archive data that already exists (NO sentinel shard)
let closed_shard = BlockNumberList::new_pre_sorted([500, 600, 700, 800, 900, 1000]);
let mut batch = provider.batch();
batch
.put::<tables::AccountsHistory>(ShardedKey::new(address, 1000), &closed_shard)
.unwrap();
batch.commit().unwrap();
// Now try to append indices that are LOWER than the existing max (1000)
// This will trigger the fallback path since sentinel is empty but closed shard exists
let mut batch = provider.batch();
batch.append_account_history_shard(address, [100, 200, 300]).unwrap();
batch.commit().unwrap();
// Verify: only the closed shard should exist, no new data added
let shards = provider.account_history_shards(address).unwrap();
assert_eq!(shards.len(), 1);
assert_eq!(shards[0].0.highest_block_number, 1000);
assert_eq!(shards[0].1.iter().collect::<Vec<_>>(), vec![500, 600, 700, 800, 900, 1000]);
// Now append indices that are HIGHER than existing max - these should be added
let mut batch = provider.batch();
batch.append_account_history_shard(address, [1100, 1200]).unwrap();
batch.commit().unwrap();
let shards = provider.account_history_shards(address).unwrap();
// Should now have 2 shards: original closed shard + new MAX shard
assert_eq!(shards.len(), 2);
assert_eq!(shards[0].0.highest_block_number, 1000);
assert_eq!(shards[1].0.highest_block_number, u64::MAX);
assert_eq!(shards[1].1.iter().collect::<Vec<_>>(), vec![1100, 1200]);
}
#[test]
fn test_append_storage_history_filters_overlapping() {
// Test storage history with overlap filtering
let temp_dir = TempDir::new().unwrap();
let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
let address = Address::from([0xBB; 20]);
let slot = B256::from([0xCC; 32]);
let sentinel_key = StorageShardedKey::new(address, slot, u64::MAX);
let existing = BlockNumberList::new_pre_sorted([5, 7, 9]);
let mut batch = provider.batch();
batch.put::<tables::StoragesHistory>(sentinel_key.clone(), &existing).unwrap();
batch.commit().unwrap();
let mut batch = provider.batch();
batch.append_storage_history_shard(address, slot, vec![1, 9, 10, 11]).unwrap();
batch.commit().unwrap();
let updated = provider.get::<tables::StoragesHistory>(sentinel_key).unwrap().unwrap();
assert_eq!(updated.iter().collect::<Vec<_>>(), vec![5, 7, 9, 10, 11]);
}
#[test]
fn test_append_history_all_filtered_is_noop() {
// When all indices are <= existing max, should be a no-op
let temp_dir = TempDir::new().unwrap();
let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
let address = Address::from([0xDD; 20]);
let existing = BlockNumberList::new_pre_sorted([100, 200, 300]);
let mut batch = provider.batch();
batch
.put::<tables::AccountsHistory>(ShardedKey::new(address, u64::MAX), &existing)
.unwrap();
batch.commit().unwrap();
// Try to append only old indices
let mut batch = provider.batch();
batch.append_account_history_shard(address, [50, 100, 200]).unwrap();
batch.commit().unwrap();
// Should be unchanged
let shards = provider.account_history_shards(address).unwrap();
assert_eq!(shards.len(), 1);
assert_eq!(shards[0].1.iter().collect::<Vec<_>>(), vec![100, 200, 300]);
}
#[test]
fn test_append_history_empty_sentinel_with_closed_shards() {
// Test: sentinel exists but is empty, closed shard has data
// This exercises the slow path where sentinel_max is None but closed_shard_max is Some
let temp_dir = TempDir::new().unwrap();
let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
let address = Address::from([0xEE; 20]);
// Create a closed shard with max block 500
let closed_shard = BlockNumberList::new_pre_sorted([100, 200, 300, 400, 500]);
let mut batch = provider.batch();
batch.put::<tables::AccountsHistory>(ShardedKey::new(address, 500), &closed_shard).unwrap();
// Create an empty sentinel shard
batch
.put::<tables::AccountsHistory>(
ShardedKey::new(address, u64::MAX),
&BlockNumberList::empty(),
)
.unwrap();
batch.commit().unwrap();
// Try to append indices - some below closed shard max, some above
let mut batch = provider.batch();
batch.append_account_history_shard(address, [250, 500, 600, 700]).unwrap();
batch.commit().unwrap();
// Only indices > 500 should be appended to sentinel
let shards = provider.account_history_shards(address).unwrap();
assert_eq!(shards.len(), 2);
assert_eq!(shards[0].0.highest_block_number, 500);
assert_eq!(shards[0].1.iter().collect::<Vec<_>>(), vec![100, 200, 300, 400, 500]);
assert_eq!(shards[1].0.highest_block_number, u64::MAX);
assert_eq!(shards[1].1.iter().collect::<Vec<_>>(), vec![600, 700]);
}
#[test]
fn test_append_storage_history_closed_shards_only() {
// Test storage history slow path: no sentinel, only closed shards
let temp_dir = TempDir::new().unwrap();
let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
let address = Address::from([0xFF; 20]);
let slot = B256::from([0x11; 32]);
// Create only a closed shard (no sentinel)
let closed_shard = BlockNumberList::new_pre_sorted([10, 20, 30]);
let mut batch = provider.batch();
batch
.put::<tables::StoragesHistory>(
StorageShardedKey::new(address, slot, 30),
&closed_shard,
)
.unwrap();
batch.commit().unwrap();
// Append with some overlapping indices
let mut batch = provider.batch();
batch.append_storage_history_shard(address, slot, vec![25, 30, 40, 50]).unwrap();
batch.commit().unwrap();
// Verify: closed shard unchanged, new sentinel with indices > 30
let shards = provider.storage_history_shards(address, slot).unwrap();
assert_eq!(shards.len(), 2);
assert_eq!(shards[0].0.sharded_key.highest_block_number, 30);
assert_eq!(shards[0].1.iter().collect::<Vec<_>>(), vec![10, 20, 30]);
assert_eq!(shards[1].0.sharded_key.highest_block_number, u64::MAX);
assert_eq!(shards[1].1.iter().collect::<Vec<_>>(), vec![40, 50]);
}
#[test]
fn test_clear_account_history() {
let temp_dir = TempDir::new().unwrap();