mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-04-30 03:01:58 -04:00
Compare commits
9 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d3950bc83f | ||
|
|
3adbfe5b3c | ||
|
|
23e7012049 | ||
|
|
a41d853a84 | ||
|
|
435e07f67b | ||
|
|
76fe19f11a | ||
|
|
d81e9c9567 | ||
|
|
f3fe4606ad | ||
|
|
cf8f79c863 |
@@ -28,7 +28,7 @@ use alloy_eips::BlockHashOrNumber;
|
||||
use alloy_primitives::{
|
||||
keccak256,
|
||||
map::{hash_map, HashMap, HashSet},
|
||||
Address, BlockHash, BlockNumber, TxHash, TxNumber, B256,
|
||||
Address, BlockHash, BlockNumber, TxHash, TxNumber, B256, U256,
|
||||
};
|
||||
use itertools::Itertools;
|
||||
use parking_lot::RwLock;
|
||||
@@ -67,7 +67,8 @@ use reth_trie::{
|
||||
changesets::storage_trie_wiped_changeset_iter,
|
||||
trie_cursor::{InMemoryTrieCursor, TrieCursor, TrieCursorIter, TrieStorageCursor},
|
||||
updates::{StorageTrieUpdatesSorted, TrieUpdatesSorted},
|
||||
HashedPostStateSorted, StoredNibbles, StoredNibblesSubKey, TrieChangeSetsEntry,
|
||||
HashedPostStateSorted, HashedStorageSorted, StoredNibbles, StoredNibblesSubKey,
|
||||
TrieChangeSetsEntry,
|
||||
};
|
||||
use reth_trie_db::{ChangesetCache, DatabaseAccountTrieCursor, DatabaseStorageTrieCursor};
|
||||
use revm_database::states::{
|
||||
@@ -520,6 +521,10 @@ impl<TX: DbTx + DbTxMut + 'static, N: NodeTypesForProvider> DatabaseProvider<TX,
|
||||
);
|
||||
}
|
||||
|
||||
// Collect hashed state for batched k-way merge writes
|
||||
let mut hashed_states: Vec<Arc<HashedPostStateSorted>> =
|
||||
Vec::with_capacity(blocks.len());
|
||||
|
||||
for (i, block) in blocks.iter().enumerate() {
|
||||
let recovered_block = block.recovered_block();
|
||||
|
||||
@@ -546,17 +551,29 @@ impl<TX: DbTx + DbTxMut + 'static, N: NodeTypesForProvider> DatabaseProvider<TX,
|
||||
|
||||
let trie_data = block.trie_data();
|
||||
|
||||
// insert hashes and intermediate merkle nodes
|
||||
let start = Instant::now();
|
||||
self.write_hashed_state(&trie_data.hashed_state)?;
|
||||
timings.write_hashed_state += start.elapsed();
|
||||
|
||||
// Trie changesets must be written per-block (keyed by block number)
|
||||
let start = Instant::now();
|
||||
self.write_trie_updates_sorted(&trie_data.trie_updates)?;
|
||||
timings.write_trie_updates += start.elapsed();
|
||||
|
||||
// Collect hashed state reference for batched write (no clone)
|
||||
hashed_states.push(Arc::clone(&trie_data.hashed_state));
|
||||
}
|
||||
}
|
||||
|
||||
// Write hashed state - use k-way merge only for multiple blocks
|
||||
if !hashed_states.is_empty() {
|
||||
let start = Instant::now();
|
||||
if hashed_states.len() == 1 {
|
||||
// Single block: use sequential write (no merge overhead)
|
||||
self.write_hashed_state(&hashed_states[0])?;
|
||||
} else {
|
||||
// Multiple blocks: use k-way merge for sorted, deduped, sequential I/O
|
||||
self.write_hashed_state_merged(&hashed_states)?;
|
||||
}
|
||||
timings.write_hashed_state = start.elapsed();
|
||||
}
|
||||
|
||||
// Full mode: update history indices
|
||||
if save_mode.with_state() {
|
||||
let start = Instant::now();
|
||||
@@ -730,6 +747,356 @@ impl<TX: DbTx + DbTxMut + 'static, N: NodeTypesForProvider> DatabaseProvider<TX,
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Writes hashed state from multiple blocks using k-way merge for sequential I/O.
|
||||
///
|
||||
/// This method merges sorted hashed state from multiple blocks and writes them in globally
|
||||
/// sorted key order, deduplicating keys (last block wins). This provides better I/O locality
|
||||
/// than writing each block's state separately, as it avoids cursor jumping back and forth
|
||||
/// in the B-tree.
|
||||
#[instrument(level = "debug", target = "providers::db", skip_all)]
|
||||
pub fn write_hashed_state_merged(
|
||||
&self,
|
||||
hashed_states: &[Arc<HashedPostStateSorted>],
|
||||
) -> ProviderResult<()> {
|
||||
use std::collections::BinaryHeap;
|
||||
|
||||
// === Write hashed accounts using k-way merge ===
|
||||
{
|
||||
// Entry for the min-heap: (key, block_index, value)
|
||||
// We use Reverse to make it a min-heap by key, then max by block_index for dedup
|
||||
#[derive(Eq, PartialEq)]
|
||||
struct AccountEntry {
|
||||
key: B256,
|
||||
block_idx: usize,
|
||||
value: Option<Account>,
|
||||
iter_idx: usize, // which iterator this came from
|
||||
}
|
||||
|
||||
impl Ord for AccountEntry {
|
||||
fn cmp(&self, other: &Self) -> Ordering {
|
||||
// Min-heap by key, then max by block_idx (so latest block wins on ties)
|
||||
other.key.cmp(&self.key).then_with(|| self.block_idx.cmp(&other.block_idx))
|
||||
}
|
||||
}
|
||||
|
||||
impl PartialOrd for AccountEntry {
|
||||
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
|
||||
Some(self.cmp(other))
|
||||
}
|
||||
}
|
||||
|
||||
// Initialize iterators and heap
|
||||
let mut iters: Vec<_> = hashed_states
|
||||
.iter()
|
||||
.enumerate()
|
||||
.map(|(block_idx, state)| (block_idx, state.accounts().iter().peekable()))
|
||||
.collect();
|
||||
|
||||
let mut heap = BinaryHeap::new();
|
||||
|
||||
// Seed heap with first element from each iterator
|
||||
for (block_idx, iter) in &mut iters {
|
||||
if let Some(&(key, value)) = iter.next() {
|
||||
heap.push(AccountEntry {
|
||||
key,
|
||||
block_idx: *block_idx,
|
||||
value,
|
||||
iter_idx: *block_idx,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
let mut hashed_accounts_cursor =
|
||||
self.tx_ref().cursor_write::<tables::HashedAccounts>()?;
|
||||
|
||||
let mut last_key: Option<B256> = None;
|
||||
|
||||
while let Some(entry) = heap.pop() {
|
||||
// Refill heap from this iterator
|
||||
if let Some(&(next_key, next_value)) = iters[entry.iter_idx].1.next() {
|
||||
heap.push(AccountEntry {
|
||||
key: next_key,
|
||||
block_idx: entry.block_idx,
|
||||
value: next_value,
|
||||
iter_idx: entry.iter_idx,
|
||||
});
|
||||
}
|
||||
|
||||
// Skip duplicate keys (we already processed this key from a later block)
|
||||
if last_key == Some(entry.key) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Drain any remaining entries with the same key (take latest block)
|
||||
let mut final_value = entry.value;
|
||||
let mut final_block_idx = entry.block_idx;
|
||||
while let Some(next) = heap.peek() {
|
||||
if next.key != entry.key {
|
||||
break;
|
||||
}
|
||||
let next = heap.pop().unwrap();
|
||||
// Refill heap
|
||||
if let Some(&(next_key, next_value)) = iters[next.iter_idx].1.next() {
|
||||
heap.push(AccountEntry {
|
||||
key: next_key,
|
||||
block_idx: next.block_idx,
|
||||
value: next_value,
|
||||
iter_idx: next.iter_idx,
|
||||
});
|
||||
}
|
||||
// Take value from later block
|
||||
if next.block_idx > final_block_idx {
|
||||
final_value = next.value;
|
||||
final_block_idx = next.block_idx;
|
||||
}
|
||||
}
|
||||
|
||||
last_key = Some(entry.key);
|
||||
|
||||
// Write to database
|
||||
if let Some(account) = final_value {
|
||||
hashed_accounts_cursor.upsert(entry.key, &account)?;
|
||||
} else if hashed_accounts_cursor.seek_exact(entry.key)?.is_some() {
|
||||
hashed_accounts_cursor.delete_current()?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// === Write hashed storage using k-way merge ===
|
||||
{
|
||||
// First, collect all (address, block_idx, storage) tuples and sort by address
|
||||
let mut all_storages: Vec<(B256, usize, &HashedStorageSorted)> = Vec::new();
|
||||
for (block_idx, state) in hashed_states.iter().enumerate() {
|
||||
for (addr, storage) in state.account_storages() {
|
||||
all_storages.push((*addr, block_idx, storage));
|
||||
}
|
||||
}
|
||||
all_storages.sort_unstable_by_key(|(addr, _, _)| *addr);
|
||||
|
||||
let mut hashed_storage_cursor =
|
||||
self.tx_ref().cursor_dup_write::<tables::HashedStorages>()?;
|
||||
|
||||
// Group by address and process each address's storage with merge
|
||||
let mut i = 0;
|
||||
while i < all_storages.len() {
|
||||
let current_addr = all_storages[i].0;
|
||||
|
||||
// Collect all storages for this address
|
||||
let mut addr_storages: Vec<(usize, &HashedStorageSorted)> = Vec::new();
|
||||
while i < all_storages.len() && all_storages[i].0 == current_addr {
|
||||
addr_storages.push((all_storages[i].1, all_storages[i].2));
|
||||
i += 1;
|
||||
}
|
||||
|
||||
// Check if any block wiped this storage (latest wipe wins)
|
||||
let mut is_wiped = false;
|
||||
let mut wipe_block_idx = 0;
|
||||
for (block_idx, storage) in &addr_storages {
|
||||
if storage.is_wiped() && *block_idx >= wipe_block_idx {
|
||||
is_wiped = true;
|
||||
wipe_block_idx = *block_idx;
|
||||
}
|
||||
}
|
||||
|
||||
if is_wiped {
|
||||
if hashed_storage_cursor.seek_exact(current_addr)?.is_some() {
|
||||
hashed_storage_cursor.delete_current_duplicates()?;
|
||||
}
|
||||
// Only include slots from blocks >= wipe_block_idx
|
||||
addr_storages.retain(|(block_idx, _)| *block_idx >= wipe_block_idx);
|
||||
}
|
||||
|
||||
// K-way merge for storage slots
|
||||
#[derive(Eq, PartialEq)]
|
||||
struct SlotEntry<'a> {
|
||||
key: B256,
|
||||
value: U256,
|
||||
block_idx: usize,
|
||||
iter_idx: usize,
|
||||
remaining: &'a [(B256, U256)],
|
||||
}
|
||||
|
||||
impl Ord for SlotEntry<'_> {
|
||||
fn cmp(&self, other: &Self) -> Ordering {
|
||||
other.key.cmp(&self.key).then_with(|| self.block_idx.cmp(&other.block_idx))
|
||||
}
|
||||
}
|
||||
|
||||
impl PartialOrd for SlotEntry<'_> {
|
||||
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
|
||||
Some(self.cmp(other))
|
||||
}
|
||||
}
|
||||
|
||||
let mut heap: BinaryHeap<SlotEntry<'_>> = BinaryHeap::new();
|
||||
|
||||
// Initialize heap with first slot from each storage
|
||||
for (idx, (block_idx, storage)) in addr_storages.iter().enumerate() {
|
||||
let slots = storage.storage_slots_ref();
|
||||
if let Some((&(key, value), remaining)) = slots.split_first() {
|
||||
heap.push(SlotEntry {
|
||||
key,
|
||||
value,
|
||||
block_idx: *block_idx,
|
||||
iter_idx: idx,
|
||||
remaining,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
let mut last_slot: Option<B256> = None;
|
||||
|
||||
while let Some(entry) = heap.pop() {
|
||||
// Refill heap
|
||||
if let Some((&(next_key, next_value), next_remaining)) =
|
||||
entry.remaining.split_first()
|
||||
{
|
||||
heap.push(SlotEntry {
|
||||
key: next_key,
|
||||
value: next_value,
|
||||
block_idx: entry.block_idx,
|
||||
iter_idx: entry.iter_idx,
|
||||
remaining: next_remaining,
|
||||
});
|
||||
}
|
||||
|
||||
// Skip if we already processed this slot
|
||||
if last_slot == Some(entry.key) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Drain duplicates, taking latest block's value
|
||||
let mut final_value = entry.value;
|
||||
let mut final_block_idx = entry.block_idx;
|
||||
while let Some(next) = heap.peek() {
|
||||
if next.key != entry.key {
|
||||
break;
|
||||
}
|
||||
let next = heap.pop().unwrap();
|
||||
if let Some((&(next_key, next_value), next_remaining)) =
|
||||
next.remaining.split_first()
|
||||
{
|
||||
heap.push(SlotEntry {
|
||||
key: next_key,
|
||||
value: next_value,
|
||||
block_idx: next.block_idx,
|
||||
iter_idx: next.iter_idx,
|
||||
remaining: next_remaining,
|
||||
});
|
||||
}
|
||||
if next.block_idx > final_block_idx {
|
||||
final_value = next.value;
|
||||
final_block_idx = next.block_idx;
|
||||
}
|
||||
}
|
||||
|
||||
last_slot = Some(entry.key);
|
||||
|
||||
// Write to database
|
||||
let storage_entry = StorageEntry { key: entry.key, value: final_value };
|
||||
|
||||
if let Some(db_entry) =
|
||||
hashed_storage_cursor.seek_by_key_subkey(current_addr, storage_entry.key)? &&
|
||||
db_entry.key == storage_entry.key
|
||||
{
|
||||
hashed_storage_cursor.delete_current()?;
|
||||
}
|
||||
|
||||
if !storage_entry.value.is_zero() {
|
||||
hashed_storage_cursor.upsert(current_addr, &storage_entry)?;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Write receipts for a single block's execution outcome.
|
||||
///
|
||||
/// This is used by `save_blocks` to write receipts per-block while batching other state writes.
|
||||
pub fn write_receipts(
|
||||
&self,
|
||||
execution_outcome: &ExecutionOutcome<ReceiptTy<N>>,
|
||||
config: StateWriteConfig,
|
||||
) -> ProviderResult<()> {
|
||||
if !config.write_receipts {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let first_block = execution_outcome.first_block();
|
||||
let block_count = execution_outcome.len() as u64;
|
||||
let last_block = execution_outcome.last_block();
|
||||
let block_range = first_block..=last_block;
|
||||
|
||||
let tip = self.last_block_number()?.max(last_block);
|
||||
|
||||
// Fetch the first transaction number for each block in the range
|
||||
let block_indices: Vec<_> = self
|
||||
.block_body_indices_range(block_range)?
|
||||
.into_iter()
|
||||
.map(|b| b.first_tx_num)
|
||||
.collect();
|
||||
|
||||
// Ensure all expected blocks are present.
|
||||
if block_indices.len() < block_count as usize {
|
||||
let missing_blocks = block_count - block_indices.len() as u64;
|
||||
return Err(ProviderError::BlockBodyIndicesNotFound(
|
||||
last_block.saturating_sub(missing_blocks - 1),
|
||||
));
|
||||
}
|
||||
|
||||
let mut receipts_writer = EitherWriter::new_receipts(self, first_block)?;
|
||||
|
||||
let has_contract_log_filter = !self.prune_modes.receipts_log_filter.is_empty();
|
||||
let contract_log_pruner = self.prune_modes.receipts_log_filter.group_by_block(tip, None)?;
|
||||
|
||||
let prunable_receipts = (EitherWriter::receipts_destination(self).is_database() ||
|
||||
self.static_file_provider()
|
||||
.get_highest_static_file_tx(StaticFileSegment::Receipts)
|
||||
.is_none()) &&
|
||||
PruneMode::Distance(self.minimum_pruning_distance).should_prune(first_block, tip);
|
||||
|
||||
let mut allowed_addresses: HashSet<Address, _> = HashSet::new();
|
||||
for (_, addresses) in contract_log_pruner.range(..first_block) {
|
||||
allowed_addresses.extend(addresses.iter().copied());
|
||||
}
|
||||
|
||||
for (idx, (receipts, first_tx_index)) in
|
||||
execution_outcome.receipts.iter().zip(block_indices).enumerate()
|
||||
{
|
||||
let block_number = first_block + idx as u64;
|
||||
|
||||
receipts_writer.increment_block(block_number)?;
|
||||
|
||||
if prunable_receipts &&
|
||||
self.prune_modes
|
||||
.receipts
|
||||
.is_some_and(|mode| mode.should_prune(block_number, tip))
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
if let Some(new_addresses) = contract_log_pruner.get(&block_number) {
|
||||
allowed_addresses.extend(new_addresses.iter().copied());
|
||||
}
|
||||
|
||||
for (idx, receipt) in receipts.iter().enumerate() {
|
||||
let receipt_idx = first_tx_index + idx as u64;
|
||||
if prunable_receipts &&
|
||||
has_contract_log_filter &&
|
||||
!receipt.logs().iter().any(|log| allowed_addresses.contains(&log.address))
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
receipts_writer.append_receipt(receipt_idx, receipt)?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl<TX: DbTx + 'static, N: NodeTypes> TryIntoHistoricalStateProvider for DatabaseProvider<TX, N> {
|
||||
|
||||
Reference in New Issue
Block a user