diff --git a/Cargo.lock b/Cargo.lock index aad5ca73d9..7eed912ed9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4075,6 +4075,7 @@ dependencies = [ "reth-rpc-builder", "reth-staged-sync", "reth-stages", + "reth-tasks", "reth-tracing", "reth-transaction-pool", "serde", diff --git a/bin/reth/Cargo.toml b/bin/reth/Cargo.toml index c9338ea5ff..d4a91404d4 100644 --- a/bin/reth/Cargo.toml +++ b/bin/reth/Cargo.toml @@ -26,6 +26,7 @@ reth-network = {path = "../../crates/net/network", features = ["serde"] } reth-network-api = {path = "../../crates/net/network-api" } reth-downloaders = {path = "../../crates/net/downloaders", features = ["test-utils"] } reth-tracing = { path = "../../crates/tracing" } +reth-tasks = { path = "../../crates/tasks" } reth-net-nat = { path = "../../crates/net/nat" } reth-discv4 = { path = "../../crates/net/discv4" } diff --git a/bin/reth/src/cli.rs b/bin/reth/src/cli.rs index 944849dc81..ed9d0f9d5b 100644 --- a/bin/reth/src/cli.rs +++ b/bin/reth/src/cli.rs @@ -4,7 +4,9 @@ use std::str::FromStr; use crate::{ chain, db, dirs::{LogsDir, PlatformPath}, - node, p2p, stage, test_eth_chain, test_vectors, + node, p2p, + runner::CliRunner, + stage, test_eth_chain, test_vectors, }; use clap::{ArgAction, Args, Parser, Subcommand}; use reth_tracing::{ @@ -14,21 +16,23 @@ use reth_tracing::{ }; /// Parse CLI options, set up logging and run the chosen command. -pub async fn run() -> eyre::Result<()> { +pub fn run() -> eyre::Result<()> { let opt = Cli::parse(); let (layer, _guard) = opt.logs.layer(); reth_tracing::init(vec![layer, reth_tracing::stdout(opt.verbosity.directive())]); + let runner = CliRunner::default(); + match opt.command { - Commands::Node(command) => command.execute().await, - Commands::Init(command) => command.execute().await, - Commands::Import(command) => command.execute().await, - Commands::Db(command) => command.execute().await, - Commands::Stage(command) => command.execute().await, - Commands::P2P(command) => command.execute().await, - Commands::TestVectors(command) => command.execute().await, - Commands::TestEthChain(command) => command.execute().await, + Commands::Node(command) => runner.run_command_until_exit(|ctx| command.execute(ctx)), + Commands::Init(command) => runner.run_until_ctrl_c(command.execute()), + Commands::Import(command) => runner.run_until_ctrl_c(command.execute()), + Commands::Db(command) => runner.run_until_ctrl_c(command.execute()), + Commands::Stage(command) => runner.run_until_ctrl_c(command.execute()), + Commands::P2P(command) => runner.run_until_ctrl_c(command.execute()), + Commands::TestVectors(command) => runner.run_until_ctrl_c(command.execute()), + Commands::TestEthChain(command) => runner.run_until_ctrl_c(command.execute()), } } diff --git a/bin/reth/src/lib.rs b/bin/reth/src/lib.rs index 7bcd3a499c..244d5ef62f 100644 --- a/bin/reth/src/lib.rs +++ b/bin/reth/src/lib.rs @@ -15,6 +15,7 @@ pub mod dirs; pub mod node; pub mod p2p; pub mod prometheus_exporter; +pub mod runner; pub mod stage; pub mod test_eth_chain; pub mod test_vectors; diff --git a/bin/reth/src/main.rs b/bin/reth/src/main.rs index 11ac01de69..87198347e4 100644 --- a/bin/reth/src/main.rs +++ b/bin/reth/src/main.rs @@ -1,6 +1,5 @@ -#[tokio::main] -async fn main() { - if let Err(err) = reth::cli::run().await { +fn main() { + if let Err(err) = reth::cli::run() { eprintln!("Error: {err:?}"); std::process::exit(1); } diff --git a/bin/reth/src/node/mod.rs b/bin/reth/src/node/mod.rs index d6533ef64b..271acb8656 100644 --- a/bin/reth/src/node/mod.rs +++ b/bin/reth/src/node/mod.rs @@ -5,6 +5,7 @@ use crate::{ args::{NetworkArgs, RpcServerArgs}, dirs::{ConfigPath, DbPath, PlatformPath}, prometheus_exporter, + runner::CliContext, }; use clap::{crate_version, Parser}; use eyre::Context; @@ -105,7 +106,7 @@ pub struct Command { impl Command { /// Execute `node` command // TODO: RPC - pub async fn execute(self) -> eyre::Result<()> { + pub async fn execute(self, _ctx: CliContext) -> eyre::Result<()> { info!(target: "reth::cli", "reth {} starting", crate_version!()); // Raise the fd limit of the process. diff --git a/bin/reth/src/runner.rs b/bin/reth/src/runner.rs new file mode 100644 index 0000000000..8e6fe7ae1e --- /dev/null +++ b/bin/reth/src/runner.rs @@ -0,0 +1,106 @@ +//! Entrypoint for running commands. + +use futures::pin_mut; +use reth_tasks::{TaskExecutor, TaskManager}; +use std::{future::Future, time::Duration}; +use tracing::trace; + +/// Used to execute cli commands +#[derive(Default, Debug)] +#[non_exhaustive] +pub struct CliRunner; + +// === impl CliRunner === + +impl CliRunner { + /// Executes the given _async_ command on the tokio runtime until the command future resolves or + /// until the process receives a `SIGINT` or `SIGTERM` signal. + /// + /// Tasks spawned by the command via the [TaskExecutor] are shut down and an attempt is made to + /// drive their shutdown to completion after the command has finished. + pub fn run_command_until_exit( + self, + command: impl FnOnce(CliContext) -> F, + ) -> Result<(), E> + where + F: Future>, + E: Send + Sync + From + 'static, + { + let AsyncCliRunner { context, task_manager, tokio_runtime } = AsyncCliRunner::new()?; + + // Executes the command until it finished or ctrl-c was fired + tokio_runtime.block_on(run_until_ctrl_c(command(context)))?; + // after the command has finished or exit signal was received we drop the task manager which + // fires the shutdown signal to all tasks spawned via the task executor + drop(task_manager); + + // give all tasks that are now being shut down some time to finish before tokio leaks them + // see [Runtime::shutdown_timeout](tokio::runtime::Runtime::shutdown_timeout) + tokio_runtime.shutdown_timeout(Duration::from_secs(30)); + + Ok(()) + } + + /// Executes a regular future until completion or until external signal received. + pub fn run_until_ctrl_c(self, fut: F) -> Result<(), E> + where + F: Future>, + E: Send + Sync + From + 'static, + { + let tokio_runtime = tokio_runtime()?; + tokio_runtime.block_on(run_until_ctrl_c(fut))?; + Ok(()) + } +} + +/// [CliRunner] configuration when executing commands asynchronously +struct AsyncCliRunner { + context: CliContext, + task_manager: TaskManager, + tokio_runtime: tokio::runtime::Runtime, +} + +// === impl AsyncCliRunner === + +impl AsyncCliRunner { + /// Attempts to create a tokio Runtime and additional context required to execute commands + /// asynchronously. + fn new() -> Result { + let tokio_runtime = tokio_runtime()?; + let task_manager = TaskManager::new(tokio_runtime.handle().clone()); + let task_executor = task_manager.executor(); + Ok(Self { context: CliContext { task_executor }, task_manager, tokio_runtime }) + } +} + +/// Additional context provided by the [CliRunner] when executing commands +pub struct CliContext { + /// Used to execute/spawn tasks + pub task_executor: TaskExecutor, +} + +/// Creates a new default tokio multi-thread [Runtime](tokio::runtime::Runtime) with all features +/// enabled +pub fn tokio_runtime() -> Result { + tokio::runtime::Builder::new_multi_thread().enable_all().build() +} + +/// Runs the future to completion or until a `ctrl-c` was received. +async fn run_until_ctrl_c(fut: F) -> Result<(), E> +where + F: Future>, + E: Send + Sync + 'static, +{ + let ctrl_c = tokio::signal::ctrl_c(); + + pin_mut!(ctrl_c, fut); + + tokio::select! { + _ = ctrl_c => { + trace!(target: "reth::cli", "Received ctrl-c"); + }, + res = fut => res?, + } + + Ok(()) +}