mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-04-30 03:01:58 -04:00
Compare commits
3 Commits
push
...
feat/simpl
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ac03f8ebce | ||
|
|
f19420f6f6 | ||
|
|
f0b0c2753e |
@@ -48,7 +48,7 @@ use std::{
|
||||
collections::BTreeMap,
|
||||
ops::Not,
|
||||
sync::{
|
||||
atomic::AtomicBool,
|
||||
atomic::{AtomicBool, AtomicUsize, Ordering},
|
||||
mpsc::{self, channel},
|
||||
Arc,
|
||||
},
|
||||
@@ -139,6 +139,11 @@ where
|
||||
disable_parallel_sparse_trie: bool,
|
||||
/// Maximum concurrency for prewarm task.
|
||||
prewarm_max_concurrency: usize,
|
||||
/// Dynamic account worker count based on previous block's queue pressure.
|
||||
/// Starts at config value and adjusts based on observed pressure.
|
||||
account_worker_count: Arc<AtomicUsize>,
|
||||
/// Maximum account workers allowed (capped at 64 or available parallelism).
|
||||
account_worker_max: usize,
|
||||
}
|
||||
|
||||
impl<N, Evm> PayloadProcessor<Evm>
|
||||
@@ -158,6 +163,9 @@ where
|
||||
config: &TreeConfig,
|
||||
precompile_cache_map: PrecompileCacheMap<SpecFor<Evm>>,
|
||||
) -> Self {
|
||||
let account_worker_max =
|
||||
std::thread::available_parallelism().map(|p| p.get().min(64)).unwrap_or(64);
|
||||
|
||||
Self {
|
||||
executor,
|
||||
execution_cache: Default::default(),
|
||||
@@ -171,6 +179,8 @@ where
|
||||
sparse_state_trie: Arc::default(),
|
||||
disable_parallel_sparse_trie: config.disable_parallel_sparse_trie(),
|
||||
prewarm_max_concurrency: config.prewarm_max_concurrency(),
|
||||
account_worker_count: Arc::new(AtomicUsize::new(config.account_worker_count())),
|
||||
account_worker_max,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -274,7 +284,9 @@ where
|
||||
// Create and spawn the storage proof task
|
||||
let task_ctx = ProofTaskCtx::new(multiproof_provider_factory);
|
||||
let storage_worker_count = config.storage_worker_count();
|
||||
let account_worker_count = config.account_worker_count();
|
||||
let account_worker_count = self.account_worker_count.load(Ordering::Relaxed);
|
||||
let account_worker_count_arc = self.account_worker_count.clone();
|
||||
let account_worker_max = self.account_worker_max;
|
||||
let v2_proofs_enabled = config.enable_proof_v2();
|
||||
let proof_handle = ProofWorkerHandle::new(
|
||||
self.executor.handle().clone(),
|
||||
@@ -295,6 +307,7 @@ where
|
||||
// spawn multi-proof task
|
||||
let parent_span = span.clone();
|
||||
let saved_cache = prewarm_handle.saved_cache.clone();
|
||||
let proof_handle_for_scaling = proof_handle.clone();
|
||||
self.executor.spawn_blocking(move || {
|
||||
let _enter = parent_span.entered();
|
||||
// Build a state provider for the multiproof task
|
||||
@@ -307,6 +320,29 @@ where
|
||||
Box::new(provider)
|
||||
};
|
||||
multi_proof_task.run(provider);
|
||||
|
||||
// After block processing, check if we experienced queue pressure and adjust
|
||||
// worker count for next block. Pressure = max_pending_tasks / workers.
|
||||
// If pressure was high (>= 8), scale up by 25% for next block.
|
||||
let max_pending = proof_handle_for_scaling.max_pending_account_tasks();
|
||||
let workers = proof_handle_for_scaling.total_account_workers();
|
||||
let pressure = max_pending / workers.max(1);
|
||||
|
||||
const PRESSURE_THRESHOLD: usize = 2;
|
||||
const SCALE_PERCENT: usize = 25;
|
||||
|
||||
if pressure >= PRESSURE_THRESHOLD {
|
||||
let new_count = ((workers * (100 + SCALE_PERCENT)) / 100).min(account_worker_max);
|
||||
account_worker_count_arc.store(new_count, Ordering::Relaxed);
|
||||
debug!(
|
||||
target: "engine::payload",
|
||||
workers,
|
||||
max_pending,
|
||||
pressure,
|
||||
new_count,
|
||||
"Scaling up account workers for next block due to queue pressure"
|
||||
);
|
||||
}
|
||||
});
|
||||
|
||||
// wire the sparse trie to the state root response receiver
|
||||
|
||||
@@ -97,6 +97,9 @@ pub struct ProofWorkerHandle {
|
||||
/// Counter tracking available account workers. Workers decrement when starting work,
|
||||
/// increment when finishing. Used to determine whether to chunk multiproofs.
|
||||
account_available_workers: Arc<AtomicUsize>,
|
||||
/// Tracks the maximum number of pending account tasks seen during execution.
|
||||
/// Used for dynamic worker scaling decisions.
|
||||
max_pending_account_tasks: Arc<AtomicUsize>,
|
||||
/// Total number of storage workers spawned
|
||||
storage_worker_count: usize,
|
||||
/// Total number of account workers spawned
|
||||
@@ -238,6 +241,7 @@ impl ProofWorkerHandle {
|
||||
account_work_tx,
|
||||
storage_available_workers,
|
||||
account_available_workers,
|
||||
max_pending_account_tasks: Arc::new(AtomicUsize::new(0)),
|
||||
storage_worker_count,
|
||||
account_worker_count,
|
||||
v2_proofs_enabled,
|
||||
@@ -269,6 +273,12 @@ impl ProofWorkerHandle {
|
||||
self.account_work_tx.len()
|
||||
}
|
||||
|
||||
/// Returns the maximum number of pending account tasks observed during execution.
|
||||
/// This is used for dynamic worker scaling decisions.
|
||||
pub fn max_pending_account_tasks(&self) -> usize {
|
||||
self.max_pending_account_tasks.load(Ordering::Relaxed)
|
||||
}
|
||||
|
||||
/// Returns the total number of storage workers in the pool.
|
||||
pub const fn total_storage_workers(&self) -> usize {
|
||||
self.storage_worker_count
|
||||
@@ -326,34 +336,39 @@ impl ProofWorkerHandle {
|
||||
&self,
|
||||
input: AccountMultiproofInput,
|
||||
) -> Result<(), ProviderError> {
|
||||
self.account_work_tx
|
||||
.send(AccountWorkerJob::AccountMultiproof { input: Box::new(input) })
|
||||
.map_err(|err| {
|
||||
let error =
|
||||
ProviderError::other(std::io::Error::other("account workers unavailable"));
|
||||
let result = self
|
||||
.account_work_tx
|
||||
.send(AccountWorkerJob::AccountMultiproof { input: Box::new(input) });
|
||||
|
||||
if let AccountWorkerJob::AccountMultiproof { input } = err.0 {
|
||||
let AccountMultiproofInput {
|
||||
proof_result_sender:
|
||||
ProofResultContext {
|
||||
sender: result_tx,
|
||||
sequence_number: seq,
|
||||
state,
|
||||
start_time: start,
|
||||
},
|
||||
..
|
||||
} = *input;
|
||||
// Track peak pending for scaling decisions (do this after send to include this task)
|
||||
let current_pending = self.account_work_tx.len();
|
||||
self.max_pending_account_tasks.fetch_max(current_pending, Ordering::Relaxed);
|
||||
|
||||
let _ = result_tx.send(ProofResultMessage {
|
||||
sequence_number: seq,
|
||||
result: Err(ParallelStateRootError::Provider(error.clone())),
|
||||
elapsed: start.elapsed(),
|
||||
state,
|
||||
});
|
||||
}
|
||||
result.map_err(|err| {
|
||||
let error = ProviderError::other(std::io::Error::other("account workers unavailable"));
|
||||
|
||||
error
|
||||
})
|
||||
if let AccountWorkerJob::AccountMultiproof { input } = err.0 {
|
||||
let AccountMultiproofInput {
|
||||
proof_result_sender:
|
||||
ProofResultContext {
|
||||
sender: result_tx,
|
||||
sequence_number: seq,
|
||||
state,
|
||||
start_time: start,
|
||||
},
|
||||
..
|
||||
} = *input;
|
||||
|
||||
let _ = result_tx.send(ProofResultMessage {
|
||||
sequence_number: seq,
|
||||
result: Err(ParallelStateRootError::Provider(error.clone())),
|
||||
elapsed: start.elapsed(),
|
||||
state,
|
||||
});
|
||||
}
|
||||
|
||||
error
|
||||
})
|
||||
}
|
||||
|
||||
/// Dispatch blinded storage node request to storage worker pool
|
||||
|
||||
Reference in New Issue
Block a user