From bc9722d9e2b880bda36eba367a05e6abc7f8cc9e Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin <5773434+shekhirin@users.noreply.github.com> Date: Mon, 12 May 2025 12:21:48 +0100 Subject: [PATCH] feat(engine): set keep alive for Tokio threads (#16162) --- .../src/tree/payload_processor/executor.rs | 23 +++++++++++++++---- 1 file changed, 19 insertions(+), 4 deletions(-) diff --git a/crates/engine/tree/src/tree/payload_processor/executor.rs b/crates/engine/tree/src/tree/payload_processor/executor.rs index c7fb37f8a9..3013c5e1c7 100644 --- a/crates/engine/tree/src/tree/payload_processor/executor.rs +++ b/crates/engine/tree/src/tree/payload_processor/executor.rs @@ -1,9 +1,12 @@ //! Executor for mixed I/O and CPU workloads. use rayon::ThreadPool as RayonPool; -use std::sync::{Arc, OnceLock}; +use std::{ + sync::{Arc, OnceLock}, + time::Duration, +}; use tokio::{ - runtime::{Handle, Runtime}, + runtime::{Builder, Handle, Runtime}, task::JoinHandle, }; @@ -66,10 +69,22 @@ impl WorkloadExecutorInner { fn new(rayon_pool: rayon::ThreadPool) -> Self { fn get_runtime_handle() -> Handle { Handle::try_current().unwrap_or_else(|_| { - // Create a new runtime if now runtime is available + // Create a new runtime if no runtime is available static RT: OnceLock = OnceLock::new(); - let rt = RT.get_or_init(|| Runtime::new().unwrap()); + let rt = RT.get_or_init(|| { + Builder::new_multi_thread() + .enable_all() + // Keep the threads alive for at least the block time, which is 12 seconds + // at the time of writing, plus a little extra. + // + // This is to prevent the costly process of spawning new threads on every + // new block, and instead reuse the existing + // threads. + .thread_keep_alive(Duration::from_secs(15)) + .build() + .unwrap() + }); rt.handle().clone() })