feat: add metrics to engine RPC API (#4912)

This commit is contained in:
Dan Cline
2023-10-05 15:51:50 -04:00
committed by GitHub
parent 50b5215f4b
commit 49ac2f5110
5 changed files with 100 additions and 15 deletions

View File

@@ -1,5 +1,6 @@
use crate::{
payload::PayloadOrAttributes, EngineApiError, EngineApiMessageVersion, EngineApiResult,
metrics::EngineApiMetrics, payload::PayloadOrAttributes, EngineApiError,
EngineApiMessageVersion, EngineApiResult,
};
use async_trait::async_trait;
use jsonrpsee_core::RpcResult;
@@ -19,7 +20,7 @@ use reth_rpc_types_compat::engine::payload::{
convert_payload_input_v2_to_payload, convert_to_payload_body_v1,
};
use reth_tasks::TaskSpawner;
use std::sync::Arc;
use std::{sync::Arc, time::Instant};
use tokio::sync::oneshot;
use tracing::trace;
@@ -46,6 +47,8 @@ struct EngineApiInner<Provider> {
payload_store: PayloadStore,
/// For spawning and executing async tasks
task_spawner: Box<dyn TaskSpawner>,
/// The metrics for engine api calls
metrics: EngineApiMetrics,
}
impl<Provider> EngineApi<Provider>
@@ -66,6 +69,7 @@ where
beacon_consensus,
payload_store,
task_spawner,
metrics: EngineApiMetrics::default(),
});
Self { inner }
}
@@ -591,14 +595,20 @@ where
/// Caution: This should not accept the `withdrawals` field
async fn new_payload_v1(&self, payload: ExecutionPayloadV1) -> RpcResult<PayloadStatus> {
trace!(target: "rpc::engine", "Serving engine_newPayloadV1");
Ok(EngineApi::new_payload_v1(self, payload).await?)
let start = Instant::now();
let res = EngineApi::new_payload_v1(self, payload).await;
self.inner.metrics.new_payload_v1.record(start.elapsed());
Ok(res?)
}
/// Handler for `engine_newPayloadV2`
/// See also <https://github.com/ethereum/execution-apis/blob/584905270d8ad665718058060267061ecfd79ca5/src/engine/shanghai.md#engine_newpayloadv2>
async fn new_payload_v2(&self, payload: ExecutionPayloadInputV2) -> RpcResult<PayloadStatus> {
trace!(target: "rpc::engine", "Serving engine_newPayloadV2");
Ok(EngineApi::new_payload_v2(self, payload).await?)
let start = Instant::now();
let res = EngineApi::new_payload_v2(self, payload).await;
self.inner.metrics.new_payload_v2.record(start.elapsed());
Ok(res?)
}
/// Handler for `engine_newPayloadV3`
@@ -610,8 +620,12 @@ where
parent_beacon_block_root: B256,
) -> RpcResult<PayloadStatus> {
trace!(target: "rpc::engine", "Serving engine_newPayloadV3");
Ok(EngineApi::new_payload_v3(self, payload, versioned_hashes, parent_beacon_block_root)
.await?)
let start = Instant::now();
let res =
EngineApi::new_payload_v3(self, payload, versioned_hashes, parent_beacon_block_root)
.await;
self.inner.metrics.new_payload_v3.record(start.elapsed());
Ok(res?)
}
/// Handler for `engine_forkchoiceUpdatedV1`
@@ -624,7 +638,11 @@ where
payload_attributes: Option<PayloadAttributes>,
) -> RpcResult<ForkchoiceUpdated> {
trace!(target: "rpc::engine", "Serving engine_forkchoiceUpdatedV1");
Ok(EngineApi::fork_choice_updated_v1(self, fork_choice_state, payload_attributes).await?)
let start = Instant::now();
let res =
EngineApi::fork_choice_updated_v1(self, fork_choice_state, payload_attributes).await;
self.inner.metrics.fork_choice_updated_v1.record(start.elapsed());
Ok(res?)
}
/// Handler for `engine_forkchoiceUpdatedV2`
@@ -635,7 +653,11 @@ where
payload_attributes: Option<PayloadAttributes>,
) -> RpcResult<ForkchoiceUpdated> {
trace!(target: "rpc::engine", "Serving engine_forkchoiceUpdatedV2");
Ok(EngineApi::fork_choice_updated_v2(self, fork_choice_state, payload_attributes).await?)
let start = Instant::now();
let res =
EngineApi::fork_choice_updated_v2(self, fork_choice_state, payload_attributes).await;
self.inner.metrics.fork_choice_updated_v2.record(start.elapsed());
Ok(res?)
}
/// Handler for `engine_forkchoiceUpdatedV2`
@@ -647,7 +669,11 @@ where
payload_attributes: Option<PayloadAttributes>,
) -> RpcResult<ForkchoiceUpdated> {
trace!(target: "rpc::engine", "Serving engine_forkchoiceUpdatedV3");
Ok(EngineApi::fork_choice_updated_v3(self, fork_choice_state, payload_attributes).await?)
let start = Instant::now();
let res =
EngineApi::fork_choice_updated_v3(self, fork_choice_state, payload_attributes).await;
self.inner.metrics.fork_choice_updated_v3.record(start.elapsed());
Ok(res?)
}
/// Handler for `engine_getPayloadV1`
@@ -663,7 +689,10 @@ where
/// > Provider software MAY stop the corresponding build process after serving this call.
async fn get_payload_v1(&self, payload_id: PayloadId) -> RpcResult<ExecutionPayloadV1> {
trace!(target: "rpc::engine", "Serving engine_getPayloadV1");
Ok(EngineApi::get_payload_v1(self, payload_id).await?)
let start = Instant::now();
let res = EngineApi::get_payload_v1(self, payload_id).await;
self.inner.metrics.get_payload_v1.record(start.elapsed());
Ok(res?)
}
/// Handler for `engine_getPayloadV2`
@@ -677,7 +706,10 @@ where
/// > Provider software MAY stop the corresponding build process after serving this call.
async fn get_payload_v2(&self, payload_id: PayloadId) -> RpcResult<ExecutionPayloadEnvelopeV2> {
trace!(target: "rpc::engine", "Serving engine_getPayloadV2");
Ok(EngineApi::get_payload_v2(self, payload_id).await?)
let start = Instant::now();
let res = EngineApi::get_payload_v2(self, payload_id).await;
self.inner.metrics.get_payload_v2.record(start.elapsed());
Ok(res?)
}
/// Handler for `engine_getPayloadV3`
@@ -691,7 +723,10 @@ where
/// > Provider software MAY stop the corresponding build process after serving this call.
async fn get_payload_v3(&self, payload_id: PayloadId) -> RpcResult<ExecutionPayloadEnvelopeV3> {
trace!(target: "rpc::engine", "Serving engine_getPayloadV3");
Ok(EngineApi::get_payload_v3(self, payload_id).await?)
let start = Instant::now();
let res = EngineApi::get_payload_v3(self, payload_id).await;
self.inner.metrics.get_payload_v3.record(start.elapsed());
Ok(res?)
}
/// Handler for `engine_getPayloadBodiesByHashV1`
@@ -701,7 +736,10 @@ where
block_hashes: Vec<BlockHash>,
) -> RpcResult<ExecutionPayloadBodiesV1> {
trace!(target: "rpc::engine", "Serving engine_getPayloadBodiesByHashV1");
Ok(EngineApi::get_payload_bodies_by_hash(self, block_hashes)?)
let start = Instant::now();
let res = EngineApi::get_payload_bodies_by_hash(self, block_hashes);
self.inner.metrics.get_payload_bodies_by_hash_v1.record(start.elapsed());
Ok(res?)
}
/// Handler for `engine_getPayloadBodiesByRangeV1`
@@ -726,7 +764,10 @@ where
count: U64,
) -> RpcResult<ExecutionPayloadBodiesV1> {
trace!(target: "rpc::engine", "Serving engine_getPayloadBodiesByRangeV1");
Ok(EngineApi::get_payload_bodies_by_range(self, start.to(), count.to()).await?)
let start_time = Instant::now();
let res = EngineApi::get_payload_bodies_by_range(self, start.to(), count.to()).await;
self.inner.metrics.get_payload_bodies_by_range_v1.record(start_time.elapsed());
Ok(res?)
}
/// Handler for `engine_exchangeTransitionConfigurationV1`
@@ -736,7 +777,10 @@ where
config: TransitionConfiguration,
) -> RpcResult<TransitionConfiguration> {
trace!(target: "rpc::engine", "Serving engine_exchangeTransitionConfigurationV1");
Ok(EngineApi::exchange_transition_configuration(self, config).await?)
let start = Instant::now();
let res = EngineApi::exchange_transition_configuration(self, config).await;
self.inner.metrics.exchange_transition_configuration.record(start.elapsed());
Ok(res?)
}
/// Handler for `engine_exchangeCapabilitiesV1`