rpc/client: new RpcChadClient added

This commit is contained in:
skoupidi
2024-02-29 14:25:23 +02:00
parent 9643b5af87
commit d920493173

View File

@@ -253,23 +253,105 @@ impl RpcClient {
}
}
}
}
/// Highly experimental request when you know what you and the other
/// side is doing. This should be used when you have consecutive requests
/// and want to skip unconsumed data from the multiplexer, until you
/// reach the ones corresponding to the provided request.
/// Additionally, this request is executed without a timeout.
pub async fn chad_request(&self, req: JsonRequest) -> Result<JsonValue> {
// Consume anything existing in the multiplexer
let _ = self.rep_recv.try_recv();
/// Highly experimental JSON-RPC client implementation using asynchronous channels,
/// with each new request canceling waiting for the previous one. All requests are
/// executed without a timeout.
pub struct RpcChadClient {
/// The channel used to send JSON-RPC request objects
req_send: channel::Sender<JsonRequest>,
/// The channel used to read the JSON-RPC response object
rep_recv: channel::Receiver<JsonResult>,
/// The stoppable task pointer, used on [`RpcChadClient::stop()`]
task: StoppableTaskPtr,
}
// Perform initial request
impl RpcChadClient {
/// Instantiate a new JSON-RPC client that connects to the given endpoint.
/// The function takes an `Executor` object, which is needed to start the
/// `StoppableTask` which represents the client-server connection.
pub async fn new(endpoint: Url, ex: Arc<Executor<'_>>) -> Result<Self> {
// Instantiate communication channels
let (req_send, req_recv) = channel::unbounded();
let (rep_send, rep_recv) = channel::unbounded();
// Instantiate Dialer and dial the server
// TODO: Could add a timeout here
let dialer = Dialer::new(endpoint).await?;
let stream = dialer.dial(None).await?;
// Create the StoppableTask running the request-reply loop.
// This represents the actual connection, which can be stopped
// using `RpcChadClient::stop()`.
let task = StoppableTask::new();
task.clone().start(
Self::reqrep_loop(stream, rep_send, req_recv),
|res| async move {
match res {
Ok(()) | Err(Error::RpcClientStopped) => {}
Err(e) => error!(target: "rpc::chad_client", "[RPC] Client error: {}", e),
}
},
Error::RpcClientStopped,
ex.clone(),
);
Ok(Self { req_send, rep_recv, task })
}
/// Stop the JSON-RPC client. This will trigger `stop()` on the inner
/// `StoppableTaskPtr` resulting in stopping the internal reqrep loop
/// and therefore closing the connection.
pub async fn stop(&self) {
self.task.stop().await;
}
/// Internal function that loops on a given stream and multiplexes the data
async fn reqrep_loop(
stream: Box<dyn PtStream>,
rep_send: channel::Sender<JsonResult>,
req_recv: channel::Receiver<JsonRequest>,
) -> Result<()> {
debug!(target: "rpc::chad_client::reqrep_loop()", "Starting reqrep loop");
let (reader, mut writer) = smol::io::split(stream);
let mut reader = BufReader::new(reader);
loop {
let mut buf = Vec::with_capacity(INIT_BUF_SIZE);
// Read an incoming client request, or wait for a response
smol::future::or(
async {
let request = req_recv.recv().await?;
let request = JsonResult::Request(request);
write_to_stream(&mut writer, &request).await?;
Ok::<(), crate::Error>(())
},
async {
let _ = read_from_stream(&mut reader, &mut buf).await?;
let val: JsonValue = String::from_utf8(buf)?.parse()?;
let rep = JsonResult::try_from_value(&val)?;
rep_send.send(rep).await?;
Ok::<(), crate::Error>(())
},
)
.await?;
}
}
/// Send a given JSON-RPC request over the instantiated client and
/// return a possible result. If the response is an error, returns
/// a `JsonRpcError`.
pub async fn request(&self, req: JsonRequest) -> Result<JsonValue> {
// Perform request
let req_id = req.id;
debug!(target: "rpc::client", "--> {}", req.stringify()?);
debug!(target: "rpc::chad_client", "--> {}", req.stringify()?);
// If the connection is closed, the sender will get an error
// for sending to a closed channel.
self.req_send.send((req, false)).await?;
self.req_send.send(req).await?;
// Now loop until we receive our response
loop {
@@ -280,11 +362,10 @@ impl RpcClient {
// Handle the response
match reply {
JsonResult::Response(rep) | JsonResult::SubscriberWithReply(_, rep) => {
debug!(target: "rpc::client", "<-- {}", rep.stringify()?);
debug!(target: "rpc::chad_client", "<-- {}", rep.stringify()?);
// Check if the IDs match
if req_id != rep.id {
self.req_skip_send.send(()).await?;
continue
}
@@ -292,18 +373,18 @@ impl RpcClient {
}
JsonResult::Error(e) => {
debug!(target: "rpc::client", "<-- {}", e.stringify()?);
debug!(target: "rpc::chad_client", "<-- {}", e.stringify()?);
return Err(Error::JsonRpcError((e.error.code, e.error.message)))
}
JsonResult::Notification(n) => {
debug!(target: "rpc::client", "<-- {}", n.stringify()?);
debug!(target: "rpc::chad_client", "<-- {}", n.stringify()?);
let e = JsonError::new(ErrorCode::InvalidReply, None, req_id);
return Err(Error::JsonRpcError((e.error.code, e.error.message)))
}
JsonResult::Request(r) => {
debug!(target: "rpc::client", "<-- {}", r.stringify()?);
debug!(target: "rpc::chad_client", "<-- {}", r.stringify()?);
let e = JsonError::new(ErrorCode::InvalidReply, None, req_id);
return Err(Error::JsonRpcError((e.error.code, e.error.message)))
}