fix: ensure tokio rt is always dropped on separate task (#6191)

This commit is contained in:
Matthias Seitz
2024-01-23 23:01:27 +01:00
committed by GitHub
parent 3fafa8c50f
commit 733d6ced31

View File

@@ -3,7 +3,7 @@
use futures::pin_mut;
use reth_tasks::{TaskExecutor, TaskManager};
use std::future::Future;
use tracing::trace;
use tracing::{debug, error, trace};
/// Executes CLI commands.
#[derive(Clone, Debug, Default)]
@@ -26,30 +26,30 @@ impl CliRunner {
F: Future<Output = Result<(), E>>,
E: Send + Sync + From<std::io::Error> + From<reth_tasks::PanickedTaskError> + 'static,
{
let AsyncCliRunner { context, task_manager, tokio_runtime } = AsyncCliRunner::new()?;
let AsyncCliRunner { context, mut task_manager, tokio_runtime } = AsyncCliRunner::new()?;
// Executes the command until it finished or ctrl-c was fired
let task_manager = tokio_runtime.block_on(run_to_completion_or_panic(
task_manager,
let command_res = tokio_runtime.block_on(run_to_completion_or_panic(
&mut task_manager,
run_until_ctrl_c(command(context)),
))?;
));
// after the command has finished or exit signal was received we shutdown the task manager
// which fires the shutdown signal to all tasks spawned via the task executor and
// awaiting on tasks spawned with graceful shutdown
task_manager.graceful_shutdown_with_timeout(std::time::Duration::from_secs(10));
if command_res.is_err() {
error!(target: "reth::cli", "shutting down due to error");
} else {
debug!(target: "reth::cli", "shutting down gracefully");
// after the command has finished or exit signal was received we shutdown the task
// manager which fires the shutdown signal to all tasks spawned via the task
// executor and awaiting on tasks spawned with graceful shutdown
task_manager.graceful_shutdown_with_timeout(std::time::Duration::from_secs(10));
}
// drop the tokio runtime on a separate thread because drop blocks until its pools
// (including blocking pool) are shutdown. In other words `drop(tokio_runtime)` would block
// the current thread but we want to exit right away.
std::thread::spawn(move || drop(tokio_runtime));
// give all tasks that are now being shut down some time to finish before tokio leaks them
// see [Runtime::shutdown_timeout](tokio::runtime::Runtime::shutdown_timeout)
// TODO: enable this again, when pipeline/stages are not longer blocking tasks
// warn!(target: "reth::cli", "Received shutdown signal, waiting up to 30 seconds for
// tasks."); tokio_runtime.shutdown_timeout(Duration::from_secs(30));
Ok(())
command_res
}
/// Executes a regular future until completion or until external signal received.
@@ -120,8 +120,10 @@ pub fn tokio_runtime() -> Result<tokio::runtime::Runtime, std::io::Error> {
tokio::runtime::Builder::new_multi_thread().enable_all().build()
}
/// Runs the given future to completion or until a critical task panicked
async fn run_to_completion_or_panic<F, E>(mut tasks: TaskManager, fut: F) -> Result<TaskManager, E>
/// Runs the given future to completion or until a critical task panicked.
///
/// Returns the error if a task panicked, or the given future returned an error.
async fn run_to_completion_or_panic<F, E>(tasks: &mut TaskManager, fut: F) -> Result<(), E>
where
F: Future<Output = Result<(), E>>,
E: Send + Sync + From<reth_tasks::PanickedTaskError> + 'static,
@@ -129,13 +131,13 @@ where
{
pin_mut!(fut);
tokio::select! {
err = &mut tasks => {
err = tasks => {
return Err(err.into())
},
res = fut => res?,
}
}
Ok(tasks)
Ok(())
}
/// Runs the future to completion or until: