mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-01-09 07:17:56 -05:00
perf(persistence): optimize append_history_index with upsert (#19825)
Co-authored-by: Alexey Shekhirin <5773434+shekhirin@users.noreply.github.com>
This commit is contained in:
@@ -830,30 +830,11 @@ impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
|
||||
}
|
||||
|
||||
impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> DatabaseProvider<TX, N> {
|
||||
/// Load shard and remove it. If list is empty, last shard was full or
|
||||
/// there are no shards at all.
|
||||
fn take_shard<T>(
|
||||
&self,
|
||||
cursor: &mut <TX as DbTxMut>::CursorMut<T>,
|
||||
key: T::Key,
|
||||
) -> ProviderResult<Vec<u64>>
|
||||
where
|
||||
T: Table<Value = BlockNumberList>,
|
||||
{
|
||||
if let Some((_, list)) = cursor.seek_exact(key)? {
|
||||
// delete old shard so new one can be inserted.
|
||||
cursor.delete_current()?;
|
||||
let list = list.iter().collect::<Vec<_>>();
|
||||
return Ok(list)
|
||||
}
|
||||
Ok(Vec::new())
|
||||
}
|
||||
|
||||
/// Insert history index to the database.
|
||||
///
|
||||
/// For each updated partial key, this function removes the last shard from
|
||||
/// the database (if any), appends the new indices to it, chunks the resulting integer list and
|
||||
/// inserts the new shards back into the database.
|
||||
/// For each updated partial key, this function retrieves the last shard from the database
|
||||
/// (if any), appends the new indices to it, chunks the resulting list if needed, and upserts
|
||||
/// the shards back into the database.
|
||||
///
|
||||
/// This function is used by history indexing stages.
|
||||
fn append_history_index<P, T>(
|
||||
@@ -865,26 +846,44 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> DatabaseProvider<TX, N> {
|
||||
P: Copy,
|
||||
T: Table<Value = BlockNumberList>,
|
||||
{
|
||||
// This function cannot be used with DUPSORT tables because `upsert` on DUPSORT tables
|
||||
// will append duplicate entries instead of updating existing ones, causing data corruption.
|
||||
assert!(!T::DUPSORT, "append_history_index cannot be used with DUPSORT tables");
|
||||
|
||||
let mut cursor = self.tx.cursor_write::<T>()?;
|
||||
|
||||
for (partial_key, indices) in index_updates {
|
||||
let mut last_shard =
|
||||
self.take_shard::<T>(&mut cursor, sharded_key_factory(partial_key, u64::MAX))?;
|
||||
last_shard.extend(indices);
|
||||
// Chunk indices and insert them in shards of N size.
|
||||
let mut chunks = last_shard.chunks(sharded_key::NUM_OF_INDICES_IN_SHARD).peekable();
|
||||
while let Some(list) = chunks.next() {
|
||||
let highest_block_number = if chunks.peek().is_some() {
|
||||
*list.last().expect("`chunks` does not return empty list")
|
||||
let last_key = sharded_key_factory(partial_key, u64::MAX);
|
||||
let mut last_shard = cursor
|
||||
.seek_exact(last_key.clone())?
|
||||
.map(|(_, list)| list)
|
||||
.unwrap_or_else(BlockNumberList::empty);
|
||||
|
||||
last_shard.append(indices).map_err(ProviderError::other)?;
|
||||
|
||||
// fast path: all indices fit in one shard
|
||||
if last_shard.len() <= sharded_key::NUM_OF_INDICES_IN_SHARD as u64 {
|
||||
cursor.upsert(last_key, &last_shard)?;
|
||||
continue;
|
||||
}
|
||||
|
||||
// slow path: rechunk into multiple shards
|
||||
let chunks = last_shard.iter().chunks(sharded_key::NUM_OF_INDICES_IN_SHARD);
|
||||
let mut chunks_peekable = chunks.into_iter().peekable();
|
||||
|
||||
while let Some(chunk) = chunks_peekable.next() {
|
||||
let shard = BlockNumberList::new_pre_sorted(chunk);
|
||||
let highest_block_number = if chunks_peekable.peek().is_some() {
|
||||
shard.iter().next_back().expect("`chunks` does not return empty list")
|
||||
} else {
|
||||
// Insert last list with `u64::MAX`.
|
||||
u64::MAX
|
||||
};
|
||||
cursor.insert(
|
||||
sharded_key_factory(partial_key, highest_block_number),
|
||||
&BlockNumberList::new_pre_sorted(list.iter().copied()),
|
||||
)?;
|
||||
|
||||
cursor.upsert(sharded_key_factory(partial_key, highest_block_number), &shard)?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user