From 733d6ced317dc38ebf802ca23eb1694a70593f62 Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Tue, 23 Jan 2024 23:01:27 +0100 Subject: [PATCH] fix: ensure tokio rt is always dropped on separate task (#6191) --- bin/reth/src/runner.rs | 40 +++++++++++++++++++++------------------- 1 file changed, 21 insertions(+), 19 deletions(-) diff --git a/bin/reth/src/runner.rs b/bin/reth/src/runner.rs index 963cb364d1..551dc50178 100644 --- a/bin/reth/src/runner.rs +++ b/bin/reth/src/runner.rs @@ -3,7 +3,7 @@ use futures::pin_mut; use reth_tasks::{TaskExecutor, TaskManager}; use std::future::Future; -use tracing::trace; +use tracing::{debug, error, trace}; /// Executes CLI commands. #[derive(Clone, Debug, Default)] @@ -26,30 +26,30 @@ impl CliRunner { F: Future>, E: Send + Sync + From + From + 'static, { - let AsyncCliRunner { context, task_manager, tokio_runtime } = AsyncCliRunner::new()?; + let AsyncCliRunner { context, mut task_manager, tokio_runtime } = AsyncCliRunner::new()?; // Executes the command until it finished or ctrl-c was fired - let task_manager = tokio_runtime.block_on(run_to_completion_or_panic( - task_manager, + let command_res = tokio_runtime.block_on(run_to_completion_or_panic( + &mut task_manager, run_until_ctrl_c(command(context)), - ))?; + )); - // after the command has finished or exit signal was received we shutdown the task manager - // which fires the shutdown signal to all tasks spawned via the task executor and - // awaiting on tasks spawned with graceful shutdown - task_manager.graceful_shutdown_with_timeout(std::time::Duration::from_secs(10)); + if command_res.is_err() { + error!(target: "reth::cli", "shutting down due to error"); + } else { + debug!(target: "reth::cli", "shutting down gracefully"); + // after the command has finished or exit signal was received we shutdown the task + // manager which fires the shutdown signal to all tasks spawned via the task + // executor and awaiting on tasks spawned with graceful shutdown + task_manager.graceful_shutdown_with_timeout(std::time::Duration::from_secs(10)); + } // drop the tokio runtime on a separate thread because drop blocks until its pools // (including blocking pool) are shutdown. In other words `drop(tokio_runtime)` would block // the current thread but we want to exit right away. std::thread::spawn(move || drop(tokio_runtime)); - // give all tasks that are now being shut down some time to finish before tokio leaks them - // see [Runtime::shutdown_timeout](tokio::runtime::Runtime::shutdown_timeout) - // TODO: enable this again, when pipeline/stages are not longer blocking tasks - // warn!(target: "reth::cli", "Received shutdown signal, waiting up to 30 seconds for - // tasks."); tokio_runtime.shutdown_timeout(Duration::from_secs(30)); - Ok(()) + command_res } /// Executes a regular future until completion or until external signal received. @@ -120,8 +120,10 @@ pub fn tokio_runtime() -> Result { tokio::runtime::Builder::new_multi_thread().enable_all().build() } -/// Runs the given future to completion or until a critical task panicked -async fn run_to_completion_or_panic(mut tasks: TaskManager, fut: F) -> Result +/// Runs the given future to completion or until a critical task panicked. +/// +/// Returns the error if a task panicked, or the given future returned an error. +async fn run_to_completion_or_panic(tasks: &mut TaskManager, fut: F) -> Result<(), E> where F: Future>, E: Send + Sync + From + 'static, @@ -129,13 +131,13 @@ where { pin_mut!(fut); tokio::select! { - err = &mut tasks => { + err = tasks => { return Err(err.into()) }, res = fut => res?, } } - Ok(tasks) + Ok(()) } /// Runs the future to completion or until: