From 72f59d222b39dd213a97aa009689dd576ff79c37 Mon Sep 17 00:00:00 2001 From: Bjerg Date: Mon, 20 Mar 2023 19:49:16 +0100 Subject: [PATCH] feat: `sync.execution.mgas_processed_total` metric (#1868) --- bin/reth/src/dump_stage/execution.rs | 10 +++---- bin/reth/src/dump_stage/merkle.rs | 6 ++-- bin/reth/src/stage/mod.rs | 3 +- crates/stages/src/stages/execution.rs | 43 ++++++++++++++++++++------- 4 files changed, 39 insertions(+), 23 deletions(-) diff --git a/bin/reth/src/dump_stage/execution.rs b/bin/reth/src/dump_stage/execution.rs index d06f0345a9..a196c8ead2 100644 --- a/bin/reth/src/dump_stage/execution.rs +++ b/bin/reth/src/dump_stage/execution.rs @@ -98,9 +98,8 @@ async fn unwind_and_copy( ) -> eyre::Result<()> { let mut unwind_tx = Transaction::new(db_tool.db)?; - let mut exec_stage = ExecutionStage::new_default_threshold(reth_executor::Factory::new( - Arc::new(MAINNET.clone()), - )); + let mut exec_stage = + ExecutionStage::new_with_factory(reth_executor::Factory::new(Arc::new(MAINNET.clone()))); exec_stage .unwind( @@ -129,9 +128,8 @@ async fn dry_run( info!(target: "reth::cli", "Executing stage. [dry-run]"); let mut tx = Transaction::new(&output_db)?; - let mut exec_stage = ExecutionStage::new_default_threshold(reth_executor::Factory::new( - Arc::new(MAINNET.clone()), - )); + let mut exec_stage = + ExecutionStage::new_with_factory(reth_executor::Factory::new(Arc::new(MAINNET.clone()))); exec_stage .execute( diff --git a/bin/reth/src/dump_stage/merkle.rs b/bin/reth/src/dump_stage/merkle.rs index 4b8e1bc98a..176f631c8a 100644 --- a/bin/reth/src/dump_stage/merkle.rs +++ b/bin/reth/src/dump_stage/merkle.rs @@ -72,11 +72,9 @@ async fn unwind_and_copy( MerkleStage::default_unwind().unwind(&mut unwind_tx, unwind).await?; // Bring Plainstate to TO (hashing stage execution requires it) - let mut exec_stage = ExecutionStage::new_default_threshold(reth_executor::Factory::new( - Arc::new(MAINNET.clone()), - )); + let mut exec_stage = + ExecutionStage::new(reth_executor::Factory::new(Arc::new(MAINNET.clone())), u64::MAX); - exec_stage.commit_threshold = u64::MAX; exec_stage .unwind( &mut unwind_tx, diff --git a/bin/reth/src/stage/mod.rs b/bin/reth/src/stage/mod.rs index eb4409dfdc..3d18769bae 100644 --- a/bin/reth/src/stage/mod.rs +++ b/bin/reth/src/stage/mod.rs @@ -164,8 +164,7 @@ impl Command { } StageEnum::Execution => { let factory = reth_executor::Factory::new(self.chain.clone()); - let mut stage = ExecutionStage::new(factory, 10_000); - stage.commit_threshold = num_blocks; + let mut stage = ExecutionStage::new(factory, num_blocks); if !self.skip_unwind { stage.unwind(&mut tx, unwind).await?; } diff --git a/crates/stages/src/stages/execution.rs b/crates/stages/src/stages/execution.rs index d0d44d8967..82a9ce1fc1 100644 --- a/crates/stages/src/stages/execution.rs +++ b/crates/stages/src/stages/execution.rs @@ -2,6 +2,7 @@ use crate::{ exec_or_return, ExecAction, ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput, UnwindOutput, }; +use metrics_core::Counter; use reth_db::{ cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO}, database::Database, @@ -10,6 +11,7 @@ use reth_db::{ transaction::{DbTx, DbTxMut}, }; use reth_interfaces::provider::ProviderError; +use reth_metrics_derive::Metrics; use reth_primitives::{Address, Block, U256}; use reth_provider::{ post_state::PostState, BlockExecutor, ExecutorFactory, LatestStateProviderRef, Transaction, @@ -19,6 +21,14 @@ use tracing::*; /// The [`StageId`] of the execution stage. pub const EXECUTION: StageId = StageId("Execution"); +/// Execution stage metrics. +#[derive(Metrics)] +#[metrics(scope = "sync.execution")] +pub struct ExecutionStageMetrics { + /// The total amount of gas processed (in millions) + mgas_processed_total: Counter, +} + /// The execution stage executes all transactions and /// update history indexes. /// @@ -50,22 +60,28 @@ pub const EXECUTION: StageId = StageId("Execution"); // false positive, we cannot derive it if !DB: Debug. #[allow(missing_debug_implementations)] pub struct ExecutionStage { + metrics: ExecutionStageMetrics, /// The stage's internal executor - pub executor_factory: EF, + executor_factory: EF, /// Commit threshold - pub commit_threshold: u64, + commit_threshold: u64, } impl ExecutionStage { /// Create new execution stage with specified config. pub fn new(executor_factory: EF, commit_threshold: u64) -> Self { - Self { executor_factory, commit_threshold } + Self { metrics: ExecutionStageMetrics::default(), executor_factory, commit_threshold } } - /// Create execution stage with executor factory and default commit threshold set to 10_000 - /// blocks - pub fn new_default_threshold(executor_factory: EF) -> Self { - Self { executor_factory, commit_threshold: 10_000 } + /// Create an execution stage with the provided executor factory. + /// + /// The commit threshold will be set to 10_000. + pub fn new_with_factory(executor_factory: EF) -> Self { + Self { + metrics: ExecutionStageMetrics::default(), + executor_factory, + commit_threshold: 10_000, + } } /// Execute the stage. @@ -114,7 +130,7 @@ impl ExecutionStage { let mut executor = self.executor_factory.with_sp(LatestStateProviderRef::new(&**tx)); // Fetch transactions, execute them and generate results - let mut changesets = PostState::default(); + let mut state = PostState::default(); for (header, td, body, ommers, withdrawals) in block_batch.into_iter() { let block_number = header.number; tracing::trace!(target: "sync::stages::execution", ?block_number, "Execute block."); @@ -149,19 +165,24 @@ impl ExecutionStage { trace!(target: "sync::stages::execution", number = block_number, txs = transactions.len(), "Executing block"); // Configure the executor to use the current state. - let changeset = executor + let block_state = executor .execute_and_verify_receipt( &Block { header, body: transactions, ommers, withdrawals }, td, Some(signers), ) .map_err(|error| StageError::ExecutionError { block: block_number, error })?; - changesets.extend(changeset); + if let Some(last_receipt) = block_state.receipts().last() { + self.metrics + .mgas_processed_total + .increment(last_receipt.cumulative_gas_used / 1_000_000); + } + state.extend(block_state); } // put execution results to database let first_transition_id = tx.get_block_transition(last_block)?; - changesets.write_to_db(&**tx, first_transition_id)?; + state.write_to_db(&**tx, first_transition_id)?; let done = !capped; info!(target: "sync::stages::execution", stage_progress = end_block, done, "Sync iteration finished");