Compare commits

...

2 Commits

Author SHA1 Message Date
Georgios Konstantopoulos
2d49ba19f5 fix(tree): make backpressure functions const
Fix clippy warning for missing_const_for_fn by making pending_persistence_blocks
and should_apply_backpressure const functions. Also use current_canonical_head.number
directly instead of canonical_block_number() to enable const evaluation.
2026-01-21 10:02:57 +00:00
Georgios Konstantopoulos
d9a6db0e6c feat(tree): add persistence backpressure to prevent OOM
When validation is faster than persistence (e.g., during reth-bench or
replay-payloads), blocks can accumulate in memory faster than they can
be written to disk, eventually causing OOM.

This adds a simple backpressure mechanism:
- New config option: max_pending_persistence_blocks (default: 128)
- When the limit is reached, validation blocks until persistence catches up
- Added metrics to monitor backpressure events and pending blocks

The implementation:
- Adds pending_persistence_blocks() to calculate blocks awaiting persistence
- Adds should_apply_backpressure() to check if limit is exceeded
- Adds wait_for_persistence_if_needed() that blocks on the persistence channel
- Calls wait_for_persistence_if_needed() before try_insert_payload()

This prevents OOM while allowing normal operation with reasonable buffer.

Co-authored-by: Tempo AI <ai@tempo.xyz>
2026-01-21 09:45:10 +00:00
3 changed files with 113 additions and 0 deletions

View File

@@ -8,6 +8,11 @@ pub const DEFAULT_PERSISTENCE_THRESHOLD: u64 = 2;
/// How close to the canonical head we persist blocks.
pub const DEFAULT_MEMORY_BLOCK_BUFFER_TARGET: u64 = 0;
/// Maximum number of blocks that can be pending persistence before backpressure is applied.
/// When this limit is reached, new block validation will block until persistence catches up.
/// This prevents OOM when validation is faster than persistence (e.g., during benchmarks).
pub const DEFAULT_MAX_PENDING_PERSISTENCE_BLOCKS: u64 = 128;
/// Minimum number of workers we allow configuring explicitly.
pub const MIN_WORKER_COUNT: usize = 32;
@@ -139,6 +144,10 @@ pub struct TreeConfig {
enable_proof_v2: bool,
/// Whether to disable cache metrics recording (can be expensive with large cached state).
disable_cache_metrics: bool,
/// Maximum number of blocks that can be pending persistence before backpressure is applied.
/// When this limit is reached, new block validation will block until persistence catches up.
/// This prevents OOM when validation is faster than persistence (e.g., during benchmarks).
max_pending_persistence_blocks: u64,
}
impl Default for TreeConfig {
@@ -169,6 +178,7 @@ impl Default for TreeConfig {
account_worker_count: default_account_worker_count(),
enable_proof_v2: false,
disable_cache_metrics: false,
max_pending_persistence_blocks: DEFAULT_MAX_PENDING_PERSISTENCE_BLOCKS,
}
}
}
@@ -202,6 +212,7 @@ impl TreeConfig {
account_worker_count: usize,
enable_proof_v2: bool,
disable_cache_metrics: bool,
max_pending_persistence_blocks: u64,
) -> Self {
Self {
persistence_threshold,
@@ -229,6 +240,7 @@ impl TreeConfig {
account_worker_count,
enable_proof_v2,
disable_cache_metrics,
max_pending_persistence_blocks,
}
}
@@ -532,4 +544,19 @@ impl TreeConfig {
self.disable_cache_metrics = disable_cache_metrics;
self
}
/// Returns the maximum number of blocks that can be pending persistence.
/// When this limit is reached, validation will block until persistence catches up.
pub const fn max_pending_persistence_blocks(&self) -> u64 {
self.max_pending_persistence_blocks
}
/// Setter for maximum pending persistence blocks (backpressure limit).
pub const fn with_max_pending_persistence_blocks(
mut self,
max_pending_persistence_blocks: u64,
) -> Self {
self.max_pending_persistence_blocks = max_pending_persistence_blocks;
self
}
}

View File

@@ -204,6 +204,10 @@ pub(crate) struct EngineMetrics {
pub(crate) failed_forkchoice_updated_response_deliveries: Counter,
/// block insert duration
pub(crate) block_insert_total_duration: Histogram,
/// Number of blocks currently pending persistence (for backpressure monitoring)
pub(crate) pending_persistence_blocks: Gauge,
/// Number of times backpressure was applied (validation blocked waiting for persistence)
pub(crate) backpressure_events: Counter,
}
/// Metrics for engine forkchoiceUpdated responses.

View File

@@ -642,6 +642,13 @@ where
&mut self,
payload: T::ExecutionData,
) -> Result<PayloadStatus, InsertBlockFatalError> {
// Apply backpressure if too many blocks are pending persistence.
// This blocks until persistence catches up to prevent OOM.
if let Err(err) = self.wait_for_persistence_if_needed() {
error!(target: "engine::tree", %err, "Backpressure wait failed");
return Err(InsertBlockFatalError::Provider(ProviderError::other(err)))
}
let block_hash = payload.block_hash();
let num_hash = payload.num_hash();
let parent_hash = payload.parent_hash();
@@ -1777,6 +1784,81 @@ where
self.config.persistence_threshold()
}
/// Returns the number of blocks that are pending persistence.
/// This is the number of canonical blocks in memory that are above the last persisted block.
pub const fn pending_persistence_blocks(&self) -> u64 {
self.state
.tree_state
.current_canonical_head
.number
.saturating_sub(self.persistence_state.last_persisted_block.number)
}
/// Returns true if backpressure should be applied due to too many blocks pending persistence.
/// When true, validation should wait for persistence to catch up before processing more blocks.
pub const fn should_apply_backpressure(&self) -> bool {
self.pending_persistence_blocks() >= self.config.max_pending_persistence_blocks()
}
/// Waits for persistence to complete if backpressure is needed.
///
/// This blocks until the number of pending persistence blocks drops below the limit.
/// Used to prevent OOM when validation is faster than persistence.
fn wait_for_persistence_if_needed(&mut self) -> Result<(), AdvancePersistenceError> {
// Update pending blocks metric
self.metrics
.engine
.pending_persistence_blocks
.set(self.pending_persistence_blocks() as f64);
if !self.should_apply_backpressure() {
return Ok(());
}
// Record backpressure event
self.metrics.engine.backpressure_events.increment(1);
while self.should_apply_backpressure() {
if let Some((rx, start_time, _action)) = self.persistence_state.rx.take() {
trace!(
target: "engine::tree",
pending = self.pending_persistence_blocks(),
max = self.config.max_pending_persistence_blocks(),
"Backpressure: waiting for persistence to complete"
);
match rx.recv() {
Ok(result) => {
self.on_persistence_complete(result, start_time)?;
}
Err(_) => {
return Err(AdvancePersistenceError::ChannelClosed);
}
}
// Update pending blocks metric after persistence completes
self.metrics
.engine
.pending_persistence_blocks
.set(self.pending_persistence_blocks() as f64);
// Try to trigger more persistence if needed
self.advance_persistence()?;
} else {
// No persistence in progress but we still have too many blocks
// Trigger persistence and wait
self.advance_persistence()?;
// If no persistence was triggered, break to avoid infinite loop
if !self.persistence_state.in_progress() {
break;
}
}
}
Ok(())
}
/// Returns a batch of consecutive canonical blocks to persist in the range
/// `(last_persisted_number .. target]`. The expected order is oldest -> newest.
fn get_canonical_blocks_to_persist(