addressed sync overhead

- Removed lock-free atomic counters from ProofTaskMetrics and replaced them with direct method calls for recording blinded node counts.
- Updated storage and account worker loops to utilize the new metrics recording methods, enhancing clarity and maintainability.
- Simplified the ProofTaskManagerHandle by removing unnecessary metrics fields, streamlining the overall structure.
This commit is contained in:
Yong Kang
2025-10-13 09:30:35 +00:00
parent 19cc9543f2
commit b84b064c2e
2 changed files with 39 additions and 56 deletions

View File

@@ -117,6 +117,7 @@ fn storage_worker_loop<Tx>(
proof_tx: ProofTaskTx<Tx>,
work_rx: CrossbeamReceiver<StorageWorkerJob>,
worker_id: usize,
#[cfg(feature = "metrics")] metrics: ProofTaskMetrics,
) where
Tx: DbTx,
{
@@ -230,6 +231,9 @@ fn storage_worker_loop<Tx>(
storage_nodes_processed,
"Storage worker shutting down"
);
#[cfg(feature = "metrics")]
metrics.record_storage_nodes(storage_nodes_processed as usize);
}
/// Worker loop for account trie operations.
@@ -260,6 +264,7 @@ fn account_worker_loop<Tx>(
work_rx: CrossbeamReceiver<AccountWorkerJob>,
storage_work_tx: CrossbeamSender<StorageWorkerJob>,
worker_id: usize,
#[cfg(feature = "metrics")] metrics: ProofTaskMetrics,
) where
Tx: DbTx,
{
@@ -405,6 +410,9 @@ fn account_worker_loop<Tx>(
account_nodes_processed,
"Account worker shutting down"
);
#[cfg(feature = "metrics")]
metrics.record_account_nodes(account_nodes_processed as usize);
}
/// Builds an account multiproof by consuming storage proof receivers lazily during trie walk.
@@ -839,9 +847,6 @@ pub struct ProofTaskManagerHandle {
account_work_tx: CrossbeamSender<AccountWorkerJob>,
/// Active handle reference count for auto-termination
active_handles: Arc<AtomicUsize>,
/// Metrics tracking (lock-free)
#[cfg(feature = "metrics")]
metrics: Arc<ProofTaskMetrics>,
}
impl ProofTaskManagerHandle {
@@ -884,7 +889,16 @@ impl ProofTaskManagerHandle {
let work_rx_clone = storage_work_rx.clone();
executor.spawn_blocking(move || {
storage_worker_loop(proof_task_tx, work_rx_clone, worker_id)
#[cfg(feature = "metrics")]
let metrics = ProofTaskMetrics::default();
storage_worker_loop(
proof_task_tx,
work_rx_clone,
worker_id,
#[cfg(feature = "metrics")]
metrics,
)
});
tracing::debug!(
@@ -903,7 +917,17 @@ impl ProofTaskManagerHandle {
let storage_work_tx_clone = storage_work_tx.clone();
executor.spawn_blocking(move || {
account_worker_loop(proof_task_tx, work_rx_clone, storage_work_tx_clone, worker_id)
#[cfg(feature = "metrics")]
let metrics = ProofTaskMetrics::default();
account_worker_loop(
proof_task_tx,
work_rx_clone,
storage_work_tx_clone,
worker_id,
#[cfg(feature = "metrics")]
metrics,
)
});
tracing::debug!(
@@ -913,13 +937,7 @@ impl ProofTaskManagerHandle {
);
}
Ok(Self::new_handle(
storage_work_tx,
account_work_tx,
Arc::new(AtomicUsize::new(0)),
#[cfg(feature = "metrics")]
Arc::new(ProofTaskMetrics::default()),
))
Ok(Self::new_handle(storage_work_tx, account_work_tx, Arc::new(AtomicUsize::new(0))))
}
/// Creates a new [`ProofTaskManagerHandle`] with direct access to worker pools.
@@ -929,16 +947,9 @@ impl ProofTaskManagerHandle {
storage_work_tx: CrossbeamSender<StorageWorkerJob>,
account_work_tx: CrossbeamSender<AccountWorkerJob>,
active_handles: Arc<AtomicUsize>,
#[cfg(feature = "metrics")] metrics: Arc<ProofTaskMetrics>,
) -> Self {
active_handles.fetch_add(1, Ordering::SeqCst);
Self {
storage_work_tx,
account_work_tx,
active_handles,
#[cfg(feature = "metrics")]
metrics,
}
Self { storage_work_tx, account_work_tx, active_handles }
}
/// Queue a storage proof computation
@@ -953,9 +964,6 @@ impl ProofTaskManagerHandle {
ProviderError::other(std::io::Error::other("storage workers unavailable"))
})?;
#[cfg(feature = "metrics")]
self.metrics.storage_proofs.fetch_add(1, Ordering::Relaxed);
Ok(rx)
}
@@ -971,9 +979,6 @@ impl ProofTaskManagerHandle {
ProviderError::other(std::io::Error::other("account workers unavailable"))
})?;
#[cfg(feature = "metrics")]
self.metrics.account_proofs.fetch_add(1, Ordering::Relaxed);
Ok(rx)
}
@@ -990,9 +995,6 @@ impl ProofTaskManagerHandle {
ProviderError::other(std::io::Error::other("storage workers unavailable"))
})?;
#[cfg(feature = "metrics")]
self.metrics.storage_nodes.fetch_add(1, Ordering::Relaxed);
Ok(rx)
}
@@ -1008,9 +1010,6 @@ impl ProofTaskManagerHandle {
ProviderError::other(std::io::Error::other("account workers unavailable"))
})?;
#[cfg(feature = "metrics")]
self.metrics.account_nodes.fetch_add(1, Ordering::Relaxed);
Ok(rx)
}
}
@@ -1021,8 +1020,6 @@ impl Clone for ProofTaskManagerHandle {
self.storage_work_tx.clone(),
self.account_work_tx.clone(),
self.active_handles.clone(),
#[cfg(feature = "metrics")]
self.metrics.clone(),
)
}
}
@@ -1039,12 +1036,6 @@ impl Drop for ProofTaskManagerHandle {
"active_handles underflow in ProofTaskManagerHandle::drop (previous={})",
previous_handles
);
#[cfg(feature = "metrics")]
if previous_handles == 1 {
// Flush metrics before exit.
self.metrics.record();
}
}
}

View File

@@ -1,29 +1,21 @@
use reth_metrics::{metrics::Histogram, Metrics};
use std::sync::{
atomic::{AtomicU64, Ordering},
Arc,
};
/// Metrics for blinded node fetching by proof workers.
#[derive(Clone, Debug, Default)]
pub struct ProofTaskMetrics {
/// The actual metrics for blinded nodes.
pub task_metrics: ProofTaskTrieMetrics,
/// Count of storage proof requests (lock-free).
pub storage_proofs: Arc<AtomicU64>,
/// Count of account proof requests (lock-free).
pub account_proofs: Arc<AtomicU64>,
/// Count of blinded account node requests (lock-free).
pub account_nodes: Arc<AtomicU64>,
/// Count of blinded storage node requests (lock-free).
pub storage_nodes: Arc<AtomicU64>,
}
impl ProofTaskMetrics {
/// Record the blinded node counts into the histograms.
pub fn record(&self) {
self.task_metrics.record_account_nodes(self.account_nodes.load(Ordering::Relaxed) as usize);
self.task_metrics.record_storage_nodes(self.storage_nodes.load(Ordering::Relaxed) as usize);
/// Record the blinded account node count into the histogram.
pub fn record_account_nodes(&self, count: usize) {
self.task_metrics.record_account_nodes(count);
}
/// Record the blinded storage node count into the histogram.
pub fn record_storage_nodes(&self, count: usize) {
self.task_metrics.record_storage_nodes(count);
}
}