From 505571188d2f5bbfbb9abecf46764e92f539b740 Mon Sep 17 00:00:00 2001 From: skoupidi Date: Thu, 22 Feb 2024 20:46:04 +0200 Subject: [PATCH] rpc/client: new fn chad_request added --- bin/darkfid/src/rpc.rs | 2 +- src/rpc/client.rs | 63 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 64 insertions(+), 1 deletion(-) diff --git a/bin/darkfid/src/rpc.rs b/bin/darkfid/src/rpc.rs index e74fbf9cb..7a2373693 100644 --- a/bin/darkfid/src/rpc.rs +++ b/bin/darkfid/src/rpc.rs @@ -170,7 +170,7 @@ impl Darkfid { debug!(target: "darkfid::rpc::miner_daemon_request", "Executing request {} with params: {:?}", method, params); let latency = Instant::now(); let req = JsonRequest::new(method, params); - let rep = rpc_client.request(req).await?; + let rep = rpc_client.chad_request(req).await?; let latency = latency.elapsed(); debug!(target: "darkfid::rpc::miner_daemon_request", "Got reply: {:?}", rep); debug!(target: "darkfid::rpc::miner_daemon_request", "Latency: {:?}", latency); diff --git a/src/rpc/client.rs b/src/rpc/client.rs index b4d29048c..ea2811573 100644 --- a/src/rpc/client.rs +++ b/src/rpc/client.rs @@ -253,4 +253,67 @@ impl RpcClient { } } } + + /// Highly experimental request when you know what you and the other + /// side is doing. This should be used when you have consecutive requests + /// and want to skip unconsumed data from the multiplexer, until you + /// reach the ones corresponding to the provided request. + /// Additionally, this request is executed without a timeout. + pub async fn chad_request(&self, req: JsonRequest) -> Result { + // Consume anything existing in the multiplexer + let _ = self.rep_recv.try_recv(); + + // Perform initial request + let req_id = req.id; + debug!(target: "rpc::client", "--> {}", req.stringify()?); + + // If the connection is closed, the sender will get an error + // for sending to a closed channel. + self.req_send.send((req, false)).await?; + + // Now loop until we receive our response + loop { + // If the connection is closed, the receiver will get an error + // for waiting on a closed channel. + let reply = self.rep_recv.recv().await?; + + // Handle the response + match reply { + JsonResult::Response(rep) | JsonResult::SubscriberWithReply(_, rep) => { + debug!(target: "rpc::client", "<-- {}", rep.stringify()?); + + // Check if the IDs match + if req_id != rep.id { + self.req_skip_send.send(()).await?; + continue + } + + return Ok(rep.result) + } + + JsonResult::Error(e) => { + debug!(target: "rpc::client", "<-- {}", e.stringify()?); + return Err(Error::JsonRpcError((e.error.code, e.error.message))) + } + + JsonResult::Notification(n) => { + debug!(target: "rpc::client", "<-- {}", n.stringify()?); + let e = JsonError::new(ErrorCode::InvalidReply, None, req_id); + return Err(Error::JsonRpcError((e.error.code, e.error.message))) + } + + JsonResult::Request(r) => { + debug!(target: "rpc::client", "<-- {}", r.stringify()?); + let e = JsonError::new(ErrorCode::InvalidReply, None, req_id); + return Err(Error::JsonRpcError((e.error.code, e.error.message))) + } + + JsonResult::Subscriber(_) => { + // When? + let e = JsonError::new(ErrorCode::InvalidReply, None, req_id); + return Err(Error::JsonRpcError((e.error.code, e.error.message))) + } + } + } + } }