Compare commits

...

2 Commits
t4 ... yk/drain

2 changed files with 38 additions and 2 deletions

View File

@@ -4,7 +4,7 @@ use alloy_evm::block::StateChangeSource;
use alloy_primitives::{keccak256, B256};
use crossbeam_channel::Sender as CrossbeamSender;
use derive_more::derive::Deref;
use metrics::{Gauge, Histogram};
use metrics::{Counter, Gauge, Histogram};
use reth_metrics::Metrics;
use reth_revm::state::EvmState;
use reth_trie::{HashedPostState, HashedStorage};
@@ -189,6 +189,12 @@ pub(crate) struct MultiProofTaskMetrics {
pub into_trie_for_reuse_duration_histogram: Histogram,
/// Time spent waiting for preserved sparse trie cache to become available.
pub sparse_trie_cache_wait_duration_histogram: Histogram,
/// Number of additional updates drained from the updates channel in a single wakeup.
pub sparse_trie_update_drain_count_histogram: Histogram,
/// Time spent draining additional updates from the updates channel.
pub sparse_trie_update_drain_duration_histogram: Histogram,
/// Number of times update draining hits the configured per-tick cap.
pub sparse_trie_update_drain_cap_hit_counter: Counter,
}
/// Dispatches work items as a single unit or in chunks based on target size and worker

View File

@@ -9,7 +9,7 @@ use crate::tree::{
};
use alloy_primitives::B256;
use alloy_rlp::{Decodable, Encodable};
use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender, TryRecvError};
use rayon::iter::ParallelIterator;
use reth_primitives_traits::{Account, FastInstant as Instant, ParallelBridgeBuffered};
use reth_tasks::Runtime;
@@ -35,6 +35,8 @@ use tracing::{debug, debug_span, error, instrument};
/// Maximum number of pending/prewarm updates that we accumulate in memory before actually applying.
const MAX_PENDING_UPDATES: usize = 100;
/// Maximum number of additional update messages to drain in a single event-loop tick.
const MAX_UPDATE_DRAIN_PER_TICK: usize = 64;
/// Sparse trie task implementation that uses in-memory sparse trie data to schedule proof fetching.
pub(super) struct SparseTrieCacheTask<A = ParallelSparseTrie, S = ParallelSparseTrie> {
@@ -252,6 +254,7 @@ where
self.on_message(update);
self.pending_updates += 1;
self.drain_pending_updates();
}
recv(self.proof_result_rx) -> message => {
let phase_end = Instant::now();
@@ -428,6 +431,33 @@ where
})
}
/// Drains additional ready update messages after receiving one update.
///
/// This reduces dispatch/wait ping-pong on small blocks while capping per-tick work to avoid
/// starving proof result handling.
fn drain_pending_updates(&mut self) {
let start = Instant::now();
let mut drained = 0usize;
while drained < MAX_UPDATE_DRAIN_PER_TICK {
match self.updates.try_recv() {
Ok(update) => {
self.on_message(update);
self.pending_updates += 1;
drained += 1;
}
Err(TryRecvError::Empty | TryRecvError::Disconnected) => break,
}
}
if drained > 0 {
self.metrics.sparse_trie_update_drain_count_histogram.record(drained as f64);
self.metrics.sparse_trie_update_drain_duration_histogram.record(start.elapsed());
if drained == MAX_UPDATE_DRAIN_PER_TICK {
self.metrics.sparse_trie_update_drain_cap_hit_counter.increment(1);
}
}
}
#[instrument(
level = "debug",
target = "engine::tree::payload_processor::sparse_trie",