diff --git a/crates/storage/provider/src/providers/blockchain_provider.rs b/crates/storage/provider/src/providers/blockchain_provider.rs index b79d8a89ef..7e375daba9 100644 --- a/crates/storage/provider/src/providers/blockchain_provider.rs +++ b/crates/storage/provider/src/providers/blockchain_provider.rs @@ -122,12 +122,17 @@ impl BlockchainProvider2 { (start, end) } - /// Fetches a range of data from both in-memory state and storage. + /// Fetches a range of data from both in-memory state and storage while a predicate is met. /// - /// - `fetch_db_range`: Retrieves a range of items from the database. - /// - `map_block_state_item`: Maps a block number to an item in memory. Stops fetching if `None` - /// is returned. - fn fetch_db_mem_range( + /// Creates a snapshot of the in-memory chain state and database provider to prevent + /// inconsistencies. Splits the range into in-memory and storage sections, prioritizing + /// recent in-memory blocks in case of overlaps. + /// + /// * `fetch_db_range` function (`F`) provides access to the database provider, allowing the + /// user to retrieve the required items from the database using [`RangeInclusive`]. + /// * `map_block_state_item` function (`G`) provides each block of the range in the in-memory + /// state, allowing for selection or filtering for the desired data. + fn fetch_db_mem_range_while( &self, range: impl RangeBounds, fetch_db_range: F, @@ -135,34 +140,90 @@ impl BlockchainProvider2 { mut predicate: P, ) -> ProviderResult> where - F: FnOnce(RangeInclusive, &mut P) -> ProviderResult>, - G: Fn(BlockNumber, &mut P) -> Option, + F: FnOnce( + &DatabaseProviderRO, + RangeInclusive, + &mut P, + ) -> ProviderResult>, + G: Fn(Arc, &mut P) -> Option, P: FnMut(&T) -> bool, { + // Each one provides a snapshot at the time of instantiation, but its order matters. + // + // If we acquire first the database provider, it's possible that before the in-memory chain + // snapshot is instantiated, it will flush blocks to disk. This would + // mean that our database provider would not have access to the flushed blocks (since it's + // working under an older view), while the in-memory state may have deleted them + // entirely. Resulting in gaps on the range. + let mut in_memory_chain = + self.canonical_in_memory_state.canonical_chain().collect::>(); + let db_provider = self.database_provider_ro()?; + let (start, end) = self.convert_range_bounds(range, || { - self.canonical_in_memory_state.get_canonical_block_number() + // the first block is the highest one. + in_memory_chain + .first() + .map(|b| b.number()) + .unwrap_or_else(|| db_provider.last_block_number().unwrap_or_default()) }); - let mut range = start..=end; + + // Split range into storage_range and in-memory range. If the in-memory range is not + // necessary drop it early. + // + // The last block of `in_memory_chain` is the lowest block number. + let (in_memory, storage_range) = match in_memory_chain.last().as_ref().map(|b| b.number()) { + Some(lowest_memory_block) if lowest_memory_block <= end => { + let highest_memory_block = + in_memory_chain.first().as_ref().map(|b| b.number()).expect("qed"); + + // Database will for a time overlap with in-memory-chain blocks. In + // case of a re-org, it can mean that the database blocks are of a forked chain, and + // so, we should prioritize the in-memory overlapped blocks. + let in_memory_range = + lowest_memory_block.max(start)..=end.min(highest_memory_block); + + // If requested range is in the middle of the in-memory range, remove the necessary + // lowest blocks + in_memory_chain.truncate( + in_memory_chain + .len() + .saturating_sub(start.saturating_sub(lowest_memory_block) as usize), + ); + + let storage_range = + (lowest_memory_block > start).then(|| start..=lowest_memory_block - 1); + + (Some((in_memory_chain, in_memory_range)), storage_range) + } + _ => { + // Drop the in-memory chain so we don't hold blocks in memory. + drop(in_memory_chain); + + (None, Some(start..=end)) + } + }; + let mut items = Vec::with_capacity((end - start + 1) as usize); - // First, fetch the items from the database - let mut db_items = fetch_db_range(range.clone(), &mut predicate)?; - - if !db_items.is_empty() { + if let Some(storage_range) = storage_range { + let mut db_items = fetch_db_range(&db_provider, storage_range.clone(), &mut predicate)?; items.append(&mut db_items); - // Advance the range iterator by the number of items fetched from the database - range.nth(items.len() - 1); + // The predicate was not met, if the number of items differs from the expected. So, we + // return what we have. + if items.len() as u64 != storage_range.end() - storage_range.start() + 1 { + return Ok(items) + } } - // Fetch the remaining items from the in-memory state - for num in range { - // TODO: there might be an update between loop iterations, we - // need to handle that situation. - if let Some(item) = map_block_state_item(num, &mut predicate) { - items.push(item); - } else { - break; + if let Some((in_memory_chain, in_memory_range)) = in_memory { + for (num, block) in in_memory_range.zip(in_memory_chain.into_iter().rev()) { + debug_assert!(num == block.number()); + if let Some(item) = map_block_state_item(block, &mut predicate) { + items.push(item); + } else { + break + } } } @@ -320,14 +381,10 @@ impl HeaderProvider for BlockchainProvider2 { } fn headers_range(&self, range: impl RangeBounds) -> ProviderResult> { - self.fetch_db_mem_range( + self.fetch_db_mem_range_while( range, - |range, _| self.database.headers_range(range), - |num, _| { - self.canonical_in_memory_state - .state_by_number(num) - .map(|block_state| block_state.block().block().header.header().clone()) - }, + |db_provider, range, _| db_provider.headers_range(range), + |block_state, _| Some(block_state.block().block().header.header().clone()), |_| true, ) } @@ -344,14 +401,10 @@ impl HeaderProvider for BlockchainProvider2 { &self, range: impl RangeBounds, ) -> ProviderResult> { - self.fetch_db_mem_range( + self.fetch_db_mem_range_while( range, - |range, _| self.database.sealed_headers_range(range), - |num, _| { - self.canonical_in_memory_state - .state_by_number(num) - .map(|block_state| block_state.block().block().header.clone()) - }, + |db_provider, range, _| db_provider.sealed_headers_range(range), + |block_state, _| Some(block_state.block().block().header.clone()), |_| true, ) } @@ -361,14 +414,11 @@ impl HeaderProvider for BlockchainProvider2 { range: impl RangeBounds, predicate: impl FnMut(&SealedHeader) -> bool, ) -> ProviderResult> { - self.fetch_db_mem_range( + self.fetch_db_mem_range_while( range, - |range, predicate| self.database.sealed_headers_while(range, predicate), - |num, predicate| { - self.canonical_in_memory_state - .state_by_number(num) - .map(|block_state| block_state.block().block().header.clone()) - .filter(|header| predicate(header)) + |db_provider, range, predicate| db_provider.sealed_headers_while(range, predicate), + |block_state, predicate| { + Some(block_state.block().block().header.clone()).filter(|header| predicate(header)) }, predicate, ) @@ -389,14 +439,13 @@ impl BlockHashReader for BlockchainProvider2 { start: BlockNumber, end: BlockNumber, ) -> ProviderResult> { - self.fetch_db_mem_range( - start..=end, - |range, _| self.database.canonical_hashes_range(*range.start(), *range.end()), - |num, _| { - self.canonical_in_memory_state - .state_by_number(num) - .map(|block_state| block_state.hash()) + self.fetch_db_mem_range_while( + start..end, + |db_provider, inclusive_range, _| { + db_provider + .canonical_hashes_range(*inclusive_range.start(), *inclusive_range.end() + 1) }, + |block_state, _| Some(block_state.hash()), |_| true, ) } @@ -588,14 +637,10 @@ impl BlockReader for BlockchainProvider2 { } fn block_range(&self, range: RangeInclusive) -> ProviderResult> { - self.fetch_db_mem_range( + self.fetch_db_mem_range_while( range, - |range, _| self.database.block_range(range), - |num, _| { - self.canonical_in_memory_state - .state_by_number(num) - .map(|block_state| block_state.block().block().clone().unseal()) - }, + |db_provider, range, _| db_provider.block_range(range), + |block_state, _| Some(block_state.block().block().clone().unseal()), |_| true, ) } @@ -604,15 +649,13 @@ impl BlockReader for BlockchainProvider2 { &self, range: RangeInclusive, ) -> ProviderResult> { - self.fetch_db_mem_range( + self.fetch_db_mem_range_while( range, - |range, _| self.database.block_with_senders_range(range), - |num, _| { - self.canonical_in_memory_state.state_by_number(num).map(|block_state| { - let block = block_state.block().block().clone(); - let senders = block_state.block().senders().clone(); - BlockWithSenders { block: block.unseal(), senders } - }) + |db_provider, range, _| db_provider.block_with_senders_range(range), + |block_state, _| { + let block = block_state.block().block().clone(); + let senders = block_state.block().senders().clone(); + Some(BlockWithSenders { block: block.unseal(), senders }) }, |_| true, ) @@ -622,15 +665,13 @@ impl BlockReader for BlockchainProvider2 { &self, range: RangeInclusive, ) -> ProviderResult> { - self.fetch_db_mem_range( + self.fetch_db_mem_range_while( range, - |range, _| self.database.sealed_block_with_senders_range(range), - |num, _| { - self.canonical_in_memory_state.state_by_number(num).map(|block_state| { - let block = block_state.block().block().clone(); - let senders = block_state.block().senders().clone(); - SealedBlockWithSenders { block, senders } - }) + |db_provider, range, _| db_provider.sealed_block_with_senders_range(range), + |block_state, _| { + let block = block_state.block().block().clone(); + let senders = block_state.block().senders().clone(); + Some(SealedBlockWithSenders { block, senders }) }, |_| true, ) @@ -2180,6 +2221,28 @@ mod tests { assert_eq!(retrieved_block, &expected_block.clone().unseal()); } + // Check for partial in-memory ranges + let blocks = provider.block_range(start_block_number + 1..=end_block_number)?; + assert_eq!(blocks.len(), in_memory_blocks.len() - 1); + for (retrieved_block, expected_block) in blocks.iter().zip(in_memory_blocks.iter().skip(1)) + { + assert_eq!(retrieved_block, &expected_block.clone().unseal()); + } + + let blocks = provider.block_range(start_block_number + 1..=end_block_number - 1)?; + assert_eq!(blocks.len(), in_memory_blocks.len() - 2); + for (retrieved_block, expected_block) in blocks.iter().zip(in_memory_blocks.iter().skip(1)) + { + assert_eq!(retrieved_block, &expected_block.clone().unseal()); + } + + let blocks = provider.block_range(start_block_number + 1..=end_block_number + 1)?; + assert_eq!(blocks.len(), in_memory_blocks.len() - 1); + for (retrieved_block, expected_block) in blocks.iter().zip(in_memory_blocks.iter().skip(1)) + { + assert_eq!(retrieved_block, &expected_block.clone().unseal()); + } + Ok(()) }