diff --git a/Cargo.lock b/Cargo.lock index 15f461430..2238b65b7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1950,6 +1950,7 @@ dependencies = [ "futures-rustls", "halo2_gadgets", "halo2_proofs", + "httparse", "lazy_static", "libc", "log", diff --git a/Cargo.toml b/Cargo.toml index ad980f694..0e482ca30 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -86,6 +86,7 @@ x509-parser = {version = "0.16.0", features = ["validate", "verify"], optional = bs58 = {version = "0.5.1", optional = true} serde = {version = "1.0.210", features = ["derive"], optional = true} tinyjson = {version = "2.5.1", optional = true} +httparse = {version = "1.9.5", optional = true} semver = {version = "1.0.23", optional = true} structopt = {version= "0.3.26", optional = true} structopt-toml = {version= "0.5.1", optional = true} @@ -233,6 +234,7 @@ net = [ rpc = [ "async-trait", + "httparse", "net", ] diff --git a/src/rpc/client.rs b/src/rpc/client.rs index 92c9f1962..5ee2c05b6 100644 --- a/src/rpc/client.rs +++ b/src/rpc/client.rs @@ -24,7 +24,10 @@ use tinyjson::JsonValue; use url::Url; use super::{ - common::{read_from_stream, write_to_stream, INIT_BUF_SIZE, READ_TIMEOUT}, + common::{ + http_read_from_stream_response, http_write_to_stream, read_from_stream, write_to_stream, + INIT_BUF_SIZE, READ_TIMEOUT, + }, jsonrpc::*, }; use crate::{ @@ -56,9 +59,18 @@ impl RpcClient { let (rep_send, rep_recv) = channel::unbounded(); let (req_skip_send, req_skip_recv) = channel::unbounded(); + // Figure out if we're using HTTP and rewrite the URL accordingly. + let mut dialer_url = endpoint.clone(); + if endpoint.scheme().starts_with("http+") { + let scheme = endpoint.scheme().strip_prefix("http+").unwrap(); + let url_str = endpoint.as_str().replace(endpoint.scheme(), scheme); + dialer_url = url_str.parse()?; + } + let use_http = endpoint.scheme().starts_with("http+"); + // Instantiate Dialer and dial the server // TODO: Could add a timeout here - let dialer = Dialer::new(endpoint, None).await?; + let dialer = Dialer::new(dialer_url, None).await?; let stream = dialer.dial(None).await?; // Create the StoppableTask running the request-reply loop. @@ -66,7 +78,7 @@ impl RpcClient { // using `RpcClient::stop()`. let task = StoppableTask::new(); task.clone().start( - Self::reqrep_loop(stream, rep_send, req_recv, req_skip_recv), + Self::reqrep_loop(use_http, stream, rep_send, req_recv, req_skip_recv), |res| async move { match res { Ok(()) | Err(Error::RpcClientStopped) => {} @@ -89,6 +101,7 @@ impl RpcClient { /// Internal function that loops on a given stream and multiplexes the data async fn reqrep_loop( + use_http: bool, stream: Box, rep_send: channel::Sender, req_recv: channel::Receiver<(JsonRequest, bool)>, @@ -111,7 +124,11 @@ impl RpcClient { with_timeout = timeout; let request = JsonResult::Request(request); - write_to_stream(&mut writer, &request).await?; + if use_http { + http_write_to_stream(&mut writer, &request).await?; + } else { + write_to_stream(&mut writer, &request).await?; + } Ok::<(), crate::Error>(()) }, async { @@ -122,9 +139,23 @@ impl RpcClient { .await?; if with_timeout { - let _ = io_timeout(READ_TIMEOUT, read_from_stream(&mut reader, &mut buf)).await?; + if use_http { + let _ = io_timeout( + READ_TIMEOUT, + http_read_from_stream_response(&mut reader, &mut buf), + ) + .await?; + } else { + let _ = + io_timeout(READ_TIMEOUT, read_from_stream(&mut reader, &mut buf)).await?; + } } else { - let _ = read_from_stream(&mut reader, &mut buf).await?; + #[allow(clippy::collapsible_else_if)] + if use_http { + let _ = http_read_from_stream_response(&mut reader, &mut buf).await?; + } else { + let _ = read_from_stream(&mut reader, &mut buf).await?; + } } let val: JsonValue = String::from_utf8(buf)?.parse()?; @@ -280,9 +311,18 @@ impl RpcChadClient { let (req_send, req_recv) = channel::unbounded(); let (rep_send, rep_recv) = channel::unbounded(); + // Figure out if we're using HTTP and rewrite the URL accordingly. + let mut dialer_url = endpoint.clone(); + if endpoint.scheme().starts_with("http+") { + let scheme = endpoint.scheme().strip_prefix("http+").unwrap(); + let url_str = endpoint.as_str().replace(endpoint.scheme(), scheme); + dialer_url = url_str.parse()?; + } + let use_http = endpoint.scheme().starts_with("http+"); + // Instantiate Dialer and dial the server // TODO: Could add a timeout here - let dialer = Dialer::new(endpoint, None).await?; + let dialer = Dialer::new(dialer_url, None).await?; let stream = dialer.dial(None).await?; // Create the StoppableTask running the request-reply loop. @@ -290,7 +330,7 @@ impl RpcChadClient { // using `RpcChadClient::stop()`. let task = StoppableTask::new(); task.clone().start( - Self::reqrep_loop(stream, rep_send, req_recv), + Self::reqrep_loop(use_http, stream, rep_send, req_recv), |res| async move { match res { Ok(()) | Err(Error::RpcClientStopped) => {} @@ -313,6 +353,7 @@ impl RpcChadClient { /// Internal function that loops on a given stream and multiplexes the data async fn reqrep_loop( + use_http: bool, stream: Box, rep_send: channel::Sender, req_recv: channel::Receiver, @@ -330,11 +371,19 @@ impl RpcChadClient { async { let request = req_recv.recv().await?; let request = JsonResult::Request(request); - write_to_stream(&mut writer, &request).await?; + if use_http { + http_write_to_stream(&mut writer, &request).await?; + } else { + write_to_stream(&mut writer, &request).await?; + } Ok::<(), crate::Error>(()) }, async { - let _ = read_from_stream(&mut reader, &mut buf).await?; + if use_http { + let _ = http_read_from_stream_response(&mut reader, &mut buf).await?; + } else { + let _ = read_from_stream(&mut reader, &mut buf).await?; + } let val: JsonValue = String::from_utf8(buf)?.parse()?; let rep = JsonResult::try_from_value(&val)?; rep_send.send(rep).await?; diff --git a/src/rpc/common.rs b/src/rpc/common.rs index e8d36bcb7..e8a8b84a3 100644 --- a/src/rpc/common.rs +++ b/src/rpc/common.rs @@ -18,15 +18,156 @@ use std::{io, time::Duration}; +use log::error; use smol::io::{AsyncReadExt, AsyncWriteExt, BufReader, ReadHalf, WriteHalf}; use super::jsonrpc::*; use crate::net::transport::PtStream; pub(super) const INIT_BUF_SIZE: usize = 4096; // 4K -pub(super) const MAX_BUF_SIZE: usize = 1024 * 8192; // 8M +pub(super) const MAX_BUF_SIZE: usize = 1024 * 1024 * 16; // 16M pub(super) const READ_TIMEOUT: Duration = Duration::from_secs(30); +/// Internal read function that reads from the active stream into a buffer. +/// Performs HTTP POST request parsing. Returns the request body length. +pub(super) async fn http_read_from_stream_request( + reader: &mut BufReader>>, + buf: &mut Vec, +) -> io::Result { + let mut total_read = 0; + + // Intermediate buffer we use to read byte-by-byte. + let mut tmpbuf = [0_u8]; + + while total_read < MAX_BUF_SIZE { + buf.resize(total_read + INIT_BUF_SIZE, 0u8); + + match reader.read(&mut tmpbuf).await { + Ok(0) if total_read == 0 => return Err(io::ErrorKind::ConnectionAborted.into()), + Ok(0) => break, // Finished reading + Ok(_) => { + // Copy the read byte to the destination buffer. + buf[total_read] = tmpbuf[0]; + total_read += 1; + + // In HTTP, when we reach '\r\n\r\n' we know we've read the headers. + // The rest is the body. Headers should contain Content-Length which + // tells us the remaining amount of bytes to read. + if total_read > 4 && buf[total_read - 4..total_read] == [b'\r', b'\n', b'\r', b'\n'] + { + break + } + } + + Err(e) => return Err(e), + } + } + + // Here we parse the HTTP for correctness and find Content-Length + let mut headers = [httparse::EMPTY_HEADER; 8]; + let mut req = httparse::Request::new(&mut headers); + let _body_offset = match req.parse(buf) { + Ok(v) => v.unwrap(), // TODO: This should check httparse::Status::is_partial() + Err(e) => { + error!("[RPC] Failed parsing HTTP request: {}", e); + return Err(io::ErrorKind::InvalidData.into()) + } + }; + + let mut content_length: usize = 0; + for header in headers { + if header.name.to_lowercase() == "content-length" { + let s = String::from_utf8_lossy(header.value); + content_length = match s.parse() { + Ok(v) => v, + Err(_) => return Err(io::ErrorKind::InvalidData.into()), + }; + } + } + + if content_length == 0 || content_length > MAX_BUF_SIZE { + return Err(io::ErrorKind::InvalidData.into()) + } + + // Now we know the request body size. Read it into the buffer. + buf.clear(); + buf.resize(content_length, 0_u8); + reader.read(buf).await?; + + assert!(buf.len() == content_length); + Ok(content_length) +} + +/// Internal read function that reads from the active stream into a buffer. +/// Performs HTTP POST response parsing. Returns the response body length. +pub(super) async fn http_read_from_stream_response( + reader: &mut BufReader>>, + buf: &mut Vec, +) -> io::Result { + let mut total_read = 0; + + // Intermediate buffer we use to read byte-by-byte. + let mut tmpbuf = [0_u8]; + + while total_read < MAX_BUF_SIZE { + buf.resize(total_read + INIT_BUF_SIZE, 0u8); + + match reader.read(&mut tmpbuf).await { + Ok(0) if total_read == 0 => return Err(io::ErrorKind::ConnectionAborted.into()), + Ok(0) => break, // Finished reading + Ok(_) => { + // Copy the read byte to the destination buffer. + buf[total_read] = tmpbuf[0]; + total_read += 1; + + // In HTTP, when we reach '\r\n\r\n' we know we've read the headers. + // The rest is the body. Headers should contain Content-Length which + // tells us the remaining amount of bytes to read. + if total_read > 4 && buf[total_read - 4..total_read] == [b'\r', b'\n', b'\r', b'\n'] + { + break + } + } + + Err(e) => return Err(e), + } + } + + // Here we parse the HTTP for correctness and find Content-Length + let mut headers = [httparse::EMPTY_HEADER; 8]; + let mut resp = httparse::Response::new(&mut headers); + let _body_offset = match resp.parse(buf) { + Ok(v) => v.unwrap(), // TODO: This should check httparse::Status::is_partial() + Err(e) => { + error!("[RPC] Failed parsing HTTP response: {}", e); + return Err(io::ErrorKind::InvalidData.into()) + } + }; + + let mut content_length: usize = 0; + for header in headers { + if header.name.to_lowercase() == "content-length" { + let s = String::from_utf8_lossy(header.value); + content_length = match s.parse() { + Ok(v) => v, + Err(_) => return Err(io::ErrorKind::InvalidData.into()), + }; + } + } + + if content_length == 0 || content_length > MAX_BUF_SIZE { + return Err(io::ErrorKind::InvalidData.into()) + } + + // Now we know the response body size. Read it into the buffer. + buf.clear(); + buf.resize(content_length, 0_u8); + reader.read(buf).await?; + + assert!(buf.len() == content_length); + Ok(content_length) +} + /// Internal read function that reads from the active stream into a buffer. /// Reading stops upon reaching CRLF or LF, or when `MAX_BUF_SIZE` is reached. pub(super) async fn read_from_stream( @@ -39,7 +180,7 @@ pub(super) async fn read_from_stream( let mut tmpbuf = [0_u8]; while total_read < MAX_BUF_SIZE { - buf.resize(total_read + INIT_BUF_SIZE, 0); + buf.resize(total_read + INIT_BUF_SIZE, 0u8); match reader.read(&mut tmpbuf).await { Ok(0) if total_read == 0 => return Err(io::ErrorKind::ConnectionAborted.into()), @@ -68,6 +209,29 @@ pub(super) async fn read_from_stream( Ok(total_read) } +/// Internal write function that writes a JSON-RPC object to the active stream. +/// Sent as an HTTP response. +pub(super) async fn http_write_to_stream( + writer: &mut WriteHalf>, + object: &JsonResult, +) -> io::Result<()> { + let (status_line, object_str) = match object { + JsonResult::Notification(v) => ("HTTP/1.1 200 OK", v.stringify().unwrap()), + JsonResult::Response(v) => ("HTTP/1.1 200 OK", v.stringify().unwrap()), + JsonResult::Error(v) => ("HTTP/1.1 400 Bad Request", v.stringify().unwrap()), + JsonResult::Request(v) => ("POST /json_rpc HTTP/1.1", v.stringify().unwrap()), + _ => unreachable!(), + }; + + let length = object_str.len(); + let data = format!("{status_line}\r\nContent-Length: {length}\r\nContent-Type: application/json\r\n\r\n{object_str}"); + + writer.write_all(data.as_bytes()).await?; + writer.flush().await?; + + Ok(()) +} + /// Internal write function that writes a JSON-RPC object to the active stream. pub(super) async fn write_to_stream( writer: &mut WriteHalf>, diff --git a/src/rpc/server.rs b/src/rpc/server.rs index 045220d7f..63736f1eb 100644 --- a/src/rpc/server.rs +++ b/src/rpc/server.rs @@ -28,7 +28,10 @@ use tinyjson::JsonValue; use url::Url; use super::{ - common::{read_from_stream, write_to_stream, INIT_BUF_SIZE}, + common::{ + http_read_from_stream_request, http_write_to_stream, read_from_stream, write_to_stream, + INIT_BUF_SIZE, + }, jsonrpc::*, }; use crate::{ @@ -80,6 +83,7 @@ async fn handle_request( rh: Arc, ex: Arc>, tasks: Arc>>>, + use_http: bool, req: JsonRequest, ) -> Result<()> { let rep = rh.handle_request(req).await; @@ -107,10 +111,20 @@ async fn handle_request( let notification = JsonResult::Notification(notification); let mut writer_lock = writer_.lock().await; - if let Err(e) = write_to_stream(&mut writer_lock, ¬ification).await { - subscription.unsubscribe().await; - return Err(e.into()) + + #[allow(clippy::collapsible_else_if)] + if use_http { + if let Err(e) = http_write_to_stream(&mut writer_lock, ¬ification).await { + subscription.unsubscribe().await; + return Err(e.into()) + } + } else { + if let Err(e) = write_to_stream(&mut writer_lock, ¬ification).await { + subscription.unsubscribe().await; + return Err(e.into()) + } } + drop(writer_lock); } }, @@ -133,7 +147,11 @@ async fn handle_request( // Write the response debug!(target: "rpc::server", "{} <-- {}", addr, reply.stringify()?); let mut writer_lock = writer.lock().await; - write_to_stream(&mut writer_lock, &reply.into()).await?; + if use_http { + http_write_to_stream(&mut writer_lock, &reply.into()).await?; + } else { + write_to_stream(&mut writer_lock, &reply.into()).await?; + } drop(writer_lock); let task = StoppableTask::new(); @@ -157,10 +175,19 @@ async fn handle_request( let notification = JsonResult::Notification(notification); let mut writer_lock = writer_.lock().await; - if let Err(e) = write_to_stream(&mut writer_lock, ¬ification).await { - subscription.unsubscribe().await; - drop(writer_lock); - return Err(e.into()) + #[allow(clippy::collapsible_else_if)] + if use_http { + if let Err(e) = http_write_to_stream(&mut writer_lock, ¬ification).await { + subscription.unsubscribe().await; + drop(writer_lock); + return Err(e.into()) + } + } else { + if let Err(e) = write_to_stream(&mut writer_lock, ¬ification).await { + subscription.unsubscribe().await; + drop(writer_lock); + return Err(e.into()) + } } drop(writer_lock); } @@ -187,14 +214,22 @@ async fn handle_request( JsonResult::Response(ref v) => { debug!(target: "rpc::server", "{} <-- {}", addr, v.stringify()?); let mut writer_lock = writer.lock().await; - write_to_stream(&mut writer_lock, &rep).await?; + if use_http { + http_write_to_stream(&mut writer_lock, &rep).await?; + } else { + write_to_stream(&mut writer_lock, &rep).await?; + } drop(writer_lock); } JsonResult::Error(ref v) => { debug!(target: "rpc::server", "{} <-- {}", addr, v.stringify()?); let mut writer_lock = writer.lock().await; - write_to_stream(&mut writer_lock, &rep).await?; + if use_http { + http_write_to_stream(&mut writer_lock, &rep).await?; + } else { + write_to_stream(&mut writer_lock, &rep).await?; + } drop(writer_lock); } } @@ -211,6 +246,7 @@ pub async fn accept( addr: Url, rh: Arc, conn_limit: Option, + use_http: bool, ex: Arc>, ) -> Result<()> { // If there's a connection limit set, we will refuse connections @@ -232,7 +268,11 @@ pub async fn accept( let mut buf = Vec::with_capacity(INIT_BUF_SIZE); let mut reader_lock = reader.lock().await; - let _ = read_from_stream(&mut reader_lock, &mut buf).await?; + if use_http { + let _ = http_read_from_stream_request(&mut reader_lock, &mut buf).await?; + } else { + let _ = read_from_stream(&mut reader_lock, &mut buf).await?; + } drop(reader_lock); let line = match String::from_utf8(buf) { @@ -287,6 +327,7 @@ pub async fn accept( rh.clone(), ex.clone(), tasks.clone(), + use_http, req, ), move |_| async move { @@ -311,6 +352,7 @@ async fn run_accept_loop( listener: Box, rh: Arc, conn_limit: Option, + use_http: bool, ex: Arc>, ) -> Result<()> { loop { @@ -327,7 +369,7 @@ async fn run_accept_loop( let task_ = task.clone(); let ex_ = ex.clone(); task.clone().start( - accept(reader, writer, url.clone(), rh.clone(), conn_limit, ex_), + accept(reader, writer, url.clone(), rh.clone(), conn_limit, use_http, ex_), |_| async move { info!(target: "rpc::server", "[RPC] Closed conn from {}", url); rh_.clone().unmark_connection(task_.clone()).await; @@ -374,16 +416,29 @@ async fn run_accept_loop( } } -/// Start a JSON-RPC server bound to the given accept URL and use the +/// Start a JSON-RPC server bound to the givven accept URL and use the /// given [`RequestHandler`] to handle incoming requests. +/// +/// The supported network schemes can be prefixed with `http+` to serve +/// JSON-RPC over HTTP/1.1. pub async fn listen_and_serve( accept_url: Url, rh: Arc, conn_limit: Option, ex: Arc>, ) -> Result<()> { - let listener = Listener::new(accept_url, None).await?.listen().await?; - run_accept_loop(listener, rh, conn_limit, ex.clone()).await + // Figure out if we're using HTTP and rewrite the URL accordingly. + let mut listen_url = accept_url.clone(); + if accept_url.scheme().starts_with("http+") { + let scheme = accept_url.scheme().strip_prefix("http+").unwrap(); + let url_str = accept_url.as_str().replace(accept_url.scheme(), scheme); + listen_url = url_str.parse()?; + } + + let listener = Listener::new(listen_url, None).await?.listen().await?; + + let use_http = accept_url.scheme().starts_with("http+"); + run_accept_loop(listener, rh, conn_limit, use_http, ex.clone()).await } #[cfg(test)] diff --git a/tests/jsonrpc.rs b/tests/jsonrpc.rs index b11f2bd46..ae1c7df3c 100644 --- a/tests/jsonrpc.rs +++ b/tests/jsonrpc.rs @@ -71,8 +71,29 @@ impl RequestHandler for RpcSrv { } } +/// Initialize the logging mechanism +fn init_logger() { + let mut cfg = simplelog::ConfigBuilder::new(); + + // We check this error so we can execute same file tests in parallel, + // otherwise second one fails to init logger here. + if simplelog::TermLogger::init( + //simplelog::LevelFilter::Info, + simplelog::LevelFilter::Debug, + //simplelog::LevelFilter::Trace, + cfg.build(), + simplelog::TerminalMode::Mixed, + simplelog::ColorChoice::Auto, + ) + .is_err() + { + log::debug!("Logger initialized"); + } +} + #[test] fn jsonrpc_reqrep() -> Result<()> { + init_logger(); let executor = Arc::new(Executor::new()); smol::block_on(executor.run(async { @@ -119,3 +140,53 @@ fn jsonrpc_reqrep() -> Result<()> { Ok(()) })) } + +#[test] +fn http_jsonrpc_reqrep() -> Result<()> { + init_logger(); + let executor = Arc::new(Executor::new()); + + smol::block_on(executor.run(async { + // Find an available port + let listener = TcpListener::bind("127.0.0.1:0").await?; + let sockaddr = listener.local_addr()?; + let endpoint = Url::parse(&format!("http+tcp://127.0.0.1:{}", sockaddr.port()))?; + drop(listener); + + let rpcsrv = Arc::new(RpcSrv { + stop_sub: smol::channel::unbounded(), + rpc_connections: Mutex::new(HashSet::new()), + }); + let rpcsrv_ = Arc::clone(&rpcsrv); + + let rpc_task = StoppableTask::new(); + rpc_task.clone().start( + listen_and_serve(endpoint.clone(), rpcsrv.clone(), None, executor.clone()), + |res| async move { + match res { + Ok(()) | Err(Error::RpcServerStopped) => rpcsrv_.stop_connections().await, + Err(e) => eprintln!("Failed starting JSON-RPC server: {}", e), + } + }, + Error::RpcServerStopped, + executor.clone(), + ); + + msleep(500).await; + + let client = RpcClient::new(endpoint, executor.clone()).await?; + let req = JsonRequest::new("ping", vec![].into()); + let rep = client.request(req).await?; + + let rep = String::try_from(rep).unwrap(); + assert_eq!(&rep, "pong"); + + let req = JsonRequest::new("kill", vec![].into()); + let rep = client.request(req).await?; + + let rep = String::try_from(rep).unwrap(); + assert_eq!(&rep, "bye"); + + Ok(()) + })) +}