From 3fafa8c50f19ed19401dc7ee681670fde3427118 Mon Sep 17 00:00:00 2001 From: Dan Cline <6798349+Rjected@users.noreply.github.com> Date: Tue, 23 Jan 2024 14:49:24 -0500 Subject: [PATCH] chore: node-core cleanup (#6189) --- Cargo.lock | 33 ++- Cargo.toml | 1 + bin/reth/Cargo.toml | 9 +- bin/reth/build.rs | 15 -- bin/reth/src/cli/mod.rs | 2 - bin/reth/src/commands/debug_cmd/mod.rs | 2 - .../src/commands/debug_cmd/replay_engine.rs | 118 +-------- bin/reth/src/commands/node/mod.rs | 12 +- bin/reth/src/lib.rs | 19 +- bin/reth/src/utils.rs | 178 +++++++++++++ crates/node-core/Cargo.toml | 43 ++- .../node-core/src}/cl_events.rs | 0 .../node-core}/src/cli/db_type.rs | 0 crates/node-core/src/cli/mod.rs | 1 + crates/node-core/src/engine_api_store.rs | 130 +++++++++ .../node => crates/node-core/src}/events.rs | 4 +- crates/node-core/src/lib.rs | 6 +- .../node-core/src/node_config.rs | 29 +- .../node-core}/src/prometheus_exporter.rs | 6 +- crates/node-core/src/utils.rs | 248 +++--------------- 20 files changed, 469 insertions(+), 387 deletions(-) delete mode 100644 bin/reth/build.rs create mode 100644 bin/reth/src/utils.rs rename {bin/reth/src/commands/node => crates/node-core/src}/cl_events.rs (100%) rename {bin/reth => crates/node-core}/src/cli/db_type.rs (100%) create mode 100644 crates/node-core/src/engine_api_store.rs rename {bin/reth/src/commands/node => crates/node-core/src}/events.rs (99%) rename bin/reth/src/builder/mod.rs => crates/node-core/src/node_config.rs (99%) rename {bin/reth => crates/node-core}/src/prometheus_exporter.rs (98%) diff --git a/Cargo.lock b/Cargo.lock index d56bf5ec50..ead9099870 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5762,6 +5762,7 @@ dependencies = [ "aquamarine", "assert_matches", "backon", + "boyer-moore-magiclen", "clap", "comfy-table", "confy", @@ -5783,7 +5784,6 @@ dependencies = [ "once_cell", "pin-project", "pretty_assertions", - "procfs", "proptest", "rand 0.8.5", "ratatui", @@ -5833,7 +5833,6 @@ dependencies = [ "tokio", "toml 0.8.8", "tracing", - "vergen", ] [[package]] @@ -6447,42 +6446,70 @@ dependencies = [ name = "reth-node-core" version = "0.1.0-alpha.16" dependencies = [ + "alloy-chains", "alloy-rlp", - "boyer-moore-magiclen", + "assert_matches", "clap", + "confy", "const-str", "dirs-next", "eyre", + "fdlimit", "futures", "humantime", + "hyper", + "jsonrpsee", + "metrics", + "metrics-exporter-prometheus", + "metrics-process", + "metrics-util", + "once_cell", + "pin-project", + "procfs", "proptest", "rand 0.8.5", + "reth-auto-seal-consensus", "reth-basic-payload-builder", + "reth-beacon-consensus", + "reth-blockchain-tree", "reth-config", "reth-consensus-common", "reth-db", "reth-discv4", + "reth-downloaders", + "reth-ethereum-payload-builder", "reth-interfaces", + "reth-metrics", "reth-net-nat", "reth-network", "reth-network-api", "reth-node-api", + "reth-node-builder", + "reth-optimism-payload-builder", "reth-payload-builder", "reth-primitives", "reth-provider", + "reth-prune", + "reth-revm", "reth-rpc", "reth-rpc-api", "reth-rpc-builder", "reth-rpc-engine-api", "reth-rpc-types", "reth-rpc-types-compat", + "reth-snapshot", + "reth-stages", "reth-tasks", "reth-tracing", "reth-transaction-pool", + "revm-inspectors", "secp256k1 0.27.0", + "serde", "serde_json", "shellexpand", + "tempfile", "thiserror", + "tokio", "tracing", "vergen", ] diff --git a/Cargo.toml b/Cargo.toml index 88fe7bb7d2..069479ff3c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -123,6 +123,7 @@ reth-ecies = { path = "crates/net/ecies" } reth-eth-wire = { path = "crates/net/eth-wire" } reth-ethereum-forks = { path = "crates/ethereum-forks" } reth-ethereum-payload-builder = { path = "crates/payload/ethereum" } +reth-optimism-payload-builder = { path = "crates/payload/optimism" } reth-interfaces = { path = "crates/interfaces" } reth-ipc = { path = "crates/rpc/ipc" } reth-libmdbx = { path = "crates/storage/libmdbx-rs" } diff --git a/bin/reth/Cargo.toml b/bin/reth/Cargo.toml index b692fbc9fc..c17a6407df 100644 --- a/bin/reth/Cargo.toml +++ b/bin/reth/Cargo.toml @@ -43,7 +43,7 @@ reth-network-api.workspace = true reth-downloaders.workspace = true reth-tracing.workspace = true reth-tasks.workspace = true -reth-optimism-payload-builder = { path = "../../crates/payload/optimism", optional = true } +reth-optimism-payload-builder = { workspace = true, optional = true } reth-ethereum-payload-builder.workspace = true reth-payload-builder.workspace = true reth-payload-validator.workspace = true @@ -109,14 +109,12 @@ pretty_assertions = "1.3.0" humantime = "2.1.0" itertools.workspace = true rayon.workspace = true +boyer-moore-magiclen = "0.2.16" [target.'cfg(not(windows))'.dependencies] jemallocator = { version = "0.5.0", optional = true } jemalloc-ctl = { version = "0.5.0", optional = true } -[target.'cfg(target_os = "linux")'.dependencies] -procfs = { version = "0.16.0" } - [dev-dependencies] jsonrpsee.workspace = true assert_matches = "1.5.0" @@ -159,9 +157,6 @@ optimism = [ # no-op feature flag for switching between the `optimism` and default functionality in CI matrices ethereum = [] -[build-dependencies] -vergen = { version = "8.0.0", features = ["build", "cargo", "git", "git2"] } - [[bin]] name = "reth" path = "src/main.rs" diff --git a/bin/reth/build.rs b/bin/reth/build.rs deleted file mode 100644 index f24f9b22db..0000000000 --- a/bin/reth/build.rs +++ /dev/null @@ -1,15 +0,0 @@ -#![allow(missing_docs)] - -use std::error::Error; -use vergen::EmitBuilder; - -fn main() -> Result<(), Box> { - // Emit the instructions - EmitBuilder::builder() - .git_sha(true) - .build_timestamp() - .cargo_features() - .cargo_target_triple() - .emit()?; - Ok(()) -} diff --git a/bin/reth/src/cli/mod.rs b/bin/reth/src/cli/mod.rs index 64f013addc..659e36bfea 100644 --- a/bin/reth/src/cli/mod.rs +++ b/bin/reth/src/cli/mod.rs @@ -24,8 +24,6 @@ use std::sync::Arc; /// change. pub use crate::core::cli::*; -pub mod db_type; - /// The main reth cli interface. /// /// This is the entrypoint to the executable. diff --git a/bin/reth/src/commands/debug_cmd/mod.rs b/bin/reth/src/commands/debug_cmd/mod.rs index 2ea23b6093..688b4d693d 100644 --- a/bin/reth/src/commands/debug_cmd/mod.rs +++ b/bin/reth/src/commands/debug_cmd/mod.rs @@ -8,9 +8,7 @@ mod build_block; mod execution; mod in_memory_merkle; mod merkle; - mod replay_engine; -pub(crate) use replay_engine::EngineApiStore; /// `reth debug` command #[derive(Debug, Parser)] diff --git a/bin/reth/src/commands/debug_cmd/replay_engine.rs b/bin/reth/src/commands/debug_cmd/replay_engine.rs index 49fbb892a0..4a5d9978ba 100644 --- a/bin/reth/src/commands/debug_cmd/replay_engine.rs +++ b/bin/reth/src/commands/debug_cmd/replay_engine.rs @@ -4,15 +4,14 @@ use crate::{ utils::{chain_help, genesis_value_parser, SUPPORTED_CHAINS}, DatabaseArgs, NetworkArgs, }, + core::engine_api_store::{EngineApiStore, StoredEngineApiMessage}, dirs::{DataDirPath, MaybePlatformPath}, runner::CliContext, }; use clap::Parser; use eyre::Context; use reth_basic_payload_builder::{BasicPayloadJobGenerator, BasicPayloadJobGeneratorConfig}; -use reth_beacon_consensus::{ - hooks::EngineHooks, BeaconConsensus, BeaconConsensusEngine, BeaconEngineMessage, -}; +use reth_beacon_consensus::{hooks::EngineHooks, BeaconConsensus, BeaconConsensusEngine}; use reth_blockchain_tree::{ BlockchainTree, BlockchainTreeConfig, ShareableBlockchainTree, TreeExternals, }; @@ -21,37 +20,24 @@ use reth_db::{init_db, mdbx::DatabaseArguments, DatabaseEnv}; use reth_interfaces::consensus::Consensus; use reth_network::NetworkHandle; use reth_network_api::NetworkInfo; -use reth_node_api::EngineTypes; #[cfg(not(feature = "optimism"))] use reth_node_builder::EthEngineTypes; #[cfg(feature = "optimism")] use reth_node_builder::OptimismEngineTypes; use reth_payload_builder::{PayloadBuilderHandle, PayloadBuilderService}; -use reth_primitives::{ - fs::{self}, - ChainSpec, -}; +use reth_primitives::{fs, ChainSpec}; use reth_provider::{providers::BlockchainProvider, CanonStateSubscriptions, ProviderFactory}; use reth_revm::EvmProcessorFactory; -use reth_rpc_types::{ - engine::{CancunPayloadFields, ForkchoiceState}, - ExecutionPayload, -}; use reth_stages::Pipeline; use reth_tasks::TaskExecutor; use reth_transaction_pool::noop::NoopTransactionPool; -use serde::{Deserialize, Serialize}; use std::{ - collections::BTreeMap, net::{SocketAddr, SocketAddrV4}, path::PathBuf, sync::Arc, - time::{Duration, SystemTime}, -}; -use tokio::sync::{ - mpsc::{self, UnboundedReceiver, UnboundedSender}, - oneshot, + time::Duration, }; +use tokio::sync::{mpsc, oneshot}; use tracing::*; /// `reth debug replay-engine` command @@ -257,97 +243,3 @@ impl Command { Ok(()) } } - -#[derive(Debug, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -enum StoredEngineApiMessage { - ForkchoiceUpdated { state: ForkchoiceState, payload_attrs: Option }, - NewPayload { payload: ExecutionPayload, cancun_fields: Option }, -} - -#[derive(Debug)] -pub(crate) struct EngineApiStore { - path: PathBuf, -} - -impl EngineApiStore { - pub(crate) fn new(path: PathBuf) -> Self { - Self { path } - } - - fn on_message( - &self, - msg: &BeaconEngineMessage, - received_at: SystemTime, - ) -> eyre::Result<()> - where - Engine: EngineTypes, - { - fs::create_dir_all(&self.path)?; // ensure that store path had been created - let timestamp = received_at.duration_since(SystemTime::UNIX_EPOCH).unwrap().as_millis(); - match msg { - BeaconEngineMessage::ForkchoiceUpdated { state, payload_attrs, tx: _tx } => { - let filename = format!("{}-fcu-{}.json", timestamp, state.head_block_hash); - fs::write( - self.path.join(filename), - serde_json::to_vec(&StoredEngineApiMessage::ForkchoiceUpdated { - state: *state, - payload_attrs: payload_attrs.clone(), - })?, - )?; - } - BeaconEngineMessage::NewPayload { payload, cancun_fields, tx: _tx } => { - let filename = format!("{}-new_payload-{}.json", timestamp, payload.block_hash()); - fs::write( - self.path.join(filename), - serde_json::to_vec( - &StoredEngineApiMessage::::NewPayload { - payload: payload.clone(), - cancun_fields: cancun_fields.clone(), - }, - )?, - )?; - } - // noop - BeaconEngineMessage::TransitionConfigurationExchanged | - BeaconEngineMessage::EventListener(_) => (), - }; - Ok(()) - } - - pub(crate) fn engine_messages_iter(&self) -> eyre::Result> { - let mut filenames_by_ts = BTreeMap::>::default(); - for entry in fs::read_dir(&self.path)? { - let entry = entry?; - let filename = entry.file_name(); - if let Some(filename) = filename.to_str().filter(|n| n.ends_with(".json")) { - if let Some(Ok(timestamp)) = filename.split('-').next().map(|n| n.parse::()) { - filenames_by_ts.entry(timestamp).or_default().push(entry.path()); - tracing::debug!(target: "engine::store", timestamp, filename, "Queued engine API message"); - } else { - tracing::warn!(target: "engine::store", %filename, "Could not parse timestamp from filename") - } - } else { - tracing::warn!(target: "engine::store", ?filename, "Skipping non json file"); - } - } - Ok(filenames_by_ts.into_iter().flat_map(|(_, paths)| paths)) - } - - pub(crate) async fn intercept( - self, - mut rx: UnboundedReceiver>, - to_engine: UnboundedSender>, - ) where - Engine: EngineTypes, - BeaconEngineMessage: std::fmt::Debug, - { - loop { - let Some(msg) = rx.recv().await else { break }; - if let Err(error) = self.on_message(&msg, SystemTime::now()) { - error!(target: "engine::intercept", ?msg, %error, "Error handling Engine API message"); - } - let _ = to_engine.send(msg); - } - } -} diff --git a/bin/reth/src/commands/node/mod.rs b/bin/reth/src/commands/node/mod.rs index e9fc8bdeba..ed48766ae9 100644 --- a/bin/reth/src/commands/node/mod.rs +++ b/bin/reth/src/commands/node/mod.rs @@ -9,8 +9,16 @@ use reth_interfaces::consensus::Consensus; use reth_primitives::ChainSpec; use std::{net::SocketAddr, path::PathBuf, sync::Arc}; -pub mod cl_events; -pub mod events; +/// Re-export from `reth_node_core` for backwards compatibility. +pub mod events { + pub use crate::core::events::*; +} + +/// Re-export from `reth_node_core` for backwards compatibility. +pub mod cl_events { + pub use crate::core::cl_events::*; +} + use crate::{ args::{ utils::{chain_help, genesis_value_parser, parse_socket_address, SUPPORTED_CHAINS}, diff --git a/bin/reth/src/lib.rs b/bin/reth/src/lib.rs index b76a014ad9..3c2ea7f2e2 100644 --- a/bin/reth/src/lib.rs +++ b/bin/reth/src/lib.rs @@ -26,11 +26,10 @@ )] #![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))] -pub mod builder; pub mod cli; pub mod commands; -pub mod prometheus_exporter; pub mod runner; +pub mod utils; /// Re-exported payload related types pub mod payload { @@ -43,6 +42,16 @@ pub mod core { pub use reth_node_core::*; } +/// Re-exported from `reth_node_core`. +pub mod builder { + pub use reth_node_core::node_config::*; +} + +/// Re-exported from `reth_node_core`. +pub mod prometheus_exporter { + pub use reth_node_core::prometheus_exporter::*; +} + /// Re-export of the `reth_node_core` types specifically in the `args` module. /// /// This is re-exported because the types in `reth_node_core::args` originally existed in @@ -58,12 +67,6 @@ pub mod version { pub use reth_node_core::version::*; } -/// Re-exported from `reth_node_core`, also to prevent a breaking change. See the comment on -/// the `reth_node_core::args` re-export for more details. -pub mod utils { - pub use reth_node_core::utils::*; -} - /// Re-exported from `reth_node_core`, also to prevent a breaking change. See the comment on /// the `reth_node_core::args` re-export for more details. pub mod init { diff --git a/bin/reth/src/utils.rs b/bin/reth/src/utils.rs new file mode 100644 index 0000000000..1e95f5a53b --- /dev/null +++ b/bin/reth/src/utils.rs @@ -0,0 +1,178 @@ +//! Common CLI utility functions. + +use boyer_moore_magiclen::BMByte; +use eyre::Result; +use reth_db::{ + cursor::{DbCursorRO, DbDupCursorRO}, + database::Database, + table::{Decode, Decompress, DupSort, Table, TableRow}, + transaction::{DbTx, DbTxMut}, + DatabaseError, RawTable, TableRawRow, +}; +use reth_primitives::{fs, ChainSpec}; +use std::{path::Path, rc::Rc, sync::Arc}; +use tracing::info; + +/// Exposing `open_db_read_only` function +pub mod db { + pub use reth_db::open_db_read_only; +} + +/// Re-exported from `reth_node_core`, also to prevent a breaking change. See the comment on +/// the `reth_node_core::args` re-export for more details. +pub use reth_node_core::utils::*; + +/// Wrapper over DB that implements many useful DB queries. +#[derive(Debug)] +pub struct DbTool<'a, DB: Database> { + /// The database that the db tool will use. + pub db: &'a DB, + /// The [ChainSpec] that the db tool will use. + pub chain: Arc, +} + +impl<'a, DB: Database> DbTool<'a, DB> { + /// Takes a DB where the tables have already been created. + pub fn new(db: &'a DB, chain: Arc) -> eyre::Result { + Ok(Self { db, chain }) + } + + /// Grabs the contents of the table within a certain index range and places the + /// entries into a [`HashMap`][std::collections::HashMap]. + /// + /// [`ListFilter`] can be used to further + /// filter down the desired results. (eg. List only rows which include `0xd3adbeef`) + pub fn list(&self, filter: &ListFilter) -> Result<(Vec>, usize)> { + let bmb = Rc::new(BMByte::from(&filter.search)); + if bmb.is_none() && filter.has_search() { + eyre::bail!("Invalid search.") + } + + let mut hits = 0; + + let data = self.db.view(|tx| { + let mut cursor = + tx.cursor_read::>().expect("Was not able to obtain a cursor."); + + let map_filter = |row: Result, _>| { + if let Ok((k, v)) = row { + let (key, value) = (k.into_key(), v.into_value()); + + if key.len() + value.len() < filter.min_row_size { + return None + } + if key.len() < filter.min_key_size { + return None + } + if value.len() < filter.min_value_size { + return None + } + + let result = || { + if filter.only_count { + return None + } + Some(( + ::Key::decode(&key).unwrap(), + ::Value::decompress(&value).unwrap(), + )) + }; + + match &*bmb { + Some(searcher) => { + if searcher.find_first_in(&value).is_some() || + searcher.find_first_in(&key).is_some() + { + hits += 1; + return result() + } + } + None => { + hits += 1; + return result() + } + } + } + None + }; + + if filter.reverse { + Ok(cursor + .walk_back(None)? + .skip(filter.skip) + .filter_map(map_filter) + .take(filter.len) + .collect::>()) + } else { + Ok(cursor + .walk(None)? + .skip(filter.skip) + .filter_map(map_filter) + .take(filter.len) + .collect::>()) + } + })?; + + Ok((data.map_err(|e: DatabaseError| eyre::eyre!(e))?, hits)) + } + + /// Grabs the content of the table for the given key + pub fn get(&self, key: T::Key) -> Result> { + self.db.view(|tx| tx.get::(key))?.map_err(|e| eyre::eyre!(e)) + } + + /// Grabs the content of the DupSort table for the given key and subkey + pub fn get_dup(&self, key: T::Key, subkey: T::SubKey) -> Result> { + self.db + .view(|tx| tx.cursor_dup_read::()?.seek_by_key_subkey(key, subkey))? + .map_err(|e| eyre::eyre!(e)) + } + + /// Drops the database at the given path. + pub fn drop(&mut self, path: impl AsRef) -> Result<()> { + let path = path.as_ref(); + info!(target: "reth::cli", "Dropping database at {:?}", path); + fs::remove_dir_all(path)?; + Ok(()) + } + + /// Drops the provided table from the database. + pub fn drop_table(&mut self) -> Result<()> { + self.db.update(|tx| tx.clear::())??; + Ok(()) + } +} + +/// Filters the results coming from the database. +#[derive(Debug)] +pub struct ListFilter { + /// Skip first N entries. + pub skip: usize, + /// Take N entries. + pub len: usize, + /// Sequence of bytes that will be searched on values and keys from the database. + pub search: Vec, + /// Minimum row size. + pub min_row_size: usize, + /// Minimum key size. + pub min_key_size: usize, + /// Minimum value size. + pub min_value_size: usize, + /// Reverse order of entries. + pub reverse: bool, + /// Only counts the number of filtered entries without decoding and returning them. + pub only_count: bool, +} + +impl ListFilter { + /// If `search` has a list of bytes, then filter for rows that have this sequence. + pub fn has_search(&self) -> bool { + !self.search.is_empty() + } + + /// Updates the page with new `skip` and `len` values. + pub fn update_page(&mut self, skip: usize, len: usize) { + self.skip = skip; + self.len = len; + } +} diff --git a/crates/node-core/Cargo.toml b/crates/node-core/Cargo.toml index b9153c96db..12779b66f3 100644 --- a/crates/node-core/Cargo.toml +++ b/crates/node-core/Cargo.toml @@ -36,37 +36,70 @@ reth-tasks.workspace = true reth-payload-builder.workspace = true reth-basic-payload-builder.workspace = true reth-consensus-common.workspace = true +reth-auto-seal-consensus.workspace = true +reth-beacon-consensus.workspace = true +reth-downloaders.workspace = true +reth-node-builder.workspace = true +reth-revm.workspace = true +reth-stages.workspace = true +reth-prune.workspace = true +reth-blockchain-tree.workspace = true +revm-inspectors.workspace = true +reth-snapshot.workspace = true +reth-ethereum-payload-builder.workspace = true +reth-optimism-payload-builder = { workspace = true, optional = true } +# async +tokio.workspace = true + +# metrics +metrics-exporter-prometheus = "0.12.1" +once_cell.workspace = true +metrics-util = "0.15.0" +metrics-process = "=1.0.14" +metrics.workspace = true +reth-metrics.workspace = true # misc -boyer-moore-magiclen = "0.2.16" eyre.workspace = true clap = { workspace = true, features = ["derive"] } humantime = "2.1.0" thiserror.workspace = true const-str = "0.5.6" - rand.workspace = true +fdlimit = "0.3.0" +pin-project.workspace = true # io dirs-next = "2.0.0" shellexpand = "3.0.0" +confy.workspace = true +serde.workspace = true serde_json.workspace = true +# http/rpc +hyper = "0.14.25" + # tracing tracing.workspace = true # crypto alloy-rlp.workspace = true +alloy-chains.workspace = true secp256k1 = { workspace = true, features = ["global-context", "rand-std", "recovery"] } # async futures.workspace = true +[target.'cfg(target_os = "linux")'.dependencies] +procfs = { version = "0.16.0" } + [dev-dependencies] # test vectors generation proptest.workspace = true - +tempfile.workspace = true +jsonrpsee.workspace = true +assert_matches.version = "1.5.0" [features] optimism = [ @@ -79,10 +112,10 @@ optimism = [ "reth-network/optimism", "reth-network-api/optimism", "reth-payload-builder/optimism", + "reth-optimism-payload-builder/optimism", + "reth-ethereum-payload-builder/optimism", "reth-node-api/optimism", ] - - [build-dependencies] vergen = { version = "8.0.0", features = ["build", "cargo", "git", "git2"] } diff --git a/bin/reth/src/commands/node/cl_events.rs b/crates/node-core/src/cl_events.rs similarity index 100% rename from bin/reth/src/commands/node/cl_events.rs rename to crates/node-core/src/cl_events.rs diff --git a/bin/reth/src/cli/db_type.rs b/crates/node-core/src/cli/db_type.rs similarity index 100% rename from bin/reth/src/cli/db_type.rs rename to crates/node-core/src/cli/db_type.rs diff --git a/crates/node-core/src/cli/mod.rs b/crates/node-core/src/cli/mod.rs index 9970fd1fc3..7425ec6409 100644 --- a/crates/node-core/src/cli/mod.rs +++ b/crates/node-core/src/cli/mod.rs @@ -2,4 +2,5 @@ pub mod components; pub mod config; +pub mod db_type; pub mod ext; diff --git a/crates/node-core/src/engine_api_store.rs b/crates/node-core/src/engine_api_store.rs new file mode 100644 index 0000000000..35774d240c --- /dev/null +++ b/crates/node-core/src/engine_api_store.rs @@ -0,0 +1,130 @@ +//! Stores engine API messages to disk for later inspection and replay. + +use reth_beacon_consensus::BeaconEngineMessage; +use reth_node_api::EngineTypes; +use reth_primitives::fs::{self}; +use reth_rpc_types::{ + engine::{CancunPayloadFields, ForkchoiceState}, + ExecutionPayload, +}; +use serde::{Deserialize, Serialize}; +use std::{collections::BTreeMap, path::PathBuf, time::SystemTime}; +use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; +use tracing::*; + +/// A message from the engine API that has been stored to disk. +#[derive(Debug, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub enum StoredEngineApiMessage { + /// The on-disk representation of an `engine_forkchoiceUpdated` method call. + ForkchoiceUpdated { + /// The [ForkchoiceState] sent in the persisted call. + state: ForkchoiceState, + /// The payload attributes sent in the persisted call, if any. + payload_attrs: Option, + }, + /// The on-disk representation of an `engine_newPayload` method call. + NewPayload { + /// The [ExecutionPayload] sent in the persisted call. + payload: ExecutionPayload, + /// The Cancun-specific fields sent in the persisted call, if any. + cancun_fields: Option, + }, +} + +/// This can read and write engine API messages in a specific directory. +#[derive(Debug)] +pub struct EngineApiStore { + /// The path to the directory that stores the engine API messages. + path: PathBuf, +} + +impl EngineApiStore { + /// Creates a new [EngineApiStore] at the given path. + /// + /// The path is expected to be a directory, where individual message JSON files will be stored. + pub fn new(path: PathBuf) -> Self { + Self { path } + } + + /// Stores the received [BeaconEngineMessage] to disk, appending the `received_at` time to the + /// path. + pub fn on_message( + &self, + msg: &BeaconEngineMessage, + received_at: SystemTime, + ) -> eyre::Result<()> + where + Engine: EngineTypes, + { + fs::create_dir_all(&self.path)?; // ensure that store path had been created + let timestamp = received_at.duration_since(SystemTime::UNIX_EPOCH).unwrap().as_millis(); + match msg { + BeaconEngineMessage::ForkchoiceUpdated { state, payload_attrs, tx: _tx } => { + let filename = format!("{}-fcu-{}.json", timestamp, state.head_block_hash); + fs::write( + self.path.join(filename), + serde_json::to_vec(&StoredEngineApiMessage::ForkchoiceUpdated { + state: *state, + payload_attrs: payload_attrs.clone(), + })?, + )?; + } + BeaconEngineMessage::NewPayload { payload, cancun_fields, tx: _tx } => { + let filename = format!("{}-new_payload-{}.json", timestamp, payload.block_hash()); + fs::write( + self.path.join(filename), + serde_json::to_vec( + &StoredEngineApiMessage::::NewPayload { + payload: payload.clone(), + cancun_fields: cancun_fields.clone(), + }, + )?, + )?; + } + // noop + BeaconEngineMessage::TransitionConfigurationExchanged | + BeaconEngineMessage::EventListener(_) => (), + }; + Ok(()) + } + + /// Finds and iterates through any stored engine API message files, ordered by timestamp. + pub fn engine_messages_iter(&self) -> eyre::Result> { + let mut filenames_by_ts = BTreeMap::>::default(); + for entry in fs::read_dir(&self.path)? { + let entry = entry?; + let filename = entry.file_name(); + if let Some(filename) = filename.to_str().filter(|n| n.ends_with(".json")) { + if let Some(Ok(timestamp)) = filename.split('-').next().map(|n| n.parse::()) { + filenames_by_ts.entry(timestamp).or_default().push(entry.path()); + tracing::debug!(target: "engine::store", timestamp, filename, "Queued engine API message"); + } else { + tracing::warn!(target: "engine::store", %filename, "Could not parse timestamp from filename") + } + } else { + tracing::warn!(target: "engine::store", ?filename, "Skipping non json file"); + } + } + Ok(filenames_by_ts.into_iter().flat_map(|(_, paths)| paths)) + } + + /// Intercepts an incoming engine API message, storing it to disk and forwarding it to the + /// engine channel. + pub async fn intercept( + self, + mut rx: UnboundedReceiver>, + to_engine: UnboundedSender>, + ) where + Engine: EngineTypes, + BeaconEngineMessage: std::fmt::Debug, + { + loop { + let Some(msg) = rx.recv().await else { break }; + if let Err(error) = self.on_message(&msg, SystemTime::now()) { + error!(target: "engine::intercept", ?msg, %error, "Error handling Engine API message"); + } + let _ = to_engine.send(msg); + } + } +} diff --git a/bin/reth/src/commands/node/events.rs b/crates/node-core/src/events.rs similarity index 99% rename from bin/reth/src/commands/node/events.rs rename to crates/node-core/src/events.rs index e34bbf1e6a..4a8048700f 100644 --- a/bin/reth/src/commands/node/events.rs +++ b/crates/node-core/src/events.rs @@ -1,6 +1,6 @@ //! Support for handling events emitted by node components. -use crate::commands::node::cl_events::ConsensusLayerHealthEvent; +use crate::cl_events::ConsensusLayerHealthEvent; use futures::Stream; use reth_beacon_consensus::BeaconConsensusEngineEvent; use reth_db::{database::Database, database_metrics::DatabaseMetadata}; @@ -466,7 +466,7 @@ impl Display for Eta { #[cfg(test)] mod tests { - use crate::commands::node::events::Eta; + use crate::events::Eta; use std::time::{Duration, Instant}; #[test] diff --git a/crates/node-core/src/lib.rs b/crates/node-core/src/lib.rs index 335da6d636..105167f191 100644 --- a/crates/node-core/src/lib.rs +++ b/crates/node-core/src/lib.rs @@ -6,12 +6,16 @@ issue_tracker_base_url = "https://github.com/paradigmxyz/reth/issues/" )] #![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))] -#![warn(unused_crate_dependencies)] pub mod args; +pub mod cl_events; pub mod cli; pub mod dirs; +pub mod engine_api_store; +pub mod events; pub mod init; +pub mod node_config; +pub mod prometheus_exporter; pub mod utils; pub mod version; diff --git a/bin/reth/src/builder/mod.rs b/crates/node-core/src/node_config.rs similarity index 99% rename from bin/reth/src/builder/mod.rs rename to crates/node-core/src/node_config.rs index 260a2fd809..5857288dc5 100644 --- a/bin/reth/src/builder/mod.rs +++ b/crates/node-core/src/node_config.rs @@ -1,22 +1,20 @@ //! Support for customizing the node -use super::cli::{components::RethRpcServerHandles, ext::DefaultRethNodeCommandConfig}; use crate::{ args::{ get_secret_key, DatabaseArgs, DebugArgs, DevArgs, NetworkArgs, PayloadBuilderArgs, PruningArgs, RpcServerArgs, TxPoolArgs, }, + cl_events::ConsensusLayerHealthEvents, cli::{ - components::RethNodeComponentsImpl, + components::{RethNodeComponentsImpl, RethRpcServerHandles}, config::{RethRpcConfig, RethTransactionPoolConfig}, db_type::{DatabaseBuilder, DatabaseInstance}, - ext::{RethCliExt, RethNodeCommandConfig}, - }, - commands::{ - debug_cmd::EngineApiStore, - node::{cl_events::ConsensusLayerHealthEvents, events}, + ext::{DefaultRethNodeCommandConfig, RethCliExt, RethNodeCommandConfig}, }, dirs::{ChainPath, DataDirPath, MaybePlatformPath}, + engine_api_store::EngineApiStore, + events, init::init_genesis, prometheus_exporter, utils::{get_single_header, write_peers_to_file}, @@ -119,8 +117,8 @@ pub static PROMETHEUS_RECORDER_HANDLE: Lazy = /// # Example /// ```rust /// # use reth_tasks::{TaskManager, TaskSpawner}; -/// # use reth::{ -/// # builder::NodeConfig, +/// # use reth_node_core::{ +/// # node_config::NodeConfig, /// # cli::{ /// # ext::DefaultRethNodeCommandConfig, /// # }, @@ -153,8 +151,8 @@ pub static PROMETHEUS_RECORDER_HANDLE: Lazy = /// # Example /// ```rust /// # use reth_tasks::{TaskManager, TaskSpawner}; -/// # use reth::{ -/// # builder::NodeConfig, +/// # use reth_node_core::{ +/// # node_config::NodeConfig, /// # cli::{ /// # ext::DefaultRethNodeCommandConfig, /// # }, @@ -368,8 +366,8 @@ impl NodeConfig { /// # Example /// ```rust /// # use reth_tasks::{TaskManager, TaskSpawner}; - /// # use reth::builder::NodeConfig; - /// # use reth::cli::{ + /// # use reth_node_core::node_config::NodeConfig; + /// # use reth_node_core::cli::{ /// # ext::DefaultRethNodeCommandConfig, /// # }; /// # use tokio::runtime::Handle; @@ -1390,8 +1388,8 @@ impl NodeHandle { /// /// # Example /// ``` -/// # use reth::{ -/// # builder::{NodeConfig, spawn_node}, +/// # use reth_node_core::{ +/// # node_config::{NodeConfig, spawn_node}, /// # args::RpcServerArgs, /// # }; /// async fn t() { @@ -1404,7 +1402,6 @@ impl NodeHandle { /// let (_handle, _manager) = spawn_node(builder).await.unwrap(); /// } /// ``` - pub async fn spawn_node(config: NodeConfig) -> eyre::Result<(NodeHandle, TaskManager)> { let task_manager = TaskManager::current(); let ext = DefaultRethNodeCommandConfig::default(); diff --git a/bin/reth/src/prometheus_exporter.rs b/crates/node-core/src/prometheus_exporter.rs similarity index 98% rename from bin/reth/src/prometheus_exporter.rs rename to crates/node-core/src/prometheus_exporter.rs index 29bca94959..c5d8893d5a 100644 --- a/bin/reth/src/prometheus_exporter.rs +++ b/crates/node-core/src/prometheus_exporter.rs @@ -16,7 +16,7 @@ pub(crate) trait Hook: Fn() + Send + Sync {} impl Hook for T {} /// Installs Prometheus as the metrics recorder. -pub(crate) fn install_recorder() -> eyre::Result { +pub fn install_recorder() -> eyre::Result { let recorder = PrometheusBuilder::new().build_recorder(); let handle = recorder.handle(); @@ -74,7 +74,7 @@ async fn start_endpoint( } /// Serves Prometheus metrics over HTTP with database and process metrics. -pub(crate) async fn serve( +pub async fn serve( listen_addr: SocketAddr, handle: PrometheusHandle, db: Metrics, @@ -244,7 +244,7 @@ fn describe_io_stats() {} #[cfg(test)] mod tests { - use crate::builder::PROMETHEUS_RECORDER_HANDLE; + use crate::node_config::PROMETHEUS_RECORDER_HANDLE; use std::ops::Deref; // Dependencies using different version of the `metrics` crate (to be exact, 0.21 vs 0.22) diff --git a/crates/node-core/src/utils.rs b/crates/node-core/src/utils.rs index 8b63fda6da..73f3fadc8e 100644 --- a/crates/node-core/src/utils.rs +++ b/crates/node-core/src/utils.rs @@ -1,15 +1,8 @@ -//! Common CLI utility functions. +//! Utility functions for node startup and shutdown, for example path parsing and retrieving single +//! blocks from the network. -use boyer_moore_magiclen::BMByte; use eyre::Result; use reth_consensus_common::validation::validate_block_standalone; -use reth_db::{ - cursor::{DbCursorRO, DbDupCursorRO}, - database::Database, - table::{Decode, Decompress, DupSort, Table, TableRow}, - transaction::{DbTx, DbTxMut}, - DatabaseError, RawTable, TableRawRow, -}; use reth_interfaces::p2p::{ bodies::client::BodiesClient, headers::client::{HeadersClient, HeadersRequest}, @@ -24,14 +17,48 @@ use reth_rpc::{JwtError, JwtSecret}; use std::{ env::VarError, path::{Path, PathBuf}, - rc::Rc, sync::Arc, }; use tracing::{debug, info, trace, warn}; -/// Exposing `open_db_read_only` function -pub mod db { - pub use reth_db::open_db_read_only; +/// Parses a user-specified path with support for environment variables and common shorthands (e.g. +/// ~ for the user's home directory). +pub fn parse_path(value: &str) -> Result> { + shellexpand::full(value).map(|path| PathBuf::from(path.into_owned())) +} + +/// Attempts to retrieve or create a JWT secret from the specified path. +pub fn get_or_create_jwt_secret_from_path(path: &Path) -> Result { + if path.exists() { + debug!(target: "reth::cli", ?path, "Reading JWT auth secret file"); + JwtSecret::from_file(path) + } else { + info!(target: "reth::cli", ?path, "Creating JWT auth secret file"); + JwtSecret::try_create(path) + } +} + +/// Collect the peers from the [NetworkManager] and write them to the given `persistent_peers_file`, +/// if configured. +pub fn write_peers_to_file(network: &NetworkManager, persistent_peers_file: Option) +where + C: BlockReader + Unpin, +{ + if let Some(file_path) = persistent_peers_file { + let known_peers = network.all_peers().collect::>(); + if let Ok(known_peers) = serde_json::to_string_pretty(&known_peers) { + trace!(target: "reth::cli", peers_file =?file_path, num_peers=%known_peers.len(), "Saving current peers"); + let parent_dir = file_path.parent().map(fs::create_dir_all).transpose(); + match parent_dir.and_then(|_| fs::write(&file_path, known_peers)) { + Ok(_) => { + info!(target: "reth::cli", peers_file=?file_path, "Wrote network peers to file"); + } + Err(err) => { + warn!(target: "reth::cli", ?err, peers_file=?file_path, "Failed to write network peers to file"); + } + } + } + } } /// Get a single header from network @@ -99,198 +126,3 @@ where Ok(block) } - -/// Wrapper over DB that implements many useful DB queries. -#[derive(Debug)] -pub struct DbTool<'a, DB: Database> { - /// The database that the db tool will use. - pub db: &'a DB, - /// The [ChainSpec] that the db tool will use. - pub chain: Arc, -} - -impl<'a, DB: Database> DbTool<'a, DB> { - /// Takes a DB where the tables have already been created. - pub fn new(db: &'a DB, chain: Arc) -> eyre::Result { - Ok(Self { db, chain }) - } - - /// Grabs the contents of the table within a certain index range and places the - /// entries into a [`HashMap`][std::collections::HashMap]. - /// - /// [`ListFilter`] can be used to further - /// filter down the desired results. (eg. List only rows which include `0xd3adbeef`) - pub fn list(&self, filter: &ListFilter) -> Result<(Vec>, usize)> { - let bmb = Rc::new(BMByte::from(&filter.search)); - if bmb.is_none() && filter.has_search() { - eyre::bail!("Invalid search.") - } - - let mut hits = 0; - - let data = self.db.view(|tx| { - let mut cursor = - tx.cursor_read::>().expect("Was not able to obtain a cursor."); - - let map_filter = |row: Result, _>| { - if let Ok((k, v)) = row { - let (key, value) = (k.into_key(), v.into_value()); - - if key.len() + value.len() < filter.min_row_size { - return None - } - if key.len() < filter.min_key_size { - return None - } - if value.len() < filter.min_value_size { - return None - } - - let result = || { - if filter.only_count { - return None - } - Some(( - ::Key::decode(&key).unwrap(), - ::Value::decompress(&value).unwrap(), - )) - }; - - match &*bmb { - Some(searcher) => { - if searcher.find_first_in(&value).is_some() || - searcher.find_first_in(&key).is_some() - { - hits += 1; - return result() - } - } - None => { - hits += 1; - return result() - } - } - } - None - }; - - if filter.reverse { - Ok(cursor - .walk_back(None)? - .skip(filter.skip) - .filter_map(map_filter) - .take(filter.len) - .collect::>()) - } else { - Ok(cursor - .walk(None)? - .skip(filter.skip) - .filter_map(map_filter) - .take(filter.len) - .collect::>()) - } - })?; - - Ok((data.map_err(|e: DatabaseError| eyre::eyre!(e))?, hits)) - } - - /// Grabs the content of the table for the given key - pub fn get(&self, key: T::Key) -> Result> { - self.db.view(|tx| tx.get::(key))?.map_err(|e| eyre::eyre!(e)) - } - - /// Grabs the content of the DupSort table for the given key and subkey - pub fn get_dup(&self, key: T::Key, subkey: T::SubKey) -> Result> { - self.db - .view(|tx| tx.cursor_dup_read::()?.seek_by_key_subkey(key, subkey))? - .map_err(|e| eyre::eyre!(e)) - } - - /// Drops the database at the given path. - pub fn drop(&mut self, path: impl AsRef) -> Result<()> { - let path = path.as_ref(); - info!(target: "reth::cli", "Dropping database at {:?}", path); - fs::remove_dir_all(path)?; - Ok(()) - } - - /// Drops the provided table from the database. - pub fn drop_table(&mut self) -> Result<()> { - self.db.update(|tx| tx.clear::())??; - Ok(()) - } -} - -/// Parses a user-specified path with support for environment variables and common shorthands (e.g. -/// ~ for the user's home directory). -pub fn parse_path(value: &str) -> Result> { - shellexpand::full(value).map(|path| PathBuf::from(path.into_owned())) -} - -/// Filters the results coming from the database. -#[derive(Debug)] -pub struct ListFilter { - /// Skip first N entries. - pub skip: usize, - /// Take N entries. - pub len: usize, - /// Sequence of bytes that will be searched on values and keys from the database. - pub search: Vec, - /// Minimum row size. - pub min_row_size: usize, - /// Minimum key size. - pub min_key_size: usize, - /// Minimum value size. - pub min_value_size: usize, - /// Reverse order of entries. - pub reverse: bool, - /// Only counts the number of filtered entries without decoding and returning them. - pub only_count: bool, -} - -impl ListFilter { - /// If `search` has a list of bytes, then filter for rows that have this sequence. - pub fn has_search(&self) -> bool { - !self.search.is_empty() - } - - /// Updates the page with new `skip` and `len` values. - pub fn update_page(&mut self, skip: usize, len: usize) { - self.skip = skip; - self.len = len; - } -} - -/// Attempts to retrieve or create a JWT secret from the specified path. -pub fn get_or_create_jwt_secret_from_path(path: &Path) -> Result { - if path.exists() { - debug!(target: "reth::cli", ?path, "Reading JWT auth secret file"); - JwtSecret::from_file(path) - } else { - info!(target: "reth::cli", ?path, "Creating JWT auth secret file"); - JwtSecret::try_create(path) - } -} - -/// Collect the peers from the [NetworkManager] and write them to the given `persistent_peers_file`, -/// if configured. -pub fn write_peers_to_file(network: &NetworkManager, persistent_peers_file: Option) -where - C: BlockReader + Unpin, -{ - if let Some(file_path) = persistent_peers_file { - let known_peers = network.all_peers().collect::>(); - if let Ok(known_peers) = serde_json::to_string_pretty(&known_peers) { - trace!(target: "reth::cli", peers_file =?file_path, num_peers=%known_peers.len(), "Saving current peers"); - let parent_dir = file_path.parent().map(fs::create_dir_all).transpose(); - match parent_dir.and_then(|_| fs::write(&file_path, known_peers)) { - Ok(_) => { - info!(target: "reth::cli", peers_file=?file_path, "Wrote network peers to file"); - } - Err(err) => { - warn!(target: "reth::cli", ?err, peers_file=?file_path, "Failed to write network peers to file"); - } - } - } - } -}