mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-04-30 03:01:58 -04:00
Compare commits
2 Commits
snapv2
...
feat/persi
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
2d49ba19f5 | ||
|
|
d9a6db0e6c |
@@ -8,6 +8,11 @@ pub const DEFAULT_PERSISTENCE_THRESHOLD: u64 = 2;
|
||||
/// How close to the canonical head we persist blocks.
|
||||
pub const DEFAULT_MEMORY_BLOCK_BUFFER_TARGET: u64 = 0;
|
||||
|
||||
/// Maximum number of blocks that can be pending persistence before backpressure is applied.
|
||||
/// When this limit is reached, new block validation will block until persistence catches up.
|
||||
/// This prevents OOM when validation is faster than persistence (e.g., during benchmarks).
|
||||
pub const DEFAULT_MAX_PENDING_PERSISTENCE_BLOCKS: u64 = 128;
|
||||
|
||||
/// Minimum number of workers we allow configuring explicitly.
|
||||
pub const MIN_WORKER_COUNT: usize = 32;
|
||||
|
||||
@@ -139,6 +144,10 @@ pub struct TreeConfig {
|
||||
enable_proof_v2: bool,
|
||||
/// Whether to disable cache metrics recording (can be expensive with large cached state).
|
||||
disable_cache_metrics: bool,
|
||||
/// Maximum number of blocks that can be pending persistence before backpressure is applied.
|
||||
/// When this limit is reached, new block validation will block until persistence catches up.
|
||||
/// This prevents OOM when validation is faster than persistence (e.g., during benchmarks).
|
||||
max_pending_persistence_blocks: u64,
|
||||
}
|
||||
|
||||
impl Default for TreeConfig {
|
||||
@@ -169,6 +178,7 @@ impl Default for TreeConfig {
|
||||
account_worker_count: default_account_worker_count(),
|
||||
enable_proof_v2: false,
|
||||
disable_cache_metrics: false,
|
||||
max_pending_persistence_blocks: DEFAULT_MAX_PENDING_PERSISTENCE_BLOCKS,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -202,6 +212,7 @@ impl TreeConfig {
|
||||
account_worker_count: usize,
|
||||
enable_proof_v2: bool,
|
||||
disable_cache_metrics: bool,
|
||||
max_pending_persistence_blocks: u64,
|
||||
) -> Self {
|
||||
Self {
|
||||
persistence_threshold,
|
||||
@@ -229,6 +240,7 @@ impl TreeConfig {
|
||||
account_worker_count,
|
||||
enable_proof_v2,
|
||||
disable_cache_metrics,
|
||||
max_pending_persistence_blocks,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -532,4 +544,19 @@ impl TreeConfig {
|
||||
self.disable_cache_metrics = disable_cache_metrics;
|
||||
self
|
||||
}
|
||||
|
||||
/// Returns the maximum number of blocks that can be pending persistence.
|
||||
/// When this limit is reached, validation will block until persistence catches up.
|
||||
pub const fn max_pending_persistence_blocks(&self) -> u64 {
|
||||
self.max_pending_persistence_blocks
|
||||
}
|
||||
|
||||
/// Setter for maximum pending persistence blocks (backpressure limit).
|
||||
pub const fn with_max_pending_persistence_blocks(
|
||||
mut self,
|
||||
max_pending_persistence_blocks: u64,
|
||||
) -> Self {
|
||||
self.max_pending_persistence_blocks = max_pending_persistence_blocks;
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
@@ -204,6 +204,10 @@ pub(crate) struct EngineMetrics {
|
||||
pub(crate) failed_forkchoice_updated_response_deliveries: Counter,
|
||||
/// block insert duration
|
||||
pub(crate) block_insert_total_duration: Histogram,
|
||||
/// Number of blocks currently pending persistence (for backpressure monitoring)
|
||||
pub(crate) pending_persistence_blocks: Gauge,
|
||||
/// Number of times backpressure was applied (validation blocked waiting for persistence)
|
||||
pub(crate) backpressure_events: Counter,
|
||||
}
|
||||
|
||||
/// Metrics for engine forkchoiceUpdated responses.
|
||||
|
||||
@@ -642,6 +642,13 @@ where
|
||||
&mut self,
|
||||
payload: T::ExecutionData,
|
||||
) -> Result<PayloadStatus, InsertBlockFatalError> {
|
||||
// Apply backpressure if too many blocks are pending persistence.
|
||||
// This blocks until persistence catches up to prevent OOM.
|
||||
if let Err(err) = self.wait_for_persistence_if_needed() {
|
||||
error!(target: "engine::tree", %err, "Backpressure wait failed");
|
||||
return Err(InsertBlockFatalError::Provider(ProviderError::other(err)))
|
||||
}
|
||||
|
||||
let block_hash = payload.block_hash();
|
||||
let num_hash = payload.num_hash();
|
||||
let parent_hash = payload.parent_hash();
|
||||
@@ -1777,6 +1784,81 @@ where
|
||||
self.config.persistence_threshold()
|
||||
}
|
||||
|
||||
/// Returns the number of blocks that are pending persistence.
|
||||
/// This is the number of canonical blocks in memory that are above the last persisted block.
|
||||
pub const fn pending_persistence_blocks(&self) -> u64 {
|
||||
self.state
|
||||
.tree_state
|
||||
.current_canonical_head
|
||||
.number
|
||||
.saturating_sub(self.persistence_state.last_persisted_block.number)
|
||||
}
|
||||
|
||||
/// Returns true if backpressure should be applied due to too many blocks pending persistence.
|
||||
/// When true, validation should wait for persistence to catch up before processing more blocks.
|
||||
pub const fn should_apply_backpressure(&self) -> bool {
|
||||
self.pending_persistence_blocks() >= self.config.max_pending_persistence_blocks()
|
||||
}
|
||||
|
||||
/// Waits for persistence to complete if backpressure is needed.
|
||||
///
|
||||
/// This blocks until the number of pending persistence blocks drops below the limit.
|
||||
/// Used to prevent OOM when validation is faster than persistence.
|
||||
fn wait_for_persistence_if_needed(&mut self) -> Result<(), AdvancePersistenceError> {
|
||||
// Update pending blocks metric
|
||||
self.metrics
|
||||
.engine
|
||||
.pending_persistence_blocks
|
||||
.set(self.pending_persistence_blocks() as f64);
|
||||
|
||||
if !self.should_apply_backpressure() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Record backpressure event
|
||||
self.metrics.engine.backpressure_events.increment(1);
|
||||
|
||||
while self.should_apply_backpressure() {
|
||||
if let Some((rx, start_time, _action)) = self.persistence_state.rx.take() {
|
||||
trace!(
|
||||
target: "engine::tree",
|
||||
pending = self.pending_persistence_blocks(),
|
||||
max = self.config.max_pending_persistence_blocks(),
|
||||
"Backpressure: waiting for persistence to complete"
|
||||
);
|
||||
|
||||
match rx.recv() {
|
||||
Ok(result) => {
|
||||
self.on_persistence_complete(result, start_time)?;
|
||||
}
|
||||
Err(_) => {
|
||||
return Err(AdvancePersistenceError::ChannelClosed);
|
||||
}
|
||||
}
|
||||
|
||||
// Update pending blocks metric after persistence completes
|
||||
self.metrics
|
||||
.engine
|
||||
.pending_persistence_blocks
|
||||
.set(self.pending_persistence_blocks() as f64);
|
||||
|
||||
// Try to trigger more persistence if needed
|
||||
self.advance_persistence()?;
|
||||
} else {
|
||||
// No persistence in progress but we still have too many blocks
|
||||
// Trigger persistence and wait
|
||||
self.advance_persistence()?;
|
||||
|
||||
// If no persistence was triggered, break to avoid infinite loop
|
||||
if !self.persistence_state.in_progress() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Returns a batch of consecutive canonical blocks to persist in the range
|
||||
/// `(last_persisted_number .. target]`. The expected order is oldest -> newest.
|
||||
fn get_canonical_blocks_to_persist(
|
||||
|
||||
Reference in New Issue
Block a user