mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-01-09 07:17:56 -05:00
refactor: decouple max proof task concurrency from inflight proof limits (#19171)
This commit is contained in:
@@ -6,9 +6,6 @@ pub const DEFAULT_PERSISTENCE_THRESHOLD: u64 = 2;
|
||||
/// How close to the canonical head we persist blocks.
|
||||
pub const DEFAULT_MEMORY_BLOCK_BUFFER_TARGET: u64 = 0;
|
||||
|
||||
/// Default maximum concurrency for on-demand proof tasks (blinded nodes)
|
||||
pub const DEFAULT_MAX_PROOF_TASK_CONCURRENCY: u64 = 256;
|
||||
|
||||
/// Minimum number of workers we allow configuring explicitly.
|
||||
pub const MIN_WORKER_COUNT: usize = 32;
|
||||
|
||||
@@ -102,8 +99,6 @@ pub struct TreeConfig {
|
||||
cross_block_cache_size: u64,
|
||||
/// Whether the host has enough parallelism to run state root task.
|
||||
has_enough_parallelism: bool,
|
||||
/// Maximum number of concurrent proof tasks
|
||||
max_proof_task_concurrency: u64,
|
||||
/// Whether multiproof task should chunk proof targets.
|
||||
multiproof_chunking_enabled: bool,
|
||||
/// Multiproof task chunk size for proof targets.
|
||||
@@ -153,7 +148,6 @@ impl Default for TreeConfig {
|
||||
state_provider_metrics: false,
|
||||
cross_block_cache_size: DEFAULT_CROSS_BLOCK_CACHE_SIZE,
|
||||
has_enough_parallelism: has_enough_parallelism(),
|
||||
max_proof_task_concurrency: DEFAULT_MAX_PROOF_TASK_CONCURRENCY,
|
||||
multiproof_chunking_enabled: true,
|
||||
multiproof_chunk_size: DEFAULT_MULTIPROOF_TASK_CHUNK_SIZE,
|
||||
reserved_cpu_cores: DEFAULT_RESERVED_CPU_CORES,
|
||||
@@ -184,7 +178,6 @@ impl TreeConfig {
|
||||
state_provider_metrics: bool,
|
||||
cross_block_cache_size: u64,
|
||||
has_enough_parallelism: bool,
|
||||
max_proof_task_concurrency: u64,
|
||||
multiproof_chunking_enabled: bool,
|
||||
multiproof_chunk_size: usize,
|
||||
reserved_cpu_cores: usize,
|
||||
@@ -196,7 +189,6 @@ impl TreeConfig {
|
||||
storage_worker_count: usize,
|
||||
account_worker_count: usize,
|
||||
) -> Self {
|
||||
assert!(max_proof_task_concurrency > 0, "max_proof_task_concurrency must be at least 1");
|
||||
Self {
|
||||
persistence_threshold,
|
||||
memory_block_buffer_target,
|
||||
@@ -210,7 +202,6 @@ impl TreeConfig {
|
||||
state_provider_metrics,
|
||||
cross_block_cache_size,
|
||||
has_enough_parallelism,
|
||||
max_proof_task_concurrency,
|
||||
multiproof_chunking_enabled,
|
||||
multiproof_chunk_size,
|
||||
reserved_cpu_cores,
|
||||
@@ -249,11 +240,6 @@ impl TreeConfig {
|
||||
self.max_execute_block_batch_size
|
||||
}
|
||||
|
||||
/// Return the maximum proof task concurrency.
|
||||
pub const fn max_proof_task_concurrency(&self) -> u64 {
|
||||
self.max_proof_task_concurrency
|
||||
}
|
||||
|
||||
/// Return whether the multiproof task chunking is enabled.
|
||||
pub const fn multiproof_chunking_enabled(&self) -> bool {
|
||||
self.multiproof_chunking_enabled
|
||||
@@ -420,16 +406,6 @@ impl TreeConfig {
|
||||
self
|
||||
}
|
||||
|
||||
/// Setter for maximum number of concurrent proof tasks.
|
||||
pub const fn with_max_proof_task_concurrency(
|
||||
mut self,
|
||||
max_proof_task_concurrency: u64,
|
||||
) -> Self {
|
||||
assert!(max_proof_task_concurrency > 0, "max_proof_task_concurrency must be at least 1");
|
||||
self.max_proof_task_concurrency = max_proof_task_concurrency;
|
||||
self
|
||||
}
|
||||
|
||||
/// Setter for whether multiproof task should chunk proof targets.
|
||||
pub const fn with_multiproof_chunking_enabled(
|
||||
mut self,
|
||||
|
||||
@@ -207,7 +207,6 @@ where
|
||||
);
|
||||
let storage_worker_count = config.storage_worker_count();
|
||||
let account_worker_count = config.account_worker_count();
|
||||
let max_proof_task_concurrency = config.max_proof_task_concurrency() as usize;
|
||||
let proof_handle = ProofWorkerHandle::new(
|
||||
self.executor.handle().clone(),
|
||||
consistent_view,
|
||||
@@ -216,15 +215,11 @@ where
|
||||
account_worker_count,
|
||||
);
|
||||
|
||||
// We set it to half of the proof task concurrency, because often for each multiproof we
|
||||
// spawn one Tokio task for the account proof, and one Tokio task for the storage proof.
|
||||
let max_multi_proof_task_concurrency = max_proof_task_concurrency / 2;
|
||||
let multi_proof_task = MultiProofTask::new(
|
||||
state_root_config,
|
||||
self.executor.clone(),
|
||||
proof_handle.clone(),
|
||||
to_sparse_trie,
|
||||
max_multi_proof_task_concurrency,
|
||||
config.multiproof_chunking_enabled().then_some(config.multiproof_chunk_size()),
|
||||
);
|
||||
|
||||
|
||||
@@ -34,6 +34,10 @@ use std::{
|
||||
};
|
||||
use tracing::{debug, error, instrument, trace};
|
||||
|
||||
/// Default upper bound for inflight multiproof calculations. These would be sitting in the queue
|
||||
/// waiting to be processed.
|
||||
const DEFAULT_MULTIPROOF_INFLIGHT_LIMIT: usize = 128;
|
||||
|
||||
/// A trie update that can be applied to sparse trie alongside the proofs for touched parts of the
|
||||
/// state.
|
||||
#[derive(Default, Debug)]
|
||||
@@ -338,8 +342,8 @@ impl MultiproofInput {
|
||||
/// availability has been signaled.
|
||||
#[derive(Debug)]
|
||||
pub struct MultiproofManager {
|
||||
/// Maximum number of concurrent calculations.
|
||||
max_concurrent: usize,
|
||||
/// Maximum number of proof calculations allowed to be inflight at once.
|
||||
inflight_limit: usize,
|
||||
/// Currently running calculations.
|
||||
inflight: usize,
|
||||
/// Queued calculations.
|
||||
@@ -370,11 +374,10 @@ impl MultiproofManager {
|
||||
executor: WorkloadExecutor,
|
||||
metrics: MultiProofTaskMetrics,
|
||||
proof_worker_handle: ProofWorkerHandle,
|
||||
max_concurrent: usize,
|
||||
) -> Self {
|
||||
Self {
|
||||
pending: VecDeque::with_capacity(max_concurrent),
|
||||
max_concurrent,
|
||||
pending: VecDeque::with_capacity(DEFAULT_MULTIPROOF_INFLIGHT_LIMIT),
|
||||
inflight_limit: DEFAULT_MULTIPROOF_INFLIGHT_LIMIT,
|
||||
executor,
|
||||
inflight: 0,
|
||||
metrics,
|
||||
@@ -384,11 +387,10 @@ impl MultiproofManager {
|
||||
}
|
||||
|
||||
const fn is_full(&self) -> bool {
|
||||
self.inflight >= self.max_concurrent
|
||||
self.inflight >= self.inflight_limit
|
||||
}
|
||||
|
||||
/// Spawns a new multiproof calculation or enqueues it for later if
|
||||
/// `max_concurrent` are already inflight.
|
||||
/// Spawns a new multiproof calculation or enqueues it if the inflight limit is reached.
|
||||
fn spawn_or_queue(&mut self, input: PendingMultiproofTask) {
|
||||
// If there are no proof targets, we can just send an empty multiproof back immediately
|
||||
if input.proof_targets_is_empty() {
|
||||
@@ -685,7 +687,6 @@ impl MultiProofTask {
|
||||
executor: WorkloadExecutor,
|
||||
proof_worker_handle: ProofWorkerHandle,
|
||||
to_sparse_trie: Sender<SparseTrieUpdate>,
|
||||
max_concurrency: usize,
|
||||
chunk_size: Option<usize>,
|
||||
) -> Self {
|
||||
let (tx, rx) = channel();
|
||||
@@ -704,7 +705,6 @@ impl MultiProofTask {
|
||||
executor,
|
||||
metrics.clone(),
|
||||
proof_worker_handle,
|
||||
max_concurrency,
|
||||
),
|
||||
metrics,
|
||||
}
|
||||
@@ -1239,7 +1239,7 @@ mod tests {
|
||||
ProofWorkerHandle::new(executor.handle().clone(), consistent_view, task_ctx, 1, 1);
|
||||
let channel = channel();
|
||||
|
||||
MultiProofTask::new(config, executor, proof_handle, channel.0, 1, None)
|
||||
MultiProofTask::new(config, executor, proof_handle, channel.0, Some(1))
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -4,8 +4,8 @@ use clap::Args;
|
||||
use reth_engine_primitives::{TreeConfig, DEFAULT_MULTIPROOF_TASK_CHUNK_SIZE};
|
||||
|
||||
use crate::node_config::{
|
||||
DEFAULT_CROSS_BLOCK_CACHE_SIZE_MB, DEFAULT_MAX_PROOF_TASK_CONCURRENCY,
|
||||
DEFAULT_MEMORY_BLOCK_BUFFER_TARGET, DEFAULT_PERSISTENCE_THRESHOLD, DEFAULT_RESERVED_CPU_CORES,
|
||||
DEFAULT_CROSS_BLOCK_CACHE_SIZE_MB, DEFAULT_MEMORY_BLOCK_BUFFER_TARGET,
|
||||
DEFAULT_PERSISTENCE_THRESHOLD, DEFAULT_RESERVED_CPU_CORES,
|
||||
};
|
||||
|
||||
/// Parameters for configuring the engine driver.
|
||||
@@ -63,10 +63,6 @@ pub struct EngineArgs {
|
||||
#[arg(long = "engine.accept-execution-requests-hash")]
|
||||
pub accept_execution_requests_hash: bool,
|
||||
|
||||
/// Configure the maximum number of concurrent proof tasks
|
||||
#[arg(long = "engine.max-proof-task-concurrency", default_value_t = DEFAULT_MAX_PROOF_TASK_CONCURRENCY)]
|
||||
pub max_proof_task_concurrency: u64,
|
||||
|
||||
/// Whether multiproof task should chunk proof targets.
|
||||
#[arg(long = "engine.multiproof-chunking", default_value = "true")]
|
||||
pub multiproof_chunking_enabled: bool,
|
||||
@@ -135,7 +131,6 @@ impl Default for EngineArgs {
|
||||
state_provider_metrics: false,
|
||||
cross_block_cache_size: DEFAULT_CROSS_BLOCK_CACHE_SIZE_MB,
|
||||
accept_execution_requests_hash: false,
|
||||
max_proof_task_concurrency: DEFAULT_MAX_PROOF_TASK_CONCURRENCY,
|
||||
multiproof_chunking_enabled: true,
|
||||
multiproof_chunk_size: DEFAULT_MULTIPROOF_TASK_CHUNK_SIZE,
|
||||
reserved_cpu_cores: DEFAULT_RESERVED_CPU_CORES,
|
||||
@@ -162,7 +157,6 @@ impl EngineArgs {
|
||||
.with_state_provider_metrics(self.state_provider_metrics)
|
||||
.with_always_compare_trie_updates(self.state_root_task_compare_updates)
|
||||
.with_cross_block_cache_size(self.cross_block_cache_size * 1024 * 1024)
|
||||
.with_max_proof_task_concurrency(self.max_proof_task_concurrency)
|
||||
.with_multiproof_chunking_enabled(self.multiproof_chunking_enabled)
|
||||
.with_multiproof_chunk_size(self.multiproof_chunk_size)
|
||||
.with_reserved_cpu_cores(self.reserved_cpu_cores)
|
||||
|
||||
@@ -34,8 +34,7 @@ use tracing::*;
|
||||
|
||||
use crate::args::{EraArgs, MetricArgs};
|
||||
pub use reth_engine_primitives::{
|
||||
DEFAULT_MAX_PROOF_TASK_CONCURRENCY, DEFAULT_MEMORY_BLOCK_BUFFER_TARGET,
|
||||
DEFAULT_PERSISTENCE_THRESHOLD, DEFAULT_RESERVED_CPU_CORES,
|
||||
DEFAULT_MEMORY_BLOCK_BUFFER_TARGET, DEFAULT_PERSISTENCE_THRESHOLD, DEFAULT_RESERVED_CPU_CORES,
|
||||
};
|
||||
|
||||
/// Default size of cross-block cache in megabytes.
|
||||
|
||||
@@ -269,11 +269,6 @@ fn preprocess_help(s: &str) -> Cow<'_, str> {
|
||||
r"(rpc.max-tracing-requests <COUNT>\n.*\n.*\n.*\n.*\n.*)\[default: \d+\]",
|
||||
r"$1[default: <NUM CPU CORES-2>]",
|
||||
),
|
||||
// Handle engine.max-proof-task-concurrency dynamic default
|
||||
(
|
||||
r"(engine\.max-proof-task-concurrency.*)\[default: \d+\]",
|
||||
r"$1[default: <DYNAMIC: CPU cores * 8>]",
|
||||
),
|
||||
// Handle engine.reserved-cpu-cores dynamic default
|
||||
(
|
||||
r"(engine\.reserved-cpu-cores.*)\[default: \d+\]",
|
||||
|
||||
@@ -840,11 +840,6 @@ Engine:
|
||||
--engine.accept-execution-requests-hash
|
||||
Enables accepting requests hash instead of an array of requests in `engine_newPayloadV4`
|
||||
|
||||
--engine.max-proof-task-concurrency <MAX_PROOF_TASK_CONCURRENCY>
|
||||
Configure the maximum number of concurrent proof tasks
|
||||
|
||||
[default: 256]
|
||||
|
||||
--engine.multiproof-chunking
|
||||
Whether multiproof task should chunk proof targets
|
||||
|
||||
|
||||
Reference in New Issue
Block a user