chore: node-core cleanup (#6189)

This commit is contained in:
Dan Cline
2024-01-23 14:49:24 -05:00
committed by GitHub
parent b9f45a7ed8
commit 3fafa8c50f
20 changed files with 469 additions and 387 deletions

33
Cargo.lock generated
View File

@@ -5762,6 +5762,7 @@ dependencies = [
"aquamarine",
"assert_matches",
"backon",
"boyer-moore-magiclen",
"clap",
"comfy-table",
"confy",
@@ -5783,7 +5784,6 @@ dependencies = [
"once_cell",
"pin-project",
"pretty_assertions",
"procfs",
"proptest",
"rand 0.8.5",
"ratatui",
@@ -5833,7 +5833,6 @@ dependencies = [
"tokio",
"toml 0.8.8",
"tracing",
"vergen",
]
[[package]]
@@ -6447,42 +6446,70 @@ dependencies = [
name = "reth-node-core"
version = "0.1.0-alpha.16"
dependencies = [
"alloy-chains",
"alloy-rlp",
"boyer-moore-magiclen",
"assert_matches",
"clap",
"confy",
"const-str",
"dirs-next",
"eyre",
"fdlimit",
"futures",
"humantime",
"hyper",
"jsonrpsee",
"metrics",
"metrics-exporter-prometheus",
"metrics-process",
"metrics-util",
"once_cell",
"pin-project",
"procfs",
"proptest",
"rand 0.8.5",
"reth-auto-seal-consensus",
"reth-basic-payload-builder",
"reth-beacon-consensus",
"reth-blockchain-tree",
"reth-config",
"reth-consensus-common",
"reth-db",
"reth-discv4",
"reth-downloaders",
"reth-ethereum-payload-builder",
"reth-interfaces",
"reth-metrics",
"reth-net-nat",
"reth-network",
"reth-network-api",
"reth-node-api",
"reth-node-builder",
"reth-optimism-payload-builder",
"reth-payload-builder",
"reth-primitives",
"reth-provider",
"reth-prune",
"reth-revm",
"reth-rpc",
"reth-rpc-api",
"reth-rpc-builder",
"reth-rpc-engine-api",
"reth-rpc-types",
"reth-rpc-types-compat",
"reth-snapshot",
"reth-stages",
"reth-tasks",
"reth-tracing",
"reth-transaction-pool",
"revm-inspectors",
"secp256k1 0.27.0",
"serde",
"serde_json",
"shellexpand",
"tempfile",
"thiserror",
"tokio",
"tracing",
"vergen",
]

View File

@@ -123,6 +123,7 @@ reth-ecies = { path = "crates/net/ecies" }
reth-eth-wire = { path = "crates/net/eth-wire" }
reth-ethereum-forks = { path = "crates/ethereum-forks" }
reth-ethereum-payload-builder = { path = "crates/payload/ethereum" }
reth-optimism-payload-builder = { path = "crates/payload/optimism" }
reth-interfaces = { path = "crates/interfaces" }
reth-ipc = { path = "crates/rpc/ipc" }
reth-libmdbx = { path = "crates/storage/libmdbx-rs" }

View File

@@ -43,7 +43,7 @@ reth-network-api.workspace = true
reth-downloaders.workspace = true
reth-tracing.workspace = true
reth-tasks.workspace = true
reth-optimism-payload-builder = { path = "../../crates/payload/optimism", optional = true }
reth-optimism-payload-builder = { workspace = true, optional = true }
reth-ethereum-payload-builder.workspace = true
reth-payload-builder.workspace = true
reth-payload-validator.workspace = true
@@ -109,14 +109,12 @@ pretty_assertions = "1.3.0"
humantime = "2.1.0"
itertools.workspace = true
rayon.workspace = true
boyer-moore-magiclen = "0.2.16"
[target.'cfg(not(windows))'.dependencies]
jemallocator = { version = "0.5.0", optional = true }
jemalloc-ctl = { version = "0.5.0", optional = true }
[target.'cfg(target_os = "linux")'.dependencies]
procfs = { version = "0.16.0" }
[dev-dependencies]
jsonrpsee.workspace = true
assert_matches = "1.5.0"
@@ -159,9 +157,6 @@ optimism = [
# no-op feature flag for switching between the `optimism` and default functionality in CI matrices
ethereum = []
[build-dependencies]
vergen = { version = "8.0.0", features = ["build", "cargo", "git", "git2"] }
[[bin]]
name = "reth"
path = "src/main.rs"

View File

@@ -1,15 +0,0 @@
#![allow(missing_docs)]
use std::error::Error;
use vergen::EmitBuilder;
fn main() -> Result<(), Box<dyn Error>> {
// Emit the instructions
EmitBuilder::builder()
.git_sha(true)
.build_timestamp()
.cargo_features()
.cargo_target_triple()
.emit()?;
Ok(())
}

View File

@@ -24,8 +24,6 @@ use std::sync::Arc;
/// change.
pub use crate::core::cli::*;
pub mod db_type;
/// The main reth cli interface.
///
/// This is the entrypoint to the executable.

View File

@@ -8,9 +8,7 @@ mod build_block;
mod execution;
mod in_memory_merkle;
mod merkle;
mod replay_engine;
pub(crate) use replay_engine::EngineApiStore;
/// `reth debug` command
#[derive(Debug, Parser)]

View File

@@ -4,15 +4,14 @@ use crate::{
utils::{chain_help, genesis_value_parser, SUPPORTED_CHAINS},
DatabaseArgs, NetworkArgs,
},
core::engine_api_store::{EngineApiStore, StoredEngineApiMessage},
dirs::{DataDirPath, MaybePlatformPath},
runner::CliContext,
};
use clap::Parser;
use eyre::Context;
use reth_basic_payload_builder::{BasicPayloadJobGenerator, BasicPayloadJobGeneratorConfig};
use reth_beacon_consensus::{
hooks::EngineHooks, BeaconConsensus, BeaconConsensusEngine, BeaconEngineMessage,
};
use reth_beacon_consensus::{hooks::EngineHooks, BeaconConsensus, BeaconConsensusEngine};
use reth_blockchain_tree::{
BlockchainTree, BlockchainTreeConfig, ShareableBlockchainTree, TreeExternals,
};
@@ -21,37 +20,24 @@ use reth_db::{init_db, mdbx::DatabaseArguments, DatabaseEnv};
use reth_interfaces::consensus::Consensus;
use reth_network::NetworkHandle;
use reth_network_api::NetworkInfo;
use reth_node_api::EngineTypes;
#[cfg(not(feature = "optimism"))]
use reth_node_builder::EthEngineTypes;
#[cfg(feature = "optimism")]
use reth_node_builder::OptimismEngineTypes;
use reth_payload_builder::{PayloadBuilderHandle, PayloadBuilderService};
use reth_primitives::{
fs::{self},
ChainSpec,
};
use reth_primitives::{fs, ChainSpec};
use reth_provider::{providers::BlockchainProvider, CanonStateSubscriptions, ProviderFactory};
use reth_revm::EvmProcessorFactory;
use reth_rpc_types::{
engine::{CancunPayloadFields, ForkchoiceState},
ExecutionPayload,
};
use reth_stages::Pipeline;
use reth_tasks::TaskExecutor;
use reth_transaction_pool::noop::NoopTransactionPool;
use serde::{Deserialize, Serialize};
use std::{
collections::BTreeMap,
net::{SocketAddr, SocketAddrV4},
path::PathBuf,
sync::Arc,
time::{Duration, SystemTime},
};
use tokio::sync::{
mpsc::{self, UnboundedReceiver, UnboundedSender},
oneshot,
time::Duration,
};
use tokio::sync::{mpsc, oneshot};
use tracing::*;
/// `reth debug replay-engine` command
@@ -257,97 +243,3 @@ impl Command {
Ok(())
}
}
#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
enum StoredEngineApiMessage<Attributes> {
ForkchoiceUpdated { state: ForkchoiceState, payload_attrs: Option<Attributes> },
NewPayload { payload: ExecutionPayload, cancun_fields: Option<CancunPayloadFields> },
}
#[derive(Debug)]
pub(crate) struct EngineApiStore {
path: PathBuf,
}
impl EngineApiStore {
pub(crate) fn new(path: PathBuf) -> Self {
Self { path }
}
fn on_message<Engine>(
&self,
msg: &BeaconEngineMessage<Engine>,
received_at: SystemTime,
) -> eyre::Result<()>
where
Engine: EngineTypes,
{
fs::create_dir_all(&self.path)?; // ensure that store path had been created
let timestamp = received_at.duration_since(SystemTime::UNIX_EPOCH).unwrap().as_millis();
match msg {
BeaconEngineMessage::ForkchoiceUpdated { state, payload_attrs, tx: _tx } => {
let filename = format!("{}-fcu-{}.json", timestamp, state.head_block_hash);
fs::write(
self.path.join(filename),
serde_json::to_vec(&StoredEngineApiMessage::ForkchoiceUpdated {
state: *state,
payload_attrs: payload_attrs.clone(),
})?,
)?;
}
BeaconEngineMessage::NewPayload { payload, cancun_fields, tx: _tx } => {
let filename = format!("{}-new_payload-{}.json", timestamp, payload.block_hash());
fs::write(
self.path.join(filename),
serde_json::to_vec(
&StoredEngineApiMessage::<Engine::PayloadAttributes>::NewPayload {
payload: payload.clone(),
cancun_fields: cancun_fields.clone(),
},
)?,
)?;
}
// noop
BeaconEngineMessage::TransitionConfigurationExchanged |
BeaconEngineMessage::EventListener(_) => (),
};
Ok(())
}
pub(crate) fn engine_messages_iter(&self) -> eyre::Result<impl Iterator<Item = PathBuf>> {
let mut filenames_by_ts = BTreeMap::<u64, Vec<PathBuf>>::default();
for entry in fs::read_dir(&self.path)? {
let entry = entry?;
let filename = entry.file_name();
if let Some(filename) = filename.to_str().filter(|n| n.ends_with(".json")) {
if let Some(Ok(timestamp)) = filename.split('-').next().map(|n| n.parse::<u64>()) {
filenames_by_ts.entry(timestamp).or_default().push(entry.path());
tracing::debug!(target: "engine::store", timestamp, filename, "Queued engine API message");
} else {
tracing::warn!(target: "engine::store", %filename, "Could not parse timestamp from filename")
}
} else {
tracing::warn!(target: "engine::store", ?filename, "Skipping non json file");
}
}
Ok(filenames_by_ts.into_iter().flat_map(|(_, paths)| paths))
}
pub(crate) async fn intercept<Engine>(
self,
mut rx: UnboundedReceiver<BeaconEngineMessage<Engine>>,
to_engine: UnboundedSender<BeaconEngineMessage<Engine>>,
) where
Engine: EngineTypes,
BeaconEngineMessage<Engine>: std::fmt::Debug,
{
loop {
let Some(msg) = rx.recv().await else { break };
if let Err(error) = self.on_message(&msg, SystemTime::now()) {
error!(target: "engine::intercept", ?msg, %error, "Error handling Engine API message");
}
let _ = to_engine.send(msg);
}
}
}

View File

@@ -9,8 +9,16 @@ use reth_interfaces::consensus::Consensus;
use reth_primitives::ChainSpec;
use std::{net::SocketAddr, path::PathBuf, sync::Arc};
pub mod cl_events;
pub mod events;
/// Re-export from `reth_node_core` for backwards compatibility.
pub mod events {
pub use crate::core::events::*;
}
/// Re-export from `reth_node_core` for backwards compatibility.
pub mod cl_events {
pub use crate::core::cl_events::*;
}
use crate::{
args::{
utils::{chain_help, genesis_value_parser, parse_socket_address, SUPPORTED_CHAINS},

View File

@@ -26,11 +26,10 @@
)]
#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
pub mod builder;
pub mod cli;
pub mod commands;
pub mod prometheus_exporter;
pub mod runner;
pub mod utils;
/// Re-exported payload related types
pub mod payload {
@@ -43,6 +42,16 @@ pub mod core {
pub use reth_node_core::*;
}
/// Re-exported from `reth_node_core`.
pub mod builder {
pub use reth_node_core::node_config::*;
}
/// Re-exported from `reth_node_core`.
pub mod prometheus_exporter {
pub use reth_node_core::prometheus_exporter::*;
}
/// Re-export of the `reth_node_core` types specifically in the `args` module.
///
/// This is re-exported because the types in `reth_node_core::args` originally existed in
@@ -58,12 +67,6 @@ pub mod version {
pub use reth_node_core::version::*;
}
/// Re-exported from `reth_node_core`, also to prevent a breaking change. See the comment on
/// the `reth_node_core::args` re-export for more details.
pub mod utils {
pub use reth_node_core::utils::*;
}
/// Re-exported from `reth_node_core`, also to prevent a breaking change. See the comment on
/// the `reth_node_core::args` re-export for more details.
pub mod init {

178
bin/reth/src/utils.rs Normal file
View File

@@ -0,0 +1,178 @@
//! Common CLI utility functions.
use boyer_moore_magiclen::BMByte;
use eyre::Result;
use reth_db::{
cursor::{DbCursorRO, DbDupCursorRO},
database::Database,
table::{Decode, Decompress, DupSort, Table, TableRow},
transaction::{DbTx, DbTxMut},
DatabaseError, RawTable, TableRawRow,
};
use reth_primitives::{fs, ChainSpec};
use std::{path::Path, rc::Rc, sync::Arc};
use tracing::info;
/// Exposing `open_db_read_only` function
pub mod db {
pub use reth_db::open_db_read_only;
}
/// Re-exported from `reth_node_core`, also to prevent a breaking change. See the comment on
/// the `reth_node_core::args` re-export for more details.
pub use reth_node_core::utils::*;
/// Wrapper over DB that implements many useful DB queries.
#[derive(Debug)]
pub struct DbTool<'a, DB: Database> {
/// The database that the db tool will use.
pub db: &'a DB,
/// The [ChainSpec] that the db tool will use.
pub chain: Arc<ChainSpec>,
}
impl<'a, DB: Database> DbTool<'a, DB> {
/// Takes a DB where the tables have already been created.
pub fn new(db: &'a DB, chain: Arc<ChainSpec>) -> eyre::Result<Self> {
Ok(Self { db, chain })
}
/// Grabs the contents of the table within a certain index range and places the
/// entries into a [`HashMap`][std::collections::HashMap].
///
/// [`ListFilter`] can be used to further
/// filter down the desired results. (eg. List only rows which include `0xd3adbeef`)
pub fn list<T: Table>(&self, filter: &ListFilter) -> Result<(Vec<TableRow<T>>, usize)> {
let bmb = Rc::new(BMByte::from(&filter.search));
if bmb.is_none() && filter.has_search() {
eyre::bail!("Invalid search.")
}
let mut hits = 0;
let data = self.db.view(|tx| {
let mut cursor =
tx.cursor_read::<RawTable<T>>().expect("Was not able to obtain a cursor.");
let map_filter = |row: Result<TableRawRow<T>, _>| {
if let Ok((k, v)) = row {
let (key, value) = (k.into_key(), v.into_value());
if key.len() + value.len() < filter.min_row_size {
return None
}
if key.len() < filter.min_key_size {
return None
}
if value.len() < filter.min_value_size {
return None
}
let result = || {
if filter.only_count {
return None
}
Some((
<T as Table>::Key::decode(&key).unwrap(),
<T as Table>::Value::decompress(&value).unwrap(),
))
};
match &*bmb {
Some(searcher) => {
if searcher.find_first_in(&value).is_some() ||
searcher.find_first_in(&key).is_some()
{
hits += 1;
return result()
}
}
None => {
hits += 1;
return result()
}
}
}
None
};
if filter.reverse {
Ok(cursor
.walk_back(None)?
.skip(filter.skip)
.filter_map(map_filter)
.take(filter.len)
.collect::<Vec<(_, _)>>())
} else {
Ok(cursor
.walk(None)?
.skip(filter.skip)
.filter_map(map_filter)
.take(filter.len)
.collect::<Vec<(_, _)>>())
}
})?;
Ok((data.map_err(|e: DatabaseError| eyre::eyre!(e))?, hits))
}
/// Grabs the content of the table for the given key
pub fn get<T: Table>(&self, key: T::Key) -> Result<Option<T::Value>> {
self.db.view(|tx| tx.get::<T>(key))?.map_err(|e| eyre::eyre!(e))
}
/// Grabs the content of the DupSort table for the given key and subkey
pub fn get_dup<T: DupSort>(&self, key: T::Key, subkey: T::SubKey) -> Result<Option<T::Value>> {
self.db
.view(|tx| tx.cursor_dup_read::<T>()?.seek_by_key_subkey(key, subkey))?
.map_err(|e| eyre::eyre!(e))
}
/// Drops the database at the given path.
pub fn drop(&mut self, path: impl AsRef<Path>) -> Result<()> {
let path = path.as_ref();
info!(target: "reth::cli", "Dropping database at {:?}", path);
fs::remove_dir_all(path)?;
Ok(())
}
/// Drops the provided table from the database.
pub fn drop_table<T: Table>(&mut self) -> Result<()> {
self.db.update(|tx| tx.clear::<T>())??;
Ok(())
}
}
/// Filters the results coming from the database.
#[derive(Debug)]
pub struct ListFilter {
/// Skip first N entries.
pub skip: usize,
/// Take N entries.
pub len: usize,
/// Sequence of bytes that will be searched on values and keys from the database.
pub search: Vec<u8>,
/// Minimum row size.
pub min_row_size: usize,
/// Minimum key size.
pub min_key_size: usize,
/// Minimum value size.
pub min_value_size: usize,
/// Reverse order of entries.
pub reverse: bool,
/// Only counts the number of filtered entries without decoding and returning them.
pub only_count: bool,
}
impl ListFilter {
/// If `search` has a list of bytes, then filter for rows that have this sequence.
pub fn has_search(&self) -> bool {
!self.search.is_empty()
}
/// Updates the page with new `skip` and `len` values.
pub fn update_page(&mut self, skip: usize, len: usize) {
self.skip = skip;
self.len = len;
}
}

View File

@@ -36,37 +36,70 @@ reth-tasks.workspace = true
reth-payload-builder.workspace = true
reth-basic-payload-builder.workspace = true
reth-consensus-common.workspace = true
reth-auto-seal-consensus.workspace = true
reth-beacon-consensus.workspace = true
reth-downloaders.workspace = true
reth-node-builder.workspace = true
reth-revm.workspace = true
reth-stages.workspace = true
reth-prune.workspace = true
reth-blockchain-tree.workspace = true
revm-inspectors.workspace = true
reth-snapshot.workspace = true
reth-ethereum-payload-builder.workspace = true
reth-optimism-payload-builder = { workspace = true, optional = true }
# async
tokio.workspace = true
# metrics
metrics-exporter-prometheus = "0.12.1"
once_cell.workspace = true
metrics-util = "0.15.0"
metrics-process = "=1.0.14"
metrics.workspace = true
reth-metrics.workspace = true
# misc
boyer-moore-magiclen = "0.2.16"
eyre.workspace = true
clap = { workspace = true, features = ["derive"] }
humantime = "2.1.0"
thiserror.workspace = true
const-str = "0.5.6"
rand.workspace = true
fdlimit = "0.3.0"
pin-project.workspace = true
# io
dirs-next = "2.0.0"
shellexpand = "3.0.0"
confy.workspace = true
serde.workspace = true
serde_json.workspace = true
# http/rpc
hyper = "0.14.25"
# tracing
tracing.workspace = true
# crypto
alloy-rlp.workspace = true
alloy-chains.workspace = true
secp256k1 = { workspace = true, features = ["global-context", "rand-std", "recovery"] }
# async
futures.workspace = true
[target.'cfg(target_os = "linux")'.dependencies]
procfs = { version = "0.16.0" }
[dev-dependencies]
# test vectors generation
proptest.workspace = true
tempfile.workspace = true
jsonrpsee.workspace = true
assert_matches.version = "1.5.0"
[features]
optimism = [
@@ -79,10 +112,10 @@ optimism = [
"reth-network/optimism",
"reth-network-api/optimism",
"reth-payload-builder/optimism",
"reth-optimism-payload-builder/optimism",
"reth-ethereum-payload-builder/optimism",
"reth-node-api/optimism",
]
[build-dependencies]
vergen = { version = "8.0.0", features = ["build", "cargo", "git", "git2"] }

View File

@@ -2,4 +2,5 @@
pub mod components;
pub mod config;
pub mod db_type;
pub mod ext;

View File

@@ -0,0 +1,130 @@
//! Stores engine API messages to disk for later inspection and replay.
use reth_beacon_consensus::BeaconEngineMessage;
use reth_node_api::EngineTypes;
use reth_primitives::fs::{self};
use reth_rpc_types::{
engine::{CancunPayloadFields, ForkchoiceState},
ExecutionPayload,
};
use serde::{Deserialize, Serialize};
use std::{collections::BTreeMap, path::PathBuf, time::SystemTime};
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
use tracing::*;
/// A message from the engine API that has been stored to disk.
#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub enum StoredEngineApiMessage<Attributes> {
/// The on-disk representation of an `engine_forkchoiceUpdated` method call.
ForkchoiceUpdated {
/// The [ForkchoiceState] sent in the persisted call.
state: ForkchoiceState,
/// The payload attributes sent in the persisted call, if any.
payload_attrs: Option<Attributes>,
},
/// The on-disk representation of an `engine_newPayload` method call.
NewPayload {
/// The [ExecutionPayload] sent in the persisted call.
payload: ExecutionPayload,
/// The Cancun-specific fields sent in the persisted call, if any.
cancun_fields: Option<CancunPayloadFields>,
},
}
/// This can read and write engine API messages in a specific directory.
#[derive(Debug)]
pub struct EngineApiStore {
/// The path to the directory that stores the engine API messages.
path: PathBuf,
}
impl EngineApiStore {
/// Creates a new [EngineApiStore] at the given path.
///
/// The path is expected to be a directory, where individual message JSON files will be stored.
pub fn new(path: PathBuf) -> Self {
Self { path }
}
/// Stores the received [BeaconEngineMessage] to disk, appending the `received_at` time to the
/// path.
pub fn on_message<Engine>(
&self,
msg: &BeaconEngineMessage<Engine>,
received_at: SystemTime,
) -> eyre::Result<()>
where
Engine: EngineTypes,
{
fs::create_dir_all(&self.path)?; // ensure that store path had been created
let timestamp = received_at.duration_since(SystemTime::UNIX_EPOCH).unwrap().as_millis();
match msg {
BeaconEngineMessage::ForkchoiceUpdated { state, payload_attrs, tx: _tx } => {
let filename = format!("{}-fcu-{}.json", timestamp, state.head_block_hash);
fs::write(
self.path.join(filename),
serde_json::to_vec(&StoredEngineApiMessage::ForkchoiceUpdated {
state: *state,
payload_attrs: payload_attrs.clone(),
})?,
)?;
}
BeaconEngineMessage::NewPayload { payload, cancun_fields, tx: _tx } => {
let filename = format!("{}-new_payload-{}.json", timestamp, payload.block_hash());
fs::write(
self.path.join(filename),
serde_json::to_vec(
&StoredEngineApiMessage::<Engine::PayloadAttributes>::NewPayload {
payload: payload.clone(),
cancun_fields: cancun_fields.clone(),
},
)?,
)?;
}
// noop
BeaconEngineMessage::TransitionConfigurationExchanged |
BeaconEngineMessage::EventListener(_) => (),
};
Ok(())
}
/// Finds and iterates through any stored engine API message files, ordered by timestamp.
pub fn engine_messages_iter(&self) -> eyre::Result<impl Iterator<Item = PathBuf>> {
let mut filenames_by_ts = BTreeMap::<u64, Vec<PathBuf>>::default();
for entry in fs::read_dir(&self.path)? {
let entry = entry?;
let filename = entry.file_name();
if let Some(filename) = filename.to_str().filter(|n| n.ends_with(".json")) {
if let Some(Ok(timestamp)) = filename.split('-').next().map(|n| n.parse::<u64>()) {
filenames_by_ts.entry(timestamp).or_default().push(entry.path());
tracing::debug!(target: "engine::store", timestamp, filename, "Queued engine API message");
} else {
tracing::warn!(target: "engine::store", %filename, "Could not parse timestamp from filename")
}
} else {
tracing::warn!(target: "engine::store", ?filename, "Skipping non json file");
}
}
Ok(filenames_by_ts.into_iter().flat_map(|(_, paths)| paths))
}
/// Intercepts an incoming engine API message, storing it to disk and forwarding it to the
/// engine channel.
pub async fn intercept<Engine>(
self,
mut rx: UnboundedReceiver<BeaconEngineMessage<Engine>>,
to_engine: UnboundedSender<BeaconEngineMessage<Engine>>,
) where
Engine: EngineTypes,
BeaconEngineMessage<Engine>: std::fmt::Debug,
{
loop {
let Some(msg) = rx.recv().await else { break };
if let Err(error) = self.on_message(&msg, SystemTime::now()) {
error!(target: "engine::intercept", ?msg, %error, "Error handling Engine API message");
}
let _ = to_engine.send(msg);
}
}
}

View File

@@ -1,6 +1,6 @@
//! Support for handling events emitted by node components.
use crate::commands::node::cl_events::ConsensusLayerHealthEvent;
use crate::cl_events::ConsensusLayerHealthEvent;
use futures::Stream;
use reth_beacon_consensus::BeaconConsensusEngineEvent;
use reth_db::{database::Database, database_metrics::DatabaseMetadata};
@@ -466,7 +466,7 @@ impl Display for Eta {
#[cfg(test)]
mod tests {
use crate::commands::node::events::Eta;
use crate::events::Eta;
use std::time::{Duration, Instant};
#[test]

View File

@@ -6,12 +6,16 @@
issue_tracker_base_url = "https://github.com/paradigmxyz/reth/issues/"
)]
#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
#![warn(unused_crate_dependencies)]
pub mod args;
pub mod cl_events;
pub mod cli;
pub mod dirs;
pub mod engine_api_store;
pub mod events;
pub mod init;
pub mod node_config;
pub mod prometheus_exporter;
pub mod utils;
pub mod version;

View File

@@ -1,22 +1,20 @@
//! Support for customizing the node
use super::cli::{components::RethRpcServerHandles, ext::DefaultRethNodeCommandConfig};
use crate::{
args::{
get_secret_key, DatabaseArgs, DebugArgs, DevArgs, NetworkArgs, PayloadBuilderArgs,
PruningArgs, RpcServerArgs, TxPoolArgs,
},
cl_events::ConsensusLayerHealthEvents,
cli::{
components::RethNodeComponentsImpl,
components::{RethNodeComponentsImpl, RethRpcServerHandles},
config::{RethRpcConfig, RethTransactionPoolConfig},
db_type::{DatabaseBuilder, DatabaseInstance},
ext::{RethCliExt, RethNodeCommandConfig},
},
commands::{
debug_cmd::EngineApiStore,
node::{cl_events::ConsensusLayerHealthEvents, events},
ext::{DefaultRethNodeCommandConfig, RethCliExt, RethNodeCommandConfig},
},
dirs::{ChainPath, DataDirPath, MaybePlatformPath},
engine_api_store::EngineApiStore,
events,
init::init_genesis,
prometheus_exporter,
utils::{get_single_header, write_peers_to_file},
@@ -119,8 +117,8 @@ pub static PROMETHEUS_RECORDER_HANDLE: Lazy<PrometheusHandle> =
/// # Example
/// ```rust
/// # use reth_tasks::{TaskManager, TaskSpawner};
/// # use reth::{
/// # builder::NodeConfig,
/// # use reth_node_core::{
/// # node_config::NodeConfig,
/// # cli::{
/// # ext::DefaultRethNodeCommandConfig,
/// # },
@@ -153,8 +151,8 @@ pub static PROMETHEUS_RECORDER_HANDLE: Lazy<PrometheusHandle> =
/// # Example
/// ```rust
/// # use reth_tasks::{TaskManager, TaskSpawner};
/// # use reth::{
/// # builder::NodeConfig,
/// # use reth_node_core::{
/// # node_config::NodeConfig,
/// # cli::{
/// # ext::DefaultRethNodeCommandConfig,
/// # },
@@ -368,8 +366,8 @@ impl NodeConfig {
/// # Example
/// ```rust
/// # use reth_tasks::{TaskManager, TaskSpawner};
/// # use reth::builder::NodeConfig;
/// # use reth::cli::{
/// # use reth_node_core::node_config::NodeConfig;
/// # use reth_node_core::cli::{
/// # ext::DefaultRethNodeCommandConfig,
/// # };
/// # use tokio::runtime::Handle;
@@ -1390,8 +1388,8 @@ impl NodeHandle {
///
/// # Example
/// ```
/// # use reth::{
/// # builder::{NodeConfig, spawn_node},
/// # use reth_node_core::{
/// # node_config::{NodeConfig, spawn_node},
/// # args::RpcServerArgs,
/// # };
/// async fn t() {
@@ -1404,7 +1402,6 @@ impl NodeHandle {
/// let (_handle, _manager) = spawn_node(builder).await.unwrap();
/// }
/// ```
pub async fn spawn_node(config: NodeConfig) -> eyre::Result<(NodeHandle, TaskManager)> {
let task_manager = TaskManager::current();
let ext = DefaultRethNodeCommandConfig::default();

View File

@@ -16,7 +16,7 @@ pub(crate) trait Hook: Fn() + Send + Sync {}
impl<T: Fn() + Send + Sync> Hook for T {}
/// Installs Prometheus as the metrics recorder.
pub(crate) fn install_recorder() -> eyre::Result<PrometheusHandle> {
pub fn install_recorder() -> eyre::Result<PrometheusHandle> {
let recorder = PrometheusBuilder::new().build_recorder();
let handle = recorder.handle();
@@ -74,7 +74,7 @@ async fn start_endpoint<F: Hook + 'static>(
}
/// Serves Prometheus metrics over HTTP with database and process metrics.
pub(crate) async fn serve<Metrics>(
pub async fn serve<Metrics>(
listen_addr: SocketAddr,
handle: PrometheusHandle,
db: Metrics,
@@ -244,7 +244,7 @@ fn describe_io_stats() {}
#[cfg(test)]
mod tests {
use crate::builder::PROMETHEUS_RECORDER_HANDLE;
use crate::node_config::PROMETHEUS_RECORDER_HANDLE;
use std::ops::Deref;
// Dependencies using different version of the `metrics` crate (to be exact, 0.21 vs 0.22)

View File

@@ -1,15 +1,8 @@
//! Common CLI utility functions.
//! Utility functions for node startup and shutdown, for example path parsing and retrieving single
//! blocks from the network.
use boyer_moore_magiclen::BMByte;
use eyre::Result;
use reth_consensus_common::validation::validate_block_standalone;
use reth_db::{
cursor::{DbCursorRO, DbDupCursorRO},
database::Database,
table::{Decode, Decompress, DupSort, Table, TableRow},
transaction::{DbTx, DbTxMut},
DatabaseError, RawTable, TableRawRow,
};
use reth_interfaces::p2p::{
bodies::client::BodiesClient,
headers::client::{HeadersClient, HeadersRequest},
@@ -24,14 +17,48 @@ use reth_rpc::{JwtError, JwtSecret};
use std::{
env::VarError,
path::{Path, PathBuf},
rc::Rc,
sync::Arc,
};
use tracing::{debug, info, trace, warn};
/// Exposing `open_db_read_only` function
pub mod db {
pub use reth_db::open_db_read_only;
/// Parses a user-specified path with support for environment variables and common shorthands (e.g.
/// ~ for the user's home directory).
pub fn parse_path(value: &str) -> Result<PathBuf, shellexpand::LookupError<VarError>> {
shellexpand::full(value).map(|path| PathBuf::from(path.into_owned()))
}
/// Attempts to retrieve or create a JWT secret from the specified path.
pub fn get_or_create_jwt_secret_from_path(path: &Path) -> Result<JwtSecret, JwtError> {
if path.exists() {
debug!(target: "reth::cli", ?path, "Reading JWT auth secret file");
JwtSecret::from_file(path)
} else {
info!(target: "reth::cli", ?path, "Creating JWT auth secret file");
JwtSecret::try_create(path)
}
}
/// Collect the peers from the [NetworkManager] and write them to the given `persistent_peers_file`,
/// if configured.
pub fn write_peers_to_file<C>(network: &NetworkManager<C>, persistent_peers_file: Option<PathBuf>)
where
C: BlockReader + Unpin,
{
if let Some(file_path) = persistent_peers_file {
let known_peers = network.all_peers().collect::<Vec<_>>();
if let Ok(known_peers) = serde_json::to_string_pretty(&known_peers) {
trace!(target: "reth::cli", peers_file =?file_path, num_peers=%known_peers.len(), "Saving current peers");
let parent_dir = file_path.parent().map(fs::create_dir_all).transpose();
match parent_dir.and_then(|_| fs::write(&file_path, known_peers)) {
Ok(_) => {
info!(target: "reth::cli", peers_file=?file_path, "Wrote network peers to file");
}
Err(err) => {
warn!(target: "reth::cli", ?err, peers_file=?file_path, "Failed to write network peers to file");
}
}
}
}
}
/// Get a single header from network
@@ -99,198 +126,3 @@ where
Ok(block)
}
/// Wrapper over DB that implements many useful DB queries.
#[derive(Debug)]
pub struct DbTool<'a, DB: Database> {
/// The database that the db tool will use.
pub db: &'a DB,
/// The [ChainSpec] that the db tool will use.
pub chain: Arc<ChainSpec>,
}
impl<'a, DB: Database> DbTool<'a, DB> {
/// Takes a DB where the tables have already been created.
pub fn new(db: &'a DB, chain: Arc<ChainSpec>) -> eyre::Result<Self> {
Ok(Self { db, chain })
}
/// Grabs the contents of the table within a certain index range and places the
/// entries into a [`HashMap`][std::collections::HashMap].
///
/// [`ListFilter`] can be used to further
/// filter down the desired results. (eg. List only rows which include `0xd3adbeef`)
pub fn list<T: Table>(&self, filter: &ListFilter) -> Result<(Vec<TableRow<T>>, usize)> {
let bmb = Rc::new(BMByte::from(&filter.search));
if bmb.is_none() && filter.has_search() {
eyre::bail!("Invalid search.")
}
let mut hits = 0;
let data = self.db.view(|tx| {
let mut cursor =
tx.cursor_read::<RawTable<T>>().expect("Was not able to obtain a cursor.");
let map_filter = |row: Result<TableRawRow<T>, _>| {
if let Ok((k, v)) = row {
let (key, value) = (k.into_key(), v.into_value());
if key.len() + value.len() < filter.min_row_size {
return None
}
if key.len() < filter.min_key_size {
return None
}
if value.len() < filter.min_value_size {
return None
}
let result = || {
if filter.only_count {
return None
}
Some((
<T as Table>::Key::decode(&key).unwrap(),
<T as Table>::Value::decompress(&value).unwrap(),
))
};
match &*bmb {
Some(searcher) => {
if searcher.find_first_in(&value).is_some() ||
searcher.find_first_in(&key).is_some()
{
hits += 1;
return result()
}
}
None => {
hits += 1;
return result()
}
}
}
None
};
if filter.reverse {
Ok(cursor
.walk_back(None)?
.skip(filter.skip)
.filter_map(map_filter)
.take(filter.len)
.collect::<Vec<(_, _)>>())
} else {
Ok(cursor
.walk(None)?
.skip(filter.skip)
.filter_map(map_filter)
.take(filter.len)
.collect::<Vec<(_, _)>>())
}
})?;
Ok((data.map_err(|e: DatabaseError| eyre::eyre!(e))?, hits))
}
/// Grabs the content of the table for the given key
pub fn get<T: Table>(&self, key: T::Key) -> Result<Option<T::Value>> {
self.db.view(|tx| tx.get::<T>(key))?.map_err(|e| eyre::eyre!(e))
}
/// Grabs the content of the DupSort table for the given key and subkey
pub fn get_dup<T: DupSort>(&self, key: T::Key, subkey: T::SubKey) -> Result<Option<T::Value>> {
self.db
.view(|tx| tx.cursor_dup_read::<T>()?.seek_by_key_subkey(key, subkey))?
.map_err(|e| eyre::eyre!(e))
}
/// Drops the database at the given path.
pub fn drop(&mut self, path: impl AsRef<Path>) -> Result<()> {
let path = path.as_ref();
info!(target: "reth::cli", "Dropping database at {:?}", path);
fs::remove_dir_all(path)?;
Ok(())
}
/// Drops the provided table from the database.
pub fn drop_table<T: Table>(&mut self) -> Result<()> {
self.db.update(|tx| tx.clear::<T>())??;
Ok(())
}
}
/// Parses a user-specified path with support for environment variables and common shorthands (e.g.
/// ~ for the user's home directory).
pub fn parse_path(value: &str) -> Result<PathBuf, shellexpand::LookupError<VarError>> {
shellexpand::full(value).map(|path| PathBuf::from(path.into_owned()))
}
/// Filters the results coming from the database.
#[derive(Debug)]
pub struct ListFilter {
/// Skip first N entries.
pub skip: usize,
/// Take N entries.
pub len: usize,
/// Sequence of bytes that will be searched on values and keys from the database.
pub search: Vec<u8>,
/// Minimum row size.
pub min_row_size: usize,
/// Minimum key size.
pub min_key_size: usize,
/// Minimum value size.
pub min_value_size: usize,
/// Reverse order of entries.
pub reverse: bool,
/// Only counts the number of filtered entries without decoding and returning them.
pub only_count: bool,
}
impl ListFilter {
/// If `search` has a list of bytes, then filter for rows that have this sequence.
pub fn has_search(&self) -> bool {
!self.search.is_empty()
}
/// Updates the page with new `skip` and `len` values.
pub fn update_page(&mut self, skip: usize, len: usize) {
self.skip = skip;
self.len = len;
}
}
/// Attempts to retrieve or create a JWT secret from the specified path.
pub fn get_or_create_jwt_secret_from_path(path: &Path) -> Result<JwtSecret, JwtError> {
if path.exists() {
debug!(target: "reth::cli", ?path, "Reading JWT auth secret file");
JwtSecret::from_file(path)
} else {
info!(target: "reth::cli", ?path, "Creating JWT auth secret file");
JwtSecret::try_create(path)
}
}
/// Collect the peers from the [NetworkManager] and write them to the given `persistent_peers_file`,
/// if configured.
pub fn write_peers_to_file<C>(network: &NetworkManager<C>, persistent_peers_file: Option<PathBuf>)
where
C: BlockReader + Unpin,
{
if let Some(file_path) = persistent_peers_file {
let known_peers = network.all_peers().collect::<Vec<_>>();
if let Ok(known_peers) = serde_json::to_string_pretty(&known_peers) {
trace!(target: "reth::cli", peers_file =?file_path, num_peers=%known_peers.len(), "Saving current peers");
let parent_dir = file_path.parent().map(fs::create_dir_all).transpose();
match parent_dir.and_then(|_| fs::write(&file_path, known_peers)) {
Ok(_) => {
info!(target: "reth::cli", peers_file=?file_path, "Wrote network peers to file");
}
Err(err) => {
warn!(target: "reth::cli", ?err, peers_file=?file_path, "Failed to write network peers to file");
}
}
}
}
}