mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-04-30 03:01:58 -04:00
feat(tree): add idle time metrics to SparseTrieCacheTask and hashing task (#23136)
Co-authored-by: Brian Picciano <933154+mediocregopher@users.noreply.github.com> Co-authored-by: Brian Picciano <me@mediocregopher.com> Co-authored-by: Alexey Shekhirin <5773434+shekhirin@users.noreply.github.com> Co-authored-by: Dan Cline <6798349+Rjected@users.noreply.github.com> Co-authored-by: Alexey Shekhirin <github@shekhirin.com> Co-authored-by: Amp <amp@ampcode.com> Co-authored-by: YK <46377366+yongkangc@users.noreply.github.com> Co-authored-by: YK <chiayongkang@hotmail.com> Co-authored-by: amp[bot] <noreply@ampcode.com>
This commit is contained in:
@@ -134,6 +134,12 @@ pub(crate) struct MultiProofTaskMetrics {
|
||||
pub into_trie_for_reuse_duration_histogram: Histogram,
|
||||
/// Time spent waiting for preserved sparse trie cache to become available.
|
||||
pub sparse_trie_cache_wait_duration_histogram: Histogram,
|
||||
/// Histogram for sparse trie task idle time in seconds (waiting for updates or proof
|
||||
/// results). Excludes the final wait after the channel is closed.
|
||||
pub sparse_trie_idle_time_seconds: Histogram,
|
||||
/// Histogram for hashing task idle time in seconds (waiting for messages from execution).
|
||||
/// Excludes the final wait after the channel is closed.
|
||||
pub hashing_task_idle_time_seconds: Histogram,
|
||||
|
||||
/// Number of account leaf updates applied without needing a new proof (cache hits).
|
||||
pub sparse_trie_account_cache_hits: Histogram,
|
||||
|
||||
@@ -128,9 +128,10 @@ where
|
||||
let (hashed_state_tx, hashed_state_rx) = crossbeam_channel::unbounded();
|
||||
|
||||
let parent_span = tracing::Span::current();
|
||||
let hashing_metrics = metrics.clone();
|
||||
executor.spawn_blocking_named("trie-hashing", move || {
|
||||
let _span = debug_span!(parent: parent_span, "run_hashing_task").entered();
|
||||
Self::run_hashing_task(updates, hashed_state_tx)
|
||||
Self::run_hashing_task(updates, hashed_state_tx, hashing_metrics)
|
||||
});
|
||||
|
||||
Self {
|
||||
@@ -165,8 +166,14 @@ where
|
||||
fn run_hashing_task(
|
||||
updates: CrossbeamReceiver<MultiProofMessage>,
|
||||
hashed_state_tx: CrossbeamSender<SparseTrieTaskMessage>,
|
||||
metrics: MultiProofTaskMetrics,
|
||||
) {
|
||||
let mut total_idle_time = std::time::Duration::ZERO;
|
||||
let mut idle_start = Instant::now();
|
||||
|
||||
while let Ok(message) = updates.recv() {
|
||||
total_idle_time += idle_start.elapsed();
|
||||
|
||||
let msg = match message {
|
||||
MultiProofMessage::PrefetchProofs(targets) => {
|
||||
SparseTrieTaskMessage::PrefetchProofs(targets)
|
||||
@@ -179,7 +186,10 @@ where
|
||||
MultiProofMessage::FinishedStateUpdates => {
|
||||
SparseTrieTaskMessage::FinishedStateUpdates
|
||||
}
|
||||
MultiProofMessage::BlockAccessList(_) => continue,
|
||||
MultiProofMessage::BlockAccessList(_) => {
|
||||
idle_start = Instant::now();
|
||||
continue;
|
||||
}
|
||||
MultiProofMessage::HashedStateUpdate(state) => {
|
||||
SparseTrieTaskMessage::HashedState(state)
|
||||
}
|
||||
@@ -187,7 +197,11 @@ where
|
||||
if hashed_state_tx.send(msg).is_err() {
|
||||
break;
|
||||
}
|
||||
|
||||
idle_start = Instant::now();
|
||||
}
|
||||
|
||||
metrics.hashing_task_idle_time_seconds.record(total_idle_time.as_secs_f64());
|
||||
}
|
||||
|
||||
/// Prunes and shrinks the trie for reuse in the next payload built on top of this one.
|
||||
@@ -247,13 +261,14 @@ where
|
||||
pub(super) fn run(&mut self) -> Result<StateRootComputeOutcome, ParallelStateRootError> {
|
||||
let now = Instant::now();
|
||||
|
||||
let mut total_idle_time = std::time::Duration::ZERO;
|
||||
let mut idle_start = Instant::now();
|
||||
|
||||
loop {
|
||||
let mut t = Instant::now();
|
||||
crossbeam_channel::select_biased! {
|
||||
recv(self.updates) -> message => {
|
||||
self.metrics
|
||||
.sparse_trie_channel_wait_duration_histogram
|
||||
.record(t.elapsed());
|
||||
let wake = Instant::now();
|
||||
|
||||
let update = match message {
|
||||
Ok(m) => m,
|
||||
@@ -264,11 +279,17 @@ where
|
||||
}
|
||||
};
|
||||
|
||||
total_idle_time += wake.duration_since(idle_start);
|
||||
self.metrics
|
||||
.sparse_trie_channel_wait_duration_histogram
|
||||
.record(wake.duration_since(t));
|
||||
|
||||
self.on_message(update);
|
||||
self.pending_updates += 1;
|
||||
}
|
||||
recv(self.proof_result_rx) -> message => {
|
||||
let phase_end = Instant::now();
|
||||
total_idle_time += phase_end.duration_since(idle_start);
|
||||
self.metrics
|
||||
.sparse_trie_channel_wait_duration_histogram
|
||||
.record(phase_end.duration_since(t));
|
||||
@@ -331,8 +352,12 @@ where
|
||||
// Make sure to dispatch targets if we've accumulated a lot of them.
|
||||
self.dispatch_pending_targets();
|
||||
}
|
||||
|
||||
idle_start = Instant::now();
|
||||
}
|
||||
|
||||
self.metrics.sparse_trie_idle_time_seconds.record(total_idle_time.as_secs_f64());
|
||||
|
||||
debug!(target: "engine::root", "All proofs processed, ending calculation");
|
||||
|
||||
let start = Instant::now();
|
||||
@@ -882,6 +907,7 @@ mod tests {
|
||||
SparseTrieCacheTask::<ArenaParallelSparseTrie, ArenaParallelSparseTrie>::run_hashing_task(
|
||||
updates_rx,
|
||||
hashed_state_tx,
|
||||
MultiProofTaskMetrics::default(),
|
||||
);
|
||||
});
|
||||
|
||||
|
||||
Reference in New Issue
Block a user