Compare commits

...

3 Commits

Author SHA1 Message Date
yongkangc
ac03f8ebce test: lower pressure threshold to 2 for testing scaling trigger 2026-01-19 14:44:03 +00:00
yongkangc
f19420f6f6 fix(engine): track peak queue pressure for worker scaling
The scaling check was running after multi_proof_task.run() completed,
when the queue was drained and pending_account_tasks() was always ~0.

Fix: Add max_pending_account_tasks tracking in ProofWorkerHandle that
records peak queue depth during dispatch_account_multiproof(). Use this
peak value for scaling decisions instead of post-execution pending count.
2026-01-19 14:04:39 +00:00
yongkangc
f0b0c2753e feat(engine): dynamic account worker scaling based on queue pressure
Add adaptive scaling for account proof workers based on observed queue
pressure from the previous block:

- Track account_worker_count in PayloadProcessor (starts at config value)
- After each block, check queue pressure (pending_tasks / workers)
- If pressure >= 8, scale up by 25% for next block (capped at 64 or
  available_parallelism)

This allows the system to automatically adapt to varying workloads without
requiring manual tuning of worker counts.
2026-01-17 22:44:21 +00:00
2 changed files with 78 additions and 27 deletions

View File

@@ -48,7 +48,7 @@ use std::{
collections::BTreeMap,
ops::Not,
sync::{
atomic::AtomicBool,
atomic::{AtomicBool, AtomicUsize, Ordering},
mpsc::{self, channel},
Arc,
},
@@ -139,6 +139,11 @@ where
disable_parallel_sparse_trie: bool,
/// Maximum concurrency for prewarm task.
prewarm_max_concurrency: usize,
/// Dynamic account worker count based on previous block's queue pressure.
/// Starts at config value and adjusts based on observed pressure.
account_worker_count: Arc<AtomicUsize>,
/// Maximum account workers allowed (capped at 64 or available parallelism).
account_worker_max: usize,
}
impl<N, Evm> PayloadProcessor<Evm>
@@ -158,6 +163,9 @@ where
config: &TreeConfig,
precompile_cache_map: PrecompileCacheMap<SpecFor<Evm>>,
) -> Self {
let account_worker_max =
std::thread::available_parallelism().map(|p| p.get().min(64)).unwrap_or(64);
Self {
executor,
execution_cache: Default::default(),
@@ -171,6 +179,8 @@ where
sparse_state_trie: Arc::default(),
disable_parallel_sparse_trie: config.disable_parallel_sparse_trie(),
prewarm_max_concurrency: config.prewarm_max_concurrency(),
account_worker_count: Arc::new(AtomicUsize::new(config.account_worker_count())),
account_worker_max,
}
}
}
@@ -274,7 +284,9 @@ where
// Create and spawn the storage proof task
let task_ctx = ProofTaskCtx::new(multiproof_provider_factory);
let storage_worker_count = config.storage_worker_count();
let account_worker_count = config.account_worker_count();
let account_worker_count = self.account_worker_count.load(Ordering::Relaxed);
let account_worker_count_arc = self.account_worker_count.clone();
let account_worker_max = self.account_worker_max;
let v2_proofs_enabled = config.enable_proof_v2();
let proof_handle = ProofWorkerHandle::new(
self.executor.handle().clone(),
@@ -295,6 +307,7 @@ where
// spawn multi-proof task
let parent_span = span.clone();
let saved_cache = prewarm_handle.saved_cache.clone();
let proof_handle_for_scaling = proof_handle.clone();
self.executor.spawn_blocking(move || {
let _enter = parent_span.entered();
// Build a state provider for the multiproof task
@@ -307,6 +320,29 @@ where
Box::new(provider)
};
multi_proof_task.run(provider);
// After block processing, check if we experienced queue pressure and adjust
// worker count for next block. Pressure = max_pending_tasks / workers.
// If pressure was high (>= 8), scale up by 25% for next block.
let max_pending = proof_handle_for_scaling.max_pending_account_tasks();
let workers = proof_handle_for_scaling.total_account_workers();
let pressure = max_pending / workers.max(1);
const PRESSURE_THRESHOLD: usize = 2;
const SCALE_PERCENT: usize = 25;
if pressure >= PRESSURE_THRESHOLD {
let new_count = ((workers * (100 + SCALE_PERCENT)) / 100).min(account_worker_max);
account_worker_count_arc.store(new_count, Ordering::Relaxed);
debug!(
target: "engine::payload",
workers,
max_pending,
pressure,
new_count,
"Scaling up account workers for next block due to queue pressure"
);
}
});
// wire the sparse trie to the state root response receiver

View File

@@ -97,6 +97,9 @@ pub struct ProofWorkerHandle {
/// Counter tracking available account workers. Workers decrement when starting work,
/// increment when finishing. Used to determine whether to chunk multiproofs.
account_available_workers: Arc<AtomicUsize>,
/// Tracks the maximum number of pending account tasks seen during execution.
/// Used for dynamic worker scaling decisions.
max_pending_account_tasks: Arc<AtomicUsize>,
/// Total number of storage workers spawned
storage_worker_count: usize,
/// Total number of account workers spawned
@@ -238,6 +241,7 @@ impl ProofWorkerHandle {
account_work_tx,
storage_available_workers,
account_available_workers,
max_pending_account_tasks: Arc::new(AtomicUsize::new(0)),
storage_worker_count,
account_worker_count,
v2_proofs_enabled,
@@ -269,6 +273,12 @@ impl ProofWorkerHandle {
self.account_work_tx.len()
}
/// Returns the maximum number of pending account tasks observed during execution.
/// This is used for dynamic worker scaling decisions.
pub fn max_pending_account_tasks(&self) -> usize {
self.max_pending_account_tasks.load(Ordering::Relaxed)
}
/// Returns the total number of storage workers in the pool.
pub const fn total_storage_workers(&self) -> usize {
self.storage_worker_count
@@ -326,34 +336,39 @@ impl ProofWorkerHandle {
&self,
input: AccountMultiproofInput,
) -> Result<(), ProviderError> {
self.account_work_tx
.send(AccountWorkerJob::AccountMultiproof { input: Box::new(input) })
.map_err(|err| {
let error =
ProviderError::other(std::io::Error::other("account workers unavailable"));
let result = self
.account_work_tx
.send(AccountWorkerJob::AccountMultiproof { input: Box::new(input) });
if let AccountWorkerJob::AccountMultiproof { input } = err.0 {
let AccountMultiproofInput {
proof_result_sender:
ProofResultContext {
sender: result_tx,
sequence_number: seq,
state,
start_time: start,
},
..
} = *input;
// Track peak pending for scaling decisions (do this after send to include this task)
let current_pending = self.account_work_tx.len();
self.max_pending_account_tasks.fetch_max(current_pending, Ordering::Relaxed);
let _ = result_tx.send(ProofResultMessage {
sequence_number: seq,
result: Err(ParallelStateRootError::Provider(error.clone())),
elapsed: start.elapsed(),
state,
});
}
result.map_err(|err| {
let error = ProviderError::other(std::io::Error::other("account workers unavailable"));
error
})
if let AccountWorkerJob::AccountMultiproof { input } = err.0 {
let AccountMultiproofInput {
proof_result_sender:
ProofResultContext {
sender: result_tx,
sequence_number: seq,
state,
start_time: start,
},
..
} = *input;
let _ = result_tx.send(ProofResultMessage {
sequence_number: seq,
result: Err(ParallelStateRootError::Provider(error.clone())),
elapsed: start.elapsed(),
state,
});
}
error
})
}
/// Dispatch blinded storage node request to storage worker pool