From 97f6db61aa52688a24f59e5198c859617421228e Mon Sep 17 00:00:00 2001 From: ligt Date: Wed, 24 Dec 2025 19:40:23 +0700 Subject: [PATCH] perf(persistence): optimize append_history_index with upsert (#19825) Co-authored-by: Alexey Shekhirin <5773434+shekhirin@users.noreply.github.com> --- .../src/providers/database/provider.rs | 67 +++++++++---------- 1 file changed, 33 insertions(+), 34 deletions(-) diff --git a/crates/storage/provider/src/providers/database/provider.rs b/crates/storage/provider/src/providers/database/provider.rs index 9b4b68a606..6e04cc9c18 100644 --- a/crates/storage/provider/src/providers/database/provider.rs +++ b/crates/storage/provider/src/providers/database/provider.rs @@ -830,30 +830,11 @@ impl DatabaseProvider { } impl DatabaseProvider { - /// Load shard and remove it. If list is empty, last shard was full or - /// there are no shards at all. - fn take_shard( - &self, - cursor: &mut ::CursorMut, - key: T::Key, - ) -> ProviderResult> - where - T: Table, - { - if let Some((_, list)) = cursor.seek_exact(key)? { - // delete old shard so new one can be inserted. - cursor.delete_current()?; - let list = list.iter().collect::>(); - return Ok(list) - } - Ok(Vec::new()) - } - /// Insert history index to the database. /// - /// For each updated partial key, this function removes the last shard from - /// the database (if any), appends the new indices to it, chunks the resulting integer list and - /// inserts the new shards back into the database. + /// For each updated partial key, this function retrieves the last shard from the database + /// (if any), appends the new indices to it, chunks the resulting list if needed, and upserts + /// the shards back into the database. /// /// This function is used by history indexing stages. fn append_history_index( @@ -865,26 +846,44 @@ impl DatabaseProvider { P: Copy, T: Table, { + // This function cannot be used with DUPSORT tables because `upsert` on DUPSORT tables + // will append duplicate entries instead of updating existing ones, causing data corruption. + assert!(!T::DUPSORT, "append_history_index cannot be used with DUPSORT tables"); + let mut cursor = self.tx.cursor_write::()?; + for (partial_key, indices) in index_updates { - let mut last_shard = - self.take_shard::(&mut cursor, sharded_key_factory(partial_key, u64::MAX))?; - last_shard.extend(indices); - // Chunk indices and insert them in shards of N size. - let mut chunks = last_shard.chunks(sharded_key::NUM_OF_INDICES_IN_SHARD).peekable(); - while let Some(list) = chunks.next() { - let highest_block_number = if chunks.peek().is_some() { - *list.last().expect("`chunks` does not return empty list") + let last_key = sharded_key_factory(partial_key, u64::MAX); + let mut last_shard = cursor + .seek_exact(last_key.clone())? + .map(|(_, list)| list) + .unwrap_or_else(BlockNumberList::empty); + + last_shard.append(indices).map_err(ProviderError::other)?; + + // fast path: all indices fit in one shard + if last_shard.len() <= sharded_key::NUM_OF_INDICES_IN_SHARD as u64 { + cursor.upsert(last_key, &last_shard)?; + continue; + } + + // slow path: rechunk into multiple shards + let chunks = last_shard.iter().chunks(sharded_key::NUM_OF_INDICES_IN_SHARD); + let mut chunks_peekable = chunks.into_iter().peekable(); + + while let Some(chunk) = chunks_peekable.next() { + let shard = BlockNumberList::new_pre_sorted(chunk); + let highest_block_number = if chunks_peekable.peek().is_some() { + shard.iter().next_back().expect("`chunks` does not return empty list") } else { // Insert last list with `u64::MAX`. u64::MAX }; - cursor.insert( - sharded_key_factory(partial_key, highest_block_number), - &BlockNumberList::new_pre_sorted(list.iter().copied()), - )?; + + cursor.upsert(sharded_key_factory(partial_key, highest_block_number), &shard)?; } } + Ok(()) } }