chore(deps): migrate to jsonrpsee 0.25 (#15956)

Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
This commit is contained in:
Z
2025-05-14 15:41:54 +08:00
committed by GitHub
parent 288ce76b53
commit 4e84e42f1e
30 changed files with 595 additions and 505 deletions

480
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -455,13 +455,13 @@ revm-context = { version = "4.0.0", default-features = false }
revm-context-interface = { version = "4.0.0", default-features = false }
revm-database-interface = { version = "4.0.0", default-features = false }
op-revm = { version = "4.0.2", default-features = false }
revm-inspectors = "0.21.0"
revm-inspectors = "0.22.0"
# eth
alloy-chains = { version = "0.2.0", default-features = false }
alloy-dyn-abi = "1.1.0"
alloy-eip2124 = { version = "0.2.0", default-features = false }
alloy-evm = { version = "0.7.1", default-features = false }
alloy-evm = { version = "0.8.0", default-features = false }
alloy-primitives = { version = "1.1.0", default-features = false, features = ["map-foldhash"] }
alloy-rlp = { version = "0.3.10", default-features = false, features = ["core-net"] }
alloy-sol-macro = "1.1.0"
@@ -470,42 +470,42 @@ alloy-trie = { version = "0.8.1", default-features = false }
alloy-hardforks = "0.2.0"
alloy-consensus = { version = "0.15.11", default-features = false }
alloy-contract = { version = "0.15.11", default-features = false }
alloy-eips = { version = "0.15.11", default-features = false }
alloy-genesis = { version = "0.15.11", default-features = false }
alloy-json-rpc = { version = "0.15.11", default-features = false }
alloy-network = { version = "0.15.11", default-features = false }
alloy-network-primitives = { version = "0.15.11", default-features = false }
alloy-provider = { version = "0.15.11", features = ["reqwest"], default-features = false }
alloy-pubsub = { version = "0.15.11", default-features = false }
alloy-rpc-client = { version = "0.15.11", default-features = false }
alloy-rpc-types = { version = "0.15.11", features = ["eth"], default-features = false }
alloy-rpc-types-admin = { version = "0.15.11", default-features = false }
alloy-rpc-types-anvil = { version = "0.15.11", default-features = false }
alloy-rpc-types-beacon = { version = "0.15.11", default-features = false }
alloy-rpc-types-debug = { version = "0.15.11", default-features = false }
alloy-rpc-types-engine = { version = "0.15.11", default-features = false }
alloy-rpc-types-eth = { version = "0.15.11", default-features = false }
alloy-rpc-types-mev = { version = "0.15.11", default-features = false }
alloy-rpc-types-trace = { version = "0.15.11", default-features = false }
alloy-rpc-types-txpool = { version = "0.15.11", default-features = false }
alloy-serde = { version = "0.15.11", default-features = false }
alloy-signer = { version = "0.15.11", default-features = false }
alloy-signer-local = { version = "0.15.11", default-features = false }
alloy-transport = { version = "0.15.11" }
alloy-transport-http = { version = "0.15.11", features = ["reqwest-rustls-tls"], default-features = false }
alloy-transport-ipc = { version = "0.15.11", default-features = false }
alloy-transport-ws = { version = "0.15.11", default-features = false }
alloy-consensus = { version = "1.0.1", default-features = false }
alloy-contract = { version = "1.0.1", default-features = false }
alloy-eips = { version = "1.0.1", default-features = false }
alloy-genesis = { version = "1.0.1", default-features = false }
alloy-json-rpc = { version = "1.0.1", default-features = false }
alloy-network = { version = "1.0.1", default-features = false }
alloy-network-primitives = { version = "1.0.1", default-features = false }
alloy-provider = { version = "1.0.1", features = ["reqwest"], default-features = false }
alloy-pubsub = { version = "1.0.1", default-features = false }
alloy-rpc-client = { version = "1.0.1", default-features = false }
alloy-rpc-types = { version = "1.0.1", features = ["eth"], default-features = false }
alloy-rpc-types-admin = { version = "1.0.1", default-features = false }
alloy-rpc-types-anvil = { version = "1.0.1", default-features = false }
alloy-rpc-types-beacon = { version = "1.0.1", default-features = false }
alloy-rpc-types-debug = { version = "1.0.1", default-features = false }
alloy-rpc-types-engine = { version = "1.0.1", default-features = false }
alloy-rpc-types-eth = { version = "1.0.1", default-features = false }
alloy-rpc-types-mev = { version = "1.0.1", default-features = false }
alloy-rpc-types-trace = { version = "1.0.1", default-features = false }
alloy-rpc-types-txpool = { version = "1.0.1", default-features = false }
alloy-serde = { version = "1.0.1", default-features = false }
alloy-signer = { version = "1.0.1", default-features = false }
alloy-signer-local = { version = "1.0.1", default-features = false }
alloy-transport = { version = "1.0.1" }
alloy-transport-http = { version = "1.0.1", features = ["reqwest-rustls-tls"], default-features = false }
alloy-transport-ipc = { version = "1.0.1", default-features = false }
alloy-transport-ws = { version = "1.0.1", default-features = false }
# op
alloy-op-evm = { version = "0.7.1", default-features = false }
alloy-op-evm = { version = "0.8.0", default-features = false }
alloy-op-hardforks = "0.2.0"
op-alloy-rpc-types = { version = "0.15.6", default-features = false }
op-alloy-rpc-types-engine = { version = "0.15.6", default-features = false }
op-alloy-network = { version = "0.15.6", default-features = false }
op-alloy-consensus = { version = "0.15.6", default-features = false }
op-alloy-rpc-jsonrpsee = { version = "0.15.6", default-features = false }
op-alloy-rpc-types = { version = "0.16.0", default-features = false }
op-alloy-rpc-types-engine = { version = "0.16.0", default-features = false }
op-alloy-network = { version = "0.16.0", default-features = false }
op-alloy-consensus = { version = "0.16.0", default-features = false }
op-alloy-rpc-jsonrpsee = { version = "0.16.0", default-features = false }
op-alloy-flz = { version = "0.13.0", default-features = false }
# misc
@@ -588,7 +588,7 @@ hyper-util = "0.1.5"
pin-project = "1.0.12"
reqwest = { version = "0.12", default-features = false }
tracing-futures = "0.2"
tower = "0.4"
tower = "0.5"
tower-http = "0.6"
# p2p
@@ -596,11 +596,11 @@ discv5 = "0.9"
if-addrs = "0.13"
# rpc
jsonrpsee = "0.24.9"
jsonrpsee-core = "0.24.9"
jsonrpsee-server = "0.24.9"
jsonrpsee-http-client = "0.24.9"
jsonrpsee-types = "0.24.9"
jsonrpsee = "0.25.1"
jsonrpsee-core = "0.25.1"
jsonrpsee-server = "0.25.1"
jsonrpsee-http-client = "0.25.1"
jsonrpsee-types = "0.25.1"
# http
http = "1.0"
@@ -699,7 +699,7 @@ visibility = "0.1.1"
walkdir = "2.3.3"
vergen-git2 = "1.0.5"
# [patch.crates-io]
[patch.crates-io]
# alloy-consensus = { git = "https://github.com/alloy-rs/alloy", branch = "main" }
# alloy-contract = { git = "https://github.com/alloy-rs/alloy", branch = "main" }
# alloy-eips = { git = "https://github.com/alloy-rs/alloy", branch = "main" }
@@ -734,3 +734,9 @@ vergen-git2 = "1.0.5"
# op-alloy-rpc-types-engine = { git = "https://github.com/alloy-rs/op-alloy", rev = "ad607c1" }
#
# revm-inspectors = { git = "https://github.com/paradigmxyz/revm-inspectors", rev = "1207e33" }
jsonrpsee = { git = "https://github.com/paradigmxyz/jsonrpsee", branch = "matt/make-rpc-service-pub" }
jsonrpsee-core = { git = "https://github.com/paradigmxyz/jsonrpsee", branch = "matt/make-rpc-service-pub" }
jsonrpsee-server = { git = "https://github.com/paradigmxyz/jsonrpsee", branch = "matt/make-rpc-service-pub" }
jsonrpsee-http-client = { git = "https://github.com/paradigmxyz/jsonrpsee", branch = "matt/make-rpc-service-pub" }
jsonrpsee-types = { git = "https://github.com/paradigmxyz/jsonrpsee", branch = "matt/make-rpc-service-pub" }

View File

@@ -11,3 +11,10 @@ reth-exex = { path = "../../crates/exex/exex" }
reth-node-ethereum = { path = "../../crates/ethereum/node" }
reth-tracing = { path = "../../crates/tracing" }
reth-node-api = { path = "../../crates/node/api" }
[patch.crates-io]
jsonrpsee = { git = "https://github.com/paradigmxyz/jsonrpsee", branch = "matt/make-rpc-service-pub" }
jsonrpsee-core = { git = "https://github.com/paradigmxyz/jsonrpsee", branch = "matt/make-rpc-service-pub" }
jsonrpsee-server = { git = "https://github.com/paradigmxyz/jsonrpsee", branch = "matt/make-rpc-service-pub" }
jsonrpsee-http-client = { git = "https://github.com/paradigmxyz/jsonrpsee", branch = "matt/make-rpc-service-pub" }
jsonrpsee-types = { git = "https://github.com/paradigmxyz/jsonrpsee", branch = "matt/make-rpc-service-pub" }

View File

@@ -2541,7 +2541,7 @@ Post-merge hard forks (timestamp based):
update_fraction: 3338477,
min_blob_fee: BLOB_TX_MIN_BLOB_GASPRICE,
},
scheduled: Default::default(),
..Default::default()
};
assert_eq!(hardfork_params, expected);
}

View File

@@ -25,13 +25,7 @@ impl<N: Network, PrimitiveBlock> RpcBlockProvider<N, PrimitiveBlock> {
convert: impl Fn(N::BlockResponse) -> PrimitiveBlock + Send + Sync + 'static,
) -> eyre::Result<Self> {
Ok(Self {
provider: Arc::new(
ProviderBuilder::new()
.disable_recommended_fillers()
.network::<N>()
.connect(rpc_url)
.await?,
),
provider: Arc::new(ProviderBuilder::default().connect(rpc_url).await?),
url: rpc_url.to_string(),
convert: Arc::new(convert),
})

View File

@@ -6,7 +6,10 @@ use alloy_rpc_types_engine::ForkchoiceState;
use alloy_rpc_types_eth::BlockNumberOrTag;
use eyre::Ok;
use futures_util::Future;
use jsonrpsee::http_client::{transport::HttpBackend, HttpClient};
use jsonrpsee::{
core::middleware::layer::RpcLogger,
http_client::{transport::HttpBackend, HttpClient, RpcService},
};
use reth_chainspec::EthereumHardforks;
use reth_network_api::test_utils::PeersHandleProvider;
use reth_node_api::{
@@ -302,7 +305,9 @@ where
}
/// Returns an Engine API client.
pub fn engine_api_client(&self) -> HttpClient<AuthClientService<HttpBackend>> {
pub fn engine_api_client(
&self,
) -> HttpClient<RpcLogger<RpcService<AuthClientService<HttpBackend>>>> {
self.inner.auth_server_handle().http_client()
}
}

View File

@@ -6,7 +6,7 @@ use crate::{
};
use alloy_primitives::B256;
use eyre::Result;
use jsonrpsee::http_client::{transport::HttpBackend, HttpClient};
use jsonrpsee::http_client::{transport::HttpBackend, HttpClient, RpcService};
use reth_engine_local::LocalPayloadAttributesBuilder;
use reth_node_api::{NodeTypes, PayloadTypes};
use reth_payload_builder::PayloadId;
@@ -16,6 +16,7 @@ use std::{collections::HashMap, marker::PhantomData};
pub mod actions;
pub mod setup;
use alloy_rpc_types_engine::{ForkchoiceState, PayloadAttributes};
use jsonrpsee::core::middleware::layer::RpcLogger;
#[cfg(test)]
mod examples;
@@ -26,7 +27,7 @@ pub struct NodeClient {
/// Regular JSON-RPC client
pub rpc: HttpClient,
/// Engine API client
pub engine: HttpClient<AuthClientService<HttpBackend>>,
pub engine: HttpClient<RpcLogger<RpcService<AuthClientService<HttpBackend>>>>,
}
/// Represents the latest block information.

View File

@@ -26,8 +26,11 @@ impl<T> BlockRecoveryError<T> {
}
}
impl<T> From<BlockRecoveryError<T>> for RecoveryError {
fn from(_: BlockRecoveryError<T>) -> Self {
Self
impl<T> From<BlockRecoveryError<T>> for RecoveryError
where
T: core::fmt::Debug + Send + Sync + 'static,
{
fn from(err: BlockRecoveryError<T>) -> Self {
Self::from_source(err)
}
}

View File

@@ -94,7 +94,7 @@ pub use alloy_consensus::{
pub use transaction::{
execute::FillTxEnv,
signed::{FullSignedTx, SignedTransaction},
FullTransaction, Transaction,
FullTransaction, SignerRecoverable, Transaction,
};
pub mod block;

View File

@@ -7,7 +7,7 @@ pub mod signed;
pub mod error;
pub mod recover;
pub use alloy_consensus::transaction::{TransactionInfo, TransactionMeta};
pub use alloy_consensus::transaction::{SignerRecoverable, TransactionInfo, TransactionMeta};
use crate::{InMemorySize, MaybeCompact, MaybeSerde};
use core::{fmt, hash::Hash};

View File

@@ -17,7 +17,6 @@ futures.workspace = true
tokio = { workspace = true, features = ["net", "time", "rt-multi-thread"] }
tokio-util = { workspace = true, features = ["codec"] }
tokio-stream.workspace = true
async-trait.workspace = true
pin-project.workspace = true
tower.workspace = true

View File

@@ -20,7 +20,6 @@ pub(crate) struct Sender {
inner: SendHalf,
}
#[async_trait::async_trait]
impl TransportSenderT for Sender {
type Error = IpcError;
@@ -47,7 +46,6 @@ pub(crate) struct Receiver {
pub(crate) inner: FramedRead<RecvHalf, StreamCodec>,
}
#[async_trait::async_trait]
impl TransportReceiverT for Receiver {
type Error = IpcError;

View File

@@ -3,17 +3,13 @@
use futures::{stream::FuturesOrdered, StreamExt};
use jsonrpsee::{
batch_response_error,
core::{
server::helpers::prepare_error,
tracing::server::{rx_log_from_json, tx_log_from_str},
JsonRawValue,
},
core::{server::helpers::prepare_error, JsonRawValue},
server::middleware::rpc::RpcServiceT,
types::{
error::{reject_too_big_request, ErrorCode},
ErrorObject, Id, InvalidRequest, Notification, Request,
},
BatchResponseBuilder, MethodResponse, ResponsePayload,
BatchResponseBuilder, MethodResponse,
};
use std::sync::Arc;
use tokio::sync::OwnedSemaphorePermit;
@@ -37,7 +33,7 @@ pub(crate) async fn process_batch_request<S>(
max_response_body_size: usize,
) -> Option<String>
where
for<'a> S: RpcServiceT<'a> + Send,
S: RpcServiceT<MethodResponse = MethodResponse> + Send,
{
let Batch { data, rpc_service } = b;
@@ -69,8 +65,8 @@ where
.collect();
while let Some(response) = pending_calls.next().await {
if let Err(too_large) = batch_response.append(&response) {
return Some(too_large.to_result())
if let Err(too_large) = batch_response.append(response) {
return Some(too_large.to_json().to_string())
}
}
@@ -78,10 +74,10 @@ where
None
} else {
let batch_resp = batch_response.finish();
Some(MethodResponse::from_batch(batch_resp).to_result())
Some(MethodResponse::from_batch(batch_resp).to_json().to_string())
}
} else {
Some(batch_response_error(Id::Null, ErrorObject::from(ErrorCode::ParseError)))
Some(batch_response_error(Id::Null, ErrorObject::from(ErrorCode::ParseError)).to_string())
}
}
@@ -90,7 +86,7 @@ pub(crate) async fn process_single_request<S>(
rpc_service: &S,
) -> Option<MethodResponse>
where
for<'a> S: RpcServiceT<'a> + Send,
S: RpcServiceT<MethodResponse = MethodResponse> + Send,
{
if let Ok(req) = serde_json::from_slice::<Request<'_>>(&data) {
Some(execute_call_with_tracing(req, rpc_service).await)
@@ -108,20 +104,11 @@ pub(crate) async fn execute_call_with_tracing<'a, S>(
rpc_service: &S,
) -> MethodResponse
where
for<'b> S: RpcServiceT<'b> + Send,
S: RpcServiceT<MethodResponse = MethodResponse> + Send,
{
rpc_service.call(req).await
}
#[instrument(name = "notification", fields(method = notif.method.as_ref()), skip(notif, max_log_length), level = "TRACE")]
fn execute_notification(notif: &Notif<'_>, max_log_length: u32) -> MethodResponse {
rx_log_from_json(notif, max_log_length);
let response =
MethodResponse::response(Id::Null, ResponsePayload::success(String::new()), usize::MAX);
tx_log_from_str(response.as_result(), max_log_length);
response
}
pub(crate) async fn call_with_service<S>(
request: String,
rpc_service: S,
@@ -130,7 +117,7 @@ pub(crate) async fn call_with_service<S>(
conn: Arc<OwnedSemaphorePermit>,
) -> Option<String>
where
for<'a> S: RpcServiceT<'a> + Send,
S: RpcServiceT<MethodResponse = MethodResponse> + Send,
{
enum Kind {
Single,
@@ -148,17 +135,17 @@ where
let data = request.into_bytes();
if data.len() > max_request_body_size {
return Some(batch_response_error(
Id::Null,
reject_too_big_request(max_request_body_size as u32),
))
return Some(
batch_response_error(Id::Null, reject_too_big_request(max_request_body_size as u32))
.to_string(),
)
}
// Single request or notification
let res = if matches!(request_kind, Kind::Single) {
let response = process_single_request(data, &rpc_service).await;
match response {
Some(response) if response.is_method_call() => Some(response.to_result()),
Some(response) if response.is_method_call() => Some(response.to_json().to_string()),
_ => {
// subscription responses are sent directly over the sink, return a response here
// would lead to duplicate responses for the subscription response

View File

@@ -9,13 +9,12 @@ use interprocess::local_socket::{
GenericFilePath, ListenerOptions, ToFsName,
};
use jsonrpsee::{
core::TEN_MB_SIZE_BYTES,
core::{middleware::layer::RpcLoggerLayer, JsonRawValue, TEN_MB_SIZE_BYTES},
server::{
middleware::rpc::{RpcLoggerLayer, RpcServiceT},
stop_channel, ConnectionGuard, ConnectionPermit, IdProvider, RandomIntegerIdProvider,
ServerHandle, StopHandle,
middleware::rpc::RpcServiceT, stop_channel, ConnectionGuard, ConnectionPermit, IdProvider,
RandomIntegerIdProvider, ServerHandle, StopHandle,
},
BoundedSubscriptions, MethodSink, Methods,
BoundedSubscriptions, MethodResponse, MethodSink, Methods,
};
use std::{
future::Future,
@@ -66,7 +65,7 @@ impl<HttpMiddleware, RpcMiddleware> IpcServer<HttpMiddleware, RpcMiddleware> {
impl<HttpMiddleware, RpcMiddleware> IpcServer<HttpMiddleware, RpcMiddleware>
where
RpcMiddleware: for<'a> Layer<RpcService, Service: RpcServiceT<'a>> + Clone + Send + 'static,
RpcMiddleware: for<'a> Layer<RpcService, Service: RpcServiceT> + Clone + Send + 'static,
HttpMiddleware: Layer<
TowerServiceNoHttp<RpcMiddleware>,
Service: Service<
@@ -292,7 +291,7 @@ impl Default for RpcServiceBuilder<Identity> {
impl RpcServiceBuilder<Identity> {
/// Create a new [`RpcServiceBuilder`].
pub fn new() -> Self {
pub const fn new() -> Self {
Self(tower::ServiceBuilder::new())
}
}
@@ -357,7 +356,8 @@ pub struct TowerServiceNoHttp<L> {
impl<RpcMiddleware> Service<String> for TowerServiceNoHttp<RpcMiddleware>
where
RpcMiddleware: for<'a> Layer<RpcService>,
for<'a> <RpcMiddleware as Layer<RpcService>>::Service: Send + Sync + 'static + RpcServiceT<'a>,
for<'a> <RpcMiddleware as Layer<RpcService>>::Service:
Send + Sync + 'static + RpcServiceT<MethodResponse = MethodResponse>,
{
/// The response of a handled RPC call
///
@@ -435,7 +435,7 @@ fn process_connection<'b, RpcMiddleware, HttpMiddleware>(
params: ProcessConnection<'_, HttpMiddleware, RpcMiddleware>,
) where
RpcMiddleware: Layer<RpcService> + Clone + Send + 'static,
for<'a> <RpcMiddleware as Layer<RpcService>>::Service: RpcServiceT<'a>,
for<'a> <RpcMiddleware as Layer<RpcService>>::Service: RpcServiceT,
HttpMiddleware: Layer<TowerServiceNoHttp<RpcMiddleware>> + Send + 'static,
<HttpMiddleware as Layer<TowerServiceNoHttp<RpcMiddleware>>>::Service: Send
+ Service<
@@ -464,7 +464,7 @@ fn process_connection<'b, RpcMiddleware, HttpMiddleware>(
local_socket_stream,
));
let (tx, rx) = mpsc::channel::<String>(server_cfg.message_buffer_capacity as usize);
let (tx, rx) = mpsc::channel::<Box<JsonRawValue>>(server_cfg.message_buffer_capacity as usize);
let method_sink = MethodSink::new_with_limit(tx, server_cfg.max_response_body_size);
let tower_service = TowerServiceNoHttp {
inner: ServiceData {
@@ -493,7 +493,7 @@ async fn to_ipc_service<S, T>(
ipc: IpcConn<JsonRpcStream<T>>,
service: S,
stop_handle: StopHandle,
rx: mpsc::Receiver<String>,
rx: mpsc::Receiver<Box<JsonRawValue>>,
) where
S: Service<String, Response = Option<String>> + Send + 'static,
S::Error: Into<Box<dyn core::error::Error + Send + Sync>>,
@@ -520,7 +520,7 @@ async fn to_ipc_service<S, T>(
}
item = rx_item.next() => {
if let Some(item) = item {
conn.push_back(item);
conn.push_back(item.to_string());
}
}
_ = &mut stopped => {
@@ -712,59 +712,6 @@ impl<HttpMiddleware, RpcMiddleware> Builder<HttpMiddleware, RpcMiddleware> {
///
/// The builder itself exposes a similar API as the [`tower::ServiceBuilder`]
/// where it is possible to compose layers to the middleware.
///
/// ```
/// use std::{
/// net::SocketAddr,
/// sync::{
/// atomic::{AtomicUsize, Ordering},
/// Arc,
/// },
/// time::Instant,
/// };
///
/// use futures_util::future::BoxFuture;
/// use jsonrpsee::{
/// server::{middleware::rpc::RpcServiceT, ServerBuilder},
/// types::Request,
/// MethodResponse,
/// };
/// use reth_ipc::server::{Builder, RpcServiceBuilder};
///
/// #[derive(Clone)]
/// struct MyMiddleware<S> {
/// service: S,
/// count: Arc<AtomicUsize>,
/// }
///
/// impl<'a, S> RpcServiceT<'a> for MyMiddleware<S>
/// where
/// S: RpcServiceT<'a> + Send + Sync + Clone + 'static,
/// {
/// type Future = BoxFuture<'a, MethodResponse>;
///
/// fn call(&self, req: Request<'a>) -> Self::Future {
/// tracing::info!("MyMiddleware processed call {}", req.method);
/// let count = self.count.clone();
/// let service = self.service.clone();
///
/// Box::pin(async move {
/// let rp = service.call(req).await;
/// // Modify the state.
/// count.fetch_add(1, Ordering::Relaxed);
/// rp
/// })
/// }
/// }
///
/// // Create a state per connection
/// // NOTE: The service type can be omitted once `start` is called on the server.
/// let m = RpcServiceBuilder::new().layer_fn(move |service: ()| MyMiddleware {
/// service,
/// count: Arc::new(AtomicUsize::new(0)),
/// });
/// let builder = Builder::default().set_rpc_middleware(m);
/// ```
pub fn set_rpc_middleware<T>(
self,
rpc_middleware: RpcServiceBuilder<T>,
@@ -808,8 +755,8 @@ mod tests {
use futures::future::select;
use jsonrpsee::{
core::{
client,
client::{ClientT, Error, Subscription, SubscriptionClientT},
client::{self, ClientT, Error, Subscription, SubscriptionClientT},
middleware::{Batch, BatchEntry, Notification},
params::BatchRequestBuilder,
},
rpc_params,
@@ -838,7 +785,8 @@ mod tests {
// received new item from the stream.
Either::Right((Some(Ok(item)), c)) => {
let notif = SubscriptionMessage::from_json(&item)?;
let raw_value = serde_json::value::to_raw_value(&item)?;
let notif = SubscriptionMessage::from(raw_value);
// NOTE: this will block until there a spot in the queue
// and you might want to do something smarter if it's
@@ -1035,13 +983,18 @@ mod tests {
#[derive(Clone)]
struct ModifyRequestIf<S>(S);
impl<'a, S> RpcServiceT<'a> for ModifyRequestIf<S>
impl<S> RpcServiceT for ModifyRequestIf<S>
where
S: Send + Sync + RpcServiceT<'a>,
S: Send + Sync + RpcServiceT,
{
type Future = S::Future;
type MethodResponse = S::MethodResponse;
type NotificationResponse = S::NotificationResponse;
type BatchResponse = S::BatchResponse;
fn call(&self, mut req: Request<'a>) -> Self::Future {
fn call<'a>(
&self,
mut req: Request<'a>,
) -> impl Future<Output = Self::MethodResponse> + Send + 'a {
// Re-direct all calls that isn't `say_hello` to `say_goodbye`
if req.method == "say_hello" {
req.method = "say_goodbye".into();
@@ -1051,6 +1004,46 @@ mod tests {
self.0.call(req)
}
fn batch<'a>(
&self,
mut batch: Batch<'a>,
) -> impl Future<Output = Self::BatchResponse> + Send + 'a {
for call in batch.iter_mut() {
match call {
Ok(BatchEntry::Call(req)) => {
if req.method == "say_hello" {
req.method = "say_goodbye".into();
} else if req.method == "say_goodbye" {
req.method = "say_hello".into();
}
}
Ok(BatchEntry::Notification(n)) => {
if n.method == "say_hello" {
n.method = "say_goodbye".into();
} else if n.method == "say_goodbye" {
n.method = "say_hello".into();
}
}
// Invalid request, we don't care about it.
Err(_err) => {}
}
}
self.0.batch(batch)
}
fn notification<'a>(
&self,
mut n: Notification<'a>,
) -> impl Future<Output = Self::NotificationResponse> + Send + 'a {
if n.method == "say_hello" {
n.method = "say_goodbye".into();
} else if n.method == "say_goodbye" {
n.method = "say_hello".into();
}
self.0.notification(n)
}
}
reth_tracing::init_test_tracing();

View File

@@ -1,15 +1,19 @@
//! JSON-RPC service middleware.
use futures_util::future::BoxFuture;
use futures::{
future::Either,
stream::{FuturesOrdered, StreamExt},
};
use jsonrpsee::{
core::middleware::{Batch, BatchEntry},
server::{
middleware::rpc::{ResponseFuture, RpcServiceT},
IdProvider,
},
types::{error::reject_too_many_subscriptions, ErrorCode, ErrorObject, Request},
BoundedSubscriptions, ConnectionId, MethodCallback, MethodResponse, MethodSink, Methods,
SubscriptionState,
types::{error::reject_too_many_subscriptions, ErrorCode, ErrorObject, Id, Request},
BatchResponse, BatchResponseBuilder, BoundedSubscriptions, ConnectionId, MethodCallback,
MethodResponse, MethodSink, Methods, SubscriptionState,
};
use std::sync::Arc;
use std::{future::Future, sync::Arc};
/// JSON-RPC service middleware.
#[derive(Clone, Debug)]
@@ -46,12 +50,12 @@ impl RpcService {
}
}
impl<'a> RpcServiceT<'a> for RpcService {
// The rpc module is already boxing the futures and
// it's used to under the hood by the RpcService.
type Future = ResponseFuture<BoxFuture<'a, MethodResponse>>;
impl RpcServiceT for RpcService {
type MethodResponse = MethodResponse;
type NotificationResponse = Option<MethodResponse>;
type BatchResponse = BatchResponse;
fn call(&self, req: Request<'a>) -> Self::Future {
fn call<'a>(&self, req: Request<'a>) -> impl Future<Output = Self::MethodResponse> + Send + 'a {
let conn_id = self.conn_id;
let max_response_body_size = self.max_response_body_size;
@@ -123,4 +127,44 @@ impl<'a> RpcServiceT<'a> for RpcService {
},
}
}
fn batch<'a>(&self, req: Batch<'a>) -> impl Future<Output = Self::BatchResponse> + Send + 'a {
let entries: Vec<_> = req.into_iter().collect();
let mut got_notif = false;
let mut batch_response = BatchResponseBuilder::new_with_limit(self.max_response_body_size);
let mut pending_calls: FuturesOrdered<_> = entries
.into_iter()
.filter_map(|v| match v {
Ok(BatchEntry::Call(call)) => Some(Either::Right(self.call(call))),
Ok(BatchEntry::Notification(_n)) => {
got_notif = true;
None
}
Err(_err) => Some(Either::Left(async {
MethodResponse::error(Id::Null, ErrorObject::from(ErrorCode::InvalidRequest))
})),
})
.collect();
async move {
while let Some(response) = pending_calls.next().await {
if let Err(too_large) = batch_response.append(response) {
let mut error_batch = BatchResponseBuilder::new_with_limit(1);
let _ = error_batch.append(too_large);
return error_batch.finish();
}
}
batch_response.finish()
}
}
#[allow(clippy::manual_async_fn)]
fn notification<'a>(
&self,
_n: jsonrpsee::core::middleware::Notification<'a>,
) -> impl Future<Output = Self::NotificationResponse> + Send + 'a {
async move { None }
}
}

View File

@@ -1,8 +1,8 @@
use crate::error::{RpcError, ServerKind};
use http::header::AUTHORIZATION;
use jsonrpsee::{
core::RegisterMethodError,
http_client::{transport::HttpBackend, HeaderMap},
core::{middleware::layer::RpcLogger, RegisterMethodError},
http_client::{transport::HttpBackend, HeaderMap, HttpClient, RpcService},
server::{AlreadyStoppedError, RpcModule},
Methods,
};
@@ -17,6 +17,7 @@ use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use tower::layer::util::Identity;
pub use jsonrpsee::server::ServerBuilder;
use jsonrpsee::server::{ServerConfig, ServerConfigBuilder};
pub use reth_ipc::server::Builder as IpcServerBuilder;
/// Server configuration for the auth server.
@@ -27,7 +28,7 @@ pub struct AuthServerConfig {
/// The secret for the auth layer of the server.
pub(crate) secret: JwtSecret,
/// Configs for JSON-RPC Http.
pub(crate) server_config: ServerBuilder<Identity, Identity>,
pub(crate) server_config: ServerConfigBuilder,
/// Configs for IPC server
pub(crate) ipc_server_config: Option<IpcServerBuilder<Identity, Identity>>,
/// IPC endpoint
@@ -56,7 +57,8 @@ impl AuthServerConfig {
tower::ServiceBuilder::new().layer(AuthLayer::new(JwtAuthValidator::new(secret)));
// By default, both http and ws are enabled.
let server = server_config
let server = ServerBuilder::new()
.set_config(server_config.build())
.set_http_middleware(middleware)
.build(socket_addr)
.await
@@ -87,7 +89,7 @@ impl AuthServerConfig {
pub struct AuthServerConfigBuilder {
socket_addr: Option<SocketAddr>,
secret: JwtSecret,
server_config: Option<ServerBuilder<Identity, Identity>>,
server_config: Option<ServerConfigBuilder>,
ipc_server_config: Option<IpcServerBuilder<Identity, Identity>>,
ipc_endpoint: Option<String>,
}
@@ -128,7 +130,7 @@ impl AuthServerConfigBuilder {
///
/// Note: this always configures an [`EthSubscriptionIdProvider`]
/// [`IdProvider`](jsonrpsee::server::IdProvider) for convenience.
pub fn with_server_config(mut self, config: ServerBuilder<Identity, Identity>) -> Self {
pub fn with_server_config(mut self, config: ServerConfigBuilder) -> Self {
self.server_config = Some(config.set_id_provider(EthSubscriptionIdProvider::default()));
self
}
@@ -155,18 +157,20 @@ impl AuthServerConfigBuilder {
}),
secret: self.secret,
server_config: self.server_config.unwrap_or_else(|| {
ServerBuilder::new()
// This needs to large enough to handle large eth_getLogs responses and maximum
// payload bodies limit for `engine_getPayloadBodiesByRangeV`
// ~750MB per response should be enough
ServerConfig::builder()
// This needs to large enough to handle large eth_getLogs responses and
// maximum payload bodies limit for
// `engine_getPayloadBodiesByRangeV` ~750MB per
// response should be enough
.max_response_body_size(750 * 1024 * 1024)
// Connections to this server are always authenticated, hence this only affects
// connections from the CL or any other client that uses JWT, this should be
// more than enough so that the CL (or multiple CL nodes) will never get rate
// limited
// Connections to this server are always authenticated, hence this only
// affects connections from the CL or any other
// client that uses JWT, this should be
// more than enough so that the CL (or multiple CL nodes) will never get
// rate limited
.max_connections(500)
// bump the default request size slightly, there aren't any methods exposed with
// dynamic request params that can exceed this
// bump the default request size slightly, there aren't any methods exposed
// with dynamic request params that can exceed this
.max_request_body_size(128 * 1024 * 1024)
.set_id_provider(EthSubscriptionIdProvider::default())
}),
@@ -297,9 +301,7 @@ impl AuthServerHandle {
}
/// Returns a http client connected to the server.
pub fn http_client(
&self,
) -> jsonrpsee::http_client::HttpClient<AuthClientService<HttpBackend>> {
pub fn http_client(&self) -> HttpClient<RpcLogger<RpcService<AuthClientService<HttpBackend>>>> {
// Create a middleware that adds a new JWT token to every request.
let secret_layer = AuthClientLayer::new(self.secret);
let middleware = tower::ServiceBuilder::default().layer(secret_layer);

View File

@@ -1,11 +1,10 @@
use std::{net::SocketAddr, path::PathBuf};
use jsonrpsee::server::ServerBuilder;
use jsonrpsee::server::ServerConfigBuilder;
use reth_node_core::{args::RpcServerArgs, utils::get_or_create_jwt_secret_from_path};
use reth_rpc::ValidationApiConfig;
use reth_rpc_eth_types::{EthConfig, EthStateCacheConfig, GasPriceOracleConfig};
use reth_rpc_layer::{JwtError, JwtSecret};
use reth_rpc_server_types::RpcModuleSelection;
use std::{net::SocketAddr, path::PathBuf};
use tower::layer::util::Identity;
use tracing::{debug, warn};
@@ -49,8 +48,8 @@ pub trait RethRpcServerConfig {
/// settings in the [`TransportRpcModuleConfig`].
fn transport_rpc_module_config(&self) -> TransportRpcModuleConfig;
/// Returns the default server builder for http/ws
fn http_ws_server_builder(&self) -> ServerBuilder<Identity, Identity>;
/// Returns the default server config for http/ws
fn http_ws_server_builder(&self) -> ServerConfigBuilder;
/// Returns the default ipc server builder
fn ipc_server_builder(&self) -> IpcServerBuilder<Identity, Identity>;
@@ -161,8 +160,8 @@ impl RethRpcServerConfig for RpcServerArgs {
config
}
fn http_ws_server_builder(&self) -> ServerBuilder<Identity, Identity> {
ServerBuilder::new()
fn http_ws_server_builder(&self) -> ServerConfigBuilder {
ServerConfigBuilder::new()
.max_connections(self.rpc_max_connections.get())
.max_request_body_size(self.rpc_max_request_size_bytes())
.max_response_body_size(self.rpc_max_response_size_bytes())

View File

@@ -27,10 +27,10 @@ use http::{header::AUTHORIZATION, HeaderMap};
use jsonrpsee::{
core::RegisterMethodError,
server::{
middleware::rpc::{RpcService, RpcServiceT},
AlreadyStoppedError, IdProvider, RpcServiceBuilder, ServerHandle,
middleware::rpc::{RpcService, RpcServiceBuilder, RpcServiceT},
AlreadyStoppedError, IdProvider, ServerHandle,
},
Methods, RpcModule,
MethodResponse, Methods, RpcModule,
};
use reth_chainspec::{ChainSpecProvider, EthereumHardforks};
use reth_consensus::{ConsensusError, FullConsensus};
@@ -69,6 +69,7 @@ pub use cors::CorsDomainError;
// re-export for convenience
pub use jsonrpsee::server::ServerBuilder;
use jsonrpsee::server::ServerConfigBuilder;
pub use reth_ipc::server::{
Builder as IpcServerBuilder, RpcServiceBuilder as IpcRpcServiceBuilder,
};
@@ -1071,13 +1072,13 @@ where
#[derive(Debug)]
pub struct RpcServerConfig<RpcMiddleware = Identity> {
/// Configs for JSON-RPC Http.
http_server_config: Option<ServerBuilder<Identity, Identity>>,
http_server_config: Option<ServerConfigBuilder>,
/// Allowed CORS Domains for http
http_cors_domains: Option<String>,
/// Address where to bind the http server to
http_addr: Option<SocketAddr>,
/// Configs for WS server
ws_server_config: Option<ServerBuilder<Identity, Identity>>,
ws_server_config: Option<ServerConfigBuilder>,
/// Allowed CORS Domains for ws.
ws_cors_domains: Option<String>,
/// Address where to bind the ws server to
@@ -1114,12 +1115,12 @@ impl Default for RpcServerConfig<Identity> {
impl RpcServerConfig {
/// Creates a new config with only http set
pub fn http(config: ServerBuilder<Identity, Identity>) -> Self {
pub fn http(config: ServerConfigBuilder) -> Self {
Self::default().with_http(config)
}
/// Creates a new config with only ws set
pub fn ws(config: ServerBuilder<Identity, Identity>) -> Self {
pub fn ws(config: ServerConfigBuilder) -> Self {
Self::default().with_ws(config)
}
@@ -1132,7 +1133,7 @@ impl RpcServerConfig {
///
/// Note: this always configures an [`EthSubscriptionIdProvider`] [`IdProvider`] for
/// convenience. To set a custom [`IdProvider`], please use [`Self::with_id_provider`].
pub fn with_http(mut self, config: ServerBuilder<Identity, Identity>) -> Self {
pub fn with_http(mut self, config: ServerConfigBuilder) -> Self {
self.http_server_config =
Some(config.set_id_provider(EthSubscriptionIdProvider::default()));
self
@@ -1142,7 +1143,7 @@ impl RpcServerConfig {
///
/// Note: this always configures an [`EthSubscriptionIdProvider`] [`IdProvider`] for
/// convenience. To set a custom [`IdProvider`], please use [`Self::with_id_provider`].
pub fn with_ws(mut self, config: ServerBuilder<Identity, Identity>) -> Self {
pub fn with_ws(mut self, config: ServerConfigBuilder) -> Self {
self.ws_server_config = Some(config.set_id_provider(EthSubscriptionIdProvider::default()));
self
}
@@ -1216,11 +1217,11 @@ impl<RpcMiddleware> RpcServerConfig<RpcMiddleware> {
where
I: IdProvider + Clone + 'static,
{
if let Some(http) = self.http_server_config {
self.http_server_config = Some(http.set_id_provider(id_provider.clone()));
if let Some(config) = self.http_server_config {
self.http_server_config = Some(config.set_id_provider(id_provider.clone()));
}
if let Some(ws) = self.ws_server_config {
self.ws_server_config = Some(ws.set_id_provider(id_provider.clone()));
if let Some(config) = self.ws_server_config {
self.ws_server_config = Some(config.set_id_provider(id_provider.clone()));
}
if let Some(ipc) = self.ipc_server_config {
self.ipc_server_config = Some(ipc.set_id_provider(id_provider));
@@ -1292,7 +1293,14 @@ impl<RpcMiddleware> RpcServerConfig<RpcMiddleware> {
where
RpcMiddleware: Layer<RpcRequestMetricsService<RpcService>> + Clone + Send + 'static,
for<'a> <RpcMiddleware as Layer<RpcRequestMetricsService<RpcService>>>::Service:
Send + Sync + 'static + RpcServiceT<'a>,
Send
+ Sync
+ 'static
+ RpcServiceT<
MethodResponse = MethodResponse,
BatchResponse = MethodResponse,
NotificationResponse = MethodResponse,
>,
{
let mut http_handle = None;
let mut ws_handle = None;
@@ -1342,8 +1350,8 @@ impl<RpcMiddleware> RpcServerConfig<RpcMiddleware> {
// we merge this into one server using the http setup
modules.config.ensure_ws_http_identical()?;
if let Some(builder) = self.http_server_config {
let server = builder
if let Some(config) = self.http_server_config {
let server = ServerBuilder::new()
.set_http_middleware(
tower::ServiceBuilder::new()
.option_layer(Self::maybe_cors_layer(cors)?)
@@ -1360,6 +1368,7 @@ impl<RpcMiddleware> RpcServerConfig<RpcMiddleware> {
.unwrap_or_default(),
),
)
.set_config(config.build())
.build(http_socket_addr)
.await
.map_err(|err| {
@@ -1390,9 +1399,9 @@ impl<RpcMiddleware> RpcServerConfig<RpcMiddleware> {
let mut http_local_addr = None;
let mut http_server = None;
if let Some(builder) = self.ws_server_config {
let server = builder
.ws_only()
if let Some(config) = self.ws_server_config {
let server = ServerBuilder::new()
.set_config(config.ws_only().build())
.set_http_middleware(
tower::ServiceBuilder::new()
.option_layer(Self::maybe_cors_layer(self.ws_cors_domains.clone())?)
@@ -1415,9 +1424,9 @@ impl<RpcMiddleware> RpcServerConfig<RpcMiddleware> {
ws_server = Some(server);
}
if let Some(builder) = self.http_server_config {
let server = builder
.http_only()
if let Some(config) = self.http_server_config {
let server = ServerBuilder::new()
.set_config(config.http_only().build())
.set_http_middleware(
tower::ServiceBuilder::new()
.option_layer(Self::maybe_cors_layer(self.http_cors_domains.clone())?)

View File

@@ -1,4 +1,9 @@
use jsonrpsee::{server::middleware::rpc::RpcServiceT, types::Request, MethodResponse, RpcModule};
use jsonrpsee::{
core::middleware::{Batch, Notification},
server::middleware::rpc::RpcServiceT,
types::Request,
MethodResponse, RpcModule,
};
use reth_metrics::{
metrics::{Counter, Histogram},
Metrics,
@@ -99,13 +104,15 @@ impl<S> RpcRequestMetricsService<S> {
}
}
impl<'a, S> RpcServiceT<'a> for RpcRequestMetricsService<S>
impl<S> RpcServiceT for RpcRequestMetricsService<S>
where
S: RpcServiceT<'a> + Send + Sync + Clone + 'static,
S: RpcServiceT<MethodResponse = MethodResponse> + Send + Sync + Clone + 'static,
{
type Future = MeteredRequestFuture<S::Future>;
type MethodResponse = S::MethodResponse;
type NotificationResponse = S::NotificationResponse;
type BatchResponse = S::BatchResponse;
fn call(&self, req: Request<'a>) -> Self::Future {
fn call<'a>(&self, req: Request<'a>) -> impl Future<Output = S::MethodResponse> + Send + 'a {
self.metrics.inner.connection_metrics.requests_started_total.increment(1);
let call_metrics = self.metrics.inner.call_metrics.get_key_value(req.method.as_ref());
if let Some((_, call_metrics)) = &call_metrics {
@@ -118,6 +125,17 @@ where
method: call_metrics.map(|(method, _)| *method),
}
}
fn batch<'a>(&self, req: Batch<'a>) -> impl Future<Output = Self::BatchResponse> + Send + 'a {
self.inner.batch(req)
}
fn notification<'a>(
&self,
n: Notification<'a>,
) -> impl Future<Output = Self::NotificationResponse> + Send + 'a {
self.inner.notification(n)
}
}
impl<S> Drop for RpcRequestMetricsService<S> {

View File

@@ -1,6 +1,6 @@
//! [`jsonrpsee`] helper layer for rate limiting certain methods.
use jsonrpsee::{server::middleware::rpc::RpcServiceT, types::Request, MethodResponse};
use jsonrpsee::{server::middleware::rpc::RpcServiceT, types::Request};
use std::{
future::Future,
pin::Pin,
@@ -61,13 +61,15 @@ impl<S> RpcRequestRateLimitingService<S> {
}
}
impl<'a, S> RpcServiceT<'a> for RpcRequestRateLimitingService<S>
impl<S> RpcServiceT for RpcRequestRateLimitingService<S>
where
S: RpcServiceT<'a> + Send + Sync + Clone + 'static,
S: RpcServiceT + Send + Sync + Clone + 'static,
{
type Future = RateLimitingRequestFuture<S::Future>;
type MethodResponse = S::MethodResponse;
type NotificationResponse = S::NotificationResponse;
type BatchResponse = S::BatchResponse;
fn call(&self, req: Request<'a>) -> Self::Future {
fn call<'a>(&self, req: Request<'a>) -> impl Future<Output = Self::MethodResponse> + Send + 'a {
let method_name = req.method_name();
if method_name.starts_with("trace_") || method_name.starts_with("debug_") {
RateLimitingRequestFuture {
@@ -81,6 +83,20 @@ where
RateLimitingRequestFuture { fut: self.inner.call(req), guard: None, permit: None }
}
}
fn batch<'a>(
&self,
requests: jsonrpsee::core::middleware::Batch<'a>,
) -> impl Future<Output = Self::BatchResponse> + Send + 'a {
self.inner.batch(requests)
}
fn notification<'a>(
&self,
n: jsonrpsee::core::middleware::Notification<'a>,
) -> impl Future<Output = Self::NotificationResponse> + Send + 'a {
self.inner.notification(n)
}
}
/// Response future.
@@ -98,7 +114,7 @@ impl<F> std::fmt::Debug for RateLimitingRequestFuture<F> {
}
}
impl<F: Future<Output = MethodResponse>> Future for RateLimitingRequestFuture<F> {
impl<F: Future> Future for RateLimitingRequestFuture<F> {
type Output = F::Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {

View File

@@ -1,16 +1,15 @@
use crate::utils::{test_address, test_rpc_builder};
use alloy_rpc_types_eth::{Block, Header, Receipt, Transaction};
use jsonrpsee::{
server::{middleware::rpc::RpcServiceT, RpcServiceBuilder},
core::middleware::{Batch, Notification},
server::middleware::rpc::{RpcServiceBuilder, RpcServiceT},
types::Request,
MethodResponse,
};
use reth_rpc_builder::{RpcServerConfig, TransportRpcModuleConfig};
use reth_rpc_eth_api::EthApiClient;
use reth_rpc_server_types::RpcModuleSelection;
use std::{
future::Future,
pin::Pin,
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
@@ -37,13 +36,15 @@ struct MyMiddlewareService<S> {
count: Arc<AtomicUsize>,
}
impl<'a, S> RpcServiceT<'a> for MyMiddlewareService<S>
impl<S> RpcServiceT for MyMiddlewareService<S>
where
S: RpcServiceT<'a> + Send + Sync + Clone + 'static,
S: RpcServiceT + Send + Sync + Clone + 'static,
{
type Future = Pin<Box<dyn Future<Output = MethodResponse> + Send + 'a>>;
type MethodResponse = S::MethodResponse;
type NotificationResponse = S::NotificationResponse;
type BatchResponse = S::BatchResponse;
fn call(&self, req: Request<'a>) -> Self::Future {
fn call<'a>(&self, req: Request<'a>) -> impl Future<Output = Self::MethodResponse> + Send + 'a {
tracing::info!("MyMiddleware processed call {}", req.method);
let count = self.count.clone();
let service = self.service.clone();
@@ -54,6 +55,17 @@ where
rp
})
}
fn batch<'a>(&self, req: Batch<'a>) -> impl Future<Output = Self::BatchResponse> + Send + 'a {
self.service.batch(req)
}
fn notification<'a>(
&self,
n: Notification<'a>,
) -> impl Future<Output = Self::NotificationResponse> + Send + 'a {
self.service.notification(n)
}
}
#[tokio::test(flavor = "multi_thread")]

View File

@@ -155,7 +155,7 @@ mod tests {
use crate::JwtAuthValidator;
use alloy_rpc_types_engine::{Claims, JwtError, JwtSecret};
use jsonrpsee::{
server::{RandomStringIdProvider, ServerBuilder, ServerHandle},
server::{RandomStringIdProvider, ServerBuilder, ServerConfig, ServerHandle},
RpcModule,
};
use reqwest::{header, StatusCode};
@@ -260,7 +260,9 @@ mod tests {
// Create a layered server
let server = ServerBuilder::default()
.set_id_provider(RandomStringIdProvider::new(16))
.set_config(
ServerConfig::builder().set_id_provider(RandomStringIdProvider::new(16)).build(),
)
.set_http_middleware(middleware)
.build(addr.parse::<SocketAddr>().unwrap())
.await

View File

@@ -132,11 +132,13 @@ where
let stream =
futures::stream::iter(blocks.into_iter().map(move |(block, opts)| async move {
let trace_future = match block {
BlockId::Hash(hash) => self.debug_trace_block_by_hash(hash.block_hash, opts),
BlockId::Number(tag) => self.debug_trace_block_by_number(tag, opts),
BlockId::Hash(hash) => {
self.debug_trace_block_by_hash(hash.block_hash, opts).await
}
BlockId::Number(tag) => self.debug_trace_block_by_number(tag, opts).await,
};
match trace_future.await {
match trace_future {
Ok(result) => Ok((result, block)),
Err(err) => Err((err, block)),
}

View File

@@ -159,8 +159,8 @@ where
let current_sub_res = pubsub.sync_status(initial_sync_status);
// send the current status immediately
let msg = SubscriptionMessage::from_json(&current_sub_res)
.map_err(SubscriptionSerializeError::new)?;
let msg = to_subscription_message(&current_sub_res)?;
if accepted_sink.send(msg).await.is_err() {
return Ok(())
}
@@ -174,8 +174,7 @@ where
// send a new message now that the status changed
let sync_status = pubsub.sync_status(current_syncing);
let msg = SubscriptionMessage::from_json(&sync_status)
.map_err(SubscriptionSerializeError::new)?;
let msg = to_subscription_message(&sync_status)?;
if accepted_sink.send(msg).await.is_err() {
break
}
@@ -227,7 +226,7 @@ where
break Ok(())
},
};
let msg = SubscriptionMessage::from_json(&item).map_err(SubscriptionSerializeError::new)?;
let msg = to_subscription_message(&item)?;
if sink.send(msg).await.is_err() {
break Ok(());
}
@@ -325,3 +324,10 @@ where
})
}
}
/// Serializes a messages into a raw [`SubscriptionMessage`].
fn to_subscription_message<T: Serialize>(
msg: &T,
) -> Result<SubscriptionMessage, SubscriptionSerializeError> {
serde_json::value::to_raw_value(msg).map(Into::into).map_err(SubscriptionSerializeError::new)
}

View File

@@ -1055,6 +1055,7 @@ mod tests {
use alloy_eips::eip2718::Decodable2718;
use alloy_primitives::{hex, U256};
use reth_ethereum_primitives::PooledTransaction;
use reth_primitives_traits::SignedTransaction;
use reth_provider::test_utils::{ExtendedAccount, MockEthProvider};
fn get_transaction() -> EthPooledTransaction {

View File

@@ -88,4 +88,5 @@ allow-git = [
"https://github.com/paradigmxyz/revm-inspectors",
"https://github.com/alloy-rs/evm",
"https://github.com/alloy-rs/hardforks",
"https://github.com/paradigmxyz/jsonrpsee",
]

View File

@@ -20,4 +20,5 @@ clap = { workspace = true, features = ["derive"] }
jsonrpsee = { workspace = true, features = ["server", "macros"] }
tokio.workspace = true
serde.workspace = true
serde_json.workspace = true
tracing.workspace = true

View File

@@ -85,7 +85,9 @@ impl StorageWatcherApiServer for StorageWatcherRpc {
let Ok(mut rx) = resp_rx.await else { return };
while let Some(diff) = rx.recv().await {
let msg = SubscriptionMessage::from_json(&diff).expect("serialize");
let msg = SubscriptionMessage::from(
serde_json::value::to_raw_value(&diff).expect("serialize"),
);
if sink.send(msg).await.is_err() {
break;
}

View File

@@ -11,5 +11,6 @@ reth-ethereum = { workspace = true, features = ["node", "pool", "cli"] }
clap = { workspace = true, features = ["derive"] }
jsonrpsee = { workspace = true, features = ["server", "macros"] }
tokio.workspace = true
serde_json.workspace = true
[dev-dependencies]
tokio.workspace = true

View File

@@ -117,8 +117,9 @@ where
loop {
sleep(Duration::from_secs(delay)).await;
let msg = SubscriptionMessage::from_json(&pool.pool_size().total)
.expect("Failed to serialize `usize`");
let msg = SubscriptionMessage::from(
serde_json::value::to_raw_value(&pool.pool_size().total).expect("serialize"),
);
let _ = sink.send(msg).await;
}
});
@@ -164,8 +165,10 @@ mod tests {
// Send pool size repeatedly, with a 10-second delay
loop {
sleep(Duration::from_millis(delay)).await;
let message = SubscriptionMessage::from_json(&pool.pool_size().total)
.expect("serialize usize");
let message = SubscriptionMessage::from(
serde_json::value::to_raw_value(&pool.pool_size().total)
.expect("serialize usize"),
);
// Just ignore errors if a client has dropped
let _ = sink.send(message).await;