rpc/client: new fn chad_request added

This commit is contained in:
skoupidi
2024-02-22 20:46:04 +02:00
parent 20117e3e7d
commit 505571188d
2 changed files with 64 additions and 1 deletions

View File

@@ -170,7 +170,7 @@ impl Darkfid {
debug!(target: "darkfid::rpc::miner_daemon_request", "Executing request {} with params: {:?}", method, params);
let latency = Instant::now();
let req = JsonRequest::new(method, params);
let rep = rpc_client.request(req).await?;
let rep = rpc_client.chad_request(req).await?;
let latency = latency.elapsed();
debug!(target: "darkfid::rpc::miner_daemon_request", "Got reply: {:?}", rep);
debug!(target: "darkfid::rpc::miner_daemon_request", "Latency: {:?}", latency);

View File

@@ -253,4 +253,67 @@ impl RpcClient {
}
}
}
/// Highly experimental request when you know what you and the other
/// side is doing. This should be used when you have consecutive requests
/// and want to skip unconsumed data from the multiplexer, until you
/// reach the ones corresponding to the provided request.
/// Additionally, this request is executed without a timeout.
pub async fn chad_request(&self, req: JsonRequest) -> Result<JsonValue> {
// Consume anything existing in the multiplexer
let _ = self.rep_recv.try_recv();
// Perform initial request
let req_id = req.id;
debug!(target: "rpc::client", "--> {}", req.stringify()?);
// If the connection is closed, the sender will get an error
// for sending to a closed channel.
self.req_send.send((req, false)).await?;
// Now loop until we receive our response
loop {
// If the connection is closed, the receiver will get an error
// for waiting on a closed channel.
let reply = self.rep_recv.recv().await?;
// Handle the response
match reply {
JsonResult::Response(rep) | JsonResult::SubscriberWithReply(_, rep) => {
debug!(target: "rpc::client", "<-- {}", rep.stringify()?);
// Check if the IDs match
if req_id != rep.id {
self.req_skip_send.send(()).await?;
continue
}
return Ok(rep.result)
}
JsonResult::Error(e) => {
debug!(target: "rpc::client", "<-- {}", e.stringify()?);
return Err(Error::JsonRpcError((e.error.code, e.error.message)))
}
JsonResult::Notification(n) => {
debug!(target: "rpc::client", "<-- {}", n.stringify()?);
let e = JsonError::new(ErrorCode::InvalidReply, None, req_id);
return Err(Error::JsonRpcError((e.error.code, e.error.message)))
}
JsonResult::Request(r) => {
debug!(target: "rpc::client", "<-- {}", r.stringify()?);
let e = JsonError::new(ErrorCode::InvalidReply, None, req_id);
return Err(Error::JsonRpcError((e.error.code, e.error.message)))
}
JsonResult::Subscriber(_) => {
// When?
let e = JsonError::new(ErrorCode::InvalidReply, None, req_id);
return Err(Error::JsonRpcError((e.error.code, e.error.message)))
}
}
}
}
}