mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-04-30 03:01:58 -04:00
Compare commits
4 Commits
devnet4
...
mediocrego
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e9cdd1fbf8 | ||
|
|
4ebc6a18c6 | ||
|
|
7fa5608b45 | ||
|
|
ac6106e6f3 |
@@ -4,13 +4,13 @@ use alloy_eips::merge::EPOCH_SLOTS;
|
||||
use core::time::Duration;
|
||||
|
||||
/// Triggers persistence when the number of canonical blocks in memory exceeds this threshold.
|
||||
pub const DEFAULT_PERSISTENCE_THRESHOLD: u64 = 2;
|
||||
pub const DEFAULT_PERSISTENCE_THRESHOLD: u64 = 12;
|
||||
|
||||
/// Maximum canonical-minus-persisted gap before engine API processing is stalled.
|
||||
pub const DEFAULT_PERSISTENCE_BACKPRESSURE_THRESHOLD: u64 = 16;
|
||||
pub const DEFAULT_PERSISTENCE_BACKPRESSURE_THRESHOLD: u64 = 44;
|
||||
|
||||
/// How close to the canonical head we persist blocks.
|
||||
pub const DEFAULT_MEMORY_BLOCK_BUFFER_TARGET: u64 = 0;
|
||||
pub const DEFAULT_MEMORY_BLOCK_BUFFER_TARGET: u64 = 10;
|
||||
|
||||
/// The size of proof targets chunk to spawn in one multiproof calculation.
|
||||
pub const DEFAULT_MULTIPROOF_TASK_CHUNK_SIZE: usize = 5;
|
||||
|
||||
@@ -102,8 +102,8 @@ where
|
||||
self.sync_metrics_tx.send(MetricEvent::SyncHeight { height: new_tip_num });
|
||||
let _ = sender.send(PersistenceResult { last_block, commit_duration: None });
|
||||
}
|
||||
PersistenceAction::SaveBlocks(blocks, sender) => {
|
||||
let result = self.on_save_blocks(blocks)?;
|
||||
PersistenceAction::SaveBlocks(batch, sender) => {
|
||||
let result = self.on_save_blocks(batch)?;
|
||||
let result_number = result.last_block.map(|b| b.number);
|
||||
|
||||
let _ = sender.send(result);
|
||||
@@ -144,14 +144,18 @@ where
|
||||
Ok(new_tip_hash.map(|hash| BlockNumHash { hash, number: new_tip_num }))
|
||||
}
|
||||
|
||||
#[instrument(level = "debug", target = "engine::persistence", skip_all, fields(block_count = blocks.len()))]
|
||||
#[instrument(level = "debug", target = "engine::persistence", skip_all, fields(block_count = batch.blocks.len()))]
|
||||
fn on_save_blocks(
|
||||
&mut self,
|
||||
blocks: Vec<ExecutedBlock<N::Primitives>>,
|
||||
batch: SaveBlocksBatch<N::Primitives>,
|
||||
) -> Result<PersistenceResult, PersistenceError> {
|
||||
let first_block = blocks.first().map(|b| b.recovered_block.num_hash());
|
||||
let last_block = blocks.last().map(|b| b.recovered_block.num_hash());
|
||||
let block_count = blocks.len();
|
||||
let SaveBlocksBatch { blocks, persist_count } = batch;
|
||||
let (blocks_to_persist, buffer_blocks) = blocks.split_at(persist_count);
|
||||
let blocks_to_persist = blocks_to_persist.to_vec();
|
||||
|
||||
let first_block = blocks_to_persist.first().map(|b| b.recovered_block.num_hash());
|
||||
let last_block = blocks_to_persist.last().map(|b| b.recovered_block.num_hash());
|
||||
let block_count = blocks_to_persist.len();
|
||||
|
||||
let pending_finalized = self.pending_finalized_block.take();
|
||||
let pending_safe = self.pending_safe_block.take();
|
||||
@@ -162,7 +166,7 @@ where
|
||||
|
||||
if let Some(last) = last_block {
|
||||
let provider_rw = self.provider.database_provider_rw()?;
|
||||
provider_rw.save_blocks(blocks, SaveBlocksMode::Full)?;
|
||||
provider_rw.save_blocks(blocks_to_persist, SaveBlocksMode::Full, buffer_blocks)?;
|
||||
|
||||
if let Some(finalized) = pending_finalized {
|
||||
provider_rw.save_finalized_block_number(finalized.min(last.number))?;
|
||||
@@ -216,6 +220,28 @@ pub enum PersistenceError {
|
||||
ProviderError(#[from] ProviderError),
|
||||
}
|
||||
|
||||
/// A batch of blocks passed to the persistence service.
|
||||
///
|
||||
/// Contains all canonical blocks collected during a persistence cycle. Only the first
|
||||
/// `persist_count` blocks are written to disk; the remaining blocks are included for
|
||||
/// downstream consumers but are not persisted yet.
|
||||
#[derive(Debug, Clone, PartialEq)]
|
||||
pub struct SaveBlocksBatch<N: NodePrimitives = EthPrimitives> {
|
||||
/// All blocks in the batch, ordered oldest to newest.
|
||||
pub blocks: Vec<ExecutedBlock<N>>,
|
||||
/// Number of leading blocks to actually persist. The blocks at indices
|
||||
/// `[persist_count..]` are passed through but not written to storage.
|
||||
pub persist_count: usize,
|
||||
}
|
||||
|
||||
impl<N: NodePrimitives> SaveBlocksBatch<N> {
|
||||
/// Creates a new batch where all blocks should be persisted.
|
||||
pub fn persist_all(blocks: Vec<ExecutedBlock<N>>) -> Self {
|
||||
let persist_count = blocks.len();
|
||||
Self { blocks, persist_count }
|
||||
}
|
||||
}
|
||||
|
||||
/// A signal to the persistence service that part of the tree state can be persisted.
|
||||
#[derive(Debug)]
|
||||
pub enum PersistenceAction<N: NodePrimitives = EthPrimitives> {
|
||||
@@ -224,7 +250,7 @@ pub enum PersistenceAction<N: NodePrimitives = EthPrimitives> {
|
||||
///
|
||||
/// First, header, transaction, and receipt-related data should be written to static files.
|
||||
/// Then the execution history-related data will be written to the database.
|
||||
SaveBlocks(Vec<ExecutedBlock<N>>, CrossbeamSender<PersistenceResult>),
|
||||
SaveBlocks(SaveBlocksBatch<N>, CrossbeamSender<PersistenceResult>),
|
||||
|
||||
/// Removes block data above the given block number from the database.
|
||||
///
|
||||
@@ -308,10 +334,10 @@ impl<T: NodePrimitives> PersistenceHandle<T> {
|
||||
/// If there are no blocks to persist, then `None` is sent in the sender.
|
||||
pub fn save_blocks(
|
||||
&self,
|
||||
blocks: Vec<ExecutedBlock<T>>,
|
||||
batch: SaveBlocksBatch<T>,
|
||||
tx: CrossbeamSender<PersistenceResult>,
|
||||
) -> Result<(), SendError<PersistenceAction<T>>> {
|
||||
self.send_action(PersistenceAction::SaveBlocks(blocks, tx))
|
||||
self.send_action(PersistenceAction::SaveBlocks(batch, tx))
|
||||
}
|
||||
|
||||
/// Queues the finalized block number to be persisted on disk.
|
||||
@@ -407,7 +433,7 @@ mod tests {
|
||||
let blocks = vec![];
|
||||
let (tx, rx) = crossbeam_channel::bounded(1);
|
||||
|
||||
handle.save_blocks(blocks, tx).unwrap();
|
||||
handle.save_blocks(SaveBlocksBatch::persist_all(blocks), tx).unwrap();
|
||||
|
||||
let result = rx.recv().unwrap();
|
||||
assert!(result.last_block.is_none());
|
||||
@@ -426,7 +452,7 @@ mod tests {
|
||||
let blocks = vec![executed];
|
||||
let (tx, rx) = crossbeam_channel::bounded(1);
|
||||
|
||||
handle.save_blocks(blocks, tx).unwrap();
|
||||
handle.save_blocks(SaveBlocksBatch::persist_all(blocks), tx).unwrap();
|
||||
|
||||
let result = rx.recv_timeout(std::time::Duration::from_secs(10)).expect("test timed out");
|
||||
|
||||
@@ -443,7 +469,7 @@ mod tests {
|
||||
let last_hash = blocks.last().unwrap().recovered_block().hash();
|
||||
let (tx, rx) = crossbeam_channel::bounded(1);
|
||||
|
||||
handle.save_blocks(blocks, tx).unwrap();
|
||||
handle.save_blocks(SaveBlocksBatch::persist_all(blocks), tx).unwrap();
|
||||
let result = rx.recv().unwrap();
|
||||
assert_eq!(last_hash, result.last_block.unwrap().hash);
|
||||
}
|
||||
@@ -460,7 +486,7 @@ mod tests {
|
||||
let last_hash = blocks.last().unwrap().recovered_block().hash();
|
||||
let (tx, rx) = crossbeam_channel::bounded(1);
|
||||
|
||||
handle.save_blocks(blocks, tx).unwrap();
|
||||
handle.save_blocks(SaveBlocksBatch::persist_all(blocks), tx).unwrap();
|
||||
|
||||
let result = rx.recv().unwrap();
|
||||
assert_eq!(last_hash, result.last_block.unwrap().hash);
|
||||
@@ -555,7 +581,7 @@ mod tests {
|
||||
|
||||
{
|
||||
let provider_rw = provider_factory.database_provider_rw().unwrap();
|
||||
provider_rw.save_blocks(blocks_a, SaveBlocksMode::Full).unwrap();
|
||||
provider_rw.save_blocks(blocks_a, SaveBlocksMode::Full, &[]).unwrap();
|
||||
provider_rw.commit().unwrap();
|
||||
}
|
||||
|
||||
@@ -612,7 +638,7 @@ mod tests {
|
||||
provider_rw.commit().unwrap();
|
||||
|
||||
let provider_rw = pf.database_provider_rw().unwrap();
|
||||
provider_rw.save_blocks(vec![block_b2], SaveBlocksMode::Full).unwrap();
|
||||
provider_rw.save_blocks(vec![block_b2], SaveBlocksMode::Full, &[]).unwrap();
|
||||
provider_rw.commit().unwrap();
|
||||
});
|
||||
|
||||
|
||||
@@ -2,7 +2,7 @@ use crate::{
|
||||
backfill::{BackfillAction, BackfillSyncState},
|
||||
chain::FromOrchestrator,
|
||||
engine::{DownloadRequest, EngineApiEvent, EngineApiKind, EngineApiRequest, FromEngine},
|
||||
persistence::PersistenceHandle,
|
||||
persistence::{PersistenceHandle, SaveBlocksBatch},
|
||||
tree::{error::InsertPayloadError, payload_validator::TreeCtx},
|
||||
};
|
||||
use alloy_consensus::BlockHeader;
|
||||
@@ -1349,22 +1349,25 @@ where
|
||||
|
||||
/// Helper method to save blocks and set the persistence state. This ensures we keep track of
|
||||
/// the current persistence action while we're saving blocks.
|
||||
fn persist_blocks(&mut self, blocks_to_persist: Vec<ExecutedBlock<N>>) {
|
||||
if blocks_to_persist.is_empty() {
|
||||
fn persist_blocks(&mut self, batch: SaveBlocksBatch<N>) {
|
||||
if batch.blocks.is_empty() {
|
||||
debug!(target: "engine::tree", "Returned empty set of blocks to persist");
|
||||
return
|
||||
}
|
||||
|
||||
// NOTE: checked non-empty above
|
||||
let highest_num_hash = blocks_to_persist
|
||||
// The highest block to be persisted determines the persistence state tracking.
|
||||
// Use persist_count to find the highest block that will actually be written to disk.
|
||||
let highest_num_hash = batch
|
||||
.blocks
|
||||
.iter()
|
||||
.take(batch.persist_count)
|
||||
.max_by_key(|block| block.recovered_block().number())
|
||||
.map(|b| b.recovered_block().num_hash())
|
||||
.expect("Checked non-empty persisting blocks");
|
||||
|
||||
debug!(target: "engine::tree", count=blocks_to_persist.len(), blocks = ?blocks_to_persist.iter().map(|block| block.recovered_block().num_hash()).collect::<Vec<_>>(), "Persisting blocks");
|
||||
debug!(target: "engine::tree", count=batch.blocks.len(), persist_count=batch.persist_count, blocks = ?batch.blocks.iter().map(|block| block.recovered_block().num_hash()).collect::<Vec<_>>(), "Persisting blocks");
|
||||
let (tx, rx) = crossbeam_channel::bounded(1);
|
||||
let _ = self.persistence.save_blocks(blocks_to_persist, tx);
|
||||
let _ = self.persistence.save_blocks(batch, tx);
|
||||
|
||||
self.persistence_state.start_save(highest_num_hash, rx);
|
||||
}
|
||||
@@ -1378,9 +1381,8 @@ where
|
||||
if let Some(new_tip_num) = self.find_disk_reorg()? {
|
||||
self.remove_blocks(new_tip_num)
|
||||
} else if self.should_persist() {
|
||||
let blocks_to_persist =
|
||||
self.get_canonical_blocks_to_persist(PersistTarget::Threshold)?;
|
||||
self.persist_blocks(blocks_to_persist);
|
||||
let batch = self.get_canonical_blocks_to_persist(PersistTarget::Threshold)?;
|
||||
self.persist_blocks(batch);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1411,15 +1413,15 @@ where
|
||||
self.on_persistence_complete(result, start_time)?;
|
||||
}
|
||||
|
||||
let blocks_to_persist = self.get_canonical_blocks_to_persist(PersistTarget::Head)?;
|
||||
let batch = self.get_canonical_blocks_to_persist(PersistTarget::Head)?;
|
||||
|
||||
if blocks_to_persist.is_empty() {
|
||||
if batch.blocks.is_empty() {
|
||||
debug!(target: "engine::tree", "persistence complete, signaling termination");
|
||||
return Ok(())
|
||||
}
|
||||
|
||||
debug!(target: "engine::tree", count = blocks_to_persist.len(), "persisting remaining blocks before shutdown");
|
||||
self.persist_blocks(blocks_to_persist);
|
||||
debug!(target: "engine::tree", count = batch.blocks.len(), "persisting remaining blocks before shutdown");
|
||||
self.persist_blocks(batch);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2018,16 +2020,20 @@ where
|
||||
}
|
||||
|
||||
/// Returns a batch of consecutive canonical blocks to persist in the range
|
||||
/// `(last_persisted_number .. target]`. The expected order is oldest -> newest.
|
||||
/// `(last_persisted_number .. canonical_head]`. The expected order is oldest -> newest.
|
||||
///
|
||||
/// All blocks above `last_persisted_number` are included in the batch, but only
|
||||
/// those up to the persistence target (determined by [`PersistTarget`]) are marked
|
||||
/// for actual persistence via [`SaveBlocksBatch::persist_count`].
|
||||
fn get_canonical_blocks_to_persist(
|
||||
&self,
|
||||
target: PersistTarget,
|
||||
) -> Result<Vec<ExecutedBlock<N>>, AdvancePersistenceError> {
|
||||
) -> Result<SaveBlocksBatch<N>, AdvancePersistenceError> {
|
||||
// We will calculate the state root using the database, so we need to be sure there are no
|
||||
// changes
|
||||
debug_assert!(!self.persistence_state.in_progress());
|
||||
|
||||
let mut blocks_to_persist = Vec::new();
|
||||
let mut all_blocks = Vec::new();
|
||||
let mut current_hash = self.state.tree_state.canonical_block_hash();
|
||||
let last_persisted_number = self.persistence_state.last_persisted_block.number;
|
||||
let canonical_head_number = self.state.tree_state.canonical_block_number();
|
||||
@@ -2052,17 +2058,18 @@ where
|
||||
break;
|
||||
}
|
||||
|
||||
if block.recovered_block().number() <= target_number {
|
||||
blocks_to_persist.push(block.clone());
|
||||
}
|
||||
|
||||
all_blocks.push(block.clone());
|
||||
current_hash = block.recovered_block().parent_hash();
|
||||
}
|
||||
|
||||
// Reverse the order so that the oldest block comes first
|
||||
blocks_to_persist.reverse();
|
||||
all_blocks.reverse();
|
||||
|
||||
Ok(blocks_to_persist)
|
||||
// Only blocks up to target_number are persisted; the rest are buffer blocks
|
||||
let persist_count =
|
||||
all_blocks.iter().filter(|b| b.recovered_block().number() <= target_number).count();
|
||||
|
||||
Ok(SaveBlocksBatch { blocks: all_blocks, persist_count })
|
||||
}
|
||||
|
||||
/// This clears the blocks from the in-memory tree state that have been persisted to the
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
use super::*;
|
||||
use crate::{
|
||||
persistence::PersistenceAction,
|
||||
persistence::{PersistenceAction, SaveBlocksBatch},
|
||||
tree::{
|
||||
payload_validator::{BasicEngineValidator, TreeCtx, ValidationOutcome},
|
||||
persistence_state::CurrentPersistenceAction,
|
||||
@@ -548,12 +548,14 @@ async fn test_tree_persist_blocks() {
|
||||
|
||||
let received_action =
|
||||
test_harness.action_rx.recv().expect("Failed to receive save blocks action");
|
||||
if let PersistenceAction::SaveBlocks(saved_blocks, _) = received_action {
|
||||
if let PersistenceAction::SaveBlocks(batch, _) = received_action {
|
||||
// all blocks are included in the batch
|
||||
assert_eq!(batch.blocks.len(), blocks.len());
|
||||
assert_eq!(batch.blocks, blocks);
|
||||
// only blocks.len() - tree_config.memory_block_buffer_target() will be
|
||||
// persisted
|
||||
let expected_persist_len = blocks.len() - tree_config.memory_block_buffer_target() as usize;
|
||||
assert_eq!(saved_blocks.len(), expected_persist_len);
|
||||
assert_eq!(saved_blocks, blocks[..expected_persist_len]);
|
||||
assert_eq!(batch.persist_count, expected_persist_len);
|
||||
} else {
|
||||
panic!("unexpected action received {received_action:?}");
|
||||
}
|
||||
@@ -818,10 +820,12 @@ async fn test_tree_state_on_new_head_reorg() {
|
||||
|
||||
// get rid of the prev action
|
||||
let received_action = test_harness.action_rx.recv().unwrap();
|
||||
let PersistenceAction::SaveBlocks(saved_blocks, sender) = received_action else {
|
||||
let PersistenceAction::SaveBlocks(batch, sender) = received_action else {
|
||||
panic!("received wrong action");
|
||||
};
|
||||
assert_eq!(saved_blocks, vec![blocks[0].clone(), blocks[1].clone()]);
|
||||
// all blocks above last_persisted are included, persist_count covers those up to the target
|
||||
assert_eq!(batch.persist_count, 2);
|
||||
assert_eq!(&batch.blocks[..batch.persist_count], &[blocks[0].clone(), blocks[1].clone()]);
|
||||
|
||||
// send the response so we can advance again
|
||||
sender
|
||||
@@ -979,8 +983,10 @@ async fn test_get_canonical_blocks_to_persist() {
|
||||
.try_into()
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(blocks_to_persist.len(), expected_blocks_to_persist_length);
|
||||
for (i, item) in blocks_to_persist.iter().enumerate().take(expected_blocks_to_persist_length) {
|
||||
assert_eq!(blocks_to_persist.persist_count, expected_blocks_to_persist_length);
|
||||
for (i, item) in
|
||||
blocks_to_persist.blocks.iter().enumerate().take(expected_blocks_to_persist_length)
|
||||
{
|
||||
assert_eq!(item.recovered_block().number, last_persisted_block_number + i as u64 + 1);
|
||||
}
|
||||
|
||||
@@ -993,13 +999,16 @@ async fn test_get_canonical_blocks_to_persist() {
|
||||
|
||||
let blocks_to_persist =
|
||||
test_harness.tree.get_canonical_blocks_to_persist(PersistTarget::Threshold).unwrap();
|
||||
assert_eq!(blocks_to_persist.len(), expected_blocks_to_persist_length);
|
||||
assert_eq!(blocks_to_persist.persist_count, expected_blocks_to_persist_length);
|
||||
|
||||
// check that the fork block is not included in the blocks to persist
|
||||
assert!(!blocks_to_persist.iter().any(|b| b.recovered_block().hash() == fork_block_hash));
|
||||
assert!(!blocks_to_persist
|
||||
.blocks
|
||||
.iter()
|
||||
.any(|b| b.recovered_block().hash() == fork_block_hash));
|
||||
|
||||
// check that the original block 4 is still included
|
||||
assert!(blocks_to_persist.iter().any(|b| b.recovered_block().number == 4 &&
|
||||
assert!(blocks_to_persist.blocks.iter().any(|b| b.recovered_block().number == 4 &&
|
||||
b.recovered_block().hash() == blocks[4].recovered_block().hash()));
|
||||
|
||||
// check that if we advance persistence, the persistence action is the correct value
|
||||
@@ -1007,7 +1016,11 @@ async fn test_get_canonical_blocks_to_persist() {
|
||||
assert_eq!(
|
||||
test_harness.tree.persistence_state.current_action().cloned(),
|
||||
Some(CurrentPersistenceAction::SavingBlocks {
|
||||
highest: blocks_to_persist.last().unwrap().recovered_block().num_hash()
|
||||
highest: blocks_to_persist.blocks[..blocks_to_persist.persist_count]
|
||||
.last()
|
||||
.unwrap()
|
||||
.recovered_block()
|
||||
.num_hash()
|
||||
})
|
||||
);
|
||||
}
|
||||
@@ -2106,15 +2119,16 @@ mod forkchoice_updated_tests {
|
||||
break;
|
||||
}
|
||||
|
||||
if let Ok(PersistenceAction::SaveBlocks(saved_blocks, sender)) =
|
||||
if let Ok(PersistenceAction::SaveBlocks(batch, sender)) =
|
||||
action_rx.recv_timeout(std::time::Duration::from_millis(100))
|
||||
{
|
||||
if let Some(last) = saved_blocks.last() {
|
||||
let persisted = &batch.blocks[..batch.persist_count];
|
||||
if let Some(last) = persisted.last() {
|
||||
last_persisted_number = last.recovered_block().number;
|
||||
}
|
||||
sender
|
||||
.send(PersistenceResult {
|
||||
last_block: saved_blocks.last().map(|b| b.recovered_block().num_hash()),
|
||||
last_block: persisted.last().map(|b| b.recovered_block().num_hash()),
|
||||
commit_duration: Some(Duration::ZERO),
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
@@ -1007,7 +1007,9 @@ mod tests {
|
||||
|
||||
// Push to disk
|
||||
let provider_rw = hook_provider.database_provider_rw().unwrap();
|
||||
provider_rw.save_blocks(vec![lowest_memory_block], SaveBlocksMode::Full).unwrap();
|
||||
provider_rw
|
||||
.save_blocks(vec![lowest_memory_block], SaveBlocksMode::Full, &[])
|
||||
.unwrap();
|
||||
provider_rw.commit().unwrap();
|
||||
|
||||
// Remove from memory
|
||||
|
||||
@@ -27,7 +27,7 @@ use alloy_consensus::{
|
||||
use alloy_eips::BlockHashOrNumber;
|
||||
use alloy_primitives::{
|
||||
keccak256,
|
||||
map::{hash_map, AddressSet, B256Map, HashMap},
|
||||
map::{hash_map, AddressSet, B256Map, B256Set, HashMap, HashSet},
|
||||
Address, BlockHash, BlockNumber, TxHash, TxNumber, B256,
|
||||
};
|
||||
use itertools::Itertools;
|
||||
@@ -67,7 +67,7 @@ use reth_storage_api::{
|
||||
use reth_storage_errors::provider::{ProviderResult, StaticFileWriterError};
|
||||
use reth_trie::{
|
||||
updates::{StorageTrieUpdatesSorted, TrieUpdatesSorted},
|
||||
HashedPostStateSorted,
|
||||
HashedPostStateSorted, Nibbles,
|
||||
};
|
||||
use reth_trie_db::{ChangesetCache, DatabaseStorageTrieCursor, TrieTableAdapter};
|
||||
use revm_database::states::{
|
||||
@@ -559,6 +559,10 @@ impl<TX: DbTx + DbTxMut + 'static, N: NodeTypesForProvider> DatabaseProvider<TX,
|
||||
/// The SF thread writes headers, transactions, senders (if SF), and receipts (if SF, Full mode
|
||||
/// only). The main thread writes MDBX data (indices, state, trie - Full mode only).
|
||||
///
|
||||
/// When `buffer_blocks` is non-empty, hashed state and trie updates for keys that also
|
||||
/// appear in the buffer blocks are skipped. Those keys will be overwritten when the buffer
|
||||
/// blocks are persisted in a future cycle, so writing them now is wasted I/O.
|
||||
///
|
||||
/// Use [`SaveBlocksMode::Full`] for production (includes receipts, state, trie).
|
||||
/// Use [`SaveBlocksMode::BlocksOnly`] for block structure only (used by `insert_block`).
|
||||
#[instrument(level = "debug", target = "providers::db", skip_all, fields(block_count = blocks.len()))]
|
||||
@@ -566,6 +570,7 @@ impl<TX: DbTx + DbTxMut + 'static, N: NodeTypesForProvider> DatabaseProvider<TX,
|
||||
&self,
|
||||
blocks: Vec<ExecutedBlock<N::Primitives>>,
|
||||
save_mode: SaveBlocksMode,
|
||||
buffer_blocks: &[ExecutedBlock<N::Primitives>],
|
||||
) -> ProviderResult<()> {
|
||||
if blocks.is_empty() {
|
||||
debug!(target: "providers::db", "Attempted to write empty block range");
|
||||
@@ -710,19 +715,42 @@ impl<TX: DbTx + DbTxMut + 'static, N: NodeTypesForProvider> DatabaseProvider<TX,
|
||||
// Write all hashed state and trie updates in single batches.
|
||||
// This reduces cursor open/close overhead from N calls to 1.
|
||||
if save_mode.with_state() {
|
||||
// Collect overwrite keys from buffer blocks so we can skip writing
|
||||
// hashed state / trie entries that will be overwritten when those
|
||||
// blocks are persisted in a future cycle.
|
||||
let (skip_accounts, skip_storages, skip_account_nodes, skip_storage_nodes) =
|
||||
Self::collect_buffer_overwrite_keys(buffer_blocks);
|
||||
|
||||
// Blocks are oldest-to-newest, merge_batch expects newest-to-oldest.
|
||||
let start = Instant::now();
|
||||
let merged_hashed_state = HashedPostStateSorted::merge_batch(
|
||||
let merged: Arc<HashedPostStateSorted> = HashedPostStateSorted::merge_batch(
|
||||
blocks.iter().rev().map(|b| b.trie_data().hashed_state),
|
||||
);
|
||||
let mut merged_hashed_state =
|
||||
Arc::try_unwrap(merged).unwrap_or_else(|arc| (*arc).clone());
|
||||
if !skip_accounts.is_empty() || !skip_storages.is_empty() {
|
||||
Self::filter_hashed_state(
|
||||
&mut merged_hashed_state,
|
||||
&skip_accounts,
|
||||
&skip_storages,
|
||||
);
|
||||
}
|
||||
if !merged_hashed_state.is_empty() {
|
||||
self.write_hashed_state(&merged_hashed_state)?;
|
||||
}
|
||||
timings.write_hashed_state += start.elapsed();
|
||||
|
||||
let start = Instant::now();
|
||||
let merged_trie =
|
||||
let merged: Arc<TrieUpdatesSorted> =
|
||||
TrieUpdatesSorted::merge_batch(blocks.iter().rev().map(|b| b.trie_updates()));
|
||||
let mut merged_trie = Arc::try_unwrap(merged).unwrap_or_else(|arc| (*arc).clone());
|
||||
if !skip_account_nodes.is_empty() || !skip_storage_nodes.is_empty() {
|
||||
Self::filter_trie_updates(
|
||||
&mut merged_trie,
|
||||
&skip_account_nodes,
|
||||
&skip_storage_nodes,
|
||||
);
|
||||
}
|
||||
if !merged_trie.is_empty() {
|
||||
self.write_trie_updates_sorted(&merged_trie)?;
|
||||
}
|
||||
@@ -765,6 +793,87 @@ impl<TX: DbTx + DbTxMut + 'static, N: NodeTypesForProvider> DatabaseProvider<TX,
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Collects the set of hashed-state and trie-update keys touched by `buffer_blocks`.
|
||||
///
|
||||
/// Returns `(skip_accounts, skip_storages, skip_account_nodes, skip_storage_nodes)`:
|
||||
/// - `skip_accounts`: hashed addresses whose account entry will be overwritten
|
||||
/// - `skip_storages`: per-address set of hashed slots that will be overwritten
|
||||
/// - `skip_account_nodes`: account-trie nibble paths that will be overwritten
|
||||
/// - `skip_storage_nodes`: per-address set of storage-trie nibble paths that will be
|
||||
/// overwritten
|
||||
fn collect_buffer_overwrite_keys(
|
||||
buffer_blocks: &[ExecutedBlock<N::Primitives>],
|
||||
) -> (B256Set, B256Map<B256Set>, HashSet<Nibbles>, B256Map<HashSet<Nibbles>>) {
|
||||
let mut skip_accounts = B256Set::default();
|
||||
let mut skip_storages: B256Map<B256Set> = B256Map::default();
|
||||
let mut skip_account_nodes: HashSet<Nibbles> = HashSet::default();
|
||||
let mut skip_storage_nodes: B256Map<HashSet<Nibbles>> = B256Map::default();
|
||||
|
||||
for block in buffer_blocks {
|
||||
let hashed_state = block.hashed_state();
|
||||
for (hashed_address, _) in hashed_state.accounts() {
|
||||
skip_accounts.insert(*hashed_address);
|
||||
}
|
||||
for (hashed_address, storage) in hashed_state.account_storages() {
|
||||
let slots = skip_storages.entry(*hashed_address).or_default();
|
||||
for (hashed_slot, _) in storage.storage_slots_ref() {
|
||||
slots.insert(*hashed_slot);
|
||||
}
|
||||
}
|
||||
|
||||
let trie = block.trie_updates();
|
||||
for (nibbles, _) in trie.account_nodes_ref() {
|
||||
skip_account_nodes.insert(nibbles.clone());
|
||||
}
|
||||
for (hashed_address, storage_trie) in trie.storage_tries_ref() {
|
||||
let nodes = skip_storage_nodes.entry(*hashed_address).or_default();
|
||||
for (nibbles, _) in storage_trie.storage_nodes_ref() {
|
||||
nodes.insert(nibbles.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
(skip_accounts, skip_storages, skip_account_nodes, skip_storage_nodes)
|
||||
}
|
||||
|
||||
/// Removes entries from `hashed_state` whose keys appear in the skip sets.
|
||||
fn filter_hashed_state(
|
||||
hashed_state: &mut HashedPostStateSorted,
|
||||
skip_accounts: &B256Set,
|
||||
skip_storages: &B256Map<B256Set>,
|
||||
) {
|
||||
hashed_state.accounts.retain(|(addr, _)| !skip_accounts.contains(addr));
|
||||
|
||||
for (addr, skip_slots) in skip_storages {
|
||||
if let Some(storage) = hashed_state.storages.get_mut(addr) {
|
||||
storage.storage_slots.retain(|(slot, _)| !skip_slots.contains(slot));
|
||||
}
|
||||
}
|
||||
// Remove empty storage entries left after filtering.
|
||||
hashed_state.storages.retain(|_, storage| !storage.is_empty() || storage.is_wiped());
|
||||
}
|
||||
|
||||
/// Removes entries from `trie_updates` whose paths appear in the skip sets.
|
||||
fn filter_trie_updates(
|
||||
trie_updates: &mut TrieUpdatesSorted,
|
||||
skip_account_nodes: &HashSet<Nibbles>,
|
||||
skip_storage_nodes: &B256Map<HashSet<Nibbles>>,
|
||||
) {
|
||||
trie_updates
|
||||
.account_nodes_mut()
|
||||
.retain(|(nibbles, _)| !skip_account_nodes.contains(nibbles));
|
||||
|
||||
for (addr, skip_nodes) in skip_storage_nodes {
|
||||
if let Some(storage_trie) = trie_updates.storage_tries_mut().get_mut(addr) {
|
||||
storage_trie.storage_nodes.retain(|(nibbles, _)| !skip_nodes.contains(nibbles));
|
||||
}
|
||||
}
|
||||
// Remove empty storage tries left after filtering.
|
||||
trie_updates
|
||||
.storage_tries_mut()
|
||||
.retain(|_, storage_trie| !storage_trie.is_empty() || storage_trie.is_deleted());
|
||||
}
|
||||
|
||||
/// Writes MDBX-only data for a block (indices, lookups, and senders if configured for MDBX).
|
||||
///
|
||||
/// SF data (headers, transactions, senders if SF, receipts if SF) must be written separately.
|
||||
@@ -3490,7 +3599,7 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> BlockWriter
|
||||
);
|
||||
|
||||
// Delegate to save_blocks with BlocksOnly mode (skips receipts/state/trie)
|
||||
self.save_blocks(vec![executed_block], SaveBlocksMode::BlocksOnly)?;
|
||||
self.save_blocks(vec![executed_block], SaveBlocksMode::BlocksOnly, &[])?;
|
||||
|
||||
// Return the body indices
|
||||
self.block_body_indices(block_number)?
|
||||
@@ -4997,7 +5106,7 @@ mod tests {
|
||||
ComputedTrieData::default(),
|
||||
);
|
||||
let provider_rw = factory.provider_rw().unwrap();
|
||||
provider_rw.save_blocks(vec![genesis_executed], SaveBlocksMode::Full).unwrap();
|
||||
provider_rw.save_blocks(vec![genesis_executed], SaveBlocksMode::Full, &[]).unwrap();
|
||||
provider_rw.commit().unwrap();
|
||||
|
||||
let mut blocks: Vec<ExecutedBlock> = Vec::new();
|
||||
@@ -5069,7 +5178,7 @@ mod tests {
|
||||
}
|
||||
|
||||
let provider_rw = factory.provider_rw().unwrap();
|
||||
provider_rw.save_blocks(blocks, SaveBlocksMode::Full).unwrap();
|
||||
provider_rw.save_blocks(blocks, SaveBlocksMode::Full, &[]).unwrap();
|
||||
provider_rw.commit().unwrap();
|
||||
|
||||
let provider = factory.provider().unwrap();
|
||||
|
||||
@@ -589,11 +589,21 @@ impl TrieUpdatesSorted {
|
||||
&self.account_nodes
|
||||
}
|
||||
|
||||
/// Returns mutable reference to updated account nodes.
|
||||
pub fn account_nodes_mut(&mut self) -> &mut Vec<(Nibbles, Option<BranchNodeCompact>)> {
|
||||
&mut self.account_nodes
|
||||
}
|
||||
|
||||
/// Returns reference to updated storage tries.
|
||||
pub const fn storage_tries_ref(&self) -> &B256Map<StorageTrieUpdatesSorted> {
|
||||
&self.storage_tries
|
||||
}
|
||||
|
||||
/// Returns mutable reference to updated storage tries.
|
||||
pub fn storage_tries_mut(&mut self) -> &mut B256Map<StorageTrieUpdatesSorted> {
|
||||
&mut self.storage_tries
|
||||
}
|
||||
|
||||
/// Returns the total number of updates including account nodes and all storage updates.
|
||||
pub fn total_len(&self) -> usize {
|
||||
self.account_nodes.len() +
|
||||
|
||||
@@ -926,19 +926,19 @@ Engine:
|
||||
|
||||
To persist blocks as fast as the node receives them, set this value to zero. This will cause more frequent DB writes.
|
||||
|
||||
[default: 2]
|
||||
[default: 12]
|
||||
|
||||
--engine.persistence-backpressure-threshold <PERSISTENCE_BACKPRESSURE_THRESHOLD>
|
||||
Configure the maximum canonical-minus-persisted gap before engine API processing stalls.
|
||||
|
||||
This value must be greater than `--engine.persistence-threshold`.
|
||||
|
||||
[default: 16]
|
||||
[default: 44]
|
||||
|
||||
--engine.memory-block-buffer-target <MEMORY_BLOCK_BUFFER_TARGET>
|
||||
Configure the target number of blocks to keep in memory
|
||||
|
||||
[default: 0]
|
||||
[default: 10]
|
||||
|
||||
--engine.legacy-state-root
|
||||
Enable legacy state root
|
||||
|
||||
Reference in New Issue
Block a user