mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-01-09 23:38:10 -05:00
feat: configuration of the tokio runtime (#14837)
This commit is contained in:
@@ -30,7 +30,7 @@ fn main() {
|
||||
}
|
||||
|
||||
// Run until either exit or sigint or sigterm
|
||||
let runner = CliRunner::default();
|
||||
let runner = CliRunner::try_default_runtime().unwrap();
|
||||
runner
|
||||
.run_command_until_exit(|ctx| {
|
||||
let command = BenchmarkCommand::parse();
|
||||
|
||||
@@ -98,6 +98,8 @@ impl<C: ChainSpecParser<ChainSpec = ChainSpec>, Ext: clap::Args + fmt::Debug> Cl
|
||||
/// This accepts a closure that is used to launch the node via the
|
||||
/// [`NodeCommand`](node::NodeCommand).
|
||||
///
|
||||
/// This command will be run on the [default tokio runtime](reth_cli_runner::tokio_runtime).
|
||||
///
|
||||
///
|
||||
/// # Example
|
||||
///
|
||||
@@ -131,11 +133,43 @@ impl<C: ChainSpecParser<ChainSpec = ChainSpec>, Ext: clap::Args + fmt::Debug> Cl
|
||||
/// Cli::<EthereumChainSpecParser, MyArgs>::parse()
|
||||
/// .run(async move |builder, my_args: MyArgs|
|
||||
/// // launch the node
|
||||
///
|
||||
/// Ok(()))
|
||||
/// .unwrap();
|
||||
/// ````
|
||||
pub fn run<L, Fut>(mut self, launcher: L) -> eyre::Result<()>
|
||||
pub fn run<L, Fut>(self, launcher: L) -> eyre::Result<()>
|
||||
where
|
||||
L: FnOnce(WithLaunchContext<NodeBuilder<Arc<DatabaseEnv>, C::ChainSpec>>, Ext) -> Fut,
|
||||
Fut: Future<Output = eyre::Result<()>>,
|
||||
{
|
||||
self.with_runner(CliRunner::try_default_runtime()?, launcher)
|
||||
}
|
||||
|
||||
/// Execute the configured cli command with the provided [`CliRunner`].
|
||||
///
|
||||
///
|
||||
/// # Example
|
||||
///
|
||||
/// ```no_run
|
||||
/// use reth::cli::Cli;
|
||||
/// use reth_cli_runner::CliRunner;
|
||||
/// use reth_node_ethereum::EthereumNode;
|
||||
///
|
||||
/// let runtime = tokio::runtime::Builder::new_multi_thread()
|
||||
/// .worker_threads(4)
|
||||
/// .max_blocking_threads(256)
|
||||
/// .enable_all()
|
||||
/// .build()
|
||||
/// .unwrap();
|
||||
/// let runner = CliRunner::from_runtime(runtime);
|
||||
///
|
||||
/// Cli::parse_args()
|
||||
/// .with_runner(runner, |builder, _| async move {
|
||||
/// let handle = builder.launch_node(EthereumNode::default()).await?;
|
||||
/// handle.wait_for_node_exit().await
|
||||
/// })
|
||||
/// .unwrap();
|
||||
/// ```
|
||||
pub fn with_runner<L, Fut>(mut self, runner: CliRunner, launcher: L) -> eyre::Result<()>
|
||||
where
|
||||
L: FnOnce(WithLaunchContext<NodeBuilder<Arc<DatabaseEnv>, C::ChainSpec>>, Ext) -> Fut,
|
||||
Fut: Future<Output = eyre::Result<()>>,
|
||||
@@ -150,7 +184,6 @@ impl<C: ChainSpecParser<ChainSpec = ChainSpec>, Ext: clap::Args + fmt::Debug> Cl
|
||||
// Install the prometheus recorder to be sure to record all metrics
|
||||
let _ = install_prometheus_recorder();
|
||||
|
||||
let runner = CliRunner::default();
|
||||
let components = |spec: Arc<C::ChainSpec>| {
|
||||
(EthExecutorProvider::ethereum(spec.clone()), EthBeaconConsensus::new(spec))
|
||||
};
|
||||
|
||||
@@ -52,12 +52,10 @@ pub trait RethCli: Sized {
|
||||
}
|
||||
|
||||
/// Executes a command.
|
||||
fn with_runner<F, R>(self, f: F) -> R
|
||||
fn with_runner<F, R>(self, f: F, runner: CliRunner) -> R
|
||||
where
|
||||
F: FnOnce(Self, CliRunner) -> R,
|
||||
{
|
||||
let runner = CliRunner::default();
|
||||
|
||||
f(self, runner)
|
||||
}
|
||||
|
||||
@@ -68,8 +66,8 @@ pub trait RethCli: Sized {
|
||||
F: FnOnce(Self, CliRunner) -> R,
|
||||
{
|
||||
let cli = Self::parse_args()?;
|
||||
|
||||
Ok(cli.with_runner(f))
|
||||
let runner = CliRunner::try_default_runtime()?;
|
||||
Ok(cli.with_runner(f, runner))
|
||||
}
|
||||
|
||||
/// The client version of the node.
|
||||
|
||||
@@ -17,9 +17,26 @@ use tracing::{debug, error, trace};
|
||||
/// Executes CLI commands.
|
||||
///
|
||||
/// Provides utilities for running a cli command to completion.
|
||||
#[derive(Clone, Debug, Default)]
|
||||
#[derive(Debug)]
|
||||
#[non_exhaustive]
|
||||
pub struct CliRunner;
|
||||
pub struct CliRunner {
|
||||
tokio_runtime: tokio::runtime::Runtime,
|
||||
}
|
||||
|
||||
impl CliRunner {
|
||||
/// Attempts to create a new [`CliRunner`] using the default tokio
|
||||
/// [`Runtime`](tokio::runtime::Runtime).
|
||||
///
|
||||
/// The default tokio runtime is multi-threaded, with both I/O and time drivers enabled.
|
||||
pub fn try_default_runtime() -> Result<Self, std::io::Error> {
|
||||
Ok(Self { tokio_runtime: tokio_runtime()? })
|
||||
}
|
||||
|
||||
/// Create a new [`CliRunner`] from a provided tokio [`Runtime`](tokio::runtime::Runtime).
|
||||
pub fn from_runtime(tokio_runtime: tokio::runtime::Runtime) -> Self {
|
||||
Self { tokio_runtime }
|
||||
}
|
||||
}
|
||||
|
||||
// === impl CliRunner ===
|
||||
|
||||
@@ -37,7 +54,8 @@ impl CliRunner {
|
||||
F: Future<Output = Result<(), E>>,
|
||||
E: Send + Sync + From<std::io::Error> + From<reth_tasks::PanickedTaskError> + 'static,
|
||||
{
|
||||
let AsyncCliRunner { context, mut task_manager, tokio_runtime } = AsyncCliRunner::new()?;
|
||||
let AsyncCliRunner { context, mut task_manager, tokio_runtime } =
|
||||
AsyncCliRunner::new(self.tokio_runtime);
|
||||
|
||||
// Executes the command until it finished or ctrl-c was fired
|
||||
let command_res = tokio_runtime.block_on(run_to_completion_or_panic(
|
||||
@@ -123,13 +141,12 @@ struct AsyncCliRunner {
|
||||
// === impl AsyncCliRunner ===
|
||||
|
||||
impl AsyncCliRunner {
|
||||
/// Attempts to create a tokio Runtime and additional context required to execute commands
|
||||
/// asynchronously.
|
||||
fn new() -> Result<Self, std::io::Error> {
|
||||
let tokio_runtime = tokio_runtime()?;
|
||||
/// Given a tokio [`Runtime`](tokio::runtime::Runtime), creates additional context required to
|
||||
/// execute commands asynchronously.
|
||||
fn new(tokio_runtime: tokio::runtime::Runtime) -> Self {
|
||||
let task_manager = TaskManager::new(tokio_runtime.handle().clone());
|
||||
let task_executor = task_manager.executor();
|
||||
Ok(Self { context: CliContext { task_executor }, task_manager, tokio_runtime })
|
||||
Self { context: CliContext { task_executor }, task_manager, tokio_runtime }
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -129,7 +129,16 @@ where
|
||||
///
|
||||
/// This accepts a closure that is used to launch the node via the
|
||||
/// [`NodeCommand`](reth_cli_commands::node::NodeCommand).
|
||||
pub fn run<L, Fut>(mut self, launcher: L) -> eyre::Result<()>
|
||||
pub fn run<L, Fut>(self, launcher: L) -> eyre::Result<()>
|
||||
where
|
||||
L: FnOnce(WithLaunchContext<NodeBuilder<Arc<DatabaseEnv>, C::ChainSpec>>, Ext) -> Fut,
|
||||
Fut: Future<Output = eyre::Result<()>>,
|
||||
{
|
||||
self.with_runner(CliRunner::try_default_runtime()?, launcher)
|
||||
}
|
||||
|
||||
/// Execute the configured cli command with the provided [`CliRunner`].
|
||||
pub fn with_runner<L, Fut>(mut self, runner: CliRunner, launcher: L) -> eyre::Result<()>
|
||||
where
|
||||
L: FnOnce(WithLaunchContext<NodeBuilder<Arc<DatabaseEnv>, C::ChainSpec>>, Ext) -> Fut,
|
||||
Fut: Future<Output = eyre::Result<()>>,
|
||||
@@ -144,7 +153,6 @@ where
|
||||
// Install the prometheus recorder to be sure to record all metrics
|
||||
let _ = install_prometheus_recorder();
|
||||
|
||||
let runner = CliRunner::default();
|
||||
match self.command {
|
||||
Commands::Node(mut command) => {
|
||||
// TODO: remove when we're ready to roll out State Root Task on OP-Reth
|
||||
|
||||
Reference in New Issue
Block a user