tau-cli: use open_channels function to communicate to rpc server

This commit is contained in:
ghassmo
2022-05-13 05:10:27 +03:00
parent 1d6e92bf55
commit 9d57fd368d
2 changed files with 110 additions and 85 deletions

View File

@@ -1,3 +1,6 @@
use async_std::sync::Arc;
use async_executor::Executor;
use log::debug;
use serde_json::{json, Value};
use url::Url;
@@ -7,86 +10,96 @@ use darkfi::{
Error, Result,
};
pub async fn request(r: jsonrpc::JsonRequest, url: String) -> Result<Value> {
let reply: JsonResult = match jsonrpc::send_request(&Url::parse(&url)?, json!(r)).await {
Ok(v) => v,
Err(e) => return Err(e),
};
pub struct JsonRpcClient {
sender: async_channel::Sender<Value>,
receiver: async_channel::Receiver<JsonResult>,
}
match reply {
JsonResult::Resp(r) => {
debug!(target: "RPC", "<-- {}", serde_json::to_string(&r)?);
Ok(r.result)
}
impl JsonRpcClient {
pub async fn new(url: Url, executor: Arc<Executor<'_>>) -> Result<Self> {
let (sender, receiver) = jsonrpc::open_channels(&url, executor).await?;
Ok(Self { sender, receiver })
}
JsonResult::Err(e) => {
debug!(target: "RPC", "<-- {}", serde_json::to_string(&e)?);
Err(Error::JsonRpcError(e.error.message.to_string()))
}
async fn request(&self, value: Value) -> Result<Value> {
self.sender.send(value).await?;
let reply: JsonResult = self.receiver.recv().await?;
JsonResult::Notif(n) => {
debug!(target: "RPC", "<-- {}", serde_json::to_string(&n)?);
Err(Error::JsonRpcError("Unexpected reply".to_string()))
match reply {
JsonResult::Resp(r) => {
debug!(target: "RPC", "<-- {}", serde_json::to_string(&r)?);
Ok(r.result)
}
JsonResult::Err(e) => {
debug!(target: "RPC", "<-- {}", serde_json::to_string(&e)?);
Err(Error::JsonRpcError(e.error.message.to_string()))
}
JsonResult::Notif(n) => {
debug!(target: "RPC", "<-- {}", serde_json::to_string(&n)?);
Err(Error::JsonRpcError("Unexpected reply".to_string()))
}
}
}
}
// RPCAPI:
// Add new task and returns `true` upon success.
// --> {"jsonrpc": "2.0", "method": "add",
// "params":
// [{
// "title": "..",
// "desc": "..",
// assign: [..],
// project: [..],
// "due": ..,
// "rank": ..
// }],
// "id": 1
// }
// <-- {"jsonrpc": "2.0", "result": true, "id": 1}
pub async fn add(url: &str, params: Value) -> Result<Value> {
let req = jsonrpc::request(json!("add"), params);
request(req, url.to_string()).await
}
// RPCAPI:
// Add new task and returns `true` upon success.
// --> {"jsonrpc": "2.0", "method": "add",
// "params":
// [{
// "title": "..",
// "desc": "..",
// assign: [..],
// project: [..],
// "due": ..,
// "rank": ..
// }],
// "id": 1
// }
// <-- {"jsonrpc": "2.0", "result": true, "id": 1}
pub async fn add(&self, params: Value) -> Result<Value> {
let req = jsonrpc::request(json!("add"), params);
self.request(json!(req)).await
}
// List tasks
// --> {"jsonrpc": "2.0", "method": "get_ids", "params": [], "id": 1}
// <-- {"jsonrpc": "2.0", "result": [task_id, ...], "id": 1}
pub async fn get_ids(url: &str, params: Value) -> Result<Value> {
let req = jsonrpc::request(json!("get_ids"), json!(params));
request(req, url.to_string()).await
}
// List tasks
// --> {"jsonrpc": "2.0", "method": "get_ids", "params": [], "id": 1}
// <-- {"jsonrpc": "2.0", "result": [task_id, ...], "id": 1}
pub async fn get_ids(&self, params: Value) -> Result<Value> {
let req = jsonrpc::request(json!("get_ids"), json!(params));
self.request(json!(req)).await
}
// Update task and returns `true` upon success.
// --> {"jsonrpc": "2.0", "method": "update", "params": [task_id, {"title": "new title"} ], "id": 1}
// <-- {"jsonrpc": "2.0", "result": true, "id": 1}
pub async fn update(url: &str, id: u64, data: Value) -> Result<Value> {
let req = jsonrpc::request(json!("update"), json!([id, data]));
request(req, url.to_string()).await
}
// Update task and returns `true` upon success.
// --> {"jsonrpc": "2.0", "method": "update", "params": [task_id, {"title": "new title"} ], "id": 1}
// <-- {"jsonrpc": "2.0", "result": true, "id": 1}
pub async fn update(&self, id: u64, data: Value) -> Result<Value> {
let req = jsonrpc::request(json!("update"), json!([id, data]));
self.request(json!(req)).await
}
// Set state for a task and returns `true` upon success.
// --> {"jsonrpc": "2.0", "method": "set_state", "params": [task_id, state], "id": 1}
// <-- {"jsonrpc": "2.0", "result": true, "id": 1}
pub async fn set_state(url: &str, id: u64, state: &str) -> Result<Value> {
let req = jsonrpc::request(json!("set_state"), json!([id, state]));
request(req, url.to_string()).await
}
// Set state for a task and returns `true` upon success.
// --> {"jsonrpc": "2.0", "method": "set_state", "params": [task_id, state], "id": 1}
// <-- {"jsonrpc": "2.0", "result": true, "id": 1}
pub async fn set_state(&self, id: u64, state: &str) -> Result<Value> {
let req = jsonrpc::request(json!("set_state"), json!([id, state]));
self.request(json!(req)).await
}
// Set comment for a task and returns `true` upon success.
// --> {"jsonrpc": "2.0", "method": "set_comment", "params": [task_id, comment_content], "id": 1}
// <-- {"jsonrpc": "2.0", "result": true, "id": 1}
pub async fn set_comment(url: &str, id: u64, content: &str) -> Result<Value> {
let req = jsonrpc::request(json!("set_comment"), json!([id, content]));
request(req, url.to_string()).await
}
// Set comment for a task and returns `true` upon success.
// --> {"jsonrpc": "2.0", "method": "set_comment", "params": [task_id, comment_content], "id": 1}
// <-- {"jsonrpc": "2.0", "result": true, "id": 1}
pub async fn set_comment(&self, id: u64, content: &str) -> Result<Value> {
let req = jsonrpc::request(json!("set_comment"), json!([id, content]));
self.request(json!(req)).await
}
// Get task by id.
// --> {"jsonrpc": "2.0", "method": "get_task_by_id", "params": [task_id], "id": 1}
// <-- {"jsonrpc": "2.0", "result": "task", "id": 1}
pub async fn get_task_by_id(url: &str, id: u64) -> Result<Value> {
let req = jsonrpc::request(json!("get_task_by_id"), json!([id]));
request(req, url.to_string()).await
// Get task by id.
// --> {"jsonrpc": "2.0", "method": "get_task_by_id", "params": [task_id], "id": 1}
// <-- {"jsonrpc": "2.0", "result": "task", "id": 1}
pub async fn get_task_by_id(&self, id: u64) -> Result<Value> {
let req = jsonrpc::request(json!("get_task_by_id"), json!([id]));
self.request(json!(req)).await
}
}

View File

@@ -1,7 +1,12 @@
use async_std::sync::Arc;
use async_executor::Executor;
use log::error;
use serde_json::json;
use simplelog::{ColorChoice, TermLogger, TerminalMode};
use smol::future;
use structopt_toml::StructOptToml;
use url::Url;
use darkfi::{
util::{
@@ -19,18 +24,19 @@ mod util;
mod view;
use cli::CliTauSubCommands;
use jsonrpc::JsonRpcClient;
use primitives::{TaskEvent, TaskInfo};
use util::{desc_in_editor, CONFIG_FILE, CONFIG_FILE_CONTENTS};
use view::{comments_as_string, print_list_of_task, print_task_info};
async fn start(mut options: cli::CliTau) -> Result<()> {
let rpc_addr = &options.rpc_listen.clone();
async fn start(mut options: cli::CliTau, executor: Arc<Executor<'_>>) -> Result<()> {
let rpc_client = JsonRpcClient::new(Url::parse(&options.rpc_listen)?, executor).await?;
let states: Vec<String> = vec!["stop".into(), "open".into(), "pause".into()];
match options.id {
Some(id) if id.len() < 4 && id.parse::<u64>().is_ok() => {
let task = jsonrpc::get_task_by_id(rpc_addr, id.parse::<u64>().unwrap()).await?;
let task = rpc_client.get_task_by_id(id.parse::<u64>().unwrap()).await?;
let taskinfo: TaskInfo = serde_json::from_value(task.clone())?;
print_task_info(taskinfo)?;
return Ok(())
@@ -51,25 +57,25 @@ async fn start(mut options: cli::CliTau) -> Result<()> {
task.desc = desc_in_editor()?;
};
jsonrpc::add(rpc_addr, json!([task])).await?;
rpc_client.add(json!([task])).await?;
}
Some(CliTauSubCommands::Update { id, values }) => {
let task = cli::task_from_cli_values(values)?;
jsonrpc::update(rpc_addr, id, json!([task])).await?;
rpc_client.update(id, json!([task])).await?;
}
Some(CliTauSubCommands::State { id, state }) => match state {
Some(state) => {
let state = state.trim().to_lowercase();
if states.contains(&state) {
jsonrpc::set_state(rpc_addr, id, &state).await?;
rpc_client.set_state(id, &state).await?;
} else {
error!("Task state could only be one of three states: open, pause or stop");
}
}
None => {
let task = jsonrpc::get_task_by_id(rpc_addr, id).await?;
let task = rpc_client.get_task_by_id(id).await?;
let taskinfo: TaskInfo = serde_json::from_value(task.clone())?;
let default_event = TaskEvent::default();
let state = &taskinfo.events.last().unwrap_or(&default_event).action;
@@ -79,10 +85,10 @@ async fn start(mut options: cli::CliTau) -> Result<()> {
Some(CliTauSubCommands::Comment { id, content }) => match content {
Some(content) => {
jsonrpc::set_comment(rpc_addr, id, content.trim()).await?;
rpc_client.set_comment(id, content.trim()).await?;
}
None => {
let task = jsonrpc::get_task_by_id(rpc_addr, id).await?;
let task = rpc_client.get_task_by_id(id).await?;
let taskinfo: TaskInfo = serde_json::from_value(task.clone())?;
let comments = comments_as_string(taskinfo.comments);
println!("Comments {}:\n{}", id, comments);
@@ -90,12 +96,12 @@ async fn start(mut options: cli::CliTau) -> Result<()> {
},
Some(CliTauSubCommands::List {}) | None => {
let task_ids = jsonrpc::get_ids(rpc_addr, json!([])).await?;
let task_ids = rpc_client.get_ids(json!([])).await?;
let mut tasks: Vec<TaskInfo> = vec![];
if let Some(ids) = task_ids.as_array() {
for id in ids {
let id = if id.is_u64() { id.as_u64().unwrap() } else { continue };
let task = jsonrpc::get_task_by_id(&rpc_addr, id).await?;
let task = rpc_client.get_task_by_id(id).await?;
let taskinfo: TaskInfo = serde_json::from_value(task.clone())?;
tasks.push(taskinfo);
}
@@ -109,8 +115,7 @@ async fn start(mut options: cli::CliTau) -> Result<()> {
Ok(())
}
#[async_std::main]
async fn main() -> Result<()> {
fn main() -> Result<()> {
let args = cli::CliTau::from_args_with_toml("").unwrap();
let cfg_path = get_config_path(args.config, CONFIG_FILE)?;
spawn_config(&cfg_path, CONFIG_FILE_CONTENTS.as_bytes())?;
@@ -119,5 +124,12 @@ async fn main() -> Result<()> {
let (lvl, conf) = log_config(args.verbose.into())?;
TermLogger::init(lvl, conf, TerminalMode::Mixed, ColorChoice::Auto)?;
start(args).await
let executor = Arc::new(Executor::new());
let task = executor.spawn(start(args, executor.clone()));
// Run the executor until the task completes.
future::block_on(executor.run(task))?;
Ok(())
}