bin/tau: catagorize methods

This commit is contained in:
Dastan-glitch
2022-05-25 21:55:16 +00:00
parent 28699968d8
commit 35fd132dae
6 changed files with 347 additions and 316 deletions

View File

@@ -0,0 +1,56 @@
use crypto_box::{aead::Aead, Box, SecretKey};
use log::{debug, error};
use rand::rngs::OsRng;
use darkfi::{
util::serial::{deserialize, serialize, SerialDecodable, SerialEncodable},
Error, Result,
};
use crate::task_info::TaskInfo;
#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
pub struct EncryptedTask {
nonce: Vec<u8>,
payload: Vec<u8>,
}
/// Encrypt a task given the task and a secret key.
pub fn encrypt_task(task: &TaskInfo, secret_key: &SecretKey) -> Result<EncryptedTask> {
debug!("start encrypting task");
// Get public key from secret key and create a new Salsa box
let public_key = secret_key.public_key();
let msg_box = Box::new(&public_key, secret_key);
// Generate a nonce and use it to encrypt serialized task (payload)
let nonce = crypto_box::generate_nonce(&mut OsRng);
let payload = &serialize(task)[..];
let payload = match msg_box.encrypt(&nonce, payload) {
Ok(p) => p,
Err(e) => {
error!("Unable to encrypt task: {}", e);
return Err(Error::OperationFailed)
}
};
let nonce = nonce.to_vec();
Ok(EncryptedTask { nonce, payload })
}
/// Decrypt a task given the encrypted task and the secret key (same used to encrypt).
pub fn decrypt_task(encrypted_task: &EncryptedTask, secret_key: &SecretKey) -> Option<TaskInfo> {
debug!("start decrypting task");
// Get public key from secret key and create a new Salsa box
let public_key = secret_key.public_key();
let msg_box = Box::new(&public_key, secret_key);
// Extract the nonce nad use it to decrypt the payload
let nonce = encrypted_task.nonce.as_slice();
let decrypted_task = match msg_box.decrypt(nonce.into(), &encrypted_task.payload[..]) {
Ok(m) => m,
Err(_) => return None,
};
// Deserialize to get the task
deserialize(&decrypted_task).ok()
}

View File

@@ -1,273 +0,0 @@
use std::path::PathBuf;
use async_trait::async_trait;
use log::debug;
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
use darkfi::{
rpc::{
jsonrpc::{ErrorCode, JsonError, JsonRequest, JsonResult},
server::RequestHandler,
},
util::Timestamp,
Error,
};
use crate::{
error::{to_json_result, TaudError, TaudResult},
month_tasks::MonthTasks,
task_info::{Comment, TaskInfo},
};
pub struct JsonRpcInterface {
dataset_path: PathBuf,
notify_queue_sender: async_channel::Sender<Option<TaskInfo>>,
nickname: String,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
struct BaseTaskInfo {
title: String,
desc: String,
assign: Vec<String>,
project: Vec<String>,
due: Option<Timestamp>,
rank: Option<f32>,
}
// TODO: Make more like RPC in darkfid, this implies the method categories,
// and function signatures, and safety checks.
#[async_trait]
impl RequestHandler for JsonRpcInterface {
async fn handle_request(&self, req: JsonRequest) -> JsonResult {
if req.params.as_array().is_none() {
return JsonError::new(ErrorCode::InvalidParams, None, req.id).into()
}
if self.notify_queue_sender.send(None).await.is_err() {
return JsonError::new(ErrorCode::InternalError, None, req.id).into()
}
let rep = match req.method.as_str() {
Some("add") => self.add(req.params).await,
Some("get_ids") => self.get_ids(req.params).await,
Some("update") => self.update(req.params).await,
Some("set_state") => self.set_state(req.params).await,
Some("set_comment") => self.set_comment(req.params).await,
Some("get_task_by_id") => self.get_task_by_id(req.params).await,
Some(_) | None => return JsonError::new(ErrorCode::MethodNotFound, None, req.id).into(),
};
to_json_result(rep, req.id)
}
}
impl JsonRpcInterface {
pub fn new(
notify_queue_sender: async_channel::Sender<Option<TaskInfo>>,
dataset_path: PathBuf,
nickname: String,
) -> Self {
Self { notify_queue_sender, dataset_path, nickname }
}
// RPCAPI:
// Add new task and returns `true` upon success.
// --> {"jsonrpc": "2.0", "method": "add",
// "params":
// [{
// "title": "..",
// "desc": "..",
// assign: [..],
// project: [..],
// "due": ..,
// "rank": ..
// }],
// "id": 1
// }
// <-- {"jsonrpc": "2.0", "result": true, "id": 1}
async fn add(&self, params: Value) -> TaudResult<Value> {
debug!(target: "tau", "JsonRpc::add() params {}", params);
let args = params.as_array().unwrap();
let task: BaseTaskInfo = serde_json::from_value(args[0].clone())?;
let mut new_task: TaskInfo = TaskInfo::new(
&task.title,
&task.desc,
&self.nickname,
task.due,
task.rank.unwrap_or(0.0),
&self.dataset_path,
)?;
new_task.set_project(&task.project);
new_task.set_assign(&task.assign);
self.notify_queue_sender.send(Some(new_task)).await.map_err(Error::from)?;
Ok(json!(true))
}
// RPCAPI:
// List tasks
// --> {"jsonrpc": "2.0", "method": "get_ids", "params": [], "id": 1}
// <-- {"jsonrpc": "2.0", "result": [task_id, ...], "id": 1}
async fn get_ids(&self, params: Value) -> TaudResult<Value> {
debug!(target: "tau", "JsonRpc::get_ids() params {}", params);
let tasks = MonthTasks::load_current_open_tasks(&self.dataset_path)?;
let task_ids: Vec<u32> = tasks.iter().map(|task| task.get_id()).collect();
Ok(json!(task_ids))
}
// RPCAPI:
// Update task and returns `true` upon success.
// --> {"jsonrpc": "2.0", "method": "update", "params": [task_id, {"title": "new title"} ], "id": 1}
// <-- {"jsonrpc": "2.0", "result": true, "id": 1}
async fn update(&self, params: Value) -> TaudResult<Value> {
debug!(target: "tau", "JsonRpc::update() params {}", params);
let args = params.as_array().unwrap();
if args.len() != 2 {
return Err(TaudError::InvalidData("len of params should be 2".into()))
}
let task = self.check_params_for_update(&args[0], &args[1])?;
self.notify_queue_sender.send(Some(task)).await.map_err(Error::from)?;
Ok(json!(true))
}
// RPCAPI:
// Set state for a task and returns `true` upon success.
// --> {"jsonrpc": "2.0", "method": "set_state", "params": [task_id, state], "id": 1}
// <-- {"jsonrpc": "2.0", "result": true, "id": 1}
async fn set_state(&self, params: Value) -> TaudResult<Value> {
// TODO: BUG: Validate that the state string is correct and not something arbitrary
debug!(target: "tau", "JsonRpc::set_state() params {}", params);
let args = params.as_array().unwrap();
if args.len() != 2 {
return Err(TaudError::InvalidData("len of params should be 2".into()))
}
let state: String = serde_json::from_value(args[1].clone())?;
let mut task: TaskInfo = self.load_task_by_id(&args[0])?;
task.set_state(&state);
self.notify_queue_sender.send(Some(task)).await.map_err(Error::from)?;
Ok(json!(true))
}
// RPCAPI:
// Set comment for a task and returns `true` upon success.
// --> {"jsonrpc": "2.0", "method": "set_comment", "params": [task_id, comment_content], "id": 1}
// <-- {"jsonrpc": "2.0", "result": true, "id": 1}
async fn set_comment(&self, params: Value) -> TaudResult<Value> {
debug!(target: "tau", "JsonRpc::set_comment() params {}", params);
let args = params.as_array().unwrap();
if args.len() != 2 {
return Err(TaudError::InvalidData("len of params should be 3".into()))
}
let comment_content: String = serde_json::from_value(args[1].clone())?;
let mut task: TaskInfo = self.load_task_by_id(&args[0])?;
task.set_comment(Comment::new(&comment_content, &self.nickname));
self.notify_queue_sender.send(Some(task)).await.map_err(Error::from)?;
Ok(json!(true))
}
// RPCAPI:
// Get a task by id.
// --> {"jsonrpc": "2.0", "method": "get_task_by_id", "params": [task_id], "id": 1}
// <-- {"jsonrpc": "2.0", "result": "task", "id": 1}
async fn get_task_by_id(&self, params: Value) -> TaudResult<Value> {
debug!(target: "tau", "JsonRpc::get_task_by_id() params {}", params);
let args = params.as_array().unwrap();
if args.len() != 1 {
return Err(TaudError::InvalidData("len of params should be 1".into()))
}
let task: TaskInfo = self.load_task_by_id(&args[0])?;
Ok(json!(task))
}
fn load_task_by_id(&self, task_id: &Value) -> TaudResult<TaskInfo> {
let task_id: u64 = serde_json::from_value(task_id.clone())?;
let tasks = MonthTasks::load_current_open_tasks(&self.dataset_path)?;
let task = tasks.into_iter().find(|t| (t.get_id() as u64) == task_id);
task.ok_or(TaudError::InvalidId)
}
fn check_params_for_update(&self, task_id: &Value, fields: &Value) -> TaudResult<TaskInfo> {
let mut task: TaskInfo = self.load_task_by_id(task_id)?;
if !fields.is_object() {
return Err(TaudError::InvalidData("Invalid task's data".into()))
}
let fields = fields.as_object().unwrap();
if fields.contains_key("title") {
let title = fields.get("title").unwrap().clone();
let title: String = serde_json::from_value(title)?;
if !title.is_empty() {
task.set_title(&title);
}
}
if fields.contains_key("desc") {
let description = fields.get("description");
if let Some(description) = description {
let description: String = serde_json::from_value(description.clone())?;
task.set_desc(&description);
}
}
if fields.contains_key("rank") {
let rank_opt = fields.get("rank");
if let Some(rank) = rank_opt {
let rank: Option<f32> = serde_json::from_value(rank.clone())?;
if let Some(r) = rank {
task.set_rank(r);
}
}
}
if fields.contains_key("due") {
let due = fields.get("due").unwrap().clone();
let due: Option<Option<Timestamp>> = serde_json::from_value(due)?;
if let Some(d) = due {
task.set_due(d);
}
}
if fields.contains_key("assign") {
let assign = fields.get("assign").unwrap().clone();
let assign: Vec<String> = serde_json::from_value(assign)?;
if !assign.is_empty() {
task.set_assign(&assign);
}
}
if fields.contains_key("project") {
let project = fields.get("project").unwrap().clone();
let project: Vec<String> = serde_json::from_value(project)?;
if !project.is_empty() {
task.set_project(&project);
}
}
Ok(task)
}
}

View File

@@ -1,11 +1,14 @@
use async_std::sync::{Arc, Mutex};
use std::{env, fs::create_dir_all};
use std::{env, fs::create_dir_all, path::PathBuf};
use async_executor::Executor;
use crypto_box::{aead::Aead, Box, SecretKey, KEY_SIZE};
use async_trait::async_trait;
use crypto::EncryptedTask;
use crypto_box::{SecretKey, KEY_SIZE};
use easy_parallel::Parallel;
use futures::{select, FutureExt};
use log::{debug, error, info, warn};
use log::{error, info, warn};
use serde_json::Value;
use simplelog::{ColorChoice, TermLogger, TerminalMode};
use smol::future;
use structopt_toml::StructOptToml;
@@ -14,72 +17,85 @@ use url::Url;
use darkfi::{
async_daemonize, net,
raft::{NetMsg, ProtocolRaft, Raft},
rpc::server::listen_and_serve,
rpc::{
jsonrpc::{ErrorCode, JsonError, JsonRequest, JsonResult},
server::{listen_and_serve, RequestHandler},
},
util::{
cli::{log_config, spawn_config},
expand_path,
path::get_config_path,
serial::{deserialize, serialize, SerialDecodable, SerialEncodable},
},
Error, Result,
};
mod crypto;
mod error;
mod jsonrpc;
mod month_tasks;
mod rpc_add;
mod rpc_get;
mod rpc_update;
mod settings;
mod task_info;
mod util;
use crate::{
error::TaudResult,
jsonrpc::JsonRpcInterface,
crypto::{decrypt_task, encrypt_task},
error::{to_json_result, TaudError, TaudResult},
month_tasks::MonthTasks,
settings::{Args, CONFIG_FILE, CONFIG_FILE_CONTENTS},
task_info::TaskInfo,
util::{load, save},
};
#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
pub struct EncryptedTask {
nonce: Vec<u8>,
payload: Vec<u8>,
pub struct JsonRpcInterface {
dataset_path: PathBuf,
notify_queue_sender: async_channel::Sender<Option<TaskInfo>>,
nickname: String,
}
fn encrypt_task(
task: &TaskInfo,
secret_key: &SecretKey,
rng: &mut crypto_box::rand_core::OsRng,
) -> Result<EncryptedTask> {
debug!("start encrypting task");
let public_key = secret_key.public_key();
let msg_box = Box::new(&public_key, secret_key);
let nonce = crypto_box::generate_nonce(rng);
let payload = &serialize(task)[..];
let payload = match msg_box.encrypt(&nonce, payload) {
Ok(p) => p,
Err(e) => {
error!("Unable to encrypt task: {}", e);
return Err(Error::OperationFailed)
#[async_trait]
impl RequestHandler for JsonRpcInterface {
async fn handle_request(&self, req: JsonRequest) -> JsonResult {
if !req.params.is_array() {
return JsonError::new(ErrorCode::InvalidParams, None, req.id).into()
}
};
let nonce = nonce.to_vec();
Ok(EncryptedTask { nonce, payload })
if self.notify_queue_sender.send(None).await.is_err() {
return JsonError::new(ErrorCode::InternalError, None, req.id).into()
}
let rep = match req.method.as_str() {
Some("add") => self.add(req.params).await,
Some("update") => self.update(req.params).await,
Some("get_ids") => self.get_ids(req.params).await,
Some("set_state") => self.set_state(req.params).await,
Some("set_comment") => self.set_comment(req.params).await,
Some("get_task_by_id") => self.get_task_by_id(req.params).await,
Some(_) | None => return JsonError::new(ErrorCode::MethodNotFound, None, req.id).into(),
};
to_json_result(rep, req.id)
}
}
fn decrypt_task(encrypt_task: &EncryptedTask, secret_key: &SecretKey) -> Option<TaskInfo> {
debug!("start decrypting task");
let public_key = secret_key.public_key();
let msg_box = Box::new(&public_key, secret_key);
impl JsonRpcInterface {
pub fn new(
notify_queue_sender: async_channel::Sender<Option<TaskInfo>>,
dataset_path: PathBuf,
nickname: String,
) -> Self {
Self { notify_queue_sender, dataset_path, nickname }
}
let nonce = encrypt_task.nonce.as_slice();
let decrypted_task = match msg_box.decrypt(nonce.into(), &encrypt_task.payload[..]) {
Ok(m) => m,
Err(_) => return None,
};
pub fn load_task_by_id(&self, task_id: &Value) -> TaudResult<TaskInfo> {
let task_id: u64 = serde_json::from_value(task_id.clone())?;
deserialize(&decrypted_task).ok()
let tasks = MonthTasks::load_current_open_tasks(&self.dataset_path)?;
let task = tasks.into_iter().find(|t| (t.get_id() as u64) == task_id);
task.ok_or(TaudError::InvalidId)
}
}
async_daemonize!(realmain);
@@ -139,7 +155,7 @@ async fn realmain(settings: Args, executor: Arc<Executor<'_>>) -> Result<()> {
let net_settings = settings.net;
//
//Raft
// Raft
//
let datastore_raft = datastore_path.join("tau.db");
let mut raft = Raft::<EncryptedTask>::new(net_settings.inbound.clone(), datastore_raft)?;
@@ -156,7 +172,7 @@ async fn realmain(settings: Args, executor: Arc<Executor<'_>>) -> Result<()> {
let task = task.map_err(Error::from)?;
if let Some(tk) = task {
info!(target: "tau", "save the received task {:?}", tk);
let encrypted_task = encrypt_task(&tk, &secret_key,&mut rng)?;
let encrypted_task = encrypt_task(&tk, &secret_key)?;
tk.save(&datastore_path_cloned)?;
raft_sender.send(encrypted_task).await.map_err(Error::from)?;
}

View File

@@ -0,0 +1,55 @@
use log::debug;
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
use darkfi::{util::Timestamp, Error};
use crate::{error::TaudResult, task_info::TaskInfo, JsonRpcInterface};
#[derive(Clone, Debug, Serialize, Deserialize)]
struct BaseTaskInfo {
title: String,
desc: String,
assign: Vec<String>,
project: Vec<String>,
due: Option<Timestamp>,
rank: Option<f32>,
}
impl JsonRpcInterface {
// RPCAPI:
// Add new task and returns `true` upon success.
// --> {"jsonrpc": "2.0", "method": "add",
// "params":
// [{
// "title": "..",
// "desc": "..",
// assign: [..],
// project: [..],
// "due": ..,
// "rank": ..
// }],
// "id": 1
// }
// <-- {"jsonrpc": "2.0", "result": true, "id": 1}
pub async fn add(&self, params: Value) -> TaudResult<Value> {
debug!(target: "tau", "JsonRpc::add() params {}", params);
let args = params.as_array().unwrap();
let task: BaseTaskInfo = serde_json::from_value(args[0].clone())?;
let mut new_task: TaskInfo = TaskInfo::new(
&task.title,
&task.desc,
&self.nickname,
task.due,
task.rank.unwrap_or(0.0),
&self.dataset_path,
)?;
new_task.set_project(&task.project);
new_task.set_assign(&task.assign);
self.notify_queue_sender.send(Some(new_task)).await.map_err(Error::from)?;
Ok(json!(true))
}
}

View File

@@ -0,0 +1,39 @@
use log::debug;
use serde_json::{json, Value};
use crate::{
error::{TaudError, TaudResult},
month_tasks::MonthTasks,
task_info::TaskInfo,
JsonRpcInterface,
};
impl JsonRpcInterface {
// RPCAPI:
// List tasks
// --> {"jsonrpc": "2.0", "method": "get_ids", "params": [], "id": 1}
// <-- {"jsonrpc": "2.0", "result": [task_id, ...], "id": 1}
pub async fn get_ids(&self, params: Value) -> TaudResult<Value> {
debug!(target: "tau", "JsonRpc::get_ids() params {}", params);
let tasks = MonthTasks::load_current_open_tasks(&self.dataset_path)?;
let task_ids: Vec<u32> = tasks.iter().map(|task| task.get_id()).collect();
Ok(json!(task_ids))
}
// RPCAPI:
// Get a task by id.
// --> {"jsonrpc": "2.0", "method": "get_task_by_id", "params": [task_id], "id": 1}
// <-- {"jsonrpc": "2.0", "result": "task", "id": 1}
pub async fn get_task_by_id(&self, params: Value) -> TaudResult<Value> {
debug!(target: "tau", "JsonRpc::get_task_by_id() params {}", params);
let args = params.as_array().unwrap();
if args.len() != 1 {
return Err(TaudError::InvalidData("len of params should be 1".into()))
}
let task: TaskInfo = self.load_task_by_id(&args[0])?;
Ok(json!(task))
}
}

View File

@@ -0,0 +1,138 @@
use log::debug;
use serde_json::{json, Value};
use darkfi::{util::Timestamp, Error};
use crate::{
error::{TaudError, TaudResult},
task_info::{Comment, TaskInfo},
JsonRpcInterface,
};
impl JsonRpcInterface {
// RPCAPI:
// Update task and returns `true` upon success.
// --> {"jsonrpc": "2.0", "method": "update", "params": [task_id, {"title": "new title"} ], "id": 1}
// <-- {"jsonrpc": "2.0", "result": true, "id": 1}
pub async fn update(&self, params: Value) -> TaudResult<Value> {
debug!(target: "tau", "JsonRpc::update() params {}", params);
let args = params.as_array().unwrap();
if args.len() != 2 {
return Err(TaudError::InvalidData("len of params should be 2".into()))
}
let task = self.check_params_for_update(&args[0], &args[1])?;
self.notify_queue_sender.send(Some(task)).await.map_err(Error::from)?;
Ok(json!(true))
}
// RPCAPI:
// Set state for a task and returns `true` upon success.
// --> {"jsonrpc": "2.0", "method": "set_state", "params": [task_id, state], "id": 1}
// <-- {"jsonrpc": "2.0", "result": true, "id": 1}
pub async fn set_state(&self, params: Value) -> TaudResult<Value> {
// TODO: BUG: Validate that the state string is correct and not something arbitrary
debug!(target: "tau", "JsonRpc::set_state() params {}", params);
let args = params.as_array().unwrap();
if args.len() != 2 {
return Err(TaudError::InvalidData("len of params should be 2".into()))
}
let state: String = serde_json::from_value(args[1].clone())?;
let mut task: TaskInfo = self.load_task_by_id(&args[0])?;
task.set_state(&state);
self.notify_queue_sender.send(Some(task)).await.map_err(Error::from)?;
Ok(json!(true))
}
// RPCAPI:
// Set comment for a task and returns `true` upon success.
// --> {"jsonrpc": "2.0", "method": "set_comment", "params": [task_id, comment_content], "id": 1}
// <-- {"jsonrpc": "2.0", "result": true, "id": 1}
pub async fn set_comment(&self, params: Value) -> TaudResult<Value> {
debug!(target: "tau", "JsonRpc::set_comment() params {}", params);
let args = params.as_array().unwrap();
if args.len() != 2 {
return Err(TaudError::InvalidData("len of params should be 3".into()))
}
let comment_content: String = serde_json::from_value(args[1].clone())?;
let mut task: TaskInfo = self.load_task_by_id(&args[0])?;
task.set_comment(Comment::new(&comment_content, &self.nickname));
self.notify_queue_sender.send(Some(task)).await.map_err(Error::from)?;
Ok(json!(true))
}
fn check_params_for_update(&self, task_id: &Value, fields: &Value) -> TaudResult<TaskInfo> {
let mut task: TaskInfo = self.load_task_by_id(task_id)?;
if !fields.is_object() {
return Err(TaudError::InvalidData("Invalid task's data".into()))
}
let fields = fields.as_object().unwrap();
if fields.contains_key("title") {
let title = fields.get("title").unwrap().clone();
let title: String = serde_json::from_value(title)?;
if !title.is_empty() {
task.set_title(&title);
}
}
if fields.contains_key("desc") {
let description = fields.get("description");
if let Some(description) = description {
let description: String = serde_json::from_value(description.clone())?;
task.set_desc(&description);
}
}
if fields.contains_key("rank") {
let rank_opt = fields.get("rank");
if let Some(rank) = rank_opt {
let rank: Option<f32> = serde_json::from_value(rank.clone())?;
if let Some(r) = rank {
task.set_rank(r);
}
}
}
if fields.contains_key("due") {
let due = fields.get("due").unwrap().clone();
let due: Option<Option<Timestamp>> = serde_json::from_value(due)?;
if let Some(d) = due {
task.set_due(d);
}
}
if fields.contains_key("assign") {
let assign = fields.get("assign").unwrap().clone();
let assign: Vec<String> = serde_json::from_value(assign)?;
if !assign.is_empty() {
task.set_assign(&assign);
}
}
if fields.contains_key("project") {
let project = fields.get("project").unwrap().clone();
let project: Vec<String> = serde_json::from_value(project)?;
if !project.is_empty() {
task.set_project(&project);
}
}
Ok(task)
}
}