refactor: slight cleanup

- Introduced `WorkerType` enum for better logging of worker types.
- Updated `ProofTaskManager` to spawn account workers alongside storage workers.
- Implemented `account_worker_loop` to handle account multiproof operations and collect storage proofs.
- Improved documentation on worker lifecycle and transaction reuse.
- Refactored task management to streamline the spawning of worker pools.
This commit is contained in:
Yong Kang
2025-10-08 08:17:38 +00:00
parent 2f10f99a83
commit f5538b25a8

View File

@@ -105,16 +105,17 @@ impl StorageWorkerJob {
///
/// # Architecture
///
/// This manager handles two distinct execution paths:
/// This manager handles three distinct execution paths:
///
/// 1. **Storage Worker Pool** (for storage trie operations):
/// 1. **Worker Pools** (for storage and account operations):
/// - Pre-spawned workers with dedicated long-lived transactions
/// - Handles `StorageProof` and `BlindedStorageNode` requests
/// - Tasks queued via crossbeam unbounded channel
/// - **Storage pool**: Handles `StorageProof` and `BlindedStorageNode` requests
/// - **Account pool**: Handles `AccountMultiproof` requests, delegates to storage pool
/// - Tasks queued via crossbeam unbounded channels
/// - Workers continuously process without transaction overhead
/// - Unbounded queue ensures all storage proofs benefit from transaction reuse
/// - Returns error if worker pool is unavailable (all workers panicked)
///
/// 2. **On-Demand Execution** (for account trie operations):
/// 2. **On-Demand Execution** (for blinded account node operations):
/// - Lazy transaction creation for `BlindedAccountNode` requests
/// - Transactions returned to pool after use for reuse
///
@@ -134,6 +135,12 @@ pub struct ProofTaskManager<Factory: DatabaseProviderFactory> {
/// May be less than requested if concurrency limits reduce the worker budget.
storage_worker_count: usize,
/// Sender for account worker jobs to worker pool.
account_work_tx: CrossbeamSender<AccountMultiproofJob>,
/// Number of account workers successfully spawned.
account_worker_count: usize,
/// Max number of database transactions to create for on-demand account trie operations.
max_concurrency: usize,
@@ -321,10 +328,9 @@ where
{
/// Creates a new [`ProofTaskManager`] with pre-spawned storage proof workers.
///
/// The `storage_worker_count` determines how many storage workers to spawn, and
/// The `storage_worker_count` determines how many storage workers to spawn,
/// `account_worker_count` determines how many account workers to spawn, and
/// `max_concurrency` determines the limit for on-demand operations (blinded account nodes).
/// These are now independent - storage workers are spawned as requested, and on-demand
/// operations use a separate concurrency pool for blinded account nodes.
/// Returns an error if the underlying provider fails to create the transactions required for
/// spawning workers.
pub fn new(
@@ -333,6 +339,7 @@ where
task_ctx: ProofTaskCtx,
max_concurrency: usize,
storage_worker_count: usize,
account_worker_count: usize,
) -> ProviderResult<Self> {
let (tx_sender, proof_task_rx) = channel();
@@ -852,9 +859,7 @@ impl AccountMultiproofJob {
/// Returns `Ok(())` if the error was sent successfully, or `Err(())` if the receiver was
/// dropped.
fn send_worker_unavailable_error(&self) -> Result<(), ()> {
let error = ParallelStateRootError::Other(
"Account worker pool unavailable".to_string(),
);
let error = ParallelStateRootError::Other("Account worker pool unavailable".to_string());
self.result_sender.send(Err(error)).map_err(|_| ())
}
}