mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-02-19 03:04:27 -05:00
fix: properly save history indices in pipeline (#21222)
This commit is contained in:
@@ -193,7 +193,7 @@ where
|
||||
P: Copy + Default + Eq,
|
||||
{
|
||||
let mut write_cursor = provider.tx_ref().cursor_write::<H>()?;
|
||||
let mut current_partial = P::default();
|
||||
let mut current_partial = None;
|
||||
let mut current_list = Vec::<u64>::new();
|
||||
|
||||
// observability
|
||||
@@ -213,26 +213,28 @@ where
|
||||
// StorageHistory: `Address.StorageKey`.
|
||||
let partial_key = get_partial(sharded_key);
|
||||
|
||||
if current_partial != partial_key {
|
||||
if current_partial != Some(partial_key) {
|
||||
// We have reached the end of this subset of keys so
|
||||
// we need to flush its last indice shard.
|
||||
load_indices(
|
||||
&mut write_cursor,
|
||||
current_partial,
|
||||
&mut current_list,
|
||||
&sharded_key_factory,
|
||||
append_only,
|
||||
LoadMode::Flush,
|
||||
)?;
|
||||
if let Some(current) = current_partial {
|
||||
load_indices(
|
||||
&mut write_cursor,
|
||||
current,
|
||||
&mut current_list,
|
||||
&sharded_key_factory,
|
||||
append_only,
|
||||
LoadMode::Flush,
|
||||
)?;
|
||||
}
|
||||
|
||||
current_partial = partial_key;
|
||||
current_partial = Some(partial_key);
|
||||
current_list.clear();
|
||||
|
||||
// If it's not the first sync, there might an existing shard already, so we need to
|
||||
// merge it with the one coming from the collector
|
||||
if !append_only &&
|
||||
let Some((_, last_database_shard)) =
|
||||
write_cursor.seek_exact(sharded_key_factory(current_partial, u64::MAX))?
|
||||
write_cursor.seek_exact(sharded_key_factory(partial_key, u64::MAX))?
|
||||
{
|
||||
current_list.extend(last_database_shard.iter());
|
||||
}
|
||||
@@ -241,7 +243,7 @@ where
|
||||
current_list.extend(new_list.iter());
|
||||
load_indices(
|
||||
&mut write_cursor,
|
||||
current_partial,
|
||||
partial_key,
|
||||
&mut current_list,
|
||||
&sharded_key_factory,
|
||||
append_only,
|
||||
@@ -250,14 +252,16 @@ where
|
||||
}
|
||||
|
||||
// There will be one remaining shard that needs to be flushed to DB.
|
||||
load_indices(
|
||||
&mut write_cursor,
|
||||
current_partial,
|
||||
&mut current_list,
|
||||
&sharded_key_factory,
|
||||
append_only,
|
||||
LoadMode::Flush,
|
||||
)?;
|
||||
if let Some(current) = current_partial {
|
||||
load_indices(
|
||||
&mut write_cursor,
|
||||
current,
|
||||
&mut current_list,
|
||||
&sharded_key_factory,
|
||||
append_only,
|
||||
LoadMode::Flush,
|
||||
)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user