mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-04-30 03:01:58 -04:00
Compare commits
1 Commits
push
...
fix/rocksd
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
aeb4862c23 |
@@ -9,8 +9,8 @@ use reth_chain_state::ExecutedBlock;
|
||||
use reth_db_api::{
|
||||
database_metrics::DatabaseMetrics,
|
||||
models::{
|
||||
sharded_key::NUM_OF_INDICES_IN_SHARD, storage_sharded_key::StorageShardedKey, ShardedKey,
|
||||
StorageSettings,
|
||||
integer_list::IntegerListError, sharded_key::NUM_OF_INDICES_IN_SHARD,
|
||||
storage_sharded_key::StorageShardedKey, ShardedKey, StorageSettings,
|
||||
},
|
||||
table::{Compress, Decode, Decompress, Encode, Table},
|
||||
tables, BlockNumberList, DatabaseError,
|
||||
@@ -35,7 +35,7 @@ use std::{
|
||||
thread,
|
||||
time::Instant,
|
||||
};
|
||||
use tracing::instrument;
|
||||
use tracing::{debug, instrument};
|
||||
|
||||
/// Pending `RocksDB` batches type alias.
|
||||
pub(crate) type PendingRocksDBBatches = Arc<Mutex<Vec<WriteBatchWithTransaction<true>>>>;
|
||||
@@ -1000,6 +1000,71 @@ impl RocksDBProvider {
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
/// Returns the highest block number from closed history shards for the given key.
|
||||
///
|
||||
/// Closed shards are keyed by their max block number, so we can find the max via
|
||||
/// reverse seek without reading shard values. Returns `None` if no closed shards exist.
|
||||
///
|
||||
/// This is O(1) - a single reverse iteration step.
|
||||
///
|
||||
/// # Arguments
|
||||
/// - `seek_key`: The key to seek from (should have `highest_block_number = u64::MAX - 1`)
|
||||
/// - `matches_query`: Predicate to check if the decoded key matches the original query
|
||||
fn last_history_shard_block_number<T: Table>(
|
||||
&self,
|
||||
seek_key: T::Key,
|
||||
matches_query: impl FnOnce(&T::Key) -> Option<BlockNumber>,
|
||||
) -> ProviderResult<Option<BlockNumber>>
|
||||
where
|
||||
T::Key: Encode + Decode,
|
||||
{
|
||||
let cf = self.get_cf_handle::<T>()?;
|
||||
let seek_bytes = seek_key.encode();
|
||||
|
||||
let mut iter = self
|
||||
.0
|
||||
.iterator_cf(cf, IteratorMode::From(seek_bytes.as_ref(), rocksdb::Direction::Reverse));
|
||||
|
||||
if let Some(Ok((key_bytes, _))) = iter.next() {
|
||||
let key = T::Key::decode(&key_bytes)
|
||||
.map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
|
||||
return Ok(matches_query(&key));
|
||||
}
|
||||
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
/// Returns the highest block number from closed account history shards for the given address.
|
||||
pub fn last_account_history_shard_key(
|
||||
&self,
|
||||
address: Address,
|
||||
) -> ProviderResult<Option<BlockNumber>> {
|
||||
self.last_history_shard_block_number::<tables::AccountsHistory>(
|
||||
ShardedKey::new(address, u64::MAX - 1),
|
||||
|key| {
|
||||
(key.key == address && key.highest_block_number != u64::MAX)
|
||||
.then_some(key.highest_block_number)
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
/// Returns the highest block number from closed storage history shards.
|
||||
pub fn last_storage_history_shard_key(
|
||||
&self,
|
||||
address: Address,
|
||||
storage_key: B256,
|
||||
) -> ProviderResult<Option<BlockNumber>> {
|
||||
self.last_history_shard_block_number::<tables::StoragesHistory>(
|
||||
StorageShardedKey::new(address, storage_key, u64::MAX - 1),
|
||||
|key| {
|
||||
(key.address == address &&
|
||||
key.sharded_key.key == storage_key &&
|
||||
key.sharded_key.highest_block_number != u64::MAX)
|
||||
.then_some(key.sharded_key.highest_block_number)
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
/// Returns all storage history shards for the given `(address, storage_key)` pair.
|
||||
///
|
||||
/// Iterates through all shards in ascending `highest_block_number` order until
|
||||
@@ -1433,8 +1498,11 @@ impl<'a> RocksDBBatch<'a> {
|
||||
|
||||
/// Appends indices to an account history shard with proper shard management.
|
||||
///
|
||||
/// Loads the existing shard (if any), appends new indices, and rechunks into
|
||||
/// multiple shards if needed (respecting `NUM_OF_INDICES_IN_SHARD` limit).
|
||||
/// Uses a hybrid-lazy strategy for performance:
|
||||
/// 1. Read only the sentinel shard (fast, usually cached)
|
||||
/// 2. Filter using sentinel's last element if overlap detected
|
||||
/// 3. Optimistic append
|
||||
/// 4. On `UnsortedInput` error only: do full reverse seek + retry once
|
||||
///
|
||||
/// # Requirements
|
||||
///
|
||||
@@ -1459,33 +1527,97 @@ impl<'a> RocksDBBatch<'a> {
|
||||
indices
|
||||
);
|
||||
|
||||
let last_key = ShardedKey::new(address, u64::MAX);
|
||||
let last_shard_opt = self.provider.get::<tables::AccountsHistory>(last_key.clone())?;
|
||||
let mut last_shard = last_shard_opt.unwrap_or_else(BlockNumberList::empty);
|
||||
let sentinel_key = ShardedKey::new(address, u64::MAX);
|
||||
|
||||
last_shard.append(indices).map_err(ProviderError::other)?;
|
||||
// Fast path: read sentinel shard only (usually in block cache)
|
||||
let mut shard = self
|
||||
.provider
|
||||
.get::<tables::AccountsHistory>(sentinel_key.clone())?
|
||||
.unwrap_or_else(BlockNumberList::empty);
|
||||
|
||||
let sentinel_max = shard.iter().next_back();
|
||||
|
||||
// If sentinel has data, try fast-path filtering and append
|
||||
if let Some(cutoff) = sentinel_max {
|
||||
let start_idx =
|
||||
if cutoff >= indices[0] { indices.partition_point(|&x| x <= cutoff) } else { 0 };
|
||||
|
||||
if start_idx == indices.len() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
match shard.append(indices[start_idx..].iter().copied()) {
|
||||
Ok(_) => return self.write_account_history_shards(address, sentinel_key, shard),
|
||||
Err(IntegerListError::UnsortedInput) => {
|
||||
// Fall through to slow path
|
||||
}
|
||||
Err(e) => return Err(ProviderError::other(e)),
|
||||
}
|
||||
}
|
||||
|
||||
// Slow path: sentinel empty/missing OR append failed - check closed shards
|
||||
let closed_shard_max = self.provider.last_account_history_shard_key(address)?;
|
||||
|
||||
// Only log when we actually have closed shards (not just fresh DB)
|
||||
if closed_shard_max.is_some() {
|
||||
debug!(
|
||||
target: "rocksdb::history",
|
||||
?address,
|
||||
?sentinel_max,
|
||||
?closed_shard_max,
|
||||
"Closed shards exist, using full seek path"
|
||||
);
|
||||
}
|
||||
|
||||
let existing_max = sentinel_max.max(closed_shard_max);
|
||||
let start_idx = existing_max.map_or(0, |cutoff| indices.partition_point(|&x| x <= cutoff));
|
||||
|
||||
if start_idx == indices.len() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Re-read sentinel only if append failed (shard was corrupted by partial append).
|
||||
// If sentinel_max was None, we can reuse the empty shard we already have.
|
||||
let mut final_shard = if sentinel_max.is_some() {
|
||||
self.provider
|
||||
.get::<tables::AccountsHistory>(sentinel_key.clone())?
|
||||
.unwrap_or_else(BlockNumberList::empty)
|
||||
} else {
|
||||
shard
|
||||
};
|
||||
final_shard.append(indices[start_idx..].iter().copied()).map_err(ProviderError::other)?;
|
||||
|
||||
self.write_account_history_shards(address, sentinel_key, final_shard)
|
||||
}
|
||||
|
||||
/// Writes account history shards, rechunking if needed.
|
||||
fn write_account_history_shards(
|
||||
&mut self,
|
||||
address: Address,
|
||||
sentinel_key: ShardedKey<Address>,
|
||||
shard: BlockNumberList,
|
||||
) -> ProviderResult<()> {
|
||||
// Fast path: all indices fit in one shard
|
||||
if last_shard.len() <= NUM_OF_INDICES_IN_SHARD as u64 {
|
||||
self.put::<tables::AccountsHistory>(last_key, &last_shard)?;
|
||||
if shard.len() <= NUM_OF_INDICES_IN_SHARD as u64 {
|
||||
self.put::<tables::AccountsHistory>(sentinel_key, &shard)?;
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Slow path: rechunk into multiple shards
|
||||
let chunks = last_shard.iter().chunks(NUM_OF_INDICES_IN_SHARD);
|
||||
let chunks = shard.iter().chunks(NUM_OF_INDICES_IN_SHARD);
|
||||
let mut chunks_peekable = chunks.into_iter().peekable();
|
||||
|
||||
while let Some(chunk) = chunks_peekable.next() {
|
||||
let shard = BlockNumberList::new_pre_sorted(chunk);
|
||||
let chunk_shard = BlockNumberList::new_pre_sorted(chunk);
|
||||
let highest_block_number = if chunks_peekable.peek().is_some() {
|
||||
shard.iter().next_back().expect("`chunks` does not return empty list")
|
||||
chunk_shard.iter().next_back().expect("`chunks` does not return empty list")
|
||||
} else {
|
||||
u64::MAX
|
||||
};
|
||||
|
||||
self.put::<tables::AccountsHistory>(
|
||||
ShardedKey::new(address, highest_block_number),
|
||||
&shard,
|
||||
&chunk_shard,
|
||||
)?;
|
||||
}
|
||||
|
||||
@@ -1494,8 +1626,11 @@ impl<'a> RocksDBBatch<'a> {
|
||||
|
||||
/// Appends indices to a storage history shard with proper shard management.
|
||||
///
|
||||
/// Loads the existing shard (if any), appends new indices, and rechunks into
|
||||
/// multiple shards if needed (respecting `NUM_OF_INDICES_IN_SHARD` limit).
|
||||
/// Uses a hybrid-lazy strategy for performance:
|
||||
/// 1. Read only the sentinel shard (fast, usually cached)
|
||||
/// 2. Filter using sentinel's last element if overlap detected
|
||||
/// 3. Optimistic append
|
||||
/// 4. On `UnsortedInput` error only: do full reverse seek + retry once
|
||||
///
|
||||
/// # Requirements
|
||||
///
|
||||
@@ -1521,33 +1656,107 @@ impl<'a> RocksDBBatch<'a> {
|
||||
indices
|
||||
);
|
||||
|
||||
let last_key = StorageShardedKey::last(address, storage_key);
|
||||
let last_shard_opt = self.provider.get::<tables::StoragesHistory>(last_key.clone())?;
|
||||
let mut last_shard = last_shard_opt.unwrap_or_else(BlockNumberList::empty);
|
||||
let sentinel_key = StorageShardedKey::last(address, storage_key);
|
||||
|
||||
last_shard.append(indices).map_err(ProviderError::other)?;
|
||||
// Fast path: read sentinel shard only (usually in block cache)
|
||||
let mut shard = self
|
||||
.provider
|
||||
.get::<tables::StoragesHistory>(sentinel_key.clone())?
|
||||
.unwrap_or_else(BlockNumberList::empty);
|
||||
|
||||
let sentinel_max = shard.iter().next_back();
|
||||
|
||||
// If sentinel has data, try fast-path filtering and append
|
||||
if let Some(cutoff) = sentinel_max {
|
||||
let start_idx =
|
||||
if cutoff >= indices[0] { indices.partition_point(|&x| x <= cutoff) } else { 0 };
|
||||
|
||||
if start_idx == indices.len() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
match shard.append(indices[start_idx..].iter().copied()) {
|
||||
Ok(_) => {
|
||||
return self.write_storage_history_shards(
|
||||
address,
|
||||
storage_key,
|
||||
sentinel_key,
|
||||
shard,
|
||||
);
|
||||
}
|
||||
Err(IntegerListError::UnsortedInput) => {
|
||||
// Fall through to slow path
|
||||
}
|
||||
Err(e) => return Err(ProviderError::other(e)),
|
||||
}
|
||||
}
|
||||
|
||||
// Slow path: sentinel empty/missing OR append failed - check closed shards
|
||||
let closed_shard_max =
|
||||
self.provider.last_storage_history_shard_key(address, storage_key)?;
|
||||
|
||||
// Only log when we actually have closed shards (not just fresh DB)
|
||||
if closed_shard_max.is_some() {
|
||||
debug!(
|
||||
target: "rocksdb::history",
|
||||
?address,
|
||||
?storage_key,
|
||||
?sentinel_max,
|
||||
?closed_shard_max,
|
||||
"Closed shards exist, using full seek path"
|
||||
);
|
||||
}
|
||||
|
||||
let existing_max = sentinel_max.max(closed_shard_max);
|
||||
let start_idx = existing_max.map_or(0, |cutoff| indices.partition_point(|&x| x <= cutoff));
|
||||
|
||||
if start_idx == indices.len() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Re-read sentinel only if append failed (shard was corrupted by partial append).
|
||||
// If sentinel_max was None, we can reuse the empty shard we already have.
|
||||
let mut final_shard = if sentinel_max.is_some() {
|
||||
self.provider
|
||||
.get::<tables::StoragesHistory>(sentinel_key.clone())?
|
||||
.unwrap_or_else(BlockNumberList::empty)
|
||||
} else {
|
||||
shard
|
||||
};
|
||||
final_shard.append(indices[start_idx..].iter().copied()).map_err(ProviderError::other)?;
|
||||
|
||||
self.write_storage_history_shards(address, storage_key, sentinel_key, final_shard)
|
||||
}
|
||||
|
||||
/// Writes storage history shards, rechunking if needed.
|
||||
fn write_storage_history_shards(
|
||||
&mut self,
|
||||
address: Address,
|
||||
storage_key: B256,
|
||||
sentinel_key: StorageShardedKey,
|
||||
shard: BlockNumberList,
|
||||
) -> ProviderResult<()> {
|
||||
// Fast path: all indices fit in one shard
|
||||
if last_shard.len() <= NUM_OF_INDICES_IN_SHARD as u64 {
|
||||
self.put::<tables::StoragesHistory>(last_key, &last_shard)?;
|
||||
if shard.len() <= NUM_OF_INDICES_IN_SHARD as u64 {
|
||||
self.put::<tables::StoragesHistory>(sentinel_key, &shard)?;
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Slow path: rechunk into multiple shards
|
||||
let chunks = last_shard.iter().chunks(NUM_OF_INDICES_IN_SHARD);
|
||||
let chunks = shard.iter().chunks(NUM_OF_INDICES_IN_SHARD);
|
||||
let mut chunks_peekable = chunks.into_iter().peekable();
|
||||
|
||||
while let Some(chunk) = chunks_peekable.next() {
|
||||
let shard = BlockNumberList::new_pre_sorted(chunk);
|
||||
let chunk_shard = BlockNumberList::new_pre_sorted(chunk);
|
||||
let highest_block_number = if chunks_peekable.peek().is_some() {
|
||||
shard.iter().next_back().expect("`chunks` does not return empty list")
|
||||
chunk_shard.iter().next_back().expect("`chunks` does not return empty list")
|
||||
} else {
|
||||
u64::MAX
|
||||
};
|
||||
|
||||
self.put::<tables::StoragesHistory>(
|
||||
StorageShardedKey::new(address, storage_key, highest_block_number),
|
||||
&shard,
|
||||
&chunk_shard,
|
||||
)?;
|
||||
}
|
||||
|
||||
@@ -3013,6 +3222,195 @@ mod tests {
|
||||
assert!(shards.is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_append_history_filters_overlapping_indices_with_sentinel() {
|
||||
// Test the fast path: sentinel shard exists with overlap
|
||||
let temp_dir = TempDir::new().unwrap();
|
||||
let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
|
||||
|
||||
let address = Address::from([0xAA; 20]);
|
||||
|
||||
// Create sentinel shard with some existing indices
|
||||
let existing = BlockNumberList::new_pre_sorted([10, 20, 30]);
|
||||
let mut batch = provider.batch();
|
||||
batch
|
||||
.put::<tables::AccountsHistory>(ShardedKey::new(address, u64::MAX), &existing)
|
||||
.unwrap();
|
||||
batch.commit().unwrap();
|
||||
|
||||
// Append includes overlap and older values; should only append > 30
|
||||
let mut batch = provider.batch();
|
||||
batch.append_account_history_shard(address, vec![20, 30, 31, 40]).unwrap();
|
||||
batch.commit().unwrap();
|
||||
|
||||
let shards = provider.account_history_shards(address).unwrap();
|
||||
assert_eq!(shards.len(), 1);
|
||||
assert_eq!(shards[0].0.highest_block_number, u64::MAX);
|
||||
assert_eq!(shards[0].1.iter().collect::<Vec<_>>(), vec![10, 20, 30, 31, 40]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_append_history_filters_overlapping_indices_with_closed_shards() {
|
||||
// Test the slow path: no sentinel, closed shard exists with higher block numbers
|
||||
// This triggers UnsortedInput -> fallback to full seek
|
||||
let temp_dir = TempDir::new().unwrap();
|
||||
let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
|
||||
|
||||
let address = Address::from([0x42; 20]);
|
||||
|
||||
// First, manually write a "closed" shard with block number 1000 as the key
|
||||
// This simulates archive data that already exists (NO sentinel shard)
|
||||
let closed_shard = BlockNumberList::new_pre_sorted([500, 600, 700, 800, 900, 1000]);
|
||||
let mut batch = provider.batch();
|
||||
batch
|
||||
.put::<tables::AccountsHistory>(ShardedKey::new(address, 1000), &closed_shard)
|
||||
.unwrap();
|
||||
batch.commit().unwrap();
|
||||
|
||||
// Now try to append indices that are LOWER than the existing max (1000)
|
||||
// This will trigger the fallback path since sentinel is empty but closed shard exists
|
||||
let mut batch = provider.batch();
|
||||
batch.append_account_history_shard(address, [100, 200, 300]).unwrap();
|
||||
batch.commit().unwrap();
|
||||
|
||||
// Verify: only the closed shard should exist, no new data added
|
||||
let shards = provider.account_history_shards(address).unwrap();
|
||||
assert_eq!(shards.len(), 1);
|
||||
assert_eq!(shards[0].0.highest_block_number, 1000);
|
||||
assert_eq!(shards[0].1.iter().collect::<Vec<_>>(), vec![500, 600, 700, 800, 900, 1000]);
|
||||
|
||||
// Now append indices that are HIGHER than existing max - these should be added
|
||||
let mut batch = provider.batch();
|
||||
batch.append_account_history_shard(address, [1100, 1200]).unwrap();
|
||||
batch.commit().unwrap();
|
||||
|
||||
let shards = provider.account_history_shards(address).unwrap();
|
||||
// Should now have 2 shards: original closed shard + new MAX shard
|
||||
assert_eq!(shards.len(), 2);
|
||||
assert_eq!(shards[0].0.highest_block_number, 1000);
|
||||
assert_eq!(shards[1].0.highest_block_number, u64::MAX);
|
||||
assert_eq!(shards[1].1.iter().collect::<Vec<_>>(), vec![1100, 1200]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_append_storage_history_filters_overlapping() {
|
||||
// Test storage history with overlap filtering
|
||||
let temp_dir = TempDir::new().unwrap();
|
||||
let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
|
||||
|
||||
let address = Address::from([0xBB; 20]);
|
||||
let slot = B256::from([0xCC; 32]);
|
||||
let sentinel_key = StorageShardedKey::new(address, slot, u64::MAX);
|
||||
|
||||
let existing = BlockNumberList::new_pre_sorted([5, 7, 9]);
|
||||
let mut batch = provider.batch();
|
||||
batch.put::<tables::StoragesHistory>(sentinel_key.clone(), &existing).unwrap();
|
||||
batch.commit().unwrap();
|
||||
|
||||
let mut batch = provider.batch();
|
||||
batch.append_storage_history_shard(address, slot, vec![1, 9, 10, 11]).unwrap();
|
||||
batch.commit().unwrap();
|
||||
|
||||
let updated = provider.get::<tables::StoragesHistory>(sentinel_key).unwrap().unwrap();
|
||||
assert_eq!(updated.iter().collect::<Vec<_>>(), vec![5, 7, 9, 10, 11]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_append_history_all_filtered_is_noop() {
|
||||
// When all indices are <= existing max, should be a no-op
|
||||
let temp_dir = TempDir::new().unwrap();
|
||||
let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
|
||||
|
||||
let address = Address::from([0xDD; 20]);
|
||||
|
||||
let existing = BlockNumberList::new_pre_sorted([100, 200, 300]);
|
||||
let mut batch = provider.batch();
|
||||
batch
|
||||
.put::<tables::AccountsHistory>(ShardedKey::new(address, u64::MAX), &existing)
|
||||
.unwrap();
|
||||
batch.commit().unwrap();
|
||||
|
||||
// Try to append only old indices
|
||||
let mut batch = provider.batch();
|
||||
batch.append_account_history_shard(address, [50, 100, 200]).unwrap();
|
||||
batch.commit().unwrap();
|
||||
|
||||
// Should be unchanged
|
||||
let shards = provider.account_history_shards(address).unwrap();
|
||||
assert_eq!(shards.len(), 1);
|
||||
assert_eq!(shards[0].1.iter().collect::<Vec<_>>(), vec![100, 200, 300]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_append_history_empty_sentinel_with_closed_shards() {
|
||||
// Test: sentinel exists but is empty, closed shard has data
|
||||
// This exercises the slow path where sentinel_max is None but closed_shard_max is Some
|
||||
let temp_dir = TempDir::new().unwrap();
|
||||
let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
|
||||
|
||||
let address = Address::from([0xEE; 20]);
|
||||
|
||||
// Create a closed shard with max block 500
|
||||
let closed_shard = BlockNumberList::new_pre_sorted([100, 200, 300, 400, 500]);
|
||||
let mut batch = provider.batch();
|
||||
batch.put::<tables::AccountsHistory>(ShardedKey::new(address, 500), &closed_shard).unwrap();
|
||||
// Create an empty sentinel shard
|
||||
batch
|
||||
.put::<tables::AccountsHistory>(
|
||||
ShardedKey::new(address, u64::MAX),
|
||||
&BlockNumberList::empty(),
|
||||
)
|
||||
.unwrap();
|
||||
batch.commit().unwrap();
|
||||
|
||||
// Try to append indices - some below closed shard max, some above
|
||||
let mut batch = provider.batch();
|
||||
batch.append_account_history_shard(address, [250, 500, 600, 700]).unwrap();
|
||||
batch.commit().unwrap();
|
||||
|
||||
// Only indices > 500 should be appended to sentinel
|
||||
let shards = provider.account_history_shards(address).unwrap();
|
||||
assert_eq!(shards.len(), 2);
|
||||
assert_eq!(shards[0].0.highest_block_number, 500);
|
||||
assert_eq!(shards[0].1.iter().collect::<Vec<_>>(), vec![100, 200, 300, 400, 500]);
|
||||
assert_eq!(shards[1].0.highest_block_number, u64::MAX);
|
||||
assert_eq!(shards[1].1.iter().collect::<Vec<_>>(), vec![600, 700]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_append_storage_history_closed_shards_only() {
|
||||
// Test storage history slow path: no sentinel, only closed shards
|
||||
let temp_dir = TempDir::new().unwrap();
|
||||
let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
|
||||
|
||||
let address = Address::from([0xFF; 20]);
|
||||
let slot = B256::from([0x11; 32]);
|
||||
|
||||
// Create only a closed shard (no sentinel)
|
||||
let closed_shard = BlockNumberList::new_pre_sorted([10, 20, 30]);
|
||||
let mut batch = provider.batch();
|
||||
batch
|
||||
.put::<tables::StoragesHistory>(
|
||||
StorageShardedKey::new(address, slot, 30),
|
||||
&closed_shard,
|
||||
)
|
||||
.unwrap();
|
||||
batch.commit().unwrap();
|
||||
|
||||
// Append with some overlapping indices
|
||||
let mut batch = provider.batch();
|
||||
batch.append_storage_history_shard(address, slot, vec![25, 30, 40, 50]).unwrap();
|
||||
batch.commit().unwrap();
|
||||
|
||||
// Verify: closed shard unchanged, new sentinel with indices > 30
|
||||
let shards = provider.storage_history_shards(address, slot).unwrap();
|
||||
assert_eq!(shards.len(), 2);
|
||||
assert_eq!(shards[0].0.sharded_key.highest_block_number, 30);
|
||||
assert_eq!(shards[0].1.iter().collect::<Vec<_>>(), vec![10, 20, 30]);
|
||||
assert_eq!(shards[1].0.sharded_key.highest_block_number, u64::MAX);
|
||||
assert_eq!(shards[1].1.iter().collect::<Vec<_>>(), vec![40, 50]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_clear_account_history() {
|
||||
let temp_dir = TempDir::new().unwrap();
|
||||
|
||||
Reference in New Issue
Block a user