Compare commits

...

4 Commits

Author SHA1 Message Date
Brian Picciano
e9cdd1fbf8 perf(engine): restore persistence defaults from ac6106
Co-Authored-By: Brian Picciano <933154+mediocregopher@users.noreply.github.com>
2026-04-16 09:24:19 +00:00
Brian Picciano
4ebc6a18c6 perf(engine): set memory_block_buffer_target to 20
Buffer of 40 caused MDBX read transaction timeouts (300s limit)
in the deferred trie task. 20 is a safe middle ground.

- memory_block_buffer_target: 40 → 20
- persistence_threshold: 42 → 22 (triggers at gap 23, persists 3)
- backpressure_threshold: 164 → 84 (2 × (22 + 20))

Co-authored-by: Brian Picciano <933154+mediocregopher@users.noreply.github.com>
Amp-Thread-ID: https://ampcode.com/threads/T-019d90cb-dc82-73be-8bbc-1a27d8705156
Co-authored-by: Amp <amp@ampcode.com>
2026-04-15 21:05:17 +00:00
Brian Picciano
7fa5608b45 perf(engine): increase memory_block_buffer_target to 40
Raises buffer from 10 to 40 blocks to filter more overwritten
hashed state and trie writes from MDBX persistence.

- memory_block_buffer_target: 10 → 40
- persistence_threshold: 12 → 42 (triggers at gap 43, persists 3)
- backpressure_threshold: 44 → 164 (2 × (42 + 40))

Co-authored-by: Brian Picciano <933154+mediocregopher@users.noreply.github.com>
Amp-Thread-ID: https://ampcode.com/threads/T-019d90cb-dc82-73be-8bbc-1a27d8705156
Co-authored-by: Amp <amp@ampcode.com>
2026-04-15 14:57:47 +00:00
Brian Picciano
ac6106e6f3 perf(engine): pass buffer blocks to save_blocks and filter overwritten hashed state/trie updates
Introduces SaveBlocksBatch to carry both blocks-to-persist and in-memory
buffer blocks through the persistence pipeline. Buffer blocks' hashed
state keys and trie node paths are collected into skip sets so that MDBX
writes which will be immediately overwritten on the next persistence
cycle are elided.

Raises default persistence thresholds:
- memory_block_buffer_target: 0 → 10
- persistence_threshold: 2 → 12 (triggers at gap 13, persists 3)
- backpressure_threshold: 16 → 44 (2 × (12 + 10))

Co-Authored-By: Brian Picciano <933154+mediocregopher@users.noreply.github.com>
Amp-Thread-ID: https://ampcode.com/threads/T-019d90cb-dc82-73be-8bbc-1a27d8705156
Co-authored-by: Amp <amp@ampcode.com>
2026-04-15 11:37:28 +00:00
8 changed files with 237 additions and 69 deletions

View File

@@ -4,13 +4,13 @@ use alloy_eips::merge::EPOCH_SLOTS;
use core::time::Duration;
/// Triggers persistence when the number of canonical blocks in memory exceeds this threshold.
pub const DEFAULT_PERSISTENCE_THRESHOLD: u64 = 2;
pub const DEFAULT_PERSISTENCE_THRESHOLD: u64 = 12;
/// Maximum canonical-minus-persisted gap before engine API processing is stalled.
pub const DEFAULT_PERSISTENCE_BACKPRESSURE_THRESHOLD: u64 = 16;
pub const DEFAULT_PERSISTENCE_BACKPRESSURE_THRESHOLD: u64 = 44;
/// How close to the canonical head we persist blocks.
pub const DEFAULT_MEMORY_BLOCK_BUFFER_TARGET: u64 = 0;
pub const DEFAULT_MEMORY_BLOCK_BUFFER_TARGET: u64 = 10;
/// The size of proof targets chunk to spawn in one multiproof calculation.
pub const DEFAULT_MULTIPROOF_TASK_CHUNK_SIZE: usize = 5;

View File

@@ -102,8 +102,8 @@ where
self.sync_metrics_tx.send(MetricEvent::SyncHeight { height: new_tip_num });
let _ = sender.send(PersistenceResult { last_block, commit_duration: None });
}
PersistenceAction::SaveBlocks(blocks, sender) => {
let result = self.on_save_blocks(blocks)?;
PersistenceAction::SaveBlocks(batch, sender) => {
let result = self.on_save_blocks(batch)?;
let result_number = result.last_block.map(|b| b.number);
let _ = sender.send(result);
@@ -144,14 +144,18 @@ where
Ok(new_tip_hash.map(|hash| BlockNumHash { hash, number: new_tip_num }))
}
#[instrument(level = "debug", target = "engine::persistence", skip_all, fields(block_count = blocks.len()))]
#[instrument(level = "debug", target = "engine::persistence", skip_all, fields(block_count = batch.blocks.len()))]
fn on_save_blocks(
&mut self,
blocks: Vec<ExecutedBlock<N::Primitives>>,
batch: SaveBlocksBatch<N::Primitives>,
) -> Result<PersistenceResult, PersistenceError> {
let first_block = blocks.first().map(|b| b.recovered_block.num_hash());
let last_block = blocks.last().map(|b| b.recovered_block.num_hash());
let block_count = blocks.len();
let SaveBlocksBatch { blocks, persist_count } = batch;
let (blocks_to_persist, buffer_blocks) = blocks.split_at(persist_count);
let blocks_to_persist = blocks_to_persist.to_vec();
let first_block = blocks_to_persist.first().map(|b| b.recovered_block.num_hash());
let last_block = blocks_to_persist.last().map(|b| b.recovered_block.num_hash());
let block_count = blocks_to_persist.len();
let pending_finalized = self.pending_finalized_block.take();
let pending_safe = self.pending_safe_block.take();
@@ -162,7 +166,7 @@ where
if let Some(last) = last_block {
let provider_rw = self.provider.database_provider_rw()?;
provider_rw.save_blocks(blocks, SaveBlocksMode::Full)?;
provider_rw.save_blocks(blocks_to_persist, SaveBlocksMode::Full, buffer_blocks)?;
if let Some(finalized) = pending_finalized {
provider_rw.save_finalized_block_number(finalized.min(last.number))?;
@@ -216,6 +220,28 @@ pub enum PersistenceError {
ProviderError(#[from] ProviderError),
}
/// A batch of blocks passed to the persistence service.
///
/// Contains all canonical blocks collected during a persistence cycle. Only the first
/// `persist_count` blocks are written to disk; the remaining blocks are included for
/// downstream consumers but are not persisted yet.
#[derive(Debug, Clone, PartialEq)]
pub struct SaveBlocksBatch<N: NodePrimitives = EthPrimitives> {
/// All blocks in the batch, ordered oldest to newest.
pub blocks: Vec<ExecutedBlock<N>>,
/// Number of leading blocks to actually persist. The blocks at indices
/// `[persist_count..]` are passed through but not written to storage.
pub persist_count: usize,
}
impl<N: NodePrimitives> SaveBlocksBatch<N> {
/// Creates a new batch where all blocks should be persisted.
pub fn persist_all(blocks: Vec<ExecutedBlock<N>>) -> Self {
let persist_count = blocks.len();
Self { blocks, persist_count }
}
}
/// A signal to the persistence service that part of the tree state can be persisted.
#[derive(Debug)]
pub enum PersistenceAction<N: NodePrimitives = EthPrimitives> {
@@ -224,7 +250,7 @@ pub enum PersistenceAction<N: NodePrimitives = EthPrimitives> {
///
/// First, header, transaction, and receipt-related data should be written to static files.
/// Then the execution history-related data will be written to the database.
SaveBlocks(Vec<ExecutedBlock<N>>, CrossbeamSender<PersistenceResult>),
SaveBlocks(SaveBlocksBatch<N>, CrossbeamSender<PersistenceResult>),
/// Removes block data above the given block number from the database.
///
@@ -308,10 +334,10 @@ impl<T: NodePrimitives> PersistenceHandle<T> {
/// If there are no blocks to persist, then `None` is sent in the sender.
pub fn save_blocks(
&self,
blocks: Vec<ExecutedBlock<T>>,
batch: SaveBlocksBatch<T>,
tx: CrossbeamSender<PersistenceResult>,
) -> Result<(), SendError<PersistenceAction<T>>> {
self.send_action(PersistenceAction::SaveBlocks(blocks, tx))
self.send_action(PersistenceAction::SaveBlocks(batch, tx))
}
/// Queues the finalized block number to be persisted on disk.
@@ -407,7 +433,7 @@ mod tests {
let blocks = vec![];
let (tx, rx) = crossbeam_channel::bounded(1);
handle.save_blocks(blocks, tx).unwrap();
handle.save_blocks(SaveBlocksBatch::persist_all(blocks), tx).unwrap();
let result = rx.recv().unwrap();
assert!(result.last_block.is_none());
@@ -426,7 +452,7 @@ mod tests {
let blocks = vec![executed];
let (tx, rx) = crossbeam_channel::bounded(1);
handle.save_blocks(blocks, tx).unwrap();
handle.save_blocks(SaveBlocksBatch::persist_all(blocks), tx).unwrap();
let result = rx.recv_timeout(std::time::Duration::from_secs(10)).expect("test timed out");
@@ -443,7 +469,7 @@ mod tests {
let last_hash = blocks.last().unwrap().recovered_block().hash();
let (tx, rx) = crossbeam_channel::bounded(1);
handle.save_blocks(blocks, tx).unwrap();
handle.save_blocks(SaveBlocksBatch::persist_all(blocks), tx).unwrap();
let result = rx.recv().unwrap();
assert_eq!(last_hash, result.last_block.unwrap().hash);
}
@@ -460,7 +486,7 @@ mod tests {
let last_hash = blocks.last().unwrap().recovered_block().hash();
let (tx, rx) = crossbeam_channel::bounded(1);
handle.save_blocks(blocks, tx).unwrap();
handle.save_blocks(SaveBlocksBatch::persist_all(blocks), tx).unwrap();
let result = rx.recv().unwrap();
assert_eq!(last_hash, result.last_block.unwrap().hash);
@@ -555,7 +581,7 @@ mod tests {
{
let provider_rw = provider_factory.database_provider_rw().unwrap();
provider_rw.save_blocks(blocks_a, SaveBlocksMode::Full).unwrap();
provider_rw.save_blocks(blocks_a, SaveBlocksMode::Full, &[]).unwrap();
provider_rw.commit().unwrap();
}
@@ -612,7 +638,7 @@ mod tests {
provider_rw.commit().unwrap();
let provider_rw = pf.database_provider_rw().unwrap();
provider_rw.save_blocks(vec![block_b2], SaveBlocksMode::Full).unwrap();
provider_rw.save_blocks(vec![block_b2], SaveBlocksMode::Full, &[]).unwrap();
provider_rw.commit().unwrap();
});

View File

@@ -2,7 +2,7 @@ use crate::{
backfill::{BackfillAction, BackfillSyncState},
chain::FromOrchestrator,
engine::{DownloadRequest, EngineApiEvent, EngineApiKind, EngineApiRequest, FromEngine},
persistence::PersistenceHandle,
persistence::{PersistenceHandle, SaveBlocksBatch},
tree::{error::InsertPayloadError, payload_validator::TreeCtx},
};
use alloy_consensus::BlockHeader;
@@ -1349,22 +1349,25 @@ where
/// Helper method to save blocks and set the persistence state. This ensures we keep track of
/// the current persistence action while we're saving blocks.
fn persist_blocks(&mut self, blocks_to_persist: Vec<ExecutedBlock<N>>) {
if blocks_to_persist.is_empty() {
fn persist_blocks(&mut self, batch: SaveBlocksBatch<N>) {
if batch.blocks.is_empty() {
debug!(target: "engine::tree", "Returned empty set of blocks to persist");
return
}
// NOTE: checked non-empty above
let highest_num_hash = blocks_to_persist
// The highest block to be persisted determines the persistence state tracking.
// Use persist_count to find the highest block that will actually be written to disk.
let highest_num_hash = batch
.blocks
.iter()
.take(batch.persist_count)
.max_by_key(|block| block.recovered_block().number())
.map(|b| b.recovered_block().num_hash())
.expect("Checked non-empty persisting blocks");
debug!(target: "engine::tree", count=blocks_to_persist.len(), blocks = ?blocks_to_persist.iter().map(|block| block.recovered_block().num_hash()).collect::<Vec<_>>(), "Persisting blocks");
debug!(target: "engine::tree", count=batch.blocks.len(), persist_count=batch.persist_count, blocks = ?batch.blocks.iter().map(|block| block.recovered_block().num_hash()).collect::<Vec<_>>(), "Persisting blocks");
let (tx, rx) = crossbeam_channel::bounded(1);
let _ = self.persistence.save_blocks(blocks_to_persist, tx);
let _ = self.persistence.save_blocks(batch, tx);
self.persistence_state.start_save(highest_num_hash, rx);
}
@@ -1378,9 +1381,8 @@ where
if let Some(new_tip_num) = self.find_disk_reorg()? {
self.remove_blocks(new_tip_num)
} else if self.should_persist() {
let blocks_to_persist =
self.get_canonical_blocks_to_persist(PersistTarget::Threshold)?;
self.persist_blocks(blocks_to_persist);
let batch = self.get_canonical_blocks_to_persist(PersistTarget::Threshold)?;
self.persist_blocks(batch);
}
}
@@ -1411,15 +1413,15 @@ where
self.on_persistence_complete(result, start_time)?;
}
let blocks_to_persist = self.get_canonical_blocks_to_persist(PersistTarget::Head)?;
let batch = self.get_canonical_blocks_to_persist(PersistTarget::Head)?;
if blocks_to_persist.is_empty() {
if batch.blocks.is_empty() {
debug!(target: "engine::tree", "persistence complete, signaling termination");
return Ok(())
}
debug!(target: "engine::tree", count = blocks_to_persist.len(), "persisting remaining blocks before shutdown");
self.persist_blocks(blocks_to_persist);
debug!(target: "engine::tree", count = batch.blocks.len(), "persisting remaining blocks before shutdown");
self.persist_blocks(batch);
}
}
@@ -2018,16 +2020,20 @@ where
}
/// Returns a batch of consecutive canonical blocks to persist in the range
/// `(last_persisted_number .. target]`. The expected order is oldest -> newest.
/// `(last_persisted_number .. canonical_head]`. The expected order is oldest -> newest.
///
/// All blocks above `last_persisted_number` are included in the batch, but only
/// those up to the persistence target (determined by [`PersistTarget`]) are marked
/// for actual persistence via [`SaveBlocksBatch::persist_count`].
fn get_canonical_blocks_to_persist(
&self,
target: PersistTarget,
) -> Result<Vec<ExecutedBlock<N>>, AdvancePersistenceError> {
) -> Result<SaveBlocksBatch<N>, AdvancePersistenceError> {
// We will calculate the state root using the database, so we need to be sure there are no
// changes
debug_assert!(!self.persistence_state.in_progress());
let mut blocks_to_persist = Vec::new();
let mut all_blocks = Vec::new();
let mut current_hash = self.state.tree_state.canonical_block_hash();
let last_persisted_number = self.persistence_state.last_persisted_block.number;
let canonical_head_number = self.state.tree_state.canonical_block_number();
@@ -2052,17 +2058,18 @@ where
break;
}
if block.recovered_block().number() <= target_number {
blocks_to_persist.push(block.clone());
}
all_blocks.push(block.clone());
current_hash = block.recovered_block().parent_hash();
}
// Reverse the order so that the oldest block comes first
blocks_to_persist.reverse();
all_blocks.reverse();
Ok(blocks_to_persist)
// Only blocks up to target_number are persisted; the rest are buffer blocks
let persist_count =
all_blocks.iter().filter(|b| b.recovered_block().number() <= target_number).count();
Ok(SaveBlocksBatch { blocks: all_blocks, persist_count })
}
/// This clears the blocks from the in-memory tree state that have been persisted to the

View File

@@ -1,6 +1,6 @@
use super::*;
use crate::{
persistence::PersistenceAction,
persistence::{PersistenceAction, SaveBlocksBatch},
tree::{
payload_validator::{BasicEngineValidator, TreeCtx, ValidationOutcome},
persistence_state::CurrentPersistenceAction,
@@ -548,12 +548,14 @@ async fn test_tree_persist_blocks() {
let received_action =
test_harness.action_rx.recv().expect("Failed to receive save blocks action");
if let PersistenceAction::SaveBlocks(saved_blocks, _) = received_action {
if let PersistenceAction::SaveBlocks(batch, _) = received_action {
// all blocks are included in the batch
assert_eq!(batch.blocks.len(), blocks.len());
assert_eq!(batch.blocks, blocks);
// only blocks.len() - tree_config.memory_block_buffer_target() will be
// persisted
let expected_persist_len = blocks.len() - tree_config.memory_block_buffer_target() as usize;
assert_eq!(saved_blocks.len(), expected_persist_len);
assert_eq!(saved_blocks, blocks[..expected_persist_len]);
assert_eq!(batch.persist_count, expected_persist_len);
} else {
panic!("unexpected action received {received_action:?}");
}
@@ -818,10 +820,12 @@ async fn test_tree_state_on_new_head_reorg() {
// get rid of the prev action
let received_action = test_harness.action_rx.recv().unwrap();
let PersistenceAction::SaveBlocks(saved_blocks, sender) = received_action else {
let PersistenceAction::SaveBlocks(batch, sender) = received_action else {
panic!("received wrong action");
};
assert_eq!(saved_blocks, vec![blocks[0].clone(), blocks[1].clone()]);
// all blocks above last_persisted are included, persist_count covers those up to the target
assert_eq!(batch.persist_count, 2);
assert_eq!(&batch.blocks[..batch.persist_count], &[blocks[0].clone(), blocks[1].clone()]);
// send the response so we can advance again
sender
@@ -979,8 +983,10 @@ async fn test_get_canonical_blocks_to_persist() {
.try_into()
.unwrap();
assert_eq!(blocks_to_persist.len(), expected_blocks_to_persist_length);
for (i, item) in blocks_to_persist.iter().enumerate().take(expected_blocks_to_persist_length) {
assert_eq!(blocks_to_persist.persist_count, expected_blocks_to_persist_length);
for (i, item) in
blocks_to_persist.blocks.iter().enumerate().take(expected_blocks_to_persist_length)
{
assert_eq!(item.recovered_block().number, last_persisted_block_number + i as u64 + 1);
}
@@ -993,13 +999,16 @@ async fn test_get_canonical_blocks_to_persist() {
let blocks_to_persist =
test_harness.tree.get_canonical_blocks_to_persist(PersistTarget::Threshold).unwrap();
assert_eq!(blocks_to_persist.len(), expected_blocks_to_persist_length);
assert_eq!(blocks_to_persist.persist_count, expected_blocks_to_persist_length);
// check that the fork block is not included in the blocks to persist
assert!(!blocks_to_persist.iter().any(|b| b.recovered_block().hash() == fork_block_hash));
assert!(!blocks_to_persist
.blocks
.iter()
.any(|b| b.recovered_block().hash() == fork_block_hash));
// check that the original block 4 is still included
assert!(blocks_to_persist.iter().any(|b| b.recovered_block().number == 4 &&
assert!(blocks_to_persist.blocks.iter().any(|b| b.recovered_block().number == 4 &&
b.recovered_block().hash() == blocks[4].recovered_block().hash()));
// check that if we advance persistence, the persistence action is the correct value
@@ -1007,7 +1016,11 @@ async fn test_get_canonical_blocks_to_persist() {
assert_eq!(
test_harness.tree.persistence_state.current_action().cloned(),
Some(CurrentPersistenceAction::SavingBlocks {
highest: blocks_to_persist.last().unwrap().recovered_block().num_hash()
highest: blocks_to_persist.blocks[..blocks_to_persist.persist_count]
.last()
.unwrap()
.recovered_block()
.num_hash()
})
);
}
@@ -2106,15 +2119,16 @@ mod forkchoice_updated_tests {
break;
}
if let Ok(PersistenceAction::SaveBlocks(saved_blocks, sender)) =
if let Ok(PersistenceAction::SaveBlocks(batch, sender)) =
action_rx.recv_timeout(std::time::Duration::from_millis(100))
{
if let Some(last) = saved_blocks.last() {
let persisted = &batch.blocks[..batch.persist_count];
if let Some(last) = persisted.last() {
last_persisted_number = last.recovered_block().number;
}
sender
.send(PersistenceResult {
last_block: saved_blocks.last().map(|b| b.recovered_block().num_hash()),
last_block: persisted.last().map(|b| b.recovered_block().num_hash()),
commit_duration: Some(Duration::ZERO),
})
.unwrap();

View File

@@ -1007,7 +1007,9 @@ mod tests {
// Push to disk
let provider_rw = hook_provider.database_provider_rw().unwrap();
provider_rw.save_blocks(vec![lowest_memory_block], SaveBlocksMode::Full).unwrap();
provider_rw
.save_blocks(vec![lowest_memory_block], SaveBlocksMode::Full, &[])
.unwrap();
provider_rw.commit().unwrap();
// Remove from memory

View File

@@ -27,7 +27,7 @@ use alloy_consensus::{
use alloy_eips::BlockHashOrNumber;
use alloy_primitives::{
keccak256,
map::{hash_map, AddressSet, B256Map, HashMap},
map::{hash_map, AddressSet, B256Map, B256Set, HashMap, HashSet},
Address, BlockHash, BlockNumber, TxHash, TxNumber, B256,
};
use itertools::Itertools;
@@ -67,7 +67,7 @@ use reth_storage_api::{
use reth_storage_errors::provider::{ProviderResult, StaticFileWriterError};
use reth_trie::{
updates::{StorageTrieUpdatesSorted, TrieUpdatesSorted},
HashedPostStateSorted,
HashedPostStateSorted, Nibbles,
};
use reth_trie_db::{ChangesetCache, DatabaseStorageTrieCursor, TrieTableAdapter};
use revm_database::states::{
@@ -559,6 +559,10 @@ impl<TX: DbTx + DbTxMut + 'static, N: NodeTypesForProvider> DatabaseProvider<TX,
/// The SF thread writes headers, transactions, senders (if SF), and receipts (if SF, Full mode
/// only). The main thread writes MDBX data (indices, state, trie - Full mode only).
///
/// When `buffer_blocks` is non-empty, hashed state and trie updates for keys that also
/// appear in the buffer blocks are skipped. Those keys will be overwritten when the buffer
/// blocks are persisted in a future cycle, so writing them now is wasted I/O.
///
/// Use [`SaveBlocksMode::Full`] for production (includes receipts, state, trie).
/// Use [`SaveBlocksMode::BlocksOnly`] for block structure only (used by `insert_block`).
#[instrument(level = "debug", target = "providers::db", skip_all, fields(block_count = blocks.len()))]
@@ -566,6 +570,7 @@ impl<TX: DbTx + DbTxMut + 'static, N: NodeTypesForProvider> DatabaseProvider<TX,
&self,
blocks: Vec<ExecutedBlock<N::Primitives>>,
save_mode: SaveBlocksMode,
buffer_blocks: &[ExecutedBlock<N::Primitives>],
) -> ProviderResult<()> {
if blocks.is_empty() {
debug!(target: "providers::db", "Attempted to write empty block range");
@@ -710,19 +715,42 @@ impl<TX: DbTx + DbTxMut + 'static, N: NodeTypesForProvider> DatabaseProvider<TX,
// Write all hashed state and trie updates in single batches.
// This reduces cursor open/close overhead from N calls to 1.
if save_mode.with_state() {
// Collect overwrite keys from buffer blocks so we can skip writing
// hashed state / trie entries that will be overwritten when those
// blocks are persisted in a future cycle.
let (skip_accounts, skip_storages, skip_account_nodes, skip_storage_nodes) =
Self::collect_buffer_overwrite_keys(buffer_blocks);
// Blocks are oldest-to-newest, merge_batch expects newest-to-oldest.
let start = Instant::now();
let merged_hashed_state = HashedPostStateSorted::merge_batch(
let merged: Arc<HashedPostStateSorted> = HashedPostStateSorted::merge_batch(
blocks.iter().rev().map(|b| b.trie_data().hashed_state),
);
let mut merged_hashed_state =
Arc::try_unwrap(merged).unwrap_or_else(|arc| (*arc).clone());
if !skip_accounts.is_empty() || !skip_storages.is_empty() {
Self::filter_hashed_state(
&mut merged_hashed_state,
&skip_accounts,
&skip_storages,
);
}
if !merged_hashed_state.is_empty() {
self.write_hashed_state(&merged_hashed_state)?;
}
timings.write_hashed_state += start.elapsed();
let start = Instant::now();
let merged_trie =
let merged: Arc<TrieUpdatesSorted> =
TrieUpdatesSorted::merge_batch(blocks.iter().rev().map(|b| b.trie_updates()));
let mut merged_trie = Arc::try_unwrap(merged).unwrap_or_else(|arc| (*arc).clone());
if !skip_account_nodes.is_empty() || !skip_storage_nodes.is_empty() {
Self::filter_trie_updates(
&mut merged_trie,
&skip_account_nodes,
&skip_storage_nodes,
);
}
if !merged_trie.is_empty() {
self.write_trie_updates_sorted(&merged_trie)?;
}
@@ -765,6 +793,87 @@ impl<TX: DbTx + DbTxMut + 'static, N: NodeTypesForProvider> DatabaseProvider<TX,
Ok(())
}
/// Collects the set of hashed-state and trie-update keys touched by `buffer_blocks`.
///
/// Returns `(skip_accounts, skip_storages, skip_account_nodes, skip_storage_nodes)`:
/// - `skip_accounts`: hashed addresses whose account entry will be overwritten
/// - `skip_storages`: per-address set of hashed slots that will be overwritten
/// - `skip_account_nodes`: account-trie nibble paths that will be overwritten
/// - `skip_storage_nodes`: per-address set of storage-trie nibble paths that will be
/// overwritten
fn collect_buffer_overwrite_keys(
buffer_blocks: &[ExecutedBlock<N::Primitives>],
) -> (B256Set, B256Map<B256Set>, HashSet<Nibbles>, B256Map<HashSet<Nibbles>>) {
let mut skip_accounts = B256Set::default();
let mut skip_storages: B256Map<B256Set> = B256Map::default();
let mut skip_account_nodes: HashSet<Nibbles> = HashSet::default();
let mut skip_storage_nodes: B256Map<HashSet<Nibbles>> = B256Map::default();
for block in buffer_blocks {
let hashed_state = block.hashed_state();
for (hashed_address, _) in hashed_state.accounts() {
skip_accounts.insert(*hashed_address);
}
for (hashed_address, storage) in hashed_state.account_storages() {
let slots = skip_storages.entry(*hashed_address).or_default();
for (hashed_slot, _) in storage.storage_slots_ref() {
slots.insert(*hashed_slot);
}
}
let trie = block.trie_updates();
for (nibbles, _) in trie.account_nodes_ref() {
skip_account_nodes.insert(nibbles.clone());
}
for (hashed_address, storage_trie) in trie.storage_tries_ref() {
let nodes = skip_storage_nodes.entry(*hashed_address).or_default();
for (nibbles, _) in storage_trie.storage_nodes_ref() {
nodes.insert(nibbles.clone());
}
}
}
(skip_accounts, skip_storages, skip_account_nodes, skip_storage_nodes)
}
/// Removes entries from `hashed_state` whose keys appear in the skip sets.
fn filter_hashed_state(
hashed_state: &mut HashedPostStateSorted,
skip_accounts: &B256Set,
skip_storages: &B256Map<B256Set>,
) {
hashed_state.accounts.retain(|(addr, _)| !skip_accounts.contains(addr));
for (addr, skip_slots) in skip_storages {
if let Some(storage) = hashed_state.storages.get_mut(addr) {
storage.storage_slots.retain(|(slot, _)| !skip_slots.contains(slot));
}
}
// Remove empty storage entries left after filtering.
hashed_state.storages.retain(|_, storage| !storage.is_empty() || storage.is_wiped());
}
/// Removes entries from `trie_updates` whose paths appear in the skip sets.
fn filter_trie_updates(
trie_updates: &mut TrieUpdatesSorted,
skip_account_nodes: &HashSet<Nibbles>,
skip_storage_nodes: &B256Map<HashSet<Nibbles>>,
) {
trie_updates
.account_nodes_mut()
.retain(|(nibbles, _)| !skip_account_nodes.contains(nibbles));
for (addr, skip_nodes) in skip_storage_nodes {
if let Some(storage_trie) = trie_updates.storage_tries_mut().get_mut(addr) {
storage_trie.storage_nodes.retain(|(nibbles, _)| !skip_nodes.contains(nibbles));
}
}
// Remove empty storage tries left after filtering.
trie_updates
.storage_tries_mut()
.retain(|_, storage_trie| !storage_trie.is_empty() || storage_trie.is_deleted());
}
/// Writes MDBX-only data for a block (indices, lookups, and senders if configured for MDBX).
///
/// SF data (headers, transactions, senders if SF, receipts if SF) must be written separately.
@@ -3490,7 +3599,7 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> BlockWriter
);
// Delegate to save_blocks with BlocksOnly mode (skips receipts/state/trie)
self.save_blocks(vec![executed_block], SaveBlocksMode::BlocksOnly)?;
self.save_blocks(vec![executed_block], SaveBlocksMode::BlocksOnly, &[])?;
// Return the body indices
self.block_body_indices(block_number)?
@@ -4997,7 +5106,7 @@ mod tests {
ComputedTrieData::default(),
);
let provider_rw = factory.provider_rw().unwrap();
provider_rw.save_blocks(vec![genesis_executed], SaveBlocksMode::Full).unwrap();
provider_rw.save_blocks(vec![genesis_executed], SaveBlocksMode::Full, &[]).unwrap();
provider_rw.commit().unwrap();
let mut blocks: Vec<ExecutedBlock> = Vec::new();
@@ -5069,7 +5178,7 @@ mod tests {
}
let provider_rw = factory.provider_rw().unwrap();
provider_rw.save_blocks(blocks, SaveBlocksMode::Full).unwrap();
provider_rw.save_blocks(blocks, SaveBlocksMode::Full, &[]).unwrap();
provider_rw.commit().unwrap();
let provider = factory.provider().unwrap();

View File

@@ -589,11 +589,21 @@ impl TrieUpdatesSorted {
&self.account_nodes
}
/// Returns mutable reference to updated account nodes.
pub fn account_nodes_mut(&mut self) -> &mut Vec<(Nibbles, Option<BranchNodeCompact>)> {
&mut self.account_nodes
}
/// Returns reference to updated storage tries.
pub const fn storage_tries_ref(&self) -> &B256Map<StorageTrieUpdatesSorted> {
&self.storage_tries
}
/// Returns mutable reference to updated storage tries.
pub fn storage_tries_mut(&mut self) -> &mut B256Map<StorageTrieUpdatesSorted> {
&mut self.storage_tries
}
/// Returns the total number of updates including account nodes and all storage updates.
pub fn total_len(&self) -> usize {
self.account_nodes.len() +

View File

@@ -926,19 +926,19 @@ Engine:
To persist blocks as fast as the node receives them, set this value to zero. This will cause more frequent DB writes.
[default: 2]
[default: 12]
--engine.persistence-backpressure-threshold <PERSISTENCE_BACKPRESSURE_THRESHOLD>
Configure the maximum canonical-minus-persisted gap before engine API processing stalls.
This value must be greater than `--engine.persistence-threshold`.
[default: 16]
[default: 44]
--engine.memory-block-buffer-target <MEMORY_BLOCK_BUFFER_TARGET>
Configure the target number of blocks to keep in memory
[default: 0]
[default: 10]
--engine.legacy-state-root
Enable legacy state root