From 9d57fd368de9274a55e9bb7ff05c85ce77cf05fd Mon Sep 17 00:00:00 2001 From: ghassmo Date: Fri, 13 May 2022 05:10:27 +0300 Subject: [PATCH] tau-cli: use open_channels function to communicate to rpc server --- bin/tau/tau-cli/src/jsonrpc.rs | 155 ++++++++++++++++++--------------- bin/tau/tau-cli/src/main.rs | 40 ++++++--- 2 files changed, 110 insertions(+), 85 deletions(-) diff --git a/bin/tau/tau-cli/src/jsonrpc.rs b/bin/tau/tau-cli/src/jsonrpc.rs index 5bb1fc5c8..aaae2859a 100644 --- a/bin/tau/tau-cli/src/jsonrpc.rs +++ b/bin/tau/tau-cli/src/jsonrpc.rs @@ -1,3 +1,6 @@ +use async_std::sync::Arc; + +use async_executor::Executor; use log::debug; use serde_json::{json, Value}; use url::Url; @@ -7,86 +10,96 @@ use darkfi::{ Error, Result, }; -pub async fn request(r: jsonrpc::JsonRequest, url: String) -> Result { - let reply: JsonResult = match jsonrpc::send_request(&Url::parse(&url)?, json!(r)).await { - Ok(v) => v, - Err(e) => return Err(e), - }; +pub struct JsonRpcClient { + sender: async_channel::Sender, + receiver: async_channel::Receiver, +} - match reply { - JsonResult::Resp(r) => { - debug!(target: "RPC", "<-- {}", serde_json::to_string(&r)?); - Ok(r.result) - } +impl JsonRpcClient { + pub async fn new(url: Url, executor: Arc>) -> Result { + let (sender, receiver) = jsonrpc::open_channels(&url, executor).await?; + Ok(Self { sender, receiver }) + } - JsonResult::Err(e) => { - debug!(target: "RPC", "<-- {}", serde_json::to_string(&e)?); - Err(Error::JsonRpcError(e.error.message.to_string())) - } + async fn request(&self, value: Value) -> Result { + self.sender.send(value).await?; + let reply: JsonResult = self.receiver.recv().await?; - JsonResult::Notif(n) => { - debug!(target: "RPC", "<-- {}", serde_json::to_string(&n)?); - Err(Error::JsonRpcError("Unexpected reply".to_string())) + match reply { + JsonResult::Resp(r) => { + debug!(target: "RPC", "<-- {}", serde_json::to_string(&r)?); + Ok(r.result) + } + + JsonResult::Err(e) => { + debug!(target: "RPC", "<-- {}", serde_json::to_string(&e)?); + Err(Error::JsonRpcError(e.error.message.to_string())) + } + + JsonResult::Notif(n) => { + debug!(target: "RPC", "<-- {}", serde_json::to_string(&n)?); + Err(Error::JsonRpcError("Unexpected reply".to_string())) + } } } -} -// RPCAPI: -// Add new task and returns `true` upon success. -// --> {"jsonrpc": "2.0", "method": "add", -// "params": -// [{ -// "title": "..", -// "desc": "..", -// assign: [..], -// project: [..], -// "due": .., -// "rank": .. -// }], -// "id": 1 -// } -// <-- {"jsonrpc": "2.0", "result": true, "id": 1} -pub async fn add(url: &str, params: Value) -> Result { - let req = jsonrpc::request(json!("add"), params); - request(req, url.to_string()).await -} + // RPCAPI: + // Add new task and returns `true` upon success. + // --> {"jsonrpc": "2.0", "method": "add", + // "params": + // [{ + // "title": "..", + // "desc": "..", + // assign: [..], + // project: [..], + // "due": .., + // "rank": .. + // }], + // "id": 1 + // } + // <-- {"jsonrpc": "2.0", "result": true, "id": 1} + pub async fn add(&self, params: Value) -> Result { + let req = jsonrpc::request(json!("add"), params); + self.request(json!(req)).await + } -// List tasks -// --> {"jsonrpc": "2.0", "method": "get_ids", "params": [], "id": 1} -// <-- {"jsonrpc": "2.0", "result": [task_id, ...], "id": 1} -pub async fn get_ids(url: &str, params: Value) -> Result { - let req = jsonrpc::request(json!("get_ids"), json!(params)); - request(req, url.to_string()).await -} + // List tasks + // --> {"jsonrpc": "2.0", "method": "get_ids", "params": [], "id": 1} + // <-- {"jsonrpc": "2.0", "result": [task_id, ...], "id": 1} + pub async fn get_ids(&self, params: Value) -> Result { + let req = jsonrpc::request(json!("get_ids"), json!(params)); + self.request(json!(req)).await + } -// Update task and returns `true` upon success. -// --> {"jsonrpc": "2.0", "method": "update", "params": [task_id, {"title": "new title"} ], "id": 1} -// <-- {"jsonrpc": "2.0", "result": true, "id": 1} -pub async fn update(url: &str, id: u64, data: Value) -> Result { - let req = jsonrpc::request(json!("update"), json!([id, data])); - request(req, url.to_string()).await -} + // Update task and returns `true` upon success. + // --> {"jsonrpc": "2.0", "method": "update", "params": [task_id, {"title": "new title"} ], "id": 1} + // <-- {"jsonrpc": "2.0", "result": true, "id": 1} + pub async fn update(&self, id: u64, data: Value) -> Result { + let req = jsonrpc::request(json!("update"), json!([id, data])); + self.request(json!(req)).await + } -// Set state for a task and returns `true` upon success. -// --> {"jsonrpc": "2.0", "method": "set_state", "params": [task_id, state], "id": 1} -// <-- {"jsonrpc": "2.0", "result": true, "id": 1} -pub async fn set_state(url: &str, id: u64, state: &str) -> Result { - let req = jsonrpc::request(json!("set_state"), json!([id, state])); - request(req, url.to_string()).await -} + // Set state for a task and returns `true` upon success. + // --> {"jsonrpc": "2.0", "method": "set_state", "params": [task_id, state], "id": 1} + // <-- {"jsonrpc": "2.0", "result": true, "id": 1} + pub async fn set_state(&self, id: u64, state: &str) -> Result { + let req = jsonrpc::request(json!("set_state"), json!([id, state])); + self.request(json!(req)).await + } -// Set comment for a task and returns `true` upon success. -// --> {"jsonrpc": "2.0", "method": "set_comment", "params": [task_id, comment_content], "id": 1} -// <-- {"jsonrpc": "2.0", "result": true, "id": 1} -pub async fn set_comment(url: &str, id: u64, content: &str) -> Result { - let req = jsonrpc::request(json!("set_comment"), json!([id, content])); - request(req, url.to_string()).await -} + // Set comment for a task and returns `true` upon success. + // --> {"jsonrpc": "2.0", "method": "set_comment", "params": [task_id, comment_content], "id": 1} + // <-- {"jsonrpc": "2.0", "result": true, "id": 1} + pub async fn set_comment(&self, id: u64, content: &str) -> Result { + let req = jsonrpc::request(json!("set_comment"), json!([id, content])); + self.request(json!(req)).await + } -// Get task by id. -// --> {"jsonrpc": "2.0", "method": "get_task_by_id", "params": [task_id], "id": 1} -// <-- {"jsonrpc": "2.0", "result": "task", "id": 1} -pub async fn get_task_by_id(url: &str, id: u64) -> Result { - let req = jsonrpc::request(json!("get_task_by_id"), json!([id])); - request(req, url.to_string()).await + // Get task by id. + // --> {"jsonrpc": "2.0", "method": "get_task_by_id", "params": [task_id], "id": 1} + // <-- {"jsonrpc": "2.0", "result": "task", "id": 1} + pub async fn get_task_by_id(&self, id: u64) -> Result { + let req = jsonrpc::request(json!("get_task_by_id"), json!([id])); + self.request(json!(req)).await + } } diff --git a/bin/tau/tau-cli/src/main.rs b/bin/tau/tau-cli/src/main.rs index 924f8f292..71e831b13 100644 --- a/bin/tau/tau-cli/src/main.rs +++ b/bin/tau/tau-cli/src/main.rs @@ -1,7 +1,12 @@ +use async_std::sync::Arc; + +use async_executor::Executor; use log::error; use serde_json::json; use simplelog::{ColorChoice, TermLogger, TerminalMode}; +use smol::future; use structopt_toml::StructOptToml; +use url::Url; use darkfi::{ util::{ @@ -19,18 +24,19 @@ mod util; mod view; use cli::CliTauSubCommands; +use jsonrpc::JsonRpcClient; use primitives::{TaskEvent, TaskInfo}; use util::{desc_in_editor, CONFIG_FILE, CONFIG_FILE_CONTENTS}; use view::{comments_as_string, print_list_of_task, print_task_info}; -async fn start(mut options: cli::CliTau) -> Result<()> { - let rpc_addr = &options.rpc_listen.clone(); +async fn start(mut options: cli::CliTau, executor: Arc>) -> Result<()> { + let rpc_client = JsonRpcClient::new(Url::parse(&options.rpc_listen)?, executor).await?; let states: Vec = vec!["stop".into(), "open".into(), "pause".into()]; match options.id { Some(id) if id.len() < 4 && id.parse::().is_ok() => { - let task = jsonrpc::get_task_by_id(rpc_addr, id.parse::().unwrap()).await?; + let task = rpc_client.get_task_by_id(id.parse::().unwrap()).await?; let taskinfo: TaskInfo = serde_json::from_value(task.clone())?; print_task_info(taskinfo)?; return Ok(()) @@ -51,25 +57,25 @@ async fn start(mut options: cli::CliTau) -> Result<()> { task.desc = desc_in_editor()?; }; - jsonrpc::add(rpc_addr, json!([task])).await?; + rpc_client.add(json!([task])).await?; } Some(CliTauSubCommands::Update { id, values }) => { let task = cli::task_from_cli_values(values)?; - jsonrpc::update(rpc_addr, id, json!([task])).await?; + rpc_client.update(id, json!([task])).await?; } Some(CliTauSubCommands::State { id, state }) => match state { Some(state) => { let state = state.trim().to_lowercase(); if states.contains(&state) { - jsonrpc::set_state(rpc_addr, id, &state).await?; + rpc_client.set_state(id, &state).await?; } else { error!("Task state could only be one of three states: open, pause or stop"); } } None => { - let task = jsonrpc::get_task_by_id(rpc_addr, id).await?; + let task = rpc_client.get_task_by_id(id).await?; let taskinfo: TaskInfo = serde_json::from_value(task.clone())?; let default_event = TaskEvent::default(); let state = &taskinfo.events.last().unwrap_or(&default_event).action; @@ -79,10 +85,10 @@ async fn start(mut options: cli::CliTau) -> Result<()> { Some(CliTauSubCommands::Comment { id, content }) => match content { Some(content) => { - jsonrpc::set_comment(rpc_addr, id, content.trim()).await?; + rpc_client.set_comment(id, content.trim()).await?; } None => { - let task = jsonrpc::get_task_by_id(rpc_addr, id).await?; + let task = rpc_client.get_task_by_id(id).await?; let taskinfo: TaskInfo = serde_json::from_value(task.clone())?; let comments = comments_as_string(taskinfo.comments); println!("Comments {}:\n{}", id, comments); @@ -90,12 +96,12 @@ async fn start(mut options: cli::CliTau) -> Result<()> { }, Some(CliTauSubCommands::List {}) | None => { - let task_ids = jsonrpc::get_ids(rpc_addr, json!([])).await?; + let task_ids = rpc_client.get_ids(json!([])).await?; let mut tasks: Vec = vec![]; if let Some(ids) = task_ids.as_array() { for id in ids { let id = if id.is_u64() { id.as_u64().unwrap() } else { continue }; - let task = jsonrpc::get_task_by_id(&rpc_addr, id).await?; + let task = rpc_client.get_task_by_id(id).await?; let taskinfo: TaskInfo = serde_json::from_value(task.clone())?; tasks.push(taskinfo); } @@ -109,8 +115,7 @@ async fn start(mut options: cli::CliTau) -> Result<()> { Ok(()) } -#[async_std::main] -async fn main() -> Result<()> { +fn main() -> Result<()> { let args = cli::CliTau::from_args_with_toml("").unwrap(); let cfg_path = get_config_path(args.config, CONFIG_FILE)?; spawn_config(&cfg_path, CONFIG_FILE_CONTENTS.as_bytes())?; @@ -119,5 +124,12 @@ async fn main() -> Result<()> { let (lvl, conf) = log_config(args.verbose.into())?; TermLogger::init(lvl, conf, TerminalMode::Mixed, ColorChoice::Auto)?; - start(args).await + let executor = Arc::new(Executor::new()); + + let task = executor.spawn(start(args, executor.clone())); + + // Run the executor until the task completes. + future::block_on(executor.run(task))?; + + Ok(()) }