From d920493173ccd83713d32c8ce7b44183d46d5df9 Mon Sep 17 00:00:00 2001 From: skoupidi Date: Thu, 29 Feb 2024 14:25:23 +0200 Subject: [PATCH] rpc/client: new RpcChadClient added --- src/rpc/client.rs | 113 +++++++++++++++++++++++++++++++++++++++------- 1 file changed, 97 insertions(+), 16 deletions(-) diff --git a/src/rpc/client.rs b/src/rpc/client.rs index ea2811573..3b0c25d98 100644 --- a/src/rpc/client.rs +++ b/src/rpc/client.rs @@ -253,23 +253,105 @@ impl RpcClient { } } } +} - /// Highly experimental request when you know what you and the other - /// side is doing. This should be used when you have consecutive requests - /// and want to skip unconsumed data from the multiplexer, until you - /// reach the ones corresponding to the provided request. - /// Additionally, this request is executed without a timeout. - pub async fn chad_request(&self, req: JsonRequest) -> Result { - // Consume anything existing in the multiplexer - let _ = self.rep_recv.try_recv(); +/// Highly experimental JSON-RPC client implementation using asynchronous channels, +/// with each new request canceling waiting for the previous one. All requests are +/// executed without a timeout. +pub struct RpcChadClient { + /// The channel used to send JSON-RPC request objects + req_send: channel::Sender, + /// The channel used to read the JSON-RPC response object + rep_recv: channel::Receiver, + /// The stoppable task pointer, used on [`RpcChadClient::stop()`] + task: StoppableTaskPtr, +} - // Perform initial request +impl RpcChadClient { + /// Instantiate a new JSON-RPC client that connects to the given endpoint. + /// The function takes an `Executor` object, which is needed to start the + /// `StoppableTask` which represents the client-server connection. + pub async fn new(endpoint: Url, ex: Arc>) -> Result { + // Instantiate communication channels + let (req_send, req_recv) = channel::unbounded(); + let (rep_send, rep_recv) = channel::unbounded(); + + // Instantiate Dialer and dial the server + // TODO: Could add a timeout here + let dialer = Dialer::new(endpoint).await?; + let stream = dialer.dial(None).await?; + + // Create the StoppableTask running the request-reply loop. + // This represents the actual connection, which can be stopped + // using `RpcChadClient::stop()`. + let task = StoppableTask::new(); + task.clone().start( + Self::reqrep_loop(stream, rep_send, req_recv), + |res| async move { + match res { + Ok(()) | Err(Error::RpcClientStopped) => {} + Err(e) => error!(target: "rpc::chad_client", "[RPC] Client error: {}", e), + } + }, + Error::RpcClientStopped, + ex.clone(), + ); + + Ok(Self { req_send, rep_recv, task }) + } + + /// Stop the JSON-RPC client. This will trigger `stop()` on the inner + /// `StoppableTaskPtr` resulting in stopping the internal reqrep loop + /// and therefore closing the connection. + pub async fn stop(&self) { + self.task.stop().await; + } + + /// Internal function that loops on a given stream and multiplexes the data + async fn reqrep_loop( + stream: Box, + rep_send: channel::Sender, + req_recv: channel::Receiver, + ) -> Result<()> { + debug!(target: "rpc::chad_client::reqrep_loop()", "Starting reqrep loop"); + + let (reader, mut writer) = smol::io::split(stream); + let mut reader = BufReader::new(reader); + + loop { + let mut buf = Vec::with_capacity(INIT_BUF_SIZE); + + // Read an incoming client request, or wait for a response + smol::future::or( + async { + let request = req_recv.recv().await?; + let request = JsonResult::Request(request); + write_to_stream(&mut writer, &request).await?; + Ok::<(), crate::Error>(()) + }, + async { + let _ = read_from_stream(&mut reader, &mut buf).await?; + let val: JsonValue = String::from_utf8(buf)?.parse()?; + let rep = JsonResult::try_from_value(&val)?; + rep_send.send(rep).await?; + Ok::<(), crate::Error>(()) + }, + ) + .await?; + } + } + + /// Send a given JSON-RPC request over the instantiated client and + /// return a possible result. If the response is an error, returns + /// a `JsonRpcError`. + pub async fn request(&self, req: JsonRequest) -> Result { + // Perform request let req_id = req.id; - debug!(target: "rpc::client", "--> {}", req.stringify()?); + debug!(target: "rpc::chad_client", "--> {}", req.stringify()?); // If the connection is closed, the sender will get an error // for sending to a closed channel. - self.req_send.send((req, false)).await?; + self.req_send.send(req).await?; // Now loop until we receive our response loop { @@ -280,11 +362,10 @@ impl RpcClient { // Handle the response match reply { JsonResult::Response(rep) | JsonResult::SubscriberWithReply(_, rep) => { - debug!(target: "rpc::client", "<-- {}", rep.stringify()?); + debug!(target: "rpc::chad_client", "<-- {}", rep.stringify()?); // Check if the IDs match if req_id != rep.id { - self.req_skip_send.send(()).await?; continue } @@ -292,18 +373,18 @@ impl RpcClient { } JsonResult::Error(e) => { - debug!(target: "rpc::client", "<-- {}", e.stringify()?); + debug!(target: "rpc::chad_client", "<-- {}", e.stringify()?); return Err(Error::JsonRpcError((e.error.code, e.error.message))) } JsonResult::Notification(n) => { - debug!(target: "rpc::client", "<-- {}", n.stringify()?); + debug!(target: "rpc::chad_client", "<-- {}", n.stringify()?); let e = JsonError::new(ErrorCode::InvalidReply, None, req_id); return Err(Error::JsonRpcError((e.error.code, e.error.message))) } JsonResult::Request(r) => { - debug!(target: "rpc::client", "<-- {}", r.stringify()?); + debug!(target: "rpc::chad_client", "<-- {}", r.stringify()?); let e = JsonError::new(ErrorCode::InvalidReply, None, req_id); return Err(Error::JsonRpcError((e.error.code, e.error.message))) }