feat: sync.execution.mgas_processed_total metric (#1868)

This commit is contained in:
Bjerg
2023-03-20 19:49:16 +01:00
committed by GitHub
parent 292be93ed2
commit 72f59d222b
4 changed files with 39 additions and 23 deletions

View File

@@ -98,9 +98,8 @@ async fn unwind_and_copy<DB: Database>(
) -> eyre::Result<()> {
let mut unwind_tx = Transaction::new(db_tool.db)?;
let mut exec_stage = ExecutionStage::new_default_threshold(reth_executor::Factory::new(
Arc::new(MAINNET.clone()),
));
let mut exec_stage =
ExecutionStage::new_with_factory(reth_executor::Factory::new(Arc::new(MAINNET.clone())));
exec_stage
.unwind(
@@ -129,9 +128,8 @@ async fn dry_run(
info!(target: "reth::cli", "Executing stage. [dry-run]");
let mut tx = Transaction::new(&output_db)?;
let mut exec_stage = ExecutionStage::new_default_threshold(reth_executor::Factory::new(
Arc::new(MAINNET.clone()),
));
let mut exec_stage =
ExecutionStage::new_with_factory(reth_executor::Factory::new(Arc::new(MAINNET.clone())));
exec_stage
.execute(

View File

@@ -72,11 +72,9 @@ async fn unwind_and_copy<DB: Database>(
MerkleStage::default_unwind().unwind(&mut unwind_tx, unwind).await?;
// Bring Plainstate to TO (hashing stage execution requires it)
let mut exec_stage = ExecutionStage::new_default_threshold(reth_executor::Factory::new(
Arc::new(MAINNET.clone()),
));
let mut exec_stage =
ExecutionStage::new(reth_executor::Factory::new(Arc::new(MAINNET.clone())), u64::MAX);
exec_stage.commit_threshold = u64::MAX;
exec_stage
.unwind(
&mut unwind_tx,

View File

@@ -164,8 +164,7 @@ impl Command {
}
StageEnum::Execution => {
let factory = reth_executor::Factory::new(self.chain.clone());
let mut stage = ExecutionStage::new(factory, 10_000);
stage.commit_threshold = num_blocks;
let mut stage = ExecutionStage::new(factory, num_blocks);
if !self.skip_unwind {
stage.unwind(&mut tx, unwind).await?;
}

View File

@@ -2,6 +2,7 @@ use crate::{
exec_or_return, ExecAction, ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput,
UnwindOutput,
};
use metrics_core::Counter;
use reth_db::{
cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO},
database::Database,
@@ -10,6 +11,7 @@ use reth_db::{
transaction::{DbTx, DbTxMut},
};
use reth_interfaces::provider::ProviderError;
use reth_metrics_derive::Metrics;
use reth_primitives::{Address, Block, U256};
use reth_provider::{
post_state::PostState, BlockExecutor, ExecutorFactory, LatestStateProviderRef, Transaction,
@@ -19,6 +21,14 @@ use tracing::*;
/// The [`StageId`] of the execution stage.
pub const EXECUTION: StageId = StageId("Execution");
/// Execution stage metrics.
#[derive(Metrics)]
#[metrics(scope = "sync.execution")]
pub struct ExecutionStageMetrics {
/// The total amount of gas processed (in millions)
mgas_processed_total: Counter,
}
/// The execution stage executes all transactions and
/// update history indexes.
///
@@ -50,22 +60,28 @@ pub const EXECUTION: StageId = StageId("Execution");
// false positive, we cannot derive it if !DB: Debug.
#[allow(missing_debug_implementations)]
pub struct ExecutionStage<EF: ExecutorFactory> {
metrics: ExecutionStageMetrics,
/// The stage's internal executor
pub executor_factory: EF,
executor_factory: EF,
/// Commit threshold
pub commit_threshold: u64,
commit_threshold: u64,
}
impl<EF: ExecutorFactory> ExecutionStage<EF> {
/// Create new execution stage with specified config.
pub fn new(executor_factory: EF, commit_threshold: u64) -> Self {
Self { executor_factory, commit_threshold }
Self { metrics: ExecutionStageMetrics::default(), executor_factory, commit_threshold }
}
/// Create execution stage with executor factory and default commit threshold set to 10_000
/// blocks
pub fn new_default_threshold(executor_factory: EF) -> Self {
Self { executor_factory, commit_threshold: 10_000 }
/// Create an execution stage with the provided executor factory.
///
/// The commit threshold will be set to 10_000.
pub fn new_with_factory(executor_factory: EF) -> Self {
Self {
metrics: ExecutionStageMetrics::default(),
executor_factory,
commit_threshold: 10_000,
}
}
/// Execute the stage.
@@ -114,7 +130,7 @@ impl<EF: ExecutorFactory> ExecutionStage<EF> {
let mut executor = self.executor_factory.with_sp(LatestStateProviderRef::new(&**tx));
// Fetch transactions, execute them and generate results
let mut changesets = PostState::default();
let mut state = PostState::default();
for (header, td, body, ommers, withdrawals) in block_batch.into_iter() {
let block_number = header.number;
tracing::trace!(target: "sync::stages::execution", ?block_number, "Execute block.");
@@ -149,19 +165,24 @@ impl<EF: ExecutorFactory> ExecutionStage<EF> {
trace!(target: "sync::stages::execution", number = block_number, txs = transactions.len(), "Executing block");
// Configure the executor to use the current state.
let changeset = executor
let block_state = executor
.execute_and_verify_receipt(
&Block { header, body: transactions, ommers, withdrawals },
td,
Some(signers),
)
.map_err(|error| StageError::ExecutionError { block: block_number, error })?;
changesets.extend(changeset);
if let Some(last_receipt) = block_state.receipts().last() {
self.metrics
.mgas_processed_total
.increment(last_receipt.cumulative_gas_used / 1_000_000);
}
state.extend(block_state);
}
// put execution results to database
let first_transition_id = tx.get_block_transition(last_block)?;
changesets.write_to_db(&**tx, first_transition_id)?;
state.write_to_db(&**tx, first_transition_id)?;
let done = !capped;
info!(target: "sync::stages::execution", stage_progress = end_block, done, "Sync iteration finished");