diff --git a/Cargo.lock b/Cargo.lock index 8fc594c9d5..8cff6fecaf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1931,26 +1931,26 @@ dependencies = [ [[package]] name = "jsonrpsee" -version = "0.15.1" +version = "0.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8bd0d559d5e679b1ab2f869b486a11182923863b1b3ee8b421763cdd707b783a" +checksum = "e0ee76536f6a303b67c13a99ecae0002bb388674dbf416094dde808263ea229c" dependencies = [ + "jsonrpsee-client-transport", "jsonrpsee-core", "jsonrpsee-http-client", - "jsonrpsee-http-server", "jsonrpsee-proc-macros", + "jsonrpsee-server", "jsonrpsee-types", "jsonrpsee-wasm-client", "jsonrpsee-ws-client", - "jsonrpsee-ws-server", "tracing", ] [[package]] name = "jsonrpsee-client-transport" -version = "0.15.1" +version = "0.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8752740ecd374bcbf8b69f3e80b0327942df76f793f8d4e60d3355650c31fb74" +checksum = "74c8f8f21b684623d23be8b6fcb101594f4e95d8a505ffd0568de863d93668f4" dependencies = [ "anyhow", "futures-channel", @@ -1973,9 +1973,9 @@ dependencies = [ [[package]] name = "jsonrpsee-core" -version = "0.15.1" +version = "0.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f3dc3e9cf2ba50b7b1d7d76a667619f82846caa39e8e8daa8a4962d74acaddca" +checksum = "f57020f4c98b6c6e8848fb115e61227fba6993517bee0faa38e4db627a9f7254" dependencies = [ "anyhow", "arrayvec", @@ -1986,10 +1986,8 @@ dependencies = [ "futures-timer", "futures-util", "globset", - "http", "hyper", "jsonrpsee-types", - "lazy_static", "parking_lot 0.12.1", "rand 0.8.5", "rustc-hash", @@ -1999,16 +1997,14 @@ dependencies = [ "thiserror", "tokio", "tracing", - "tracing-futures", - "unicase", "wasm-bindgen-futures", ] [[package]] name = "jsonrpsee-http-client" -version = "0.15.1" +version = "0.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "52f7c0e2333ab2115c302eeb4f137c8a4af5ab609762df68bbda8f06496677c9" +checksum = "7ca71086fd13ad0991cd4a0e50c9f4c59488b1acfac4a528c448c2e10020aa1e" dependencies = [ "async-trait", "hyper", @@ -2021,33 +2017,15 @@ dependencies = [ "thiserror", "tokio", "tracing", - "tracing-futures", -] - -[[package]] -name = "jsonrpsee-http-server" -version = "0.15.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "03802f0373a38c2420c70b5144742d800b509e2937edc4afb116434f07120117" -dependencies = [ - "futures-channel", - "futures-util", - "hyper", - "jsonrpsee-core", - "jsonrpsee-types", - "serde", - "serde_json", - "tokio", - "tracing", - "tracing-futures", ] [[package]] name = "jsonrpsee-proc-macros" -version = "0.15.1" +version = "0.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bd67957d4280217247588ac86614ead007b301ca2fa9f19c19f880a536f029e3" +checksum = "d335519bfe970511318f2780b7716f91d99d67fbf32ac3ea94b5f2f6c9818a4d" dependencies = [ + "heck", "proc-macro-crate", "proc-macro2", "quote", @@ -2055,10 +2033,32 @@ dependencies = [ ] [[package]] -name = "jsonrpsee-types" -version = "0.15.1" +name = "jsonrpsee-server" +version = "0.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e290bba767401b646812f608c099b922d8142603c9e73a50fb192d3ac86f4a0d" +checksum = "ff5de9e3d6280c5354882e001494bc6ffb3ea31ac7dd81440f997aa380039e39" +dependencies = [ + "futures-channel", + "futures-util", + "http", + "hyper", + "jsonrpsee-core", + "jsonrpsee-types", + "serde", + "serde_json", + "soketto", + "tokio", + "tokio-stream", + "tokio-util 0.7.4", + "tower", + "tracing", +] + +[[package]] +name = "jsonrpsee-types" +version = "0.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "88c88c764104fef883eb8a832d0d77688a63f67d75b385f5cdae7b3db8925288" dependencies = [ "anyhow", "beef", @@ -2070,9 +2070,9 @@ dependencies = [ [[package]] name = "jsonrpsee-wasm-client" -version = "0.15.1" +version = "0.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "597b4eb94730e7695d0a2a429bc37a12e6e84d12680fdafb9b8f5f53652aab57" +checksum = "d11a058951524f3f6e02e94c26d5c189a5df0f2dea81339147c603b9eb7c511d" dependencies = [ "jsonrpsee-client-transport", "jsonrpsee-core", @@ -2081,9 +2081,9 @@ dependencies = [ [[package]] name = "jsonrpsee-ws-client" -version = "0.15.1" +version = "0.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6ee5feddd5188e62ac08fcf0e56478138e581509d4730f3f7be9b57dd402a4ff" +checksum = "ea609539b5062f856a43652fd01d8ed8df40cd4d7067be6f6b7ce81d8bbd03be" dependencies = [ "http", "jsonrpsee-client-transport", @@ -2091,26 +2091,6 @@ dependencies = [ "jsonrpsee-types", ] -[[package]] -name = "jsonrpsee-ws-server" -version = "0.15.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d488ba74fb369e5ab68926feb75a483458b88e768d44319f37e4ecad283c7325" -dependencies = [ - "futures-channel", - "futures-util", - "http", - "jsonrpsee-core", - "jsonrpsee-types", - "serde_json", - "soketto", - "tokio", - "tokio-stream", - "tokio-util 0.7.4", - "tracing", - "tracing-futures", -] - [[package]] name = "k256" version = "0.11.6" @@ -2544,6 +2524,20 @@ dependencies = [ "syn", ] +[[package]] +name = "parity-tokio-ipc" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9981e32fb75e004cc148f5fb70342f393830e0a4aa62e3cc93b50976218d42b6" +dependencies = [ + "futures", + "libc", + "log", + "rand 0.7.3", + "tokio", + "winapi", +] + [[package]] name = "parking" version = "2.0.0" @@ -3250,6 +3244,25 @@ dependencies = [ "tokio-stream", ] +[[package]] +name = "reth-ipc" +version = "0.1.0" +dependencies = [ + "async-trait", + "bytes", + "futures", + "jsonrpsee", + "parity-tokio-ipc", + "pin-project", + "serde_json", + "thiserror", + "tokio", + "tokio-util 0.7.4", + "tower", + "tracing", + "tracing-test", +] + [[package]] name = "reth-libmdbx" version = "0.1.6" @@ -4023,6 +4036,7 @@ dependencies = [ "base64", "bytes", "futures", + "http", "httparse", "log", "rand 0.8.5", @@ -4440,6 +4454,23 @@ dependencies = [ "walkdir", ] +[[package]] +name = "tower" +version = "0.4.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c" +dependencies = [ + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tower-layer" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c20c8dbed6283a09604c3e69b4b7eeb54e298b8a600d4d5ecb5ad39de609f1d0" + [[package]] name = "tower-service" version = "0.3.2" @@ -4629,15 +4660,6 @@ dependencies = [ "static_assertions", ] -[[package]] -name = "unicase" -version = "2.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "50f37be617794602aabbeee0be4f259dc1778fabe05e2d67ee8f79326d5cb4f6" -dependencies = [ - "version_check", -] - [[package]] name = "unicode-bidi" version = "0.3.8" diff --git a/Cargo.toml b/Cargo.toml index 1036ccc0ab..d1fa725587 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,6 +13,7 @@ members = [ "crates/net/eth-wire", "crates/net/discv4", "crates/net/network", + "crates/net/ipc", "crates/net/rpc", "crates/net/rpc-api", "crates/net/rpc-types", diff --git a/crates/net/ipc/Cargo.toml b/crates/net/ipc/Cargo.toml new file mode 100644 index 0000000000..490d04abe4 --- /dev/null +++ b/crates/net/ipc/Cargo.toml @@ -0,0 +1,34 @@ +[package] +name = "reth-ipc" +version = "0.1.0" +edition = "2021" +license = "MIT OR Apache-2.0" +repository = "https://github.com/foundry-rs/reth" +readme = "README.md" +description = """ +IPC support for reth +""" + +[dependencies] + +# async/net +futures = "0.3" +parity-tokio-ipc = "0.9.0" +tokio = { version = "1", features = ["net", "time", "rt-multi-thread"] } +tokio-util = { version = "0.7", features = ["codec"] } +async-trait = "0.1" +pin-project = "1.0" +tower = "0.4" + +# misc +jsonrpsee = { version = "0.16", features = ["server", "client"] } +serde_json = "1.0" +tracing = "0.1.37" +bytes = "1.2.1" +thiserror = "1.0.37" + +[dev-dependencies] +tracing-test = "0.2" + +[features] +client = ["jsonrpsee/client", "jsonrpsee/async-client"] diff --git a/crates/net/ipc/README.md b/crates/net/ipc/README.md new file mode 100644 index 0000000000..b65fe8ae90 --- /dev/null +++ b/crates/net/ipc/README.md @@ -0,0 +1,3 @@ +#

reth-ipc

+ +IPC server and client implementation for [`jsonrpsee`](https://github.com/paritytech/jsonrpsee/). \ No newline at end of file diff --git a/crates/net/ipc/src/client.rs b/crates/net/ipc/src/client.rs new file mode 100644 index 0000000000..a04bdfa23d --- /dev/null +++ b/crates/net/ipc/src/client.rs @@ -0,0 +1,151 @@ +//! [`jsonrpsee`] transport adapter implementation for IPC. + +use crate::stream_codec::StreamCodec; +use futures::StreamExt; +use jsonrpsee::{ + async_client::{Client, ClientBuilder}, + core::client::{ReceivedMessage, TransportReceiverT, TransportSenderT}, +}; +use std::{ + io, + path::{Path, PathBuf}, +}; +use tokio::{io::AsyncWriteExt, net::UnixStream}; +use tokio_util::codec::FramedRead; + +/// Builder type for [`Client`] +#[derive(Clone, Default, Debug)] +#[non_exhaustive] +pub struct IpcClientBuilder; + +impl IpcClientBuilder { + /// Connects to a IPC socket + pub async fn build(self, path: impl AsRef) -> Result { + let (tx, rx) = IpcTransportClientBuilder::default().build(path).await?; + Ok(self.build_with_tokio(tx, rx)) + } + + /// Uses the sender and receiver channels to connect to the socket. + pub fn build_with_tokio(self, sender: S, receiver: R) -> Client + where + S: TransportSenderT + Send, + R: TransportReceiverT + Send, + { + ClientBuilder::default().build_with_tokio(sender, receiver) + } +} + +/// Sending end of IPC transport. +#[derive(Debug)] +pub struct Sender { + inner: tokio::net::unix::OwnedWriteHalf, +} + +#[async_trait::async_trait] +impl TransportSenderT for Sender { + type Error = IpcError; + + /// Sends out a request. Returns a Future that finishes when the request has been successfully + /// sent. + async fn send(&mut self, msg: String) -> Result<(), Self::Error> { + Ok(self.inner.write_all(msg.as_bytes()).await?) + } + + async fn send_ping(&mut self) -> Result<(), Self::Error> { + tracing::trace!("send ping - not implemented"); + Err(IpcError::NotSupported) + } + + /// Close the connection. + async fn close(&mut self) -> Result<(), Self::Error> { + Ok(()) + } +} + +/// Receiving end of IPC transport. +#[derive(Debug)] +pub struct Receiver { + inner: FramedRead, +} + +#[async_trait::async_trait] +impl TransportReceiverT for Receiver { + type Error = IpcError; + + /// Returns a Future resolving when the server sent us something back. + async fn receive(&mut self) -> Result { + match self.inner.next().await { + None => Err(IpcError::Closed), + Some(val) => Ok(ReceivedMessage::Text(val?)), + } + } +} + +/// Builder for IPC transport [`Sender`] and ['Receiver`] pair. +#[derive(Debug, Clone, Default)] +#[non_exhaustive] +pub struct IpcTransportClientBuilder; + +impl IpcTransportClientBuilder { + /// Try to establish the connection. + /// + /// ``` + /// use jsonrpsee::rpc_params; + /// use reth_ipc::client::IpcClientBuilder; + /// use jsonrpsee::core::client::ClientT; + /// # async fn run_client() -> Result<(), Box> { + /// let client = IpcClientBuilder::default().build("/tmp/my-uds").await?; + /// let response: String = client.request("say_hello", rpc_params![]).await?; + /// # Ok(()) + /// # } + /// ``` + pub async fn build(self, path: impl AsRef) -> Result<(Sender, Receiver), IpcError> { + let path = path.as_ref(); + let stream = UnixStream::connect(path) + .await + .map_err(|err| IpcError::FailedToConnect { path: path.to_path_buf(), err })?; + + let (rhlf, whlf) = stream.into_split(); + + Ok(( + Sender { inner: whlf }, + Receiver { inner: FramedRead::new(rhlf, StreamCodec::stream_incoming()) }, + )) + } +} + +/// Error variants that can happen in IPC transport. +#[derive(Debug, thiserror::Error)] +#[allow(missing_docs)] +pub enum IpcError { + /// Operation not supported + #[error("Operation not supported")] + NotSupported, + /// Stream was closed + #[error("Stream closed")] + Closed, + /// Thrown when failed to establish a socket connection. + #[error("Failed to connect to socket {path}: {err}")] + FailedToConnect { + /// The path of the socket. + path: PathBuf, + err: io::Error, + }, + #[error(transparent)] + Io(#[from] io::Error), +} + +#[cfg(test)] +mod tests { + use super::*; + use parity_tokio_ipc::{dummy_endpoint, Endpoint}; + + #[tokio::test] + async fn test_connect() { + let endpoint = dummy_endpoint(); + let _incoming = Endpoint::new(endpoint.clone()).incoming().unwrap(); + + let (tx, rx) = IpcTransportClientBuilder::default().build(endpoint).await.unwrap(); + let _ = IpcClientBuilder::default().build_with_tokio(tx, rx); + } +} diff --git a/crates/net/ipc/src/lib.rs b/crates/net/ipc/src/lib.rs new file mode 100644 index 0000000000..f8d4086119 --- /dev/null +++ b/crates/net/ipc/src/lib.rs @@ -0,0 +1,14 @@ +#![warn(missing_debug_implementations, missing_docs, unreachable_pub, unused_crate_dependencies)] +#![deny(unused_must_use, rust_2018_idioms)] +#![doc(test( + no_crate_inject, + attr(deny(warnings, rust_2018_idioms), allow(dead_code, unused_variables)) +))] + +//! Reth IPC implementation + +pub mod client; +pub mod server; + +/// Json codec implementation +pub mod stream_codec; diff --git a/crates/net/ipc/src/server/connection.rs b/crates/net/ipc/src/server/connection.rs new file mode 100644 index 0000000000..ff0bd4c00a --- /dev/null +++ b/crates/net/ipc/src/server/connection.rs @@ -0,0 +1,115 @@ +//! A IPC connection. + +use crate::stream_codec::StreamCodec; +use futures::{ready, Sink, Stream, StreamExt}; +use std::{ + io, + marker::PhantomData, + pin::Pin, + task::{Context, Poll}, +}; +use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt}; +use tokio_util::codec::Framed; + +pub(crate) type JsonRpcStream = Framed; + +/// Wraps a stream of incoming connections. +#[pin_project::pin_project] +pub(crate) struct Incoming { + #[pin] + inner: T, + _marker: PhantomData, +} +impl Incoming +where + T: Stream> + Unpin + 'static, + Item: AsyncRead + AsyncWrite, +{ + /// Create a new instance. + pub(crate) fn new(inner: T) -> Self { + Self { inner, _marker: Default::default() } + } + + /// Polls to accept a new incoming connection to the endpoint. + pub(crate) fn poll_accept(&mut self, cx: &mut Context<'_>) -> Poll<::Item> { + let res = match ready!(self.poll_next_unpin(cx)) { + None => Err(io::Error::new(io::ErrorKind::ConnectionAborted, "ipc connection closed")), + Some(conn) => conn, + }; + Poll::Ready(res) + } +} + +impl Stream for Incoming +where + T: Stream> + 'static, + Item: AsyncRead + AsyncWrite, +{ + type Item = io::Result>>; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.project(); + let res = match ready!(this.inner.poll_next(cx)) { + Some(Ok(item)) => { + let framed = IpcConn(tokio_util::codec::Decoder::framed( + StreamCodec::stream_incoming(), + item, + )); + Ok(framed) + } + Some(Err(err)) => Err(err), + None => return Poll::Ready(None), + }; + Poll::Ready(Some(res)) + } +} + +#[pin_project::pin_project] +pub(crate) struct IpcConn(#[pin] T); + +impl IpcConn> +where + T: AsyncRead + AsyncWrite + Unpin, +{ + /// Create a response for when the server is busy and can't accept more requests. + pub(crate) async fn reject_connection(self) { + let mut parts = self.0.into_parts(); + let _ = parts.io.write_all(b"Too many connections. Please try again later.").await; + } +} + +impl Stream for IpcConn> +where + T: AsyncRead + AsyncWrite, +{ + type Item = io::Result; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project().0.poll_next(cx) + } +} + +impl Sink for IpcConn> +where + T: AsyncRead + AsyncWrite, +{ + type Error = io::Error; + + fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + // NOTE: we always flush here this prevents buffering in the underlying + // `Framed` impl that would cause stalled requests + self.project().0.poll_flush(cx) + } + + fn start_send(self: Pin<&mut Self>, item: String) -> Result<(), Self::Error> { + self.project().0.start_send(item) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project().0.poll_flush(cx) + } + + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project().0.poll_close(cx) + } +} diff --git a/crates/net/ipc/src/server/future.rs b/crates/net/ipc/src/server/future.rs new file mode 100644 index 0000000000..942c25f3fc --- /dev/null +++ b/crates/net/ipc/src/server/future.rs @@ -0,0 +1,208 @@ +// Copyright 2019-2021 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any +// person obtaining a copy of this software and associated +// documentation files (the "Software"), to deal in the +// Software without restriction, including without +// limitation the rights to use, copy, modify, merge, +// publish, distribute, sublicense, and/or sell copies of +// the Software, and to permit persons to whom the Software +// is furnished to do so, subject to the following +// conditions: +// +// The above copyright notice and this permission notice +// shall be included in all copies or substantial portions +// of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF +// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED +// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A +// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT +// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY +// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR +// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +//! Utilities for handling async code. + +use futures::FutureExt; +use std::{ + future::Future, + pin::Pin, + sync::Arc, + task::{Context, Poll}, +}; +use tokio::{ + sync::{watch, OwnedSemaphorePermit, Semaphore, TryAcquireError}, + time::{self, Duration, Interval}, +}; + +/// Polling for server stop monitor interval in milliseconds. +const STOP_MONITOR_POLLING_INTERVAL: Duration = Duration::from_millis(1000); + +/// This is a flexible collection of futures that need to be driven to completion +/// alongside some other future, such as connection handlers that need to be +/// handled along with a listener for new connections. +/// +/// In order to `.await` on these futures and drive them to completion, call +/// `select_with` providing some other future, the result of which you need. +pub(crate) struct FutureDriver { + futures: Vec, + stop_monitor_heartbeat: Interval, +} + +impl Default for FutureDriver { + fn default() -> Self { + let mut heartbeat = time::interval(STOP_MONITOR_POLLING_INTERVAL); + + heartbeat.set_missed_tick_behavior(time::MissedTickBehavior::Skip); + + FutureDriver { futures: Vec::new(), stop_monitor_heartbeat: heartbeat } + } +} + +impl FutureDriver { + /// Add a new future to this driver + pub(crate) fn add(&mut self, future: F) { + self.futures.push(future); + } +} + +impl FutureDriver +where + F: Future + Unpin, +{ + pub(crate) async fn select_with(&mut self, selector: S) -> S::Output { + tokio::pin!(selector); + + DriverSelect { selector, driver: self }.await + } + + fn drive(&mut self, cx: &mut Context<'_>) { + let mut i = 0; + + while i < self.futures.len() { + if self.futures[i].poll_unpin(cx).is_ready() { + // Using `swap_remove` since we don't care about ordering + // but we do care about removing being `O(1)`. + // + // We don't increment `i` in this branch, since we now + // have a shorter length, and potentially a new value at + // current index + self.futures.swap_remove(i); + } else { + i += 1; + } + } + } + + fn poll_stop_monitor_heartbeat(&mut self, cx: &mut Context<'_>) { + // We don't care about the ticks of the heartbeat, it's here only + // to periodically wake the `Waker` on `cx`. + let _ = self.stop_monitor_heartbeat.poll_tick(cx); + } +} + +impl Future for FutureDriver +where + F: Future + Unpin, +{ + type Output = (); + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = Pin::into_inner(self); + + this.drive(cx); + + if this.futures.is_empty() { + Poll::Ready(()) + } else { + Poll::Pending + } + } +} + +/// This is a glorified select `Future` that will attempt to drive all +/// connection futures `F` to completion on each `poll`, while also +/// handling incoming connections. +struct DriverSelect<'a, S, F> { + selector: S, + driver: &'a mut FutureDriver, +} + +impl<'a, R, F> Future for DriverSelect<'a, R, F> +where + R: Future + Unpin, + F: Future + Unpin, +{ + type Output = R::Output; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = Pin::into_inner(self); + + this.driver.drive(cx); + this.driver.poll_stop_monitor_heartbeat(cx); + + this.selector.poll_unpin(cx) + } +} + +#[derive(Debug, Clone)] +pub(crate) struct StopHandle(watch::Receiver<()>); + +impl StopHandle { + pub(crate) fn new(rx: watch::Receiver<()>) -> Self { + Self(rx) + } + + pub(crate) fn shutdown_requested(&self) -> bool { + // if a message has been seen, it means that `stop` has been called. + self.0.has_changed().unwrap_or(true) + } + + pub(crate) async fn shutdown(&mut self) { + // Err(_) implies that the `sender` has been dropped. + // Ok(_) implies that `stop` has been called. + let _ = self.0.changed().await; + } +} + +/// Server handle. +/// +/// When all [`StopHandle`]'s have been `dropped` or `stop` has been called +/// the server will be stopped. +#[derive(Debug, Clone)] +pub(crate) struct ServerHandle(Arc>); + +impl ServerHandle { + /// Wait for the server to stop. + #[allow(unused)] + pub(crate) async fn stopped(self) { + self.0.closed().await + } +} + +/// Limits the number of connections. +pub(crate) struct ConnectionGuard(Arc); + +impl ConnectionGuard { + pub(crate) fn new(limit: usize) -> Self { + Self(Arc::new(Semaphore::new(limit))) + } + + pub(crate) fn try_acquire(&self) -> Option { + match self.0.clone().try_acquire_owned() { + Ok(guard) => Some(guard), + Err(TryAcquireError::Closed) => { + unreachable!("Semaphore::Close is never called and can't be closed; qed") + } + Err(TryAcquireError::NoPermits) => None, + } + } + + #[allow(unused)] + pub(crate) fn available_connections(&self) -> usize { + self.0.available_permits() + } +} diff --git a/crates/net/ipc/src/server/ipc.rs b/crates/net/ipc/src/server/ipc.rs new file mode 100644 index 0000000000..1e48e43592 --- /dev/null +++ b/crates/net/ipc/src/server/ipc.rs @@ -0,0 +1,291 @@ +//! IPC request handling adapted from [`jsonrpsee`] http request handling +use futures::{stream::FuturesOrdered, StreamExt}; +use jsonrpsee::{ + core::{ + server::{ + helpers::{prepare_error, BatchResponse, BatchResponseBuilder, MethodResponse}, + resource_limiting::Resources, + rpc_module::{MethodKind, Methods}, + }, + tracing::{rx_log_from_json, tx_log_from_str}, + JsonRawValue, + }, + server::{ + logger, + logger::{Logger, TransportProtocol}, + }, + types::{error::ErrorCode, ErrorObject, Id, InvalidRequest, Notification, Params, Request}, +}; +use std::sync::Arc; +use tokio::sync::OwnedSemaphorePermit; +use tokio_util::either::Either; +use tracing::instrument; + +type Notif<'a> = Notification<'a, Option<&'a JsonRawValue>>; + +#[derive(Debug, Clone)] +pub(crate) struct Batch<'a, L: Logger> { + data: Vec, + call: CallData<'a, L>, +} + +#[derive(Debug, Clone)] +pub(crate) struct CallData<'a, L: Logger> { + conn_id: usize, + logger: &'a L, + methods: &'a Methods, + max_response_body_size: u32, + max_log_length: u32, + resources: &'a Resources, + request_start: L::Instant, +} + +// Batch responses must be sent back as a single message so we read the results from each +// request in the batch and read the results off of a new channel, `rx_batch`, and then send the +// complete batch response back to the client over `tx`. +#[instrument(name = "batch", skip(b), level = "TRACE")] +pub(crate) async fn process_batch_request(b: Batch<'_, L>) -> BatchResponse +where + L: Logger, +{ + let Batch { data, call } = b; + + if let Ok(batch) = serde_json::from_slice::>(&data) { + let mut got_notif = false; + let mut batch_response = + BatchResponseBuilder::new_with_limit(call.max_response_body_size as usize); + + let mut pending_calls: FuturesOrdered<_> = batch + .into_iter() + .filter_map(|v| { + if let Ok(req) = serde_json::from_str::>(v.get()) { + Some(Either::Right(execute_call(req, call.clone()))) + } else if let Ok(_notif) = + serde_json::from_str::>(v.get()) + { + // notifications should not be answered. + got_notif = true; + None + } else { + // valid JSON but could be not parsable as `InvalidRequest` + let id = match serde_json::from_str::>(v.get()) { + Ok(err) => err.id, + Err(_) => Id::Null, + }; + + Some(Either::Left(async { + MethodResponse::error(id, ErrorObject::from(ErrorCode::InvalidRequest)) + })) + } + }) + .collect(); + + while let Some(response) = pending_calls.next().await { + if let Err(too_large) = batch_response.append(&response) { + return too_large + } + } + + if got_notif && batch_response.is_empty() { + BatchResponse { result: String::new(), success: true } + } else { + batch_response.finish() + } + } else { + BatchResponse::error(Id::Null, ErrorObject::from(ErrorCode::ParseError)) + } +} + +pub(crate) async fn process_single_request( + data: Vec, + call: CallData<'_, L>, +) -> MethodResponse { + if let Ok(req) = serde_json::from_slice::>(&data) { + execute_call_with_tracing(req, call).await + } else if let Ok(notif) = serde_json::from_slice::>(&data) { + execute_notification(notif, call.max_log_length) + } else { + let (id, code) = prepare_error(&data); + MethodResponse::error(id, ErrorObject::from(code)) + } +} + +#[instrument(name = "method_call", fields(method = req.method.as_ref()), skip(call, req), level = "TRACE")] +pub(crate) async fn execute_call_with_tracing<'a, L: Logger>( + req: Request<'a>, + call: CallData<'_, L>, +) -> MethodResponse { + execute_call(req, call).await +} + +pub(crate) async fn execute_call( + req: Request<'_>, + call: CallData<'_, L>, +) -> MethodResponse { + let CallData { + resources, + methods, + logger, + max_response_body_size, + max_log_length, + conn_id, + request_start, + } = call; + + rx_log_from_json(&req, call.max_log_length); + + let params = Params::new(req.params.map(|params| params.get())); + let name = &req.method; + let id = req.id; + + let response = match methods.method_with_name(name) { + None => { + logger.on_call( + name, + params.clone(), + logger::MethodKind::Unknown, + TransportProtocol::Http, + ); + MethodResponse::error(id, ErrorObject::from(ErrorCode::MethodNotFound)) + } + Some((name, method)) => match &method.inner() { + MethodKind::Sync(callback) => { + logger.on_call( + name, + params.clone(), + logger::MethodKind::MethodCall, + TransportProtocol::Http, + ); + + match method.claim(name, resources) { + Ok(guard) => { + let r = (callback)(id, params, max_response_body_size as usize); + drop(guard); + r + } + Err(err) => { + tracing::error!( + "[Methods::execute_with_resources] failed to lock resources: {}", + err + ); + MethodResponse::error(id, ErrorObject::from(ErrorCode::ServerIsBusy)) + } + } + } + MethodKind::Async(callback) => { + logger.on_call( + name, + params.clone(), + logger::MethodKind::MethodCall, + TransportProtocol::Http, + ); + match method.claim(name, resources) { + Ok(guard) => { + let id = id.into_owned(); + let params = params.into_owned(); + + (callback)( + id, + params, + conn_id, + max_response_body_size as usize, + Some(guard), + ) + .await + } + Err(err) => { + tracing::error!( + "[Methods::execute_with_resources] failed to lock resources: {}", + err + ); + MethodResponse::error(id, ErrorObject::from(ErrorCode::ServerIsBusy)) + } + } + } + MethodKind::Subscription(_) | MethodKind::Unsubscription(_) => { + logger.on_call( + name, + params.clone(), + logger::MethodKind::Unknown, + TransportProtocol::Http, + ); + tracing::error!("Subscriptions not supported on HTTP"); + MethodResponse::error(id, ErrorObject::from(ErrorCode::InternalError)) + } + }, + }; + + tx_log_from_str(&response.result, max_log_length); + logger.on_result(name, response.success, request_start, TransportProtocol::Http); + response +} + +#[instrument(name = "notification", fields(method = notif.method.as_ref()), skip(notif, max_log_length), level = "TRACE")] +fn execute_notification(notif: Notif<'_>, max_log_length: u32) -> MethodResponse { + rx_log_from_json(¬if, max_log_length); + let response = MethodResponse { result: String::new(), success: true }; + tx_log_from_str(&response.result, max_log_length); + response +} + +#[allow(unused)] +pub(crate) struct HandleRequest { + pub(crate) methods: Methods, + pub(crate) resources: Resources, + pub(crate) max_request_body_size: u32, + pub(crate) max_response_body_size: u32, + pub(crate) max_log_length: u32, + pub(crate) batch_requests_supported: bool, + pub(crate) logger: L, + pub(crate) conn: Arc, +} + +pub(crate) async fn handle_request(request: String, input: HandleRequest) -> String { + let HandleRequest { + methods, + resources, + max_response_body_size, + max_log_length, + logger, + conn, + .. + } = input; + + enum Kind { + Single, + Batch, + } + + let request_kind = request + .chars() + .find_map(|c| match c { + '{' => Some(Kind::Single), + '[' => Some(Kind::Batch), + _ => None, + }) + .unwrap_or(Kind::Single); + + let request_start = logger.on_request(TransportProtocol::Http); + + let call = CallData { + conn_id: 0, + logger: &logger, + methods: &methods, + max_response_body_size, + max_log_length, + resources: &resources, + request_start, + }; + // Single request or notification + let res = if matches!(request_kind, Kind::Single) { + let response = process_single_request(request.into_bytes(), call).await; + response.result + } else { + let response = process_batch_request(Batch { data: request.into_bytes(), call }).await; + response.result + }; + + drop(conn); + + res +} diff --git a/crates/net/ipc/src/server/mod.rs b/crates/net/ipc/src/server/mod.rs new file mode 100644 index 0000000000..699f654924 --- /dev/null +++ b/crates/net/ipc/src/server/mod.rs @@ -0,0 +1,558 @@ +//! JSON-RPC IPC server implementation + +use crate::server::{ + connection::{Incoming, IpcConn, JsonRpcStream}, + future::{ConnectionGuard, FutureDriver, StopHandle}, +}; +use futures::{FutureExt, SinkExt, Stream, StreamExt}; +use jsonrpsee::{ + core::{ + server::{resource_limiting::Resources, rpc_module::Methods}, + Error, TEN_MB_SIZE_BYTES, + }, + server::{logger::Logger, IdProvider, RandomIntegerIdProvider, ServerHandle}, +}; +use parity_tokio_ipc::Endpoint; +use std::{ + future::Future, + io, + pin::Pin, + sync::Arc, + task::{Context, Poll}, +}; +use tokio::{ + io::{AsyncRead, AsyncWrite}, + sync::{oneshot, watch, OwnedSemaphorePermit}, +}; +use tower::{layer::util::Identity, Service}; +use tracing::{trace, warn}; + +mod connection; +mod future; +mod ipc; + +/// Ipc Server implementation + +// This is an adapted `jsonrpsee` Server, but for `Ipc` connections. +pub struct IpcServer { + /// The endpoint we listen for incoming transactions + endpoint: Endpoint, + resources: Resources, + logger: L, + id_provider: Arc, + cfg: Settings, + service_builder: tower::ServiceBuilder, +} + +impl IpcServer { + /// Start responding to connections requests. + /// + /// This will run on the tokio runtime until the server is stopped or the ServerHandle is + /// dropped. + /// + /// ``` + /// use jsonrpsee::RpcModule; + /// use reth_ipc::server::Builder; + /// async fn run_server() -> Result<(), Box> { + /// let server = Builder::default().build("/tmp/my-uds")?; + /// let mut module = RpcModule::new(()); + /// module.register_method("say_hello", |_, _| Ok("lo"))?; + /// let handle = server.start(module).await?; + /// + /// // In this example we don't care about doing shutdown so let's it run forever. + /// // You may use the `ServerHandle` to shut it down or manage it yourself. + /// let server = tokio::spawn(handle.stopped()); + /// server.await.unwrap(); + /// Ok(()) + /// } + /// ``` + pub async fn start(mut self, methods: impl Into) -> Result { + let methods = methods.into().initialize_resources(&self.resources)?; + let (stop_tx, stop_rx) = watch::channel(()); + + let stop_handle = StopHandle::new(stop_rx); + + // use a signal channel to wait until we're ready to accept connections + let (tx, rx) = oneshot::channel(); + + match self.cfg.tokio_runtime.take() { + Some(rt) => rt.spawn(self.start_inner(methods, stop_handle, tx)), + None => tokio::spawn(self.start_inner(methods, stop_handle, tx)), + }; + rx.await.expect("channel is open").map_err(Error::Custom)?; + + Ok(ServerHandle::new(stop_tx)) + } + + #[allow(clippy::let_unit_value)] + async fn start_inner( + self, + methods: Methods, + stop_handle: StopHandle, + on_ready: oneshot::Sender>, + ) -> io::Result<()> { + trace!( endpoint=?self.endpoint.path(), "starting ipc server" ); + + if cfg!(unix) { + // ensure the file does not exist + if std::fs::remove_file(self.endpoint.path()).is_ok() { + warn!( endpoint=?self.endpoint.path(), "removed existing file"); + } + } + + let max_request_body_size = self.cfg.max_request_body_size; + let max_response_body_size = self.cfg.max_response_body_size; + let max_log_length = self.cfg.max_log_length; + let resources = self.resources; + let id_provider = self.id_provider; + let max_subscriptions_per_connection = self.cfg.max_subscriptions_per_connection; + let logger = self.logger; + + let mut id: u32 = 0; + let connection_guard = ConnectionGuard::new(self.cfg.max_connections as usize); + + let mut connections = FutureDriver::default(); + let incoming = match self.endpoint.incoming() { + Ok(connections) => Incoming::new(connections), + Err(err) => { + on_ready.send(Err(err.to_string())).ok(); + return Err(err) + } + }; + // signal that we're ready to accept connections + on_ready.send(Ok(())).ok(); + + let mut incoming = Monitored::new(incoming, &stop_handle); + + trace!("accepting ipc connections"); + loop { + match connections.select_with(&mut incoming).await { + Ok(ipc) => { + trace!("established new connection"); + let conn = match connection_guard.try_acquire() { + Some(conn) => conn, + None => { + warn!("Too many connections. Please try again later."); + connections.add(ipc.reject_connection().boxed()); + continue + } + }; + + let tower_service = TowerService { + inner: ServiceData { + methods: methods.clone(), + resources: resources.clone(), + max_request_body_size, + max_response_body_size, + max_log_length, + id_provider: id_provider.clone(), + stop_handle: stop_handle.clone(), + max_subscriptions_per_connection, + conn_id: id, + logger, + conn: Arc::new(conn), + }, + }; + + let service = self.service_builder.service(tower_service); + connections.add(Box::pin(spawn_connection(ipc, service, stop_handle.clone()))); + + id = id.wrapping_add(1); + } + Err(MonitoredError::Selector(err)) => { + tracing::error!("Error while awaiting a new connection: {:?}", err); + } + Err(MonitoredError::Shutdown) => break, + } + } + + connections.await; + Ok(()) + } +} + +impl std::fmt::Debug for IpcServer { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("IpcServer") + .field("endpoint", &self.endpoint.path()) + .field("cfg", &self.cfg) + .field("id_provider", &self.id_provider) + .field("resources", &self.resources) + .finish() + } +} + +/// Data required by the server to handle requests. +#[derive(Debug, Clone)] +#[allow(unused)] +pub(crate) struct ServiceData { + /// Registered server methods. + pub(crate) methods: Methods, + /// Tracker for currently used resources on the server. + pub(crate) resources: Resources, + /// Max request body size. + pub(crate) max_request_body_size: u32, + /// Max request body size. + pub(crate) max_response_body_size: u32, + /// Max length for logging for request and response + /// + /// Logs bigger than this limit will be truncated. + pub(crate) max_log_length: u32, + /// Subscription ID provider. + pub(crate) id_provider: Arc, + /// Stop handle. + pub(crate) stop_handle: StopHandle, + /// Max subscriptions per connection. + pub(crate) max_subscriptions_per_connection: u32, + /// Connection ID + pub(crate) conn_id: u32, + /// Logger. + pub(crate) logger: L, + /// Handle to hold a `connection permit`. + pub(crate) conn: Arc, +} + +/// JsonRPSee service compatible with `tower`. +/// +/// # Note +/// This is similar to [`hyper::service::service_fn`]. +#[derive(Debug)] +pub struct TowerService { + inner: ServiceData, +} + +impl Service for TowerService { + type Response = String; + + type Error = Box; + + type Future = Pin> + Send>>; + + /// Opens door for back pressure implementation. + fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, request: String) -> Self::Future { + trace!("{:?}", request); + + // handle the request + let data = ipc::HandleRequest { + methods: self.inner.methods.clone(), + resources: self.inner.resources.clone(), + max_request_body_size: self.inner.max_request_body_size, + max_response_body_size: self.inner.max_response_body_size, + max_log_length: self.inner.max_log_length, + batch_requests_supported: true, + logger: self.inner.logger.clone(), + conn: self.inner.conn.clone(), + }; + Box::pin(ipc::handle_request(request, data).map(Ok)) + } +} + +/// Spawns the connection in a new task +async fn spawn_connection( + conn: IpcConn>, + mut service: S, + mut stop_handle: StopHandle, +) where + S: Service + Send + 'static, + S::Error: Into>, + S::Future: Send, + T: AsyncRead + AsyncWrite + Unpin + Send + 'static, +{ + let task = tokio::task::spawn(async move { + tokio::pin!(conn); + + loop { + let request = tokio::select! { + res = conn.next() => { + match res { + Some(Ok(request)) => { + request + }, + Some(Err(e)) => { + tracing::warn!("Request failed: {:?}", e); + break + } + None => { + return + } + } + } + _ = stop_handle.shutdown() => { + break + } + }; + + // handle the RPC request + let resp = match service.call(request).await { + Ok(resp) => resp, + Err(err) => err.into().to_string(), + }; + + // send back + if let Err(err) = conn.send(resp).await { + warn!("Failed to send response: {:?}", err); + break + } + } + }); + + task.await.ok(); +} + +/// This is a glorified select listening for new messages, while also checking the `stop_receiver` +/// signal. +struct Monitored<'a, F> { + future: F, + stop_monitor: &'a StopHandle, +} + +impl<'a, F> Monitored<'a, F> { + fn new(future: F, stop_monitor: &'a StopHandle) -> Self { + Monitored { future, stop_monitor } + } +} + +enum MonitoredError { + Shutdown, + Selector(E), +} + +impl<'a, T, Item> Future for Monitored<'a, Incoming> +where + T: Stream> + Unpin + 'static, + Item: AsyncRead + AsyncWrite, +{ + type Output = Result>, MonitoredError>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.get_mut(); + + if this.stop_monitor.shutdown_requested() { + return Poll::Ready(Err(MonitoredError::Shutdown)) + } + + this.future.poll_accept(cx).map_err(MonitoredError::Selector) + } +} + +/// JSON-RPC IPC server settings. +#[derive(Debug, Clone)] +pub struct Settings { + /// Maximum size in bytes of a request. + max_request_body_size: u32, + /// Maximum size in bytes of a response. + max_response_body_size: u32, + /// Max length for logging for requests and responses + /// + /// Logs bigger than this limit will be truncated. + max_log_length: u32, + /// Maximum number of incoming connections allowed. + max_connections: u32, + /// Maximum number of subscriptions per connection. + max_subscriptions_per_connection: u32, + /// Custom tokio runtime to run the server on. + tokio_runtime: Option, +} + +impl Default for Settings { + fn default() -> Self { + Self { + max_request_body_size: TEN_MB_SIZE_BYTES, + max_response_body_size: TEN_MB_SIZE_BYTES, + max_log_length: 4096, + max_connections: 100, + max_subscriptions_per_connection: 1024, + tokio_runtime: None, + } + } +} + +/// Builder to configure and create a JSON-RPC server +#[derive(Debug)] +pub struct Builder { + settings: Settings, + resources: Resources, + logger: L, + id_provider: Arc, + service_builder: tower::ServiceBuilder, +} + +impl Default for Builder { + fn default() -> Self { + Builder { + settings: Settings::default(), + resources: Resources::default(), + logger: (), + id_provider: Arc::new(RandomIntegerIdProvider), + service_builder: tower::ServiceBuilder::new(), + } + } +} + +impl Builder { + /// Set the maximum size of a request body in bytes. Default is 10 MiB. + pub fn max_request_body_size(mut self, size: u32) -> Self { + self.settings.max_request_body_size = size; + self + } + + /// Set the maximum size of a response body in bytes. Default is 10 MiB. + pub fn max_response_body_size(mut self, size: u32) -> Self { + self.settings.max_response_body_size = size; + self + } + + /// Set the maximum size of a log + pub fn max_log_length(mut self, size: u32) -> Self { + self.settings.max_log_length = size; + self + } + + /// Set the maximum number of connections allowed. Default is 100. + pub fn max_connections(mut self, max: u32) -> Self { + self.settings.max_connections = max; + self + } + + /// Set the maximum number of connections allowed. Default is 1024. + pub fn max_subscriptions_per_connection(mut self, max: u32) -> Self { + self.settings.max_subscriptions_per_connection = max; + self + } + + /// Register a new resource kind. Errors if `label` is already registered, or if the number of + /// registered resources on this server instance would exceed 8. + /// + /// See the module documentation for + /// [`resurce_limiting`](../jsonrpsee_utils/server/resource_limiting/index.html# + /// resource-limiting) for details. + pub fn register_resource( + mut self, + label: &'static str, + capacity: u16, + default: u16, + ) -> Result { + self.resources.register(label, capacity, default)?; + Ok(self) + } + + /// Add a logger to the builder [`Logger`]. + pub fn set_logger(self, logger: T) -> Builder { + Builder { + settings: self.settings, + resources: self.resources, + logger, + id_provider: self.id_provider, + service_builder: self.service_builder, + } + } + + /// Configure a custom [`tokio::runtime::Handle`] to run the server on. + /// + /// Default: [`tokio::spawn`] + pub fn custom_tokio_runtime(mut self, rt: tokio::runtime::Handle) -> Self { + self.settings.tokio_runtime = Some(rt); + self + } + + /// Configure custom `subscription ID` provider for the server to use + /// to when getting new subscription calls. + /// + /// You may choose static dispatch or dynamic dispatch because + /// `IdProvider` is implemented for `Box`. + /// + /// Default: [`RandomIntegerIdProvider`]. + /// + /// # Examples + /// + /// ```rust + /// use jsonrpsee::server::RandomStringIdProvider; + /// use reth_ipc::server::Builder; + /// + /// // static dispatch + /// let builder1 = Builder::default().set_id_provider(RandomStringIdProvider::new(16)); + /// + /// // or dynamic dispatch + /// let builder2 = Builder::default().set_id_provider(Box::new(RandomStringIdProvider::new(16))); + /// ``` + pub fn set_id_provider(mut self, id_provider: I) -> Self { + self.id_provider = Arc::new(id_provider); + self + } + + /// Configure a custom [`tower::ServiceBuilder`] middleware for composing layers to be applied + /// to the RPC service. + /// + /// Default: No tower layers are applied to the RPC service. + /// + /// # Examples + /// + /// ```rust + /// + /// #[tokio::main] + /// async fn main() { + /// let builder = tower::ServiceBuilder::new(); + /// + /// let server = reth_ipc::server::Builder::default() + /// .set_middleware(builder) + /// .build("/tmp/my-uds") + /// .unwrap(); + /// } + /// ``` + pub fn set_middleware(self, service_builder: tower::ServiceBuilder) -> Builder { + Builder { + settings: self.settings, + resources: self.resources, + logger: self.logger, + id_provider: self.id_provider, + service_builder, + } + } + + /// Finalize the configuration of the server. Consumes the [`Builder`]. + pub fn build(self, endpoint: impl AsRef) -> Result, Error> { + let endpoint = Endpoint::new(endpoint.as_ref().to_string()); + self.build_with_endpoint(endpoint) + } + + /// Finalize the configuration of the server. Consumes the [`Builder`]. + pub fn build_with_endpoint(self, endpoint: Endpoint) -> Result, Error> { + Ok(IpcServer { + endpoint, + cfg: self.settings, + resources: self.resources, + logger: self.logger, + id_provider: self.id_provider, + service_builder: self.service_builder, + }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::client::IpcClientBuilder; + use jsonrpsee::{core::client::ClientT, rpc_params, RpcModule}; + use parity_tokio_ipc::dummy_endpoint; + use tracing_test::traced_test; + + #[tokio::test] + #[traced_test] + async fn test_rpc_request() { + let endpoint = dummy_endpoint(); + let server = Builder::default().build(&endpoint).unwrap(); + let mut module = RpcModule::new(()); + let msg = r#"{"jsonrpc":"2.0","id":83,"result":"0x7a69"}"#; + module.register_method("eth_chainId", move |_, _| Ok(msg)).unwrap(); + let handle = server.start(module).await.unwrap(); + tokio::spawn(handle.stopped()); + + let client = IpcClientBuilder::default().build(endpoint).await.unwrap(); + let response: String = client.request("eth_chainId", rpc_params![]).await.unwrap(); + assert_eq!(response, msg); + } +} diff --git a/crates/net/ipc/src/stream_codec.rs b/crates/net/ipc/src/stream_codec.rs new file mode 100644 index 0000000000..b163aa1319 --- /dev/null +++ b/crates/net/ipc/src/stream_codec.rs @@ -0,0 +1,307 @@ +// Copyright (c) 2015-2017 Parity Technologies Limited +// +// Permission is hereby granted, free of charge, to any +// person obtaining a copy of this software and associated +// documentation files (the "Software"), to deal in the +// Software without restriction, including without +// limitation the rights to use, copy, modify, merge, +// publish, distribute, sublicense, and/or sell copies of +// the Software, and to permit persons to whom the Software +// is furnished to do so, subject to the following +// conditions: +// +// The above copyright notice and this permission notice +// shall be included in all copies or substantial portions +// of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF +// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED +// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A +// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT +// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY +// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR +// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +// This basis of this file has been taken from the deprecated jsonrpc codebase: +// https://github.com/paritytech/jsonrpc + +use bytes::BytesMut; +use std::{io, str}; + +/// Separator for enveloping messages in streaming codecs +#[derive(Debug, Clone)] +pub enum Separator { + /// No envelope is expected between messages. Decoder will try to figure out + /// message boundaries by accumulating incoming bytes until valid JSON is formed. + /// Encoder will send messages without any boundaries between requests. + Empty, + /// Byte is used as a sentinel between messages + Byte(u8), +} + +impl Default for Separator { + fn default() -> Self { + Separator::Byte(b'\n') + } +} + +/// Stream codec for streaming protocols (ipc, tcp) +#[derive(Debug, Default)] +pub struct StreamCodec { + incoming_separator: Separator, + outgoing_separator: Separator, +} + +impl StreamCodec { + /// Default codec with streaming input data. Input can be both enveloped and not. + pub fn stream_incoming() -> Self { + StreamCodec::new(Separator::Empty, Default::default()) + } + + /// New custom stream codec + pub fn new(incoming_separator: Separator, outgoing_separator: Separator) -> Self { + StreamCodec { incoming_separator, outgoing_separator } + } +} + +#[inline] +fn is_whitespace(byte: u8) -> bool { + matches!(byte, 0x0D | 0x0A | 0x20 | 0x09) +} + +impl tokio_util::codec::Decoder for StreamCodec { + type Item = String; + type Error = io::Error; + + fn decode(&mut self, buf: &mut BytesMut) -> io::Result> { + if let Separator::Byte(separator) = self.incoming_separator { + if let Some(i) = buf.as_ref().iter().position(|&b| b == separator) { + let line = buf.split_to(i); + let _ = buf.split_to(1); + + match str::from_utf8(line.as_ref()) { + Ok(s) => Ok(Some(s.to_string())), + Err(_) => Err(io::Error::new(io::ErrorKind::Other, "invalid UTF-8")), + } + } else { + Ok(None) + } + } else { + let mut depth = 0; + let mut in_str = false; + let mut is_escaped = false; + let mut start_idx = 0; + let mut whitespaces = 0; + + for idx in 0..buf.as_ref().len() { + let byte = buf.as_ref()[idx]; + + if (byte == b'{' || byte == b'[') && !in_str { + if depth == 0 { + start_idx = idx; + } + depth += 1; + } else if (byte == b'}' || byte == b']') && !in_str { + depth -= 1; + } else if byte == b'"' && !is_escaped { + in_str = !in_str; + } else if is_whitespace(byte) { + whitespaces += 1; + } + if byte == b'\\' && !is_escaped && in_str { + is_escaped = true; + } else { + is_escaped = false; + } + + if depth == 0 && idx != start_idx && idx - start_idx + 1 > whitespaces { + let bts = buf.split_to(idx + 1); + return match String::from_utf8(bts.as_ref().to_vec()) { + Ok(val) => Ok(Some(val)), + Err(_) => Ok(None), + } + } + } + Ok(None) + } + } +} + +impl tokio_util::codec::Encoder for StreamCodec { + type Error = io::Error; + + fn encode(&mut self, msg: String, buf: &mut BytesMut) -> io::Result<()> { + let mut payload = msg.into_bytes(); + if let Separator::Byte(separator) = self.outgoing_separator { + payload.push(separator); + } + buf.extend_from_slice(&payload); + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use bytes::{BufMut, BytesMut}; + use tokio_util::codec::Decoder; + + #[test] + fn simple_encode() { + let mut buf = BytesMut::with_capacity(2048); + buf.put_slice(b"{ test: 1 }{ test: 2 }{ test: 3 }"); + + let mut codec = StreamCodec::stream_incoming(); + + let request = codec + .decode(&mut buf) + .expect("There should be no error in simple test") + .expect("There should be at least one request in simple test"); + + assert_eq!(request, "{ test: 1 }"); + } + + #[test] + fn escape() { + let mut buf = BytesMut::with_capacity(2048); + buf.put_slice(br#"{ test: "\"\\" }{ test: "\ " }{ test: "\}" }[ test: "\]" ]"#); + + let mut codec = StreamCodec::stream_incoming(); + + let request = codec + .decode(&mut buf) + .expect("There should be no error in first escape test") + .expect("There should be a request in first escape test"); + + assert_eq!(request, r#"{ test: "\"\\" }"#); + + let request2 = codec + .decode(&mut buf) + .expect("There should be no error in 2nd escape test") + .expect("There should be a request in 2nd escape test"); + assert_eq!(request2, r#"{ test: "\ " }"#); + + let request3 = codec + .decode(&mut buf) + .expect("There should be no error in 3rd escape test") + .expect("There should be a request in 3rd escape test"); + assert_eq!(request3, r#"{ test: "\}" }"#); + + let request4 = codec + .decode(&mut buf) + .expect("There should be no error in 4th escape test") + .expect("There should be a request in 4th escape test"); + assert_eq!(request4, r#"[ test: "\]" ]"#); + } + + #[test] + fn whitespace() { + let mut buf = BytesMut::with_capacity(2048); + buf.put_slice(b"{ test: 1 }\n\n\n\n{ test: 2 }\n\r{\n test: 3 } "); + + let mut codec = StreamCodec::stream_incoming(); + + let request = codec + .decode(&mut buf) + .expect("There should be no error in first whitespace test") + .expect("There should be a request in first whitespace test"); + + assert_eq!(request, "{ test: 1 }"); + + let request2 = codec + .decode(&mut buf) + .expect("There should be no error in first 2nd test") + .expect("There should be aa request in 2nd whitespace test"); + // TODO: maybe actually trim it out + assert_eq!(request2, "\n\n\n\n{ test: 2 }"); + + let request3 = codec + .decode(&mut buf) + .expect("There should be no error in first 3rd test") + .expect("There should be a request in 3rd whitespace test"); + assert_eq!(request3, "\n\r{\n test: 3 }"); + + let request4 = codec.decode(&mut buf).expect("There should be no error in first 4th test"); + assert!( + request4.is_none(), + "There should be no 4th request because it contains only whitespaces" + ); + } + + #[test] + fn fragmented_encode() { + let mut buf = BytesMut::with_capacity(2048); + buf.put_slice(b"{ test: 1 }{ test: 2 }{ tes"); + + let mut codec = StreamCodec::stream_incoming(); + + let request = codec + .decode(&mut buf) + .expect("There should be no error in first fragmented test") + .expect("There should be at least one request in first fragmented test"); + assert_eq!(request, "{ test: 1 }"); + codec + .decode(&mut buf) + .expect("There should be no error in second fragmented test") + .expect("There should be at least one request in second fragmented test"); + assert_eq!(String::from_utf8(buf.as_ref().to_vec()).unwrap(), "{ tes"); + + buf.put_slice(b"t: 3 }"); + let request = codec + .decode(&mut buf) + .expect("There should be no error in third fragmented test") + .expect("There should be at least one request in third fragmented test"); + assert_eq!(request, "{ test: 3 }"); + } + + #[test] + fn huge() { + let request = r#" + { + "jsonrpc":"2.0", + "method":"say_hello", + "params": [ + 42, + 0, + { + "from":"0xb60e8dd61c5d32be8058bb8eb970870f07233155", + "gas":"0x2dc6c0", + "data":"0x606060405260003411156010576002565b6001805433600160a060020a0319918216811790925560028054909116909117905561291f806100406000396000f3606060405236156100e55760e060020a600035046304029f2381146100ed5780630a1273621461015f57806317c1dd87146102335780631f9ea25d14610271578063266fa0e91461029357806349593f5314610429578063569aa0d8146104fc57806359a4669f14610673578063647a4d5f14610759578063656104f5146108095780636e9febfe1461082b57806370de8c6e1461090d57806371bde852146109ed5780638f30435d14610ab4578063916dbc1714610da35780639f5a7cd414610eef578063c91540f614610fe6578063eae99e1c146110b5578063fedc2a281461115a575b61122d610002565b61122d6004808035906020019082018035906020019191908080601f01602080910402602001604051908101604052809392919081815260200183838082843750949650509335935050604435915050606435600154600090600160a060020a03908116339091161461233357610002565b61122f6004808035906020019082018035906020019191908080601f016020809104026020016040519081016040528093929190818152602001838380828437509496505093359350506044359150506064355b60006000600060005086604051808280519060200190808383829060006004602084601f0104600f02600301f1509050019150509081526020016040518091039020600050905042816005016000508560ff1660028110156100025760040201835060010154604060020a90046001604060020a0316116115df576115d6565b6112416004355b604080516001604060020a038316408152606060020a33600160a060020a031602602082015290519081900360340190205b919050565b61122d600435600254600160a060020a0390811633909116146128e357610002565b61125e6004808035906020019082018035906020019191908080601f01602080910402602001604051908101604052809392919081815260200183838082843750949650509335935050505060006000600060006000600060005087604051808280519060200190808383829060006004602084601f0104600f02600301f1509050019150509081526020016040518091039020600050905080600001600050600087600160a060020a0316815260200190815260200160002060005060000160059054906101000a90046001604060020a03169450845080600001600050600087600160a060020a03168152602001908152602001600020600050600001600d9054906101000a90046001604060020a03169350835080600001600050600087600160a060020a0316815260200190815260200160002060005060000160009054906101000a900460ff169250825080600001600050600087600160a060020a0316815260200190815260200160002060005060000160019054906101000a900463ffffffff16915081505092959194509250565b61122d6004808035906020019082018035906020019191908080601f01602080910402602001604051908101604052809392919081815260200183838082843750949650509335935050604435915050606435608435600060006000600060005088604051808280519060200190808383829060006004602084601f0104600f02600301f15090500191505090815260200160405180910390206000509250346000141515611c0e5760405133600160a060020a0316908290349082818181858883f193505050501515611c1a57610002565b6112996004808035906020019082018035906020019191908080601f01602080910402602001604051908101604052809392919081815260200183838082843750949650509335935050604435915050600060006000600060006000600060006000508a604051808280519060200190808383829060006004602084601f0104600f02600301f15090500191505090815260200160405180910390206000509050806001016000508960ff16600281101561000257600160a060020a038a168452828101600101602052604084205463ffffffff1698506002811015610002576040842054606060020a90046001604060020a031697506002811015610002576040842054640100000000900463ffffffff169650600281101561000257604084206001015495506002811015610002576040842054604060020a900463ffffffff169450600281101561000257505060409091205495999498509296509094509260a060020a90046001604060020a0316919050565b61122d6004808035906020019082018035906020019191908080601f016020809104026020016040519081016040528093929190818152602001838380828437509496505050505050506000600060005082604051808280519060200190808383829060006004602084601f0104600f02600301f15090500191505090815260200160405180910390206000509050348160050160005082600d0160009054906101000a900460ff1660ff16600281101561000257600402830160070180546001608060020a0381169093016001608060020a03199390931692909217909155505b5050565b6112e26004808035906020019082018035906020019191908080601f01602080910003423423094734987103498712093847102938740192387401349857109487501938475" + } + ] + }"#; + + let mut buf = BytesMut::with_capacity(65536); + buf.put_slice(request.as_bytes()); + + let mut codec = StreamCodec::stream_incoming(); + + let parsed_request = codec + .decode(&mut buf) + .expect("There should be no error in huge test") + .expect("There should be at least one request huge test"); + assert_eq!(request, parsed_request); + } + + #[test] + fn simple_line_codec() { + let mut buf = BytesMut::with_capacity(2048); + buf.put_slice(b"{ test: 1 }\n{ test: 2 }\n{ test: 3 }"); + + let mut codec = StreamCodec::default(); + + let request = codec + .decode(&mut buf) + .expect("There should be no error in simple test") + .expect("There should be at least one request in simple test"); + let request2 = codec + .decode(&mut buf) + .expect("There should be no error in simple test") + .expect("There should be at least one request in simple test"); + + assert_eq!(request, "{ test: 1 }"); + assert_eq!(request2, "{ test: 2 }"); + } +} diff --git a/crates/net/rpc-api/Cargo.toml b/crates/net/rpc-api/Cargo.toml index 5e011b025f..76a0ca651f 100644 --- a/crates/net/rpc-api/Cargo.toml +++ b/crates/net/rpc-api/Cargo.toml @@ -15,7 +15,7 @@ reth-primitives = { path = "../../primitives" } reth-rpc-types = { path = "../rpc-types" } # misc -jsonrpsee = { version = "0.15", features = ["server", "macros"] } +jsonrpsee = { version = "0.16", features = ["server", "macros"] } serde_json = "1.0" [features] diff --git a/crates/net/rpc/Cargo.toml b/crates/net/rpc/Cargo.toml index d22a01d050..212ee4007f 100644 --- a/crates/net/rpc/Cargo.toml +++ b/crates/net/rpc/Cargo.toml @@ -17,7 +17,7 @@ reth-rpc-types = { path = "../rpc-types" } reth-transaction-pool = { path = "../../transaction-pool" } # rpc -jsonrpsee = { version = "0.15" } +jsonrpsee = { version = "0.16" } # misc async-trait = "0.1"