From 9567b256c8cb4a8cd439bd7e6e7b1be3b032efde Mon Sep 17 00:00:00 2001 From: Abner Zheng Date: Thu, 25 Apr 2024 23:51:31 +0800 Subject: [PATCH] feat: support max_request_body_size (#7880) --- crates/rpc/ipc/src/server/ipc.rs | 22 ++++++--- crates/rpc/ipc/src/server/mod.rs | 80 +++++++++++++++++++++++++++++++- 2 files changed, 93 insertions(+), 9 deletions(-) diff --git a/crates/rpc/ipc/src/server/ipc.rs b/crates/rpc/ipc/src/server/ipc.rs index 1fd600c033..daf7d1dc0e 100644 --- a/crates/rpc/ipc/src/server/ipc.rs +++ b/crates/rpc/ipc/src/server/ipc.rs @@ -11,7 +11,10 @@ use jsonrpsee::{ JsonRawValue, }, server::middleware::rpc::RpcServiceT, - types::{error::ErrorCode, ErrorObject, Id, InvalidRequest, Notification, Request}, + types::{ + error::{reject_too_big_request, ErrorCode}, + ErrorObject, Id, InvalidRequest, Notification, Request, + }, BatchResponseBuilder, MethodResponse, ResponsePayload, }; use tokio::sync::OwnedSemaphorePermit; @@ -124,6 +127,7 @@ pub(crate) async fn call_with_service( request: String, rpc_service: S, max_response_body_size: usize, + max_request_body_size: usize, conn: Arc, ) -> Option where @@ -143,9 +147,17 @@ where }) .unwrap_or(Kind::Single); + let data = request.into_bytes(); + if data.len() > max_request_body_size { + return Some(batch_response_error( + Id::Null, + reject_too_big_request(max_request_body_size as u32), + )); + } + // Single request or notification let res = if matches!(request_kind, Kind::Single) { - let response = process_single_request(request.into_bytes(), &rpc_service).await; + let response = process_single_request(data, &rpc_service).await; match response { Some(response) if response.is_method_call() => Some(response.to_result()), _ => { @@ -155,11 +167,7 @@ where } } } else { - process_batch_request( - Batch { data: request.into_bytes(), rpc_service }, - max_response_body_size, - ) - .await + process_batch_request(Batch { data, rpc_service }, max_response_body_size).await }; drop(conn); diff --git a/crates/rpc/ipc/src/server/mod.rs b/crates/rpc/ipc/src/server/mod.rs index 7afb6bb7d1..5301c7d219 100644 --- a/crates/rpc/ipc/src/server/mod.rs +++ b/crates/rpc/ipc/src/server/mod.rs @@ -378,6 +378,7 @@ where }; let max_response_body_size = self.inner.max_response_body_size as usize; + let max_request_body_size = self.inner.max_request_body_size as usize; let rpc_service = self.rpc_middleware.service(RpcService::new( self.inner.methods.clone(), max_response_body_size, @@ -392,7 +393,14 @@ where // work to a separate task takes the pressure off the connection so all concurrent responses // are also serialized concurrently and the connection can focus on read+write let f = tokio::task::spawn(async move { - ipc::call_with_service(request, rpc_service, max_response_body_size, conn).await + ipc::call_with_service( + request, + rpc_service, + max_response_body_size, + max_request_body_size, + conn, + ) + .await }); Box::pin(async move { f.await.map_err(|err| err.into()) }) @@ -780,7 +788,11 @@ mod tests { use crate::client::IpcClientBuilder; use futures::future::{select, Either}; use jsonrpsee::{ - core::client::{ClientT, Subscription, SubscriptionClientT}, + core::{ + client, + client::{ClientT, Error, Subscription, SubscriptionClientT}, + params::BatchRequestBuilder, + }, rpc_params, types::Request, PendingSubscriptionSink, RpcModule, SubscriptionMessage, @@ -834,6 +846,46 @@ mod tests { } } + #[tokio::test] + async fn can_set_the_max_response_body_size() { + let endpoint = dummy_endpoint(); + let server = Builder::default().max_response_body_size(100).build(&endpoint); + let mut module = RpcModule::new(()); + module.register_method("anything", |_, _| "a".repeat(101)).unwrap(); + let handle = server.start(module).await.unwrap(); + tokio::spawn(handle.stopped()); + + let client = IpcClientBuilder::default().build(endpoint).await.unwrap(); + let response: Result = client.request("anything", rpc_params![]).await; + assert!(response.unwrap_err().to_string().contains("Exceeded max limit of")); + } + + #[tokio::test] + async fn can_set_the_max_request_body_size() { + let endpoint = dummy_endpoint(); + let server = Builder::default().max_request_body_size(100).build(&endpoint); + let mut module = RpcModule::new(()); + module.register_method("anything", |_, _| "succeed").unwrap(); + let handle = server.start(module).await.unwrap(); + tokio::spawn(handle.stopped()); + + let client = IpcClientBuilder::default().build(endpoint).await.unwrap(); + let response: Result = + client.request("anything", rpc_params!["a".repeat(101)]).await; + assert!(response.is_err()); + let mut batch_request_builder = BatchRequestBuilder::new(); + let _ = batch_request_builder.insert("anything", rpc_params![]); + let _ = batch_request_builder.insert("anything", rpc_params![]); + let _ = batch_request_builder.insert("anything", rpc_params![]); + // the raw request string is: + // [{"jsonrpc":"2.0","id":0,"method":"anything"},{"jsonrpc":"2.0","id":1, \ + // "method":"anything"},{"jsonrpc":"2.0","id":2,"method":"anything"}]" + // which is 136 bytes, more than 100 bytes. + let response: Result, Error> = + client.batch_request(batch_request_builder).await; + assert!(response.is_err()); + } + #[tokio::test] async fn test_rpc_request() { let endpoint = dummy_endpoint(); @@ -849,6 +901,30 @@ mod tests { assert_eq!(response, msg); } + #[tokio::test] + async fn test_batch_request() { + let endpoint = dummy_endpoint(); + let server = Builder::default().build(&endpoint); + let mut module = RpcModule::new(()); + module.register_method("anything", |_, _| "ok").unwrap(); + let handle = server.start(module).await.unwrap(); + tokio::spawn(handle.stopped()); + + let client = IpcClientBuilder::default().build(endpoint).await.unwrap(); + let mut batch_request_builder = BatchRequestBuilder::new(); + let _ = batch_request_builder.insert("anything", rpc_params![]); + let _ = batch_request_builder.insert("anything", rpc_params![]); + let _ = batch_request_builder.insert("anything", rpc_params![]); + let result = client + .batch_request(batch_request_builder) + .await + .unwrap() + .into_ok() + .unwrap() + .collect::>(); + assert_eq!(result, vec!["ok", "ok", "ok"]); + } + #[tokio::test] async fn test_ipc_modules() { reth_tracing::init_test_tracing();