From 19f2c79ea7d0126e6be02e6ec16e9751bbfb8124 Mon Sep 17 00:00:00 2001 From: ghassmo Date: Fri, 13 May 2022 07:19:01 +0300 Subject: [PATCH] rpc: create RpcClient for handling opened channels --- bin/tau/tau-cli/src/jsonrpc.rs | 55 ++++++--------------------- bin/tau/tau-cli/src/main.rs | 6 ++- src/rpc/mod.rs | 1 + src/rpc/rpcclient.rs | 68 ++++++++++++++++++++++++++++++++++ 4 files changed, 84 insertions(+), 46 deletions(-) create mode 100644 src/rpc/rpcclient.rs diff --git a/bin/tau/tau-cli/src/jsonrpc.rs b/bin/tau/tau-cli/src/jsonrpc.rs index aaae2859a..786164571 100644 --- a/bin/tau/tau-cli/src/jsonrpc.rs +++ b/bin/tau/tau-cli/src/jsonrpc.rs @@ -1,48 +1,15 @@ -use async_std::sync::Arc; - -use async_executor::Executor; -use log::debug; use serde_json::{json, Value}; -use url::Url; use darkfi::{ - rpc::jsonrpc::{self, JsonResult}, - Error, Result, + rpc::{jsonrpc, rpcclient::RpcClient}, + Result, }; -pub struct JsonRpcClient { - sender: async_channel::Sender, - receiver: async_channel::Receiver, +pub struct Rpc { + pub client: RpcClient, } -impl JsonRpcClient { - pub async fn new(url: Url, executor: Arc>) -> Result { - let (sender, receiver) = jsonrpc::open_channels(&url, executor).await?; - Ok(Self { sender, receiver }) - } - - async fn request(&self, value: Value) -> Result { - self.sender.send(value).await?; - let reply: JsonResult = self.receiver.recv().await?; - - match reply { - JsonResult::Resp(r) => { - debug!(target: "RPC", "<-- {}", serde_json::to_string(&r)?); - Ok(r.result) - } - - JsonResult::Err(e) => { - debug!(target: "RPC", "<-- {}", serde_json::to_string(&e)?); - Err(Error::JsonRpcError(e.error.message.to_string())) - } - - JsonResult::Notif(n) => { - debug!(target: "RPC", "<-- {}", serde_json::to_string(&n)?); - Err(Error::JsonRpcError("Unexpected reply".to_string())) - } - } - } - +impl Rpc { // RPCAPI: // Add new task and returns `true` upon success. // --> {"jsonrpc": "2.0", "method": "add", @@ -60,7 +27,7 @@ impl JsonRpcClient { // <-- {"jsonrpc": "2.0", "result": true, "id": 1} pub async fn add(&self, params: Value) -> Result { let req = jsonrpc::request(json!("add"), params); - self.request(json!(req)).await + self.client.request(req).await } // List tasks @@ -68,7 +35,7 @@ impl JsonRpcClient { // <-- {"jsonrpc": "2.0", "result": [task_id, ...], "id": 1} pub async fn get_ids(&self, params: Value) -> Result { let req = jsonrpc::request(json!("get_ids"), json!(params)); - self.request(json!(req)).await + self.client.request(req).await } // Update task and returns `true` upon success. @@ -76,7 +43,7 @@ impl JsonRpcClient { // <-- {"jsonrpc": "2.0", "result": true, "id": 1} pub async fn update(&self, id: u64, data: Value) -> Result { let req = jsonrpc::request(json!("update"), json!([id, data])); - self.request(json!(req)).await + self.client.request(req).await } // Set state for a task and returns `true` upon success. @@ -84,7 +51,7 @@ impl JsonRpcClient { // <-- {"jsonrpc": "2.0", "result": true, "id": 1} pub async fn set_state(&self, id: u64, state: &str) -> Result { let req = jsonrpc::request(json!("set_state"), json!([id, state])); - self.request(json!(req)).await + self.client.request(req).await } // Set comment for a task and returns `true` upon success. @@ -92,7 +59,7 @@ impl JsonRpcClient { // <-- {"jsonrpc": "2.0", "result": true, "id": 1} pub async fn set_comment(&self, id: u64, content: &str) -> Result { let req = jsonrpc::request(json!("set_comment"), json!([id, content])); - self.request(json!(req)).await + self.client.request(req).await } // Get task by id. @@ -100,6 +67,6 @@ impl JsonRpcClient { // <-- {"jsonrpc": "2.0", "result": "task", "id": 1} pub async fn get_task_by_id(&self, id: u64) -> Result { let req = jsonrpc::request(json!("get_task_by_id"), json!([id])); - self.request(json!(req)).await + self.client.request(req).await } } diff --git a/bin/tau/tau-cli/src/main.rs b/bin/tau/tau-cli/src/main.rs index 71e831b13..071480808 100644 --- a/bin/tau/tau-cli/src/main.rs +++ b/bin/tau/tau-cli/src/main.rs @@ -9,6 +9,7 @@ use structopt_toml::StructOptToml; use url::Url; use darkfi::{ + rpc::rpcclient::RpcClient, util::{ cli::{log_config, spawn_config}, path::get_config_path, @@ -24,13 +25,14 @@ mod util; mod view; use cli::CliTauSubCommands; -use jsonrpc::JsonRpcClient; +use jsonrpc::Rpc; use primitives::{TaskEvent, TaskInfo}; use util::{desc_in_editor, CONFIG_FILE, CONFIG_FILE_CONTENTS}; use view::{comments_as_string, print_list_of_task, print_task_info}; async fn start(mut options: cli::CliTau, executor: Arc>) -> Result<()> { - let rpc_client = JsonRpcClient::new(Url::parse(&options.rpc_listen)?, executor).await?; + let rpc_client = + Rpc { client: RpcClient::new(Url::parse(&options.rpc_listen)?, executor).await? }; let states: Vec = vec!["stop".into(), "open".into(), "pause".into()]; diff --git a/src/rpc/mod.rs b/src/rpc/mod.rs index 63bf26870..d559839c2 100644 --- a/src/rpc/mod.rs +++ b/src/rpc/mod.rs @@ -1,3 +1,4 @@ pub mod jsonrpc; +pub mod rpcclient; pub mod rpcserver; pub mod websockets; diff --git a/src/rpc/rpcclient.rs b/src/rpc/rpcclient.rs new file mode 100644 index 000000000..4905293bb --- /dev/null +++ b/src/rpc/rpcclient.rs @@ -0,0 +1,68 @@ +use async_std::sync::Arc; + +use async_executor::Executor; +use log::debug; +use serde_json::{json, Value}; +use url::Url; + +use crate::{Error, Result}; + +use super::jsonrpc::{self, ErrorCode, JsonRequest, JsonResult}; + +pub struct RpcClient { + sender: async_channel::Sender, + receiver: async_channel::Receiver, + stop_signal: async_channel::Sender<()>, +} + +impl RpcClient { + pub async fn new(url: Url, executor: Arc>) -> Result { + let (sender, receiver, stop_signal) = jsonrpc::open_channels(&url, executor).await?; + Ok(Self { sender, receiver, stop_signal }) + } + + pub async fn request(&self, value: JsonRequest) -> Result { + let req_id = value.id.clone().as_u64().unwrap_or(0); + let value = json!(value); + + self.sender.send(value).await?; + + let reply: JsonResult = self.receiver.recv().await?; + + match reply { + JsonResult::Resp(r) => { + // check if the ids match + let resp_id = r.id.as_u64(); + if resp_id.is_none() { + let error = jsonrpc::error(ErrorCode::InvalidId, None, r.id); + self.stop_signal.send(()).await?; + return Err(Error::JsonRpcError(error.error.message.to_string())) + } + if resp_id.unwrap() != req_id { + let error = jsonrpc::error( + ErrorCode::InvalidId, + Some("Ids doesn't match".into()), + r.id, + ); + self.stop_signal.send(()).await?; + return Err(Error::JsonRpcError(error.error.message.to_string())) + } + + debug!(target: "RPC", "<-- {}", serde_json::to_string(&r)?); + Ok(r.result) + } + + JsonResult::Err(e) => { + debug!(target: "RPC", "<-- {}", serde_json::to_string(&e)?); + // close the server connection + self.stop_signal.send(()).await?; + Err(Error::JsonRpcError(e.error.message.to_string())) + } + + JsonResult::Notif(n) => { + debug!(target: "RPC", "<-- {}", serde_json::to_string(&n)?); + Err(Error::JsonRpcError("Unexpected reply".to_string())) + } + } + } +}