rpc: Implement HTTP-based JSON-RPC server and client

This commit is contained in:
parazyd
2024-11-27 14:44:18 +01:00
parent 4fbf5fc8e1
commit 61b9f538b8
6 changed files with 370 additions and 28 deletions

1
Cargo.lock generated
View File

@@ -1950,6 +1950,7 @@ dependencies = [
"futures-rustls",
"halo2_gadgets",
"halo2_proofs",
"httparse",
"lazy_static",
"libc",
"log",

View File

@@ -86,6 +86,7 @@ x509-parser = {version = "0.16.0", features = ["validate", "verify"], optional =
bs58 = {version = "0.5.1", optional = true}
serde = {version = "1.0.210", features = ["derive"], optional = true}
tinyjson = {version = "2.5.1", optional = true}
httparse = {version = "1.9.5", optional = true}
semver = {version = "1.0.23", optional = true}
structopt = {version= "0.3.26", optional = true}
structopt-toml = {version= "0.5.1", optional = true}
@@ -233,6 +234,7 @@ net = [
rpc = [
"async-trait",
"httparse",
"net",
]

View File

@@ -24,7 +24,10 @@ use tinyjson::JsonValue;
use url::Url;
use super::{
common::{read_from_stream, write_to_stream, INIT_BUF_SIZE, READ_TIMEOUT},
common::{
http_read_from_stream_response, http_write_to_stream, read_from_stream, write_to_stream,
INIT_BUF_SIZE, READ_TIMEOUT,
},
jsonrpc::*,
};
use crate::{
@@ -56,9 +59,18 @@ impl RpcClient {
let (rep_send, rep_recv) = channel::unbounded();
let (req_skip_send, req_skip_recv) = channel::unbounded();
// Figure out if we're using HTTP and rewrite the URL accordingly.
let mut dialer_url = endpoint.clone();
if endpoint.scheme().starts_with("http+") {
let scheme = endpoint.scheme().strip_prefix("http+").unwrap();
let url_str = endpoint.as_str().replace(endpoint.scheme(), scheme);
dialer_url = url_str.parse()?;
}
let use_http = endpoint.scheme().starts_with("http+");
// Instantiate Dialer and dial the server
// TODO: Could add a timeout here
let dialer = Dialer::new(endpoint, None).await?;
let dialer = Dialer::new(dialer_url, None).await?;
let stream = dialer.dial(None).await?;
// Create the StoppableTask running the request-reply loop.
@@ -66,7 +78,7 @@ impl RpcClient {
// using `RpcClient::stop()`.
let task = StoppableTask::new();
task.clone().start(
Self::reqrep_loop(stream, rep_send, req_recv, req_skip_recv),
Self::reqrep_loop(use_http, stream, rep_send, req_recv, req_skip_recv),
|res| async move {
match res {
Ok(()) | Err(Error::RpcClientStopped) => {}
@@ -89,6 +101,7 @@ impl RpcClient {
/// Internal function that loops on a given stream and multiplexes the data
async fn reqrep_loop(
use_http: bool,
stream: Box<dyn PtStream>,
rep_send: channel::Sender<JsonResult>,
req_recv: channel::Receiver<(JsonRequest, bool)>,
@@ -111,7 +124,11 @@ impl RpcClient {
with_timeout = timeout;
let request = JsonResult::Request(request);
write_to_stream(&mut writer, &request).await?;
if use_http {
http_write_to_stream(&mut writer, &request).await?;
} else {
write_to_stream(&mut writer, &request).await?;
}
Ok::<(), crate::Error>(())
},
async {
@@ -122,9 +139,23 @@ impl RpcClient {
.await?;
if with_timeout {
let _ = io_timeout(READ_TIMEOUT, read_from_stream(&mut reader, &mut buf)).await?;
if use_http {
let _ = io_timeout(
READ_TIMEOUT,
http_read_from_stream_response(&mut reader, &mut buf),
)
.await?;
} else {
let _ =
io_timeout(READ_TIMEOUT, read_from_stream(&mut reader, &mut buf)).await?;
}
} else {
let _ = read_from_stream(&mut reader, &mut buf).await?;
#[allow(clippy::collapsible_else_if)]
if use_http {
let _ = http_read_from_stream_response(&mut reader, &mut buf).await?;
} else {
let _ = read_from_stream(&mut reader, &mut buf).await?;
}
}
let val: JsonValue = String::from_utf8(buf)?.parse()?;
@@ -280,9 +311,18 @@ impl RpcChadClient {
let (req_send, req_recv) = channel::unbounded();
let (rep_send, rep_recv) = channel::unbounded();
// Figure out if we're using HTTP and rewrite the URL accordingly.
let mut dialer_url = endpoint.clone();
if endpoint.scheme().starts_with("http+") {
let scheme = endpoint.scheme().strip_prefix("http+").unwrap();
let url_str = endpoint.as_str().replace(endpoint.scheme(), scheme);
dialer_url = url_str.parse()?;
}
let use_http = endpoint.scheme().starts_with("http+");
// Instantiate Dialer and dial the server
// TODO: Could add a timeout here
let dialer = Dialer::new(endpoint, None).await?;
let dialer = Dialer::new(dialer_url, None).await?;
let stream = dialer.dial(None).await?;
// Create the StoppableTask running the request-reply loop.
@@ -290,7 +330,7 @@ impl RpcChadClient {
// using `RpcChadClient::stop()`.
let task = StoppableTask::new();
task.clone().start(
Self::reqrep_loop(stream, rep_send, req_recv),
Self::reqrep_loop(use_http, stream, rep_send, req_recv),
|res| async move {
match res {
Ok(()) | Err(Error::RpcClientStopped) => {}
@@ -313,6 +353,7 @@ impl RpcChadClient {
/// Internal function that loops on a given stream and multiplexes the data
async fn reqrep_loop(
use_http: bool,
stream: Box<dyn PtStream>,
rep_send: channel::Sender<JsonResult>,
req_recv: channel::Receiver<JsonRequest>,
@@ -330,11 +371,19 @@ impl RpcChadClient {
async {
let request = req_recv.recv().await?;
let request = JsonResult::Request(request);
write_to_stream(&mut writer, &request).await?;
if use_http {
http_write_to_stream(&mut writer, &request).await?;
} else {
write_to_stream(&mut writer, &request).await?;
}
Ok::<(), crate::Error>(())
},
async {
let _ = read_from_stream(&mut reader, &mut buf).await?;
if use_http {
let _ = http_read_from_stream_response(&mut reader, &mut buf).await?;
} else {
let _ = read_from_stream(&mut reader, &mut buf).await?;
}
let val: JsonValue = String::from_utf8(buf)?.parse()?;
let rep = JsonResult::try_from_value(&val)?;
rep_send.send(rep).await?;

View File

@@ -18,15 +18,156 @@
use std::{io, time::Duration};
use log::error;
use smol::io::{AsyncReadExt, AsyncWriteExt, BufReader, ReadHalf, WriteHalf};
use super::jsonrpc::*;
use crate::net::transport::PtStream;
pub(super) const INIT_BUF_SIZE: usize = 4096; // 4K
pub(super) const MAX_BUF_SIZE: usize = 1024 * 8192; // 8M
pub(super) const MAX_BUF_SIZE: usize = 1024 * 1024 * 16; // 16M
pub(super) const READ_TIMEOUT: Duration = Duration::from_secs(30);
/// Internal read function that reads from the active stream into a buffer.
/// Performs HTTP POST request parsing. Returns the request body length.
pub(super) async fn http_read_from_stream_request(
reader: &mut BufReader<ReadHalf<Box<dyn PtStream>>>,
buf: &mut Vec<u8>,
) -> io::Result<usize> {
let mut total_read = 0;
// Intermediate buffer we use to read byte-by-byte.
let mut tmpbuf = [0_u8];
while total_read < MAX_BUF_SIZE {
buf.resize(total_read + INIT_BUF_SIZE, 0u8);
match reader.read(&mut tmpbuf).await {
Ok(0) if total_read == 0 => return Err(io::ErrorKind::ConnectionAborted.into()),
Ok(0) => break, // Finished reading
Ok(_) => {
// Copy the read byte to the destination buffer.
buf[total_read] = tmpbuf[0];
total_read += 1;
// In HTTP, when we reach '\r\n\r\n' we know we've read the headers.
// The rest is the body. Headers should contain Content-Length which
// tells us the remaining amount of bytes to read.
if total_read > 4 && buf[total_read - 4..total_read] == [b'\r', b'\n', b'\r', b'\n']
{
break
}
}
Err(e) => return Err(e),
}
}
// Here we parse the HTTP for correctness and find Content-Length
let mut headers = [httparse::EMPTY_HEADER; 8];
let mut req = httparse::Request::new(&mut headers);
let _body_offset = match req.parse(buf) {
Ok(v) => v.unwrap(), // TODO: This should check httparse::Status::is_partial()
Err(e) => {
error!("[RPC] Failed parsing HTTP request: {}", e);
return Err(io::ErrorKind::InvalidData.into())
}
};
let mut content_length: usize = 0;
for header in headers {
if header.name.to_lowercase() == "content-length" {
let s = String::from_utf8_lossy(header.value);
content_length = match s.parse() {
Ok(v) => v,
Err(_) => return Err(io::ErrorKind::InvalidData.into()),
};
}
}
if content_length == 0 || content_length > MAX_BUF_SIZE {
return Err(io::ErrorKind::InvalidData.into())
}
// Now we know the request body size. Read it into the buffer.
buf.clear();
buf.resize(content_length, 0_u8);
reader.read(buf).await?;
assert!(buf.len() == content_length);
Ok(content_length)
}
/// Internal read function that reads from the active stream into a buffer.
/// Performs HTTP POST response parsing. Returns the response body length.
pub(super) async fn http_read_from_stream_response(
reader: &mut BufReader<ReadHalf<Box<dyn PtStream>>>,
buf: &mut Vec<u8>,
) -> io::Result<usize> {
let mut total_read = 0;
// Intermediate buffer we use to read byte-by-byte.
let mut tmpbuf = [0_u8];
while total_read < MAX_BUF_SIZE {
buf.resize(total_read + INIT_BUF_SIZE, 0u8);
match reader.read(&mut tmpbuf).await {
Ok(0) if total_read == 0 => return Err(io::ErrorKind::ConnectionAborted.into()),
Ok(0) => break, // Finished reading
Ok(_) => {
// Copy the read byte to the destination buffer.
buf[total_read] = tmpbuf[0];
total_read += 1;
// In HTTP, when we reach '\r\n\r\n' we know we've read the headers.
// The rest is the body. Headers should contain Content-Length which
// tells us the remaining amount of bytes to read.
if total_read > 4 && buf[total_read - 4..total_read] == [b'\r', b'\n', b'\r', b'\n']
{
break
}
}
Err(e) => return Err(e),
}
}
// Here we parse the HTTP for correctness and find Content-Length
let mut headers = [httparse::EMPTY_HEADER; 8];
let mut resp = httparse::Response::new(&mut headers);
let _body_offset = match resp.parse(buf) {
Ok(v) => v.unwrap(), // TODO: This should check httparse::Status::is_partial()
Err(e) => {
error!("[RPC] Failed parsing HTTP response: {}", e);
return Err(io::ErrorKind::InvalidData.into())
}
};
let mut content_length: usize = 0;
for header in headers {
if header.name.to_lowercase() == "content-length" {
let s = String::from_utf8_lossy(header.value);
content_length = match s.parse() {
Ok(v) => v,
Err(_) => return Err(io::ErrorKind::InvalidData.into()),
};
}
}
if content_length == 0 || content_length > MAX_BUF_SIZE {
return Err(io::ErrorKind::InvalidData.into())
}
// Now we know the response body size. Read it into the buffer.
buf.clear();
buf.resize(content_length, 0_u8);
reader.read(buf).await?;
assert!(buf.len() == content_length);
Ok(content_length)
}
/// Internal read function that reads from the active stream into a buffer.
/// Reading stops upon reaching CRLF or LF, or when `MAX_BUF_SIZE` is reached.
pub(super) async fn read_from_stream(
@@ -39,7 +180,7 @@ pub(super) async fn read_from_stream(
let mut tmpbuf = [0_u8];
while total_read < MAX_BUF_SIZE {
buf.resize(total_read + INIT_BUF_SIZE, 0);
buf.resize(total_read + INIT_BUF_SIZE, 0u8);
match reader.read(&mut tmpbuf).await {
Ok(0) if total_read == 0 => return Err(io::ErrorKind::ConnectionAborted.into()),
@@ -68,6 +209,29 @@ pub(super) async fn read_from_stream(
Ok(total_read)
}
/// Internal write function that writes a JSON-RPC object to the active stream.
/// Sent as an HTTP response.
pub(super) async fn http_write_to_stream(
writer: &mut WriteHalf<Box<dyn PtStream>>,
object: &JsonResult,
) -> io::Result<()> {
let (status_line, object_str) = match object {
JsonResult::Notification(v) => ("HTTP/1.1 200 OK", v.stringify().unwrap()),
JsonResult::Response(v) => ("HTTP/1.1 200 OK", v.stringify().unwrap()),
JsonResult::Error(v) => ("HTTP/1.1 400 Bad Request", v.stringify().unwrap()),
JsonResult::Request(v) => ("POST /json_rpc HTTP/1.1", v.stringify().unwrap()),
_ => unreachable!(),
};
let length = object_str.len();
let data = format!("{status_line}\r\nContent-Length: {length}\r\nContent-Type: application/json\r\n\r\n{object_str}");
writer.write_all(data.as_bytes()).await?;
writer.flush().await?;
Ok(())
}
/// Internal write function that writes a JSON-RPC object to the active stream.
pub(super) async fn write_to_stream(
writer: &mut WriteHalf<Box<dyn PtStream>>,

View File

@@ -28,7 +28,10 @@ use tinyjson::JsonValue;
use url::Url;
use super::{
common::{read_from_stream, write_to_stream, INIT_BUF_SIZE},
common::{
http_read_from_stream_request, http_write_to_stream, read_from_stream, write_to_stream,
INIT_BUF_SIZE,
},
jsonrpc::*,
};
use crate::{
@@ -80,6 +83,7 @@ async fn handle_request(
rh: Arc<impl RequestHandler + 'static>,
ex: Arc<smol::Executor<'_>>,
tasks: Arc<Mutex<HashSet<Arc<StoppableTask>>>>,
use_http: bool,
req: JsonRequest,
) -> Result<()> {
let rep = rh.handle_request(req).await;
@@ -107,10 +111,20 @@ async fn handle_request(
let notification = JsonResult::Notification(notification);
let mut writer_lock = writer_.lock().await;
if let Err(e) = write_to_stream(&mut writer_lock, &notification).await {
subscription.unsubscribe().await;
return Err(e.into())
#[allow(clippy::collapsible_else_if)]
if use_http {
if let Err(e) = http_write_to_stream(&mut writer_lock, &notification).await {
subscription.unsubscribe().await;
return Err(e.into())
}
} else {
if let Err(e) = write_to_stream(&mut writer_lock, &notification).await {
subscription.unsubscribe().await;
return Err(e.into())
}
}
drop(writer_lock);
}
},
@@ -133,7 +147,11 @@ async fn handle_request(
// Write the response
debug!(target: "rpc::server", "{} <-- {}", addr, reply.stringify()?);
let mut writer_lock = writer.lock().await;
write_to_stream(&mut writer_lock, &reply.into()).await?;
if use_http {
http_write_to_stream(&mut writer_lock, &reply.into()).await?;
} else {
write_to_stream(&mut writer_lock, &reply.into()).await?;
}
drop(writer_lock);
let task = StoppableTask::new();
@@ -157,10 +175,19 @@ async fn handle_request(
let notification = JsonResult::Notification(notification);
let mut writer_lock = writer_.lock().await;
if let Err(e) = write_to_stream(&mut writer_lock, &notification).await {
subscription.unsubscribe().await;
drop(writer_lock);
return Err(e.into())
#[allow(clippy::collapsible_else_if)]
if use_http {
if let Err(e) = http_write_to_stream(&mut writer_lock, &notification).await {
subscription.unsubscribe().await;
drop(writer_lock);
return Err(e.into())
}
} else {
if let Err(e) = write_to_stream(&mut writer_lock, &notification).await {
subscription.unsubscribe().await;
drop(writer_lock);
return Err(e.into())
}
}
drop(writer_lock);
}
@@ -187,14 +214,22 @@ async fn handle_request(
JsonResult::Response(ref v) => {
debug!(target: "rpc::server", "{} <-- {}", addr, v.stringify()?);
let mut writer_lock = writer.lock().await;
write_to_stream(&mut writer_lock, &rep).await?;
if use_http {
http_write_to_stream(&mut writer_lock, &rep).await?;
} else {
write_to_stream(&mut writer_lock, &rep).await?;
}
drop(writer_lock);
}
JsonResult::Error(ref v) => {
debug!(target: "rpc::server", "{} <-- {}", addr, v.stringify()?);
let mut writer_lock = writer.lock().await;
write_to_stream(&mut writer_lock, &rep).await?;
if use_http {
http_write_to_stream(&mut writer_lock, &rep).await?;
} else {
write_to_stream(&mut writer_lock, &rep).await?;
}
drop(writer_lock);
}
}
@@ -211,6 +246,7 @@ pub async fn accept(
addr: Url,
rh: Arc<impl RequestHandler + 'static>,
conn_limit: Option<usize>,
use_http: bool,
ex: Arc<smol::Executor<'_>>,
) -> Result<()> {
// If there's a connection limit set, we will refuse connections
@@ -232,7 +268,11 @@ pub async fn accept(
let mut buf = Vec::with_capacity(INIT_BUF_SIZE);
let mut reader_lock = reader.lock().await;
let _ = read_from_stream(&mut reader_lock, &mut buf).await?;
if use_http {
let _ = http_read_from_stream_request(&mut reader_lock, &mut buf).await?;
} else {
let _ = read_from_stream(&mut reader_lock, &mut buf).await?;
}
drop(reader_lock);
let line = match String::from_utf8(buf) {
@@ -287,6 +327,7 @@ pub async fn accept(
rh.clone(),
ex.clone(),
tasks.clone(),
use_http,
req,
),
move |_| async move {
@@ -311,6 +352,7 @@ async fn run_accept_loop(
listener: Box<dyn PtListener>,
rh: Arc<impl RequestHandler + 'static>,
conn_limit: Option<usize>,
use_http: bool,
ex: Arc<smol::Executor<'_>>,
) -> Result<()> {
loop {
@@ -327,7 +369,7 @@ async fn run_accept_loop(
let task_ = task.clone();
let ex_ = ex.clone();
task.clone().start(
accept(reader, writer, url.clone(), rh.clone(), conn_limit, ex_),
accept(reader, writer, url.clone(), rh.clone(), conn_limit, use_http, ex_),
|_| async move {
info!(target: "rpc::server", "[RPC] Closed conn from {}", url);
rh_.clone().unmark_connection(task_.clone()).await;
@@ -374,16 +416,29 @@ async fn run_accept_loop(
}
}
/// Start a JSON-RPC server bound to the given accept URL and use the
/// Start a JSON-RPC server bound to the givven accept URL and use the
/// given [`RequestHandler`] to handle incoming requests.
///
/// The supported network schemes can be prefixed with `http+` to serve
/// JSON-RPC over HTTP/1.1.
pub async fn listen_and_serve(
accept_url: Url,
rh: Arc<impl RequestHandler + 'static>,
conn_limit: Option<usize>,
ex: Arc<smol::Executor<'_>>,
) -> Result<()> {
let listener = Listener::new(accept_url, None).await?.listen().await?;
run_accept_loop(listener, rh, conn_limit, ex.clone()).await
// Figure out if we're using HTTP and rewrite the URL accordingly.
let mut listen_url = accept_url.clone();
if accept_url.scheme().starts_with("http+") {
let scheme = accept_url.scheme().strip_prefix("http+").unwrap();
let url_str = accept_url.as_str().replace(accept_url.scheme(), scheme);
listen_url = url_str.parse()?;
}
let listener = Listener::new(listen_url, None).await?.listen().await?;
let use_http = accept_url.scheme().starts_with("http+");
run_accept_loop(listener, rh, conn_limit, use_http, ex.clone()).await
}
#[cfg(test)]

View File

@@ -71,8 +71,29 @@ impl RequestHandler for RpcSrv {
}
}
/// Initialize the logging mechanism
fn init_logger() {
let mut cfg = simplelog::ConfigBuilder::new();
// We check this error so we can execute same file tests in parallel,
// otherwise second one fails to init logger here.
if simplelog::TermLogger::init(
//simplelog::LevelFilter::Info,
simplelog::LevelFilter::Debug,
//simplelog::LevelFilter::Trace,
cfg.build(),
simplelog::TerminalMode::Mixed,
simplelog::ColorChoice::Auto,
)
.is_err()
{
log::debug!("Logger initialized");
}
}
#[test]
fn jsonrpc_reqrep() -> Result<()> {
init_logger();
let executor = Arc::new(Executor::new());
smol::block_on(executor.run(async {
@@ -119,3 +140,53 @@ fn jsonrpc_reqrep() -> Result<()> {
Ok(())
}))
}
#[test]
fn http_jsonrpc_reqrep() -> Result<()> {
init_logger();
let executor = Arc::new(Executor::new());
smol::block_on(executor.run(async {
// Find an available port
let listener = TcpListener::bind("127.0.0.1:0").await?;
let sockaddr = listener.local_addr()?;
let endpoint = Url::parse(&format!("http+tcp://127.0.0.1:{}", sockaddr.port()))?;
drop(listener);
let rpcsrv = Arc::new(RpcSrv {
stop_sub: smol::channel::unbounded(),
rpc_connections: Mutex::new(HashSet::new()),
});
let rpcsrv_ = Arc::clone(&rpcsrv);
let rpc_task = StoppableTask::new();
rpc_task.clone().start(
listen_and_serve(endpoint.clone(), rpcsrv.clone(), None, executor.clone()),
|res| async move {
match res {
Ok(()) | Err(Error::RpcServerStopped) => rpcsrv_.stop_connections().await,
Err(e) => eprintln!("Failed starting JSON-RPC server: {}", e),
}
},
Error::RpcServerStopped,
executor.clone(),
);
msleep(500).await;
let client = RpcClient::new(endpoint, executor.clone()).await?;
let req = JsonRequest::new("ping", vec![].into());
let rep = client.request(req).await?;
let rep = String::try_from(rep).unwrap();
assert_eq!(&rep, "pong");
let req = JsonRequest::new("kill", vec![].into());
let rep = client.request(req).await?;
let rep = String::try_from(rep).unwrap();
assert_eq!(&rep, "bye");
Ok(())
}))
}