chore(deps): migrate to jsonrpsee 0.22 (#5894)

Co-authored-by: DaniPopes <57450786+DaniPopes@users.noreply.github.com>
This commit is contained in:
Matthias Seitz
2024-04-05 18:02:05 +02:00
committed by GitHub
parent 3236baef5e
commit 67cfb06fbb
29 changed files with 715 additions and 549 deletions

View File

@@ -7,8 +7,9 @@ use crate::{
use hyper::header::AUTHORIZATION;
pub use jsonrpsee::server::ServerBuilder;
use jsonrpsee::{
core::RegisterMethodError,
http_client::HeaderMap,
server::{RpcModule, ServerHandle},
server::{AlreadyStoppedError, RpcModule, ServerHandle},
Methods,
};
use reth_network_api::{NetworkInfo, Peers};
@@ -32,6 +33,7 @@ use std::{
net::{IpAddr, Ipv4Addr, SocketAddr},
time::{Duration, SystemTime, UNIX_EPOCH},
};
use tower::layer::util::Identity;
/// Configure and launch a _standalone_ auth server with `engine` and a _new_ `eth` namespace.
#[allow(clippy::too_many_arguments)]
@@ -129,12 +131,14 @@ where
// By default, both http and ws are enabled.
let server = ServerBuilder::new()
.set_middleware(middleware)
.set_http_middleware(middleware)
.build(socket_addr)
.await
.map_err(|err| RpcError::from_jsonrpsee_error(err, ServerKind::Auth(socket_addr)))?;
.map_err(|err| RpcError::server_error(err, ServerKind::Auth(socket_addr)))?;
let local_addr = server.local_addr()?;
let local_addr = server
.local_addr()
.map_err(|err| RpcError::server_error(err, ServerKind::Auth(socket_addr)))?;
let handle = server.start(module);
Ok(AuthServerHandle { handle, local_addr, secret })
@@ -148,7 +152,7 @@ pub struct AuthServerConfig {
/// The secret for the auth layer of the server.
pub(crate) secret: JwtSecret,
/// Configs for JSON-RPC Http.
pub(crate) server_config: ServerBuilder,
pub(crate) server_config: ServerBuilder<Identity, Identity>,
}
// === impl AuthServerConfig ===
@@ -173,12 +177,15 @@ impl AuthServerConfig {
.layer(AuthLayer::new(JwtAuthValidator::new(secret.clone())));
// By default, both http and ws are enabled.
let server =
server_config.set_middleware(middleware).build(socket_addr).await.map_err(|err| {
RpcError::from_jsonrpsee_error(err, ServerKind::Auth(socket_addr))
})?;
let server = server_config
.set_http_middleware(middleware)
.build(socket_addr)
.await
.map_err(|err| RpcError::server_error(err, ServerKind::Auth(socket_addr)))?;
let local_addr = server.local_addr()?;
let local_addr = server
.local_addr()
.map_err(|err| RpcError::server_error(err, ServerKind::Auth(socket_addr)))?;
let handle = server.start(module.inner);
Ok(AuthServerHandle { handle, local_addr, secret })
@@ -190,7 +197,7 @@ impl AuthServerConfig {
pub struct AuthServerConfigBuilder {
socket_addr: Option<SocketAddr>,
secret: JwtSecret,
server_config: Option<ServerBuilder>,
server_config: Option<ServerBuilder<Identity, Identity>>,
}
// === impl AuthServerConfigBuilder ===
@@ -223,7 +230,7 @@ impl AuthServerConfigBuilder {
///
/// Note: this always configures an [EthSubscriptionIdProvider]
/// [IdProvider](jsonrpsee::server::IdProvider) for convenience.
pub fn with_server_config(mut self, config: ServerBuilder) -> Self {
pub fn with_server_config(mut self, config: ServerBuilder<Identity, Identity>) -> Self {
self.server_config = Some(config.set_id_provider(EthSubscriptionIdProvider::default()));
self
}
@@ -286,7 +293,7 @@ impl AuthRpcModule {
pub fn merge_auth_methods(
&mut self,
other: impl Into<Methods>,
) -> Result<bool, jsonrpsee::core::error::Error> {
) -> Result<bool, RegisterMethodError> {
self.module_mut().merge(other.into()).map(|_| true)
}
@@ -320,8 +327,8 @@ impl AuthServerHandle {
}
/// Tell the server to stop without waiting for the server to stop.
pub fn stop(self) -> Result<(), RpcError> {
Ok(self.handle.stop()?)
pub fn stop(self) -> Result<(), AlreadyStoppedError> {
self.handle.stop()
}
/// Returns the url to the http server

View File

@@ -1,8 +1,6 @@
use std::net::SocketAddr;
use crate::RethRpcModule;
use jsonrpsee::core::Error as JsonRpseeError;
use std::{io, io::ErrorKind};
use reth_ipc::server::IpcServerStartError;
use std::{io, io::ErrorKind, net::SocketAddr};
/// Rpc server kind.
#[derive(Debug, PartialEq, Eq, Copy, Clone)]
@@ -40,12 +38,17 @@ impl std::fmt::Display for ServerKind {
}
}
/// Rpc Errors.
/// Rpc Server related errors
#[derive(Debug, thiserror::Error)]
pub enum RpcError {
/// Wrapper for `jsonrpsee::core::Error`.
#[error(transparent)]
RpcError(#[from] JsonRpseeError),
/// Thrown during server start.
#[error("Failed to start {kind} server: {error}")]
ServerError {
/// Server kind.
kind: ServerKind,
/// IO error.
error: io::Error,
},
/// Address already in use.
#[error("address {kind} is already in use (os error 98). Choose a different port using {}", kind.flags())]
AddressAlreadyInUse {
@@ -57,28 +60,21 @@ pub enum RpcError {
/// Http and WS server configured on the same port but with conflicting settings.
#[error(transparent)]
WsHttpSamePortError(#[from] WsHttpSamePortError),
/// Thrown when IPC server fails to start.
#[error(transparent)]
IpcServerError(#[from] IpcServerStartError),
/// Custom error.
#[error("{0}")]
Custom(String),
}
impl RpcError {
/// Converts a `jsonrpsee::core::Error` to a more descriptive `RpcError`.
pub fn from_jsonrpsee_error(err: JsonRpseeError, kind: ServerKind) -> RpcError {
match err {
JsonRpseeError::Transport(err) => {
if let Some(io_error) = err.downcast_ref::<io::Error>() {
if io_error.kind() == ErrorKind::AddrInUse {
return RpcError::AddressAlreadyInUse {
kind,
error: io::Error::from(io_error.kind()),
}
}
}
RpcError::RpcError(JsonRpseeError::Transport(err))
}
_ => err.into(),
/// Converts an [io::Error] to a more descriptive `RpcError`.
pub fn server_error(io_error: io::Error, kind: ServerKind) -> RpcError {
if io_error.kind() == ErrorKind::AddrInUse {
return RpcError::AddressAlreadyInUse { kind, error: io_error }
}
RpcError::ServerError { kind, error: io_error }
}
}

View File

@@ -155,7 +155,7 @@
#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
use crate::{
auth::AuthRpcModule, error::WsHttpSamePortError, metrics::RpcServerMetrics,
auth::AuthRpcModule, error::WsHttpSamePortError, metrics::RpcRequestMetrics,
RpcModuleSelection::Selection,
};
use constants::*;
@@ -163,7 +163,8 @@ use error::{RpcError, ServerKind};
use hyper::{header::AUTHORIZATION, HeaderMap};
pub use jsonrpsee::server::ServerBuilder;
use jsonrpsee::{
server::{IdProvider, Server, ServerHandle},
core::RegisterMethodError,
server::{AlreadyStoppedError, IdProvider, RpcServiceBuilder, Server, ServerHandle},
Methods, RpcModule,
};
use reth_ipc::server::IpcServer;
@@ -200,7 +201,7 @@ use std::{
time::{Duration, SystemTime, UNIX_EPOCH},
};
use strum::{AsRefStr, EnumIter, IntoStaticStr, ParseError, VariantArray, VariantNames};
use tower::layer::util::{Identity, Stack};
pub use tower::layer::util::{Identity, Stack};
use tower_http::cors::CorsLayer;
use tracing::{instrument, trace};
@@ -1440,13 +1441,13 @@ where
#[derive(Default)]
pub struct RpcServerConfig {
/// Configs for JSON-RPC Http.
http_server_config: Option<ServerBuilder>,
http_server_config: Option<ServerBuilder<Identity, Identity>>,
/// Allowed CORS Domains for http
http_cors_domains: Option<String>,
/// Address where to bind the http server to
http_addr: Option<SocketAddr>,
/// Configs for WS server
ws_server_config: Option<ServerBuilder>,
ws_server_config: Option<ServerBuilder<Identity, Identity>>,
/// Allowed CORS Domains for ws.
ws_cors_domains: Option<String>,
/// Address where to bind the ws server to
@@ -1478,12 +1479,12 @@ impl fmt::Debug for RpcServerConfig {
impl RpcServerConfig {
/// Creates a new config with only http set
pub fn http(config: ServerBuilder) -> Self {
pub fn http(config: ServerBuilder<Identity, Identity>) -> Self {
Self::default().with_http(config)
}
/// Creates a new config with only ws set
pub fn ws(config: ServerBuilder) -> Self {
pub fn ws(config: ServerBuilder<Identity, Identity>) -> Self {
Self::default().with_ws(config)
}
@@ -1496,7 +1497,7 @@ impl RpcServerConfig {
///
/// Note: this always configures an [EthSubscriptionIdProvider] [IdProvider] for convenience.
/// To set a custom [IdProvider], please use [Self::with_id_provider].
pub fn with_http(mut self, config: ServerBuilder) -> Self {
pub fn with_http(mut self, config: ServerBuilder<Identity, Identity>) -> Self {
self.http_server_config =
Some(config.set_id_provider(EthSubscriptionIdProvider::default()));
self
@@ -1523,7 +1524,7 @@ impl RpcServerConfig {
///
/// Note: this always configures an [EthSubscriptionIdProvider] [IdProvider] for convenience.
/// To set a custom [IdProvider], please use [Self::with_id_provider].
pub fn with_ws(mut self, config: ServerBuilder) -> Self {
pub fn with_ws(mut self, config: ServerBuilder<Identity, Identity>) -> Self {
self.ws_server_config = Some(config.set_id_provider(EthSubscriptionIdProvider::default()));
self
}
@@ -1671,7 +1672,7 @@ impl RpcServerConfig {
.http
.as_ref()
.or(modules.ws.as_ref())
.map(RpcServerMetrics::new)
.map(RpcRequestMetrics::same_port)
.unwrap_or_default(),
)
.await?;
@@ -1696,7 +1697,7 @@ impl RpcServerConfig {
self.ws_cors_domains.take(),
self.jwt_secret.clone(),
ServerKind::WS(ws_socket_addr),
modules.ws.as_ref().map(RpcServerMetrics::new).unwrap_or_default(),
modules.ws.as_ref().map(RpcRequestMetrics::ws).unwrap_or_default(),
)
.await?;
ws_local_addr = Some(addr);
@@ -1711,7 +1712,7 @@ impl RpcServerConfig {
self.http_cors_domains.take(),
self.jwt_secret.clone(),
ServerKind::Http(http_socket_addr),
modules.http.as_ref().map(RpcServerMetrics::new).unwrap_or_default(),
modules.http.as_ref().map(RpcRequestMetrics::http).unwrap_or_default(),
)
.await?;
http_local_addr = Some(addr);
@@ -1736,11 +1737,14 @@ impl RpcServerConfig {
server.ws_http = self.build_ws_http(modules).await?;
if let Some(builder) = self.ipc_server_config {
let metrics = modules.ipc.as_ref().map(RpcServerMetrics::new).unwrap_or_default();
// let metrics = modules.ipc.as_ref().map(RpcRequestMetrics::new).unwrap_or_default();
let ipc_path = self
.ipc_endpoint
.unwrap_or_else(|| Endpoint::new(DEFAULT_IPC_ENDPOINT.to_string()));
let ipc = builder.set_logger(metrics).build(ipc_path.path())?;
let ipc = builder
// TODO(mattsse): add metrics middleware for IPC
// .set_middleware(metrics)
.build(ipc_path.path());
server.ipc = Some(ipc);
}
@@ -1874,10 +1878,7 @@ impl TransportRpcModules {
/// Fails if any of the methods in other is present already.
///
/// Returns [Ok(false)] if no http transport is configured.
pub fn merge_http(
&mut self,
other: impl Into<Methods>,
) -> Result<bool, jsonrpsee::core::error::Error> {
pub fn merge_http(&mut self, other: impl Into<Methods>) -> Result<bool, RegisterMethodError> {
if let Some(ref mut http) = self.http {
return http.merge(other.into()).map(|_| true)
}
@@ -1889,10 +1890,7 @@ impl TransportRpcModules {
/// Fails if any of the methods in other is present already.
///
/// Returns [Ok(false)] if no ws transport is configured.
pub fn merge_ws(
&mut self,
other: impl Into<Methods>,
) -> Result<bool, jsonrpsee::core::error::Error> {
pub fn merge_ws(&mut self, other: impl Into<Methods>) -> Result<bool, RegisterMethodError> {
if let Some(ref mut ws) = self.ws {
return ws.merge(other.into()).map(|_| true)
}
@@ -1904,10 +1902,7 @@ impl TransportRpcModules {
/// Fails if any of the methods in other is present already.
///
/// Returns [Ok(false)] if no ipc transport is configured.
pub fn merge_ipc(
&mut self,
other: impl Into<Methods>,
) -> Result<bool, jsonrpsee::core::error::Error> {
pub fn merge_ipc(&mut self, other: impl Into<Methods>) -> Result<bool, RegisterMethodError> {
if let Some(ref mut ipc) = self.ipc {
return ipc.merge(other.into()).map(|_| true)
}
@@ -1920,7 +1915,7 @@ impl TransportRpcModules {
pub fn merge_configured(
&mut self,
other: impl Into<Methods>,
) -> Result<(), jsonrpsee::core::error::Error> {
) -> Result<(), RegisterMethodError> {
let other = other.into();
self.merge_http(other.clone())?;
self.merge_ws(other.clone())?;
@@ -2004,16 +1999,22 @@ impl Default for WsHttpServers {
}
/// Http Servers Enum
#[allow(clippy::type_complexity)]
enum WsHttpServerKind {
/// Http server
Plain(Server<Identity, RpcServerMetrics>),
Plain(Server<Identity, Stack<RpcRequestMetrics, Identity>>),
/// Http server with cors
WithCors(Server<Stack<CorsLayer, Identity>, RpcServerMetrics>),
WithCors(Server<Stack<CorsLayer, Identity>, Stack<RpcRequestMetrics, Identity>>),
/// Http server with auth
WithAuth(Server<Stack<AuthLayer<JwtAuthValidator>, Identity>, RpcServerMetrics>),
WithAuth(
Server<Stack<AuthLayer<JwtAuthValidator>, Identity>, Stack<RpcRequestMetrics, Identity>>,
),
/// Http server with cors and auth
WithCorsAuth(
Server<Stack<AuthLayer<JwtAuthValidator>, Stack<CorsLayer, Identity>>, RpcServerMetrics>,
Server<
Stack<AuthLayer<JwtAuthValidator>, Stack<CorsLayer, Identity>>,
Stack<RpcRequestMetrics, Identity>,
>,
),
}
@@ -2034,12 +2035,12 @@ impl WsHttpServerKind {
///
/// Returns the address of the started server.
async fn build(
builder: ServerBuilder,
builder: ServerBuilder<Identity, Identity>,
socket_addr: SocketAddr,
cors_domains: Option<String>,
jwt_secret: Option<JwtSecret>,
server_kind: ServerKind,
metrics: RpcServerMetrics,
metrics: RpcRequestMetrics,
) -> Result<(Self, SocketAddr), RpcError> {
if let Some(cors) = cors_domains.as_deref().map(cors::create_cors_layer) {
let cors = cors.map_err(|err| RpcError::Custom(err.to_string()))?;
@@ -2051,23 +2052,25 @@ impl WsHttpServerKind {
.layer(AuthLayer::new(JwtAuthValidator::new(secret.clone())));
let server = builder
.set_middleware(middleware)
.set_logger(metrics)
.set_http_middleware(middleware)
.set_rpc_middleware(RpcServiceBuilder::new().layer(metrics))
.build(socket_addr)
.await
.map_err(|err| RpcError::from_jsonrpsee_error(err, server_kind))?;
let local_addr = server.local_addr()?;
.map_err(|err| RpcError::server_error(err, server_kind))?;
let local_addr =
server.local_addr().map_err(|err| RpcError::server_error(err, server_kind))?;
let server = WsHttpServerKind::WithCorsAuth(server);
Ok((server, local_addr))
} else {
let middleware = tower::ServiceBuilder::new().layer(cors);
let server = builder
.set_middleware(middleware)
.set_logger(metrics)
.set_http_middleware(middleware)
.set_rpc_middleware(RpcServiceBuilder::new().layer(metrics))
.build(socket_addr)
.await
.map_err(|err| RpcError::from_jsonrpsee_error(err, server_kind))?;
let local_addr = server.local_addr()?;
.map_err(|err| RpcError::server_error(err, server_kind))?;
let local_addr =
server.local_addr().map_err(|err| RpcError::server_error(err, server_kind))?;
let server = WsHttpServerKind::WithCors(server);
Ok((server, local_addr))
}
@@ -2076,24 +2079,24 @@ impl WsHttpServerKind {
let middleware = tower::ServiceBuilder::new()
.layer(AuthLayer::new(JwtAuthValidator::new(secret.clone())));
let server = builder
.set_middleware(middleware)
.set_logger(metrics)
.set_http_middleware(middleware)
.set_rpc_middleware(RpcServiceBuilder::new().layer(metrics))
.build(socket_addr)
.await
.map_err(|err| {
RpcError::from_jsonrpsee_error(err, ServerKind::Auth(socket_addr))
})?;
let local_addr = server.local_addr()?;
.map_err(|err| RpcError::server_error(err, ServerKind::Auth(socket_addr)))?;
let local_addr =
server.local_addr().map_err(|err| RpcError::server_error(err, server_kind))?;
let server = WsHttpServerKind::WithAuth(server);
Ok((server, local_addr))
} else {
// plain server without any middleware
let server = builder
.set_logger(metrics)
.set_rpc_middleware(RpcServiceBuilder::new().layer(metrics))
.build(socket_addr)
.await
.map_err(|err| RpcError::from_jsonrpsee_error(err, server_kind))?;
let local_addr = server.local_addr()?;
.map_err(|err| RpcError::server_error(err, server_kind))?;
let local_addr =
server.local_addr().map_err(|err| RpcError::server_error(err, server_kind))?;
let server = WsHttpServerKind::Plain(server);
Ok((server, local_addr))
}
@@ -2105,7 +2108,7 @@ pub struct RpcServer {
/// Configured ws,http servers
ws_http: WsHttpServer,
/// ipc server
ipc: Option<IpcServer<Identity, RpcServerMetrics>>,
ipc: Option<IpcServer>,
}
// === impl RpcServer ===
@@ -2190,7 +2193,7 @@ pub struct RpcServerHandle {
http: Option<ServerHandle>,
ws: Option<ServerHandle>,
ipc_endpoint: Option<String>,
ipc: Option<ServerHandle>,
ipc: Option<reth_ipc::server::ServerHandle>,
jwt_secret: Option<JwtSecret>,
}
@@ -2224,7 +2227,7 @@ impl RpcServerHandle {
}
/// Tell the server to stop without waiting for the server to stop.
pub fn stop(self) -> Result<(), RpcError> {
pub fn stop(self) -> Result<(), AlreadyStoppedError> {
if let Some(handle) = self.http {
handle.stop()?
}

View File

@@ -1,63 +1,199 @@
use jsonrpsee::{
helpers::MethodResponseResult,
server::logger::{HttpRequest, Logger, MethodKind, Params, TransportProtocol},
RpcModule,
};
use jsonrpsee::{server::middleware::rpc::RpcServiceT, types::Request, MethodResponse, RpcModule};
use reth_metrics::{
metrics::{Counter, Histogram},
Metrics,
};
use std::{collections::HashMap, net::SocketAddr, sync::Arc, time::Instant};
use std::{
collections::HashMap,
future::Future,
pin::Pin,
sync::Arc,
task::{Context, Poll},
time::Instant,
};
use tower::Layer;
/// Metrics for the RPC server
#[derive(Default, Clone)]
pub(crate) struct RpcServerMetrics {
/// Metrics for the RPC server.
///
/// Metrics are divided into two categories:
/// - Connection metrics: metrics for the connection (e.g. number of connections opened, relevant
/// for WS and IPC)
/// - Request metrics: metrics for each RPC method (e.g. number of calls started, time taken to
/// process a call)
#[derive(Default, Debug, Clone)]
pub(crate) struct RpcRequestMetrics {
inner: Arc<RpcServerMetricsInner>,
}
/// Metrics for the RPC server
#[derive(Default, Clone)]
struct RpcServerMetricsInner {
/// Connection metrics per transport type
connection_metrics: ConnectionMetrics,
/// Call metrics per RPC method
call_metrics: HashMap<&'static str, RpcServerCallMetrics>,
}
impl RpcServerMetrics {
pub(crate) fn new(module: &RpcModule<()>) -> Self {
impl RpcRequestMetrics {
pub(crate) fn new(module: &RpcModule<()>, transport: RpcTransport) -> Self {
Self {
inner: Arc::new(RpcServerMetricsInner {
connection_metrics: ConnectionMetrics::default(),
connection_metrics: transport.connection_metrics(),
call_metrics: HashMap::from_iter(module.method_names().map(|method| {
(method, RpcServerCallMetrics::new_with_labels(&[("method", method)]))
})),
}),
}
}
/// Creates a new instance of the metrics layer for HTTP.
pub(crate) fn http(module: &RpcModule<()>) -> Self {
Self::new(module, RpcTransport::Http)
}
/// Creates a new instance of the metrics layer for same port.
///
/// Note: currently it's not possible to track transport specific metrics for a server that runs http and ws on the same port: <https://github.com/paritytech/jsonrpsee/issues/1345> until we have this feature we will use the http metrics for this case.
pub(crate) fn same_port(module: &RpcModule<()>) -> Self {
Self::http(module)
}
/// Creates a new instance of the metrics layer for Ws.
pub(crate) fn ws(module: &RpcModule<()>) -> Self {
Self::new(module, RpcTransport::WebSocket)
}
/// Creates a new instance of the metrics layer for Ws.
#[allow(unused)]
pub(crate) fn ipc(module: &RpcModule<()>) -> Self {
Self::new(module, RpcTransport::Ipc)
}
}
impl<S> Layer<S> for RpcRequestMetrics {
type Service = RpcRequestMetricsService<S>;
fn layer(&self, inner: S) -> Self::Service {
RpcRequestMetricsService::new(inner, self.clone())
}
}
/// Metrics for the RPC server
#[derive(Default, Clone, Debug)]
struct RpcServerMetricsInner {
/// Connection metrics per transport type
connection_metrics: RpcServerConnectionMetrics,
/// Call metrics per RPC method
call_metrics: HashMap<&'static str, RpcServerCallMetrics>,
}
/// A [RpcServiceT] middleware that captures RPC metrics for the server.
///
/// This is created per connection and captures metrics for each request.
#[derive(Clone)]
struct ConnectionMetrics {
http: RpcServerConnectionMetrics,
ws: RpcServerConnectionMetrics,
pub(crate) struct RpcRequestMetricsService<S> {
metrics: RpcRequestMetrics,
inner: S,
}
impl ConnectionMetrics {
fn get_metrics(&self, transport: TransportProtocol) -> &RpcServerConnectionMetrics {
match transport {
TransportProtocol::Http => &self.http,
TransportProtocol::WebSocket => &self.ws,
impl<S> RpcRequestMetricsService<S> {
pub(crate) fn new(service: S, metrics: RpcRequestMetrics) -> Self {
// this instance is kept alive for the duration of the connection
metrics.inner.connection_metrics.connections_opened.increment(1);
Self { inner: service, metrics }
}
}
impl<'a, S> RpcServiceT<'a> for RpcRequestMetricsService<S>
where
S: RpcServiceT<'a> + Send + Sync + Clone + 'static,
{
type Future = MeteredRequestFuture<S::Future>;
fn call(&self, req: Request<'a>) -> Self::Future {
self.metrics.inner.connection_metrics.requests_started.increment(1);
let call_metrics = self.metrics.inner.call_metrics.get_key_value(req.method.as_ref());
if let Some((_, call_metrics)) = &call_metrics {
call_metrics.started.increment(1);
}
MeteredRequestFuture {
fut: self.inner.call(req),
started_at: Instant::now(),
metrics: self.metrics.clone(),
method: call_metrics.map(|(method, _)| *method),
}
}
}
impl Default for ConnectionMetrics {
fn default() -> Self {
Self {
http: RpcServerConnectionMetrics::new_with_labels(&[("transport", "http")]),
ws: RpcServerConnectionMetrics::new_with_labels(&[("transport", "ws")]),
impl<S> Drop for RpcRequestMetricsService<S> {
fn drop(&mut self) {
// update connection metrics, connection closed
self.metrics.inner.connection_metrics.connections_closed.increment(1);
}
}
/// Response future to update the metrics for a single request/response pair.
#[pin_project::pin_project]
pub(crate) struct MeteredRequestFuture<F> {
#[pin]
fut: F,
/// time when the request started
started_at: Instant,
/// metrics for the method call
metrics: RpcRequestMetrics,
/// the method name if known
method: Option<&'static str>,
}
impl<F> std::fmt::Debug for MeteredRequestFuture<F> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str("MeteredRequestFuture")
}
}
impl<F: Future<Output = MethodResponse>> Future for MeteredRequestFuture<F> {
type Output = F::Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
let res = this.fut.poll(cx);
if let Poll::Ready(resp) = &res {
let elapsed = this.started_at.elapsed().as_secs_f64();
// update transport metrics
this.metrics.inner.connection_metrics.requests_finished.increment(1);
this.metrics.inner.connection_metrics.request_time_seconds.record(elapsed);
// update call metrics
if let Some(call_metrics) =
this.method.and_then(|method| this.metrics.inner.call_metrics.get(method))
{
call_metrics.time_seconds.record(elapsed);
if resp.is_success() {
call_metrics.successful.increment(1);
} else {
call_metrics.failed.increment(1);
}
}
}
res
}
}
/// The transport protocol used for the RPC connection.
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
pub(crate) enum RpcTransport {
Http,
WebSocket,
#[allow(unused)]
Ipc,
}
impl RpcTransport {
/// Returns the string representation of the transport protocol.
pub(crate) const fn as_str(&self) -> &'static str {
match self {
RpcTransport::Http => "http",
RpcTransport::WebSocket => "ws",
RpcTransport::Ipc => "ipc",
}
}
/// Returns the connection metrics for the transport protocol.
fn connection_metrics(&self) -> RpcServerConnectionMetrics {
RpcServerConnectionMetrics::new_with_labels(&[("transport", self.as_str())])
}
}
@@ -90,61 +226,3 @@ struct RpcServerCallMetrics {
/// Response for a single call
time_seconds: Histogram,
}
impl Logger for RpcServerMetrics {
type Instant = Instant;
fn on_connect(
&self,
_remote_addr: SocketAddr,
_request: &HttpRequest,
transport: TransportProtocol,
) {
self.inner.connection_metrics.get_metrics(transport).connections_opened.increment(1)
}
fn on_request(&self, transport: TransportProtocol) -> Self::Instant {
self.inner.connection_metrics.get_metrics(transport).requests_started.increment(1);
Instant::now()
}
fn on_call(
&self,
method_name: &str,
_params: Params<'_>,
_kind: MethodKind,
_transport: TransportProtocol,
) {
let Some(call_metrics) = self.inner.call_metrics.get(method_name) else { return };
call_metrics.started.increment(1);
}
fn on_result(
&self,
method_name: &str,
success: MethodResponseResult,
started_at: Self::Instant,
_transport: TransportProtocol,
) {
let Some(call_metrics) = self.inner.call_metrics.get(method_name) else { return };
// capture call latency
call_metrics.time_seconds.record(started_at.elapsed().as_secs_f64());
if success.is_success() {
call_metrics.successful.increment(1);
} else {
call_metrics.failed.increment(1);
}
}
fn on_response(&self, _result: &str, started_at: Self::Instant, transport: TransportProtocol) {
let metrics = self.inner.connection_metrics.get_metrics(transport);
// capture request latency for this request/response pair
metrics.request_time_seconds.record(started_at.elapsed().as_secs_f64());
metrics.requests_finished.increment(1);
}
fn on_disconnect(&self, _remote_addr: SocketAddr, transport: TransportProtocol) {
self.inner.connection_metrics.get_metrics(transport).connections_closed.increment(1)
}
}