rpc: create RpcClient for handling opened channels

This commit is contained in:
ghassmo
2022-05-13 07:19:01 +03:00
parent 9d57fd368d
commit 19f2c79ea7
4 changed files with 84 additions and 46 deletions

View File

@@ -1,48 +1,15 @@
use async_std::sync::Arc;
use async_executor::Executor;
use log::debug;
use serde_json::{json, Value};
use url::Url;
use darkfi::{
rpc::jsonrpc::{self, JsonResult},
Error, Result,
rpc::{jsonrpc, rpcclient::RpcClient},
Result,
};
pub struct JsonRpcClient {
sender: async_channel::Sender<Value>,
receiver: async_channel::Receiver<JsonResult>,
pub struct Rpc {
pub client: RpcClient,
}
impl JsonRpcClient {
pub async fn new(url: Url, executor: Arc<Executor<'_>>) -> Result<Self> {
let (sender, receiver) = jsonrpc::open_channels(&url, executor).await?;
Ok(Self { sender, receiver })
}
async fn request(&self, value: Value) -> Result<Value> {
self.sender.send(value).await?;
let reply: JsonResult = self.receiver.recv().await?;
match reply {
JsonResult::Resp(r) => {
debug!(target: "RPC", "<-- {}", serde_json::to_string(&r)?);
Ok(r.result)
}
JsonResult::Err(e) => {
debug!(target: "RPC", "<-- {}", serde_json::to_string(&e)?);
Err(Error::JsonRpcError(e.error.message.to_string()))
}
JsonResult::Notif(n) => {
debug!(target: "RPC", "<-- {}", serde_json::to_string(&n)?);
Err(Error::JsonRpcError("Unexpected reply".to_string()))
}
}
}
impl Rpc {
// RPCAPI:
// Add new task and returns `true` upon success.
// --> {"jsonrpc": "2.0", "method": "add",
@@ -60,7 +27,7 @@ impl JsonRpcClient {
// <-- {"jsonrpc": "2.0", "result": true, "id": 1}
pub async fn add(&self, params: Value) -> Result<Value> {
let req = jsonrpc::request(json!("add"), params);
self.request(json!(req)).await
self.client.request(req).await
}
// List tasks
@@ -68,7 +35,7 @@ impl JsonRpcClient {
// <-- {"jsonrpc": "2.0", "result": [task_id, ...], "id": 1}
pub async fn get_ids(&self, params: Value) -> Result<Value> {
let req = jsonrpc::request(json!("get_ids"), json!(params));
self.request(json!(req)).await
self.client.request(req).await
}
// Update task and returns `true` upon success.
@@ -76,7 +43,7 @@ impl JsonRpcClient {
// <-- {"jsonrpc": "2.0", "result": true, "id": 1}
pub async fn update(&self, id: u64, data: Value) -> Result<Value> {
let req = jsonrpc::request(json!("update"), json!([id, data]));
self.request(json!(req)).await
self.client.request(req).await
}
// Set state for a task and returns `true` upon success.
@@ -84,7 +51,7 @@ impl JsonRpcClient {
// <-- {"jsonrpc": "2.0", "result": true, "id": 1}
pub async fn set_state(&self, id: u64, state: &str) -> Result<Value> {
let req = jsonrpc::request(json!("set_state"), json!([id, state]));
self.request(json!(req)).await
self.client.request(req).await
}
// Set comment for a task and returns `true` upon success.
@@ -92,7 +59,7 @@ impl JsonRpcClient {
// <-- {"jsonrpc": "2.0", "result": true, "id": 1}
pub async fn set_comment(&self, id: u64, content: &str) -> Result<Value> {
let req = jsonrpc::request(json!("set_comment"), json!([id, content]));
self.request(json!(req)).await
self.client.request(req).await
}
// Get task by id.
@@ -100,6 +67,6 @@ impl JsonRpcClient {
// <-- {"jsonrpc": "2.0", "result": "task", "id": 1}
pub async fn get_task_by_id(&self, id: u64) -> Result<Value> {
let req = jsonrpc::request(json!("get_task_by_id"), json!([id]));
self.request(json!(req)).await
self.client.request(req).await
}
}

View File

@@ -9,6 +9,7 @@ use structopt_toml::StructOptToml;
use url::Url;
use darkfi::{
rpc::rpcclient::RpcClient,
util::{
cli::{log_config, spawn_config},
path::get_config_path,
@@ -24,13 +25,14 @@ mod util;
mod view;
use cli::CliTauSubCommands;
use jsonrpc::JsonRpcClient;
use jsonrpc::Rpc;
use primitives::{TaskEvent, TaskInfo};
use util::{desc_in_editor, CONFIG_FILE, CONFIG_FILE_CONTENTS};
use view::{comments_as_string, print_list_of_task, print_task_info};
async fn start(mut options: cli::CliTau, executor: Arc<Executor<'_>>) -> Result<()> {
let rpc_client = JsonRpcClient::new(Url::parse(&options.rpc_listen)?, executor).await?;
let rpc_client =
Rpc { client: RpcClient::new(Url::parse(&options.rpc_listen)?, executor).await? };
let states: Vec<String> = vec!["stop".into(), "open".into(), "pause".into()];

View File

@@ -1,3 +1,4 @@
pub mod jsonrpc;
pub mod rpcclient;
pub mod rpcserver;
pub mod websockets;

68
src/rpc/rpcclient.rs Normal file
View File

@@ -0,0 +1,68 @@
use async_std::sync::Arc;
use async_executor::Executor;
use log::debug;
use serde_json::{json, Value};
use url::Url;
use crate::{Error, Result};
use super::jsonrpc::{self, ErrorCode, JsonRequest, JsonResult};
pub struct RpcClient {
sender: async_channel::Sender<Value>,
receiver: async_channel::Receiver<JsonResult>,
stop_signal: async_channel::Sender<()>,
}
impl RpcClient {
pub async fn new(url: Url, executor: Arc<Executor<'_>>) -> Result<Self> {
let (sender, receiver, stop_signal) = jsonrpc::open_channels(&url, executor).await?;
Ok(Self { sender, receiver, stop_signal })
}
pub async fn request(&self, value: JsonRequest) -> Result<Value> {
let req_id = value.id.clone().as_u64().unwrap_or(0);
let value = json!(value);
self.sender.send(value).await?;
let reply: JsonResult = self.receiver.recv().await?;
match reply {
JsonResult::Resp(r) => {
// check if the ids match
let resp_id = r.id.as_u64();
if resp_id.is_none() {
let error = jsonrpc::error(ErrorCode::InvalidId, None, r.id);
self.stop_signal.send(()).await?;
return Err(Error::JsonRpcError(error.error.message.to_string()))
}
if resp_id.unwrap() != req_id {
let error = jsonrpc::error(
ErrorCode::InvalidId,
Some("Ids doesn't match".into()),
r.id,
);
self.stop_signal.send(()).await?;
return Err(Error::JsonRpcError(error.error.message.to_string()))
}
debug!(target: "RPC", "<-- {}", serde_json::to_string(&r)?);
Ok(r.result)
}
JsonResult::Err(e) => {
debug!(target: "RPC", "<-- {}", serde_json::to_string(&e)?);
// close the server connection
self.stop_signal.send(()).await?;
Err(Error::JsonRpcError(e.error.message.to_string()))
}
JsonResult::Notif(n) => {
debug!(target: "RPC", "<-- {}", serde_json::to_string(&n)?);
Err(Error::JsonRpcError("Unexpected reply".to_string()))
}
}
}
}