mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-01-26 23:58:46 -05:00
fix(provider): use canonical_chain on range lookups (#11332)
This commit is contained in:
@@ -122,12 +122,17 @@ impl<N: ProviderNodeTypes> BlockchainProvider2<N> {
|
||||
(start, end)
|
||||
}
|
||||
|
||||
/// Fetches a range of data from both in-memory state and storage.
|
||||
/// Fetches a range of data from both in-memory state and storage while a predicate is met.
|
||||
///
|
||||
/// - `fetch_db_range`: Retrieves a range of items from the database.
|
||||
/// - `map_block_state_item`: Maps a block number to an item in memory. Stops fetching if `None`
|
||||
/// is returned.
|
||||
fn fetch_db_mem_range<T, F, G, P>(
|
||||
/// Creates a snapshot of the in-memory chain state and database provider to prevent
|
||||
/// inconsistencies. Splits the range into in-memory and storage sections, prioritizing
|
||||
/// recent in-memory blocks in case of overlaps.
|
||||
///
|
||||
/// * `fetch_db_range` function (`F`) provides access to the database provider, allowing the
|
||||
/// user to retrieve the required items from the database using [`RangeInclusive`].
|
||||
/// * `map_block_state_item` function (`G`) provides each block of the range in the in-memory
|
||||
/// state, allowing for selection or filtering for the desired data.
|
||||
fn fetch_db_mem_range_while<T, F, G, P>(
|
||||
&self,
|
||||
range: impl RangeBounds<BlockNumber>,
|
||||
fetch_db_range: F,
|
||||
@@ -135,34 +140,90 @@ impl<N: ProviderNodeTypes> BlockchainProvider2<N> {
|
||||
mut predicate: P,
|
||||
) -> ProviderResult<Vec<T>>
|
||||
where
|
||||
F: FnOnce(RangeInclusive<BlockNumber>, &mut P) -> ProviderResult<Vec<T>>,
|
||||
G: Fn(BlockNumber, &mut P) -> Option<T>,
|
||||
F: FnOnce(
|
||||
&DatabaseProviderRO<N::DB, N::ChainSpec>,
|
||||
RangeInclusive<BlockNumber>,
|
||||
&mut P,
|
||||
) -> ProviderResult<Vec<T>>,
|
||||
G: Fn(Arc<BlockState>, &mut P) -> Option<T>,
|
||||
P: FnMut(&T) -> bool,
|
||||
{
|
||||
// Each one provides a snapshot at the time of instantiation, but its order matters.
|
||||
//
|
||||
// If we acquire first the database provider, it's possible that before the in-memory chain
|
||||
// snapshot is instantiated, it will flush blocks to disk. This would
|
||||
// mean that our database provider would not have access to the flushed blocks (since it's
|
||||
// working under an older view), while the in-memory state may have deleted them
|
||||
// entirely. Resulting in gaps on the range.
|
||||
let mut in_memory_chain =
|
||||
self.canonical_in_memory_state.canonical_chain().collect::<Vec<_>>();
|
||||
let db_provider = self.database_provider_ro()?;
|
||||
|
||||
let (start, end) = self.convert_range_bounds(range, || {
|
||||
self.canonical_in_memory_state.get_canonical_block_number()
|
||||
// the first block is the highest one.
|
||||
in_memory_chain
|
||||
.first()
|
||||
.map(|b| b.number())
|
||||
.unwrap_or_else(|| db_provider.last_block_number().unwrap_or_default())
|
||||
});
|
||||
let mut range = start..=end;
|
||||
|
||||
// Split range into storage_range and in-memory range. If the in-memory range is not
|
||||
// necessary drop it early.
|
||||
//
|
||||
// The last block of `in_memory_chain` is the lowest block number.
|
||||
let (in_memory, storage_range) = match in_memory_chain.last().as_ref().map(|b| b.number()) {
|
||||
Some(lowest_memory_block) if lowest_memory_block <= end => {
|
||||
let highest_memory_block =
|
||||
in_memory_chain.first().as_ref().map(|b| b.number()).expect("qed");
|
||||
|
||||
// Database will for a time overlap with in-memory-chain blocks. In
|
||||
// case of a re-org, it can mean that the database blocks are of a forked chain, and
|
||||
// so, we should prioritize the in-memory overlapped blocks.
|
||||
let in_memory_range =
|
||||
lowest_memory_block.max(start)..=end.min(highest_memory_block);
|
||||
|
||||
// If requested range is in the middle of the in-memory range, remove the necessary
|
||||
// lowest blocks
|
||||
in_memory_chain.truncate(
|
||||
in_memory_chain
|
||||
.len()
|
||||
.saturating_sub(start.saturating_sub(lowest_memory_block) as usize),
|
||||
);
|
||||
|
||||
let storage_range =
|
||||
(lowest_memory_block > start).then(|| start..=lowest_memory_block - 1);
|
||||
|
||||
(Some((in_memory_chain, in_memory_range)), storage_range)
|
||||
}
|
||||
_ => {
|
||||
// Drop the in-memory chain so we don't hold blocks in memory.
|
||||
drop(in_memory_chain);
|
||||
|
||||
(None, Some(start..=end))
|
||||
}
|
||||
};
|
||||
|
||||
let mut items = Vec::with_capacity((end - start + 1) as usize);
|
||||
|
||||
// First, fetch the items from the database
|
||||
let mut db_items = fetch_db_range(range.clone(), &mut predicate)?;
|
||||
|
||||
if !db_items.is_empty() {
|
||||
if let Some(storage_range) = storage_range {
|
||||
let mut db_items = fetch_db_range(&db_provider, storage_range.clone(), &mut predicate)?;
|
||||
items.append(&mut db_items);
|
||||
|
||||
// Advance the range iterator by the number of items fetched from the database
|
||||
range.nth(items.len() - 1);
|
||||
// The predicate was not met, if the number of items differs from the expected. So, we
|
||||
// return what we have.
|
||||
if items.len() as u64 != storage_range.end() - storage_range.start() + 1 {
|
||||
return Ok(items)
|
||||
}
|
||||
}
|
||||
|
||||
// Fetch the remaining items from the in-memory state
|
||||
for num in range {
|
||||
// TODO: there might be an update between loop iterations, we
|
||||
// need to handle that situation.
|
||||
if let Some(item) = map_block_state_item(num, &mut predicate) {
|
||||
items.push(item);
|
||||
} else {
|
||||
break;
|
||||
if let Some((in_memory_chain, in_memory_range)) = in_memory {
|
||||
for (num, block) in in_memory_range.zip(in_memory_chain.into_iter().rev()) {
|
||||
debug_assert!(num == block.number());
|
||||
if let Some(item) = map_block_state_item(block, &mut predicate) {
|
||||
items.push(item);
|
||||
} else {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -320,14 +381,10 @@ impl<N: ProviderNodeTypes> HeaderProvider for BlockchainProvider2<N> {
|
||||
}
|
||||
|
||||
fn headers_range(&self, range: impl RangeBounds<BlockNumber>) -> ProviderResult<Vec<Header>> {
|
||||
self.fetch_db_mem_range(
|
||||
self.fetch_db_mem_range_while(
|
||||
range,
|
||||
|range, _| self.database.headers_range(range),
|
||||
|num, _| {
|
||||
self.canonical_in_memory_state
|
||||
.state_by_number(num)
|
||||
.map(|block_state| block_state.block().block().header.header().clone())
|
||||
},
|
||||
|db_provider, range, _| db_provider.headers_range(range),
|
||||
|block_state, _| Some(block_state.block().block().header.header().clone()),
|
||||
|_| true,
|
||||
)
|
||||
}
|
||||
@@ -344,14 +401,10 @@ impl<N: ProviderNodeTypes> HeaderProvider for BlockchainProvider2<N> {
|
||||
&self,
|
||||
range: impl RangeBounds<BlockNumber>,
|
||||
) -> ProviderResult<Vec<SealedHeader>> {
|
||||
self.fetch_db_mem_range(
|
||||
self.fetch_db_mem_range_while(
|
||||
range,
|
||||
|range, _| self.database.sealed_headers_range(range),
|
||||
|num, _| {
|
||||
self.canonical_in_memory_state
|
||||
.state_by_number(num)
|
||||
.map(|block_state| block_state.block().block().header.clone())
|
||||
},
|
||||
|db_provider, range, _| db_provider.sealed_headers_range(range),
|
||||
|block_state, _| Some(block_state.block().block().header.clone()),
|
||||
|_| true,
|
||||
)
|
||||
}
|
||||
@@ -361,14 +414,11 @@ impl<N: ProviderNodeTypes> HeaderProvider for BlockchainProvider2<N> {
|
||||
range: impl RangeBounds<BlockNumber>,
|
||||
predicate: impl FnMut(&SealedHeader) -> bool,
|
||||
) -> ProviderResult<Vec<SealedHeader>> {
|
||||
self.fetch_db_mem_range(
|
||||
self.fetch_db_mem_range_while(
|
||||
range,
|
||||
|range, predicate| self.database.sealed_headers_while(range, predicate),
|
||||
|num, predicate| {
|
||||
self.canonical_in_memory_state
|
||||
.state_by_number(num)
|
||||
.map(|block_state| block_state.block().block().header.clone())
|
||||
.filter(|header| predicate(header))
|
||||
|db_provider, range, predicate| db_provider.sealed_headers_while(range, predicate),
|
||||
|block_state, predicate| {
|
||||
Some(block_state.block().block().header.clone()).filter(|header| predicate(header))
|
||||
},
|
||||
predicate,
|
||||
)
|
||||
@@ -389,14 +439,13 @@ impl<N: ProviderNodeTypes> BlockHashReader for BlockchainProvider2<N> {
|
||||
start: BlockNumber,
|
||||
end: BlockNumber,
|
||||
) -> ProviderResult<Vec<B256>> {
|
||||
self.fetch_db_mem_range(
|
||||
start..=end,
|
||||
|range, _| self.database.canonical_hashes_range(*range.start(), *range.end()),
|
||||
|num, _| {
|
||||
self.canonical_in_memory_state
|
||||
.state_by_number(num)
|
||||
.map(|block_state| block_state.hash())
|
||||
self.fetch_db_mem_range_while(
|
||||
start..end,
|
||||
|db_provider, inclusive_range, _| {
|
||||
db_provider
|
||||
.canonical_hashes_range(*inclusive_range.start(), *inclusive_range.end() + 1)
|
||||
},
|
||||
|block_state, _| Some(block_state.hash()),
|
||||
|_| true,
|
||||
)
|
||||
}
|
||||
@@ -588,14 +637,10 @@ impl<N: ProviderNodeTypes> BlockReader for BlockchainProvider2<N> {
|
||||
}
|
||||
|
||||
fn block_range(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<Vec<Block>> {
|
||||
self.fetch_db_mem_range(
|
||||
self.fetch_db_mem_range_while(
|
||||
range,
|
||||
|range, _| self.database.block_range(range),
|
||||
|num, _| {
|
||||
self.canonical_in_memory_state
|
||||
.state_by_number(num)
|
||||
.map(|block_state| block_state.block().block().clone().unseal())
|
||||
},
|
||||
|db_provider, range, _| db_provider.block_range(range),
|
||||
|block_state, _| Some(block_state.block().block().clone().unseal()),
|
||||
|_| true,
|
||||
)
|
||||
}
|
||||
@@ -604,15 +649,13 @@ impl<N: ProviderNodeTypes> BlockReader for BlockchainProvider2<N> {
|
||||
&self,
|
||||
range: RangeInclusive<BlockNumber>,
|
||||
) -> ProviderResult<Vec<BlockWithSenders>> {
|
||||
self.fetch_db_mem_range(
|
||||
self.fetch_db_mem_range_while(
|
||||
range,
|
||||
|range, _| self.database.block_with_senders_range(range),
|
||||
|num, _| {
|
||||
self.canonical_in_memory_state.state_by_number(num).map(|block_state| {
|
||||
let block = block_state.block().block().clone();
|
||||
let senders = block_state.block().senders().clone();
|
||||
BlockWithSenders { block: block.unseal(), senders }
|
||||
})
|
||||
|db_provider, range, _| db_provider.block_with_senders_range(range),
|
||||
|block_state, _| {
|
||||
let block = block_state.block().block().clone();
|
||||
let senders = block_state.block().senders().clone();
|
||||
Some(BlockWithSenders { block: block.unseal(), senders })
|
||||
},
|
||||
|_| true,
|
||||
)
|
||||
@@ -622,15 +665,13 @@ impl<N: ProviderNodeTypes> BlockReader for BlockchainProvider2<N> {
|
||||
&self,
|
||||
range: RangeInclusive<BlockNumber>,
|
||||
) -> ProviderResult<Vec<SealedBlockWithSenders>> {
|
||||
self.fetch_db_mem_range(
|
||||
self.fetch_db_mem_range_while(
|
||||
range,
|
||||
|range, _| self.database.sealed_block_with_senders_range(range),
|
||||
|num, _| {
|
||||
self.canonical_in_memory_state.state_by_number(num).map(|block_state| {
|
||||
let block = block_state.block().block().clone();
|
||||
let senders = block_state.block().senders().clone();
|
||||
SealedBlockWithSenders { block, senders }
|
||||
})
|
||||
|db_provider, range, _| db_provider.sealed_block_with_senders_range(range),
|
||||
|block_state, _| {
|
||||
let block = block_state.block().block().clone();
|
||||
let senders = block_state.block().senders().clone();
|
||||
Some(SealedBlockWithSenders { block, senders })
|
||||
},
|
||||
|_| true,
|
||||
)
|
||||
@@ -2180,6 +2221,28 @@ mod tests {
|
||||
assert_eq!(retrieved_block, &expected_block.clone().unseal());
|
||||
}
|
||||
|
||||
// Check for partial in-memory ranges
|
||||
let blocks = provider.block_range(start_block_number + 1..=end_block_number)?;
|
||||
assert_eq!(blocks.len(), in_memory_blocks.len() - 1);
|
||||
for (retrieved_block, expected_block) in blocks.iter().zip(in_memory_blocks.iter().skip(1))
|
||||
{
|
||||
assert_eq!(retrieved_block, &expected_block.clone().unseal());
|
||||
}
|
||||
|
||||
let blocks = provider.block_range(start_block_number + 1..=end_block_number - 1)?;
|
||||
assert_eq!(blocks.len(), in_memory_blocks.len() - 2);
|
||||
for (retrieved_block, expected_block) in blocks.iter().zip(in_memory_blocks.iter().skip(1))
|
||||
{
|
||||
assert_eq!(retrieved_block, &expected_block.clone().unseal());
|
||||
}
|
||||
|
||||
let blocks = provider.block_range(start_block_number + 1..=end_block_number + 1)?;
|
||||
assert_eq!(blocks.len(), in_memory_blocks.len() - 1);
|
||||
for (retrieved_block, expected_block) in blocks.iter().zip(in_memory_blocks.iter().skip(1))
|
||||
{
|
||||
assert_eq!(retrieved_block, &expected_block.clone().unseal());
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user