mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-02-19 03:04:27 -05:00
perf: batch finalized/safe block commits with SaveBlocks (#21663)
Co-authored-by: Amp <amp@ampcode.com>
This commit is contained in:
@@ -40,6 +40,12 @@ where
|
||||
metrics: PersistenceMetrics,
|
||||
/// Sender for sync metrics - we only submit sync metrics for persisted blocks
|
||||
sync_metrics_tx: MetricEventsSender,
|
||||
/// Pending finalized block number to be committed with the next block save.
|
||||
/// This avoids triggering a separate fsync for each finalized block update.
|
||||
pending_finalized_block: Option<u64>,
|
||||
/// Pending safe block number to be committed with the next block save.
|
||||
/// This avoids triggering a separate fsync for each safe block update.
|
||||
pending_safe_block: Option<u64>,
|
||||
}
|
||||
|
||||
impl<N> PersistenceService<N>
|
||||
@@ -53,7 +59,15 @@ where
|
||||
pruner: PrunerWithFactory<ProviderFactory<N>>,
|
||||
sync_metrics_tx: MetricEventsSender,
|
||||
) -> Self {
|
||||
Self { provider, incoming, pruner, metrics: PersistenceMetrics::default(), sync_metrics_tx }
|
||||
Self {
|
||||
provider,
|
||||
incoming,
|
||||
pruner,
|
||||
metrics: PersistenceMetrics::default(),
|
||||
sync_metrics_tx,
|
||||
pending_finalized_block: None,
|
||||
pending_safe_block: None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Prunes block data before the given block number according to the configured prune
|
||||
@@ -106,14 +120,10 @@ where
|
||||
}
|
||||
}
|
||||
PersistenceAction::SaveFinalizedBlock(finalized_block) => {
|
||||
let provider = self.provider.database_provider_rw()?;
|
||||
provider.save_finalized_block_number(finalized_block)?;
|
||||
provider.commit()?;
|
||||
self.pending_finalized_block = Some(finalized_block);
|
||||
}
|
||||
PersistenceAction::SaveSafeBlock(safe_block) => {
|
||||
let provider = self.provider.database_provider_rw()?;
|
||||
provider.save_safe_block_number(safe_block)?;
|
||||
provider.commit()?;
|
||||
self.pending_safe_block = Some(safe_block);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -138,20 +148,33 @@ where
|
||||
}
|
||||
|
||||
fn on_save_blocks(
|
||||
&self,
|
||||
&mut self,
|
||||
blocks: Vec<ExecutedBlock<N::Primitives>>,
|
||||
) -> Result<Option<BlockNumHash>, PersistenceError> {
|
||||
let first_block = blocks.first().map(|b| b.recovered_block.num_hash());
|
||||
let last_block = blocks.last().map(|b| b.recovered_block.num_hash());
|
||||
let block_count = blocks.len();
|
||||
|
||||
// Take any pending finalized/safe block updates to commit together
|
||||
let pending_finalized = self.pending_finalized_block.take();
|
||||
let pending_safe = self.pending_safe_block.take();
|
||||
|
||||
debug!(target: "engine::persistence", ?block_count, first=?first_block, last=?last_block, "Saving range of blocks");
|
||||
|
||||
let start_time = Instant::now();
|
||||
|
||||
if last_block.is_some() {
|
||||
let provider_rw = self.provider.database_provider_rw()?;
|
||||
|
||||
provider_rw.save_blocks(blocks, SaveBlocksMode::Full)?;
|
||||
|
||||
// Commit pending finalized/safe block updates in the same transaction
|
||||
if let Some(finalized) = pending_finalized {
|
||||
provider_rw.save_finalized_block_number(finalized)?;
|
||||
}
|
||||
if let Some(safe) = pending_safe {
|
||||
provider_rw.save_safe_block_number(safe)?;
|
||||
}
|
||||
|
||||
provider_rw.commit()?;
|
||||
}
|
||||
|
||||
@@ -267,7 +290,10 @@ impl<T: NodePrimitives> PersistenceHandle<T> {
|
||||
self.send_action(PersistenceAction::SaveBlocks(blocks, tx))
|
||||
}
|
||||
|
||||
/// Persists the finalized block number on disk.
|
||||
/// Queues the finalized block number to be persisted on disk.
|
||||
///
|
||||
/// The update is deferred and will be committed together with the next [`Self::save_blocks`]
|
||||
/// call to avoid triggering a separate fsync for each update.
|
||||
pub fn save_finalized_block_number(
|
||||
&self,
|
||||
finalized_block: u64,
|
||||
@@ -275,7 +301,10 @@ impl<T: NodePrimitives> PersistenceHandle<T> {
|
||||
self.send_action(PersistenceAction::SaveFinalizedBlock(finalized_block))
|
||||
}
|
||||
|
||||
/// Persists the safe block number on disk.
|
||||
/// Queues the safe block number to be persisted on disk.
|
||||
///
|
||||
/// The update is deferred and will be committed together with the next [`Self::save_blocks`]
|
||||
/// call to avoid triggering a separate fsync for each update.
|
||||
pub fn save_safe_block_number(
|
||||
&self,
|
||||
safe_block: u64,
|
||||
|
||||
Reference in New Issue
Block a user