diff --git a/bin/tau/taud/src/error.rs b/bin/tau/taud/src/error.rs index 63518ef82..313e21dd8 100644 --- a/bin/tau/taud/src/error.rs +++ b/bin/tau/taud/src/error.rs @@ -1,3 +1,7 @@ +use serde_json::Value; + +use darkfi::rpc::jsonrpc::{error as jsonerr, response as jsonresp, ErrorCode, JsonResult}; + #[derive(Debug, thiserror::Error)] pub enum TaudError { #[error("Due timestamp invalid")] @@ -6,7 +10,7 @@ pub enum TaudError { InvalidId, #[error("Invalid Data/Params: `{0}` ")] InvalidData(String), - #[error(transparent)] + #[error("InternalError")] Darkfi(#[from] darkfi::error::Error), #[error("Json serialization error: `{0}`")] SerdeJsonError(String), @@ -19,3 +23,27 @@ impl From for TaudError { TaudError::SerdeJsonError(err.to_string()) } } + +pub fn to_json_result(res: TaudResult, id: Value) -> JsonResult { + match res { + Ok(v) => JsonResult::Resp(jsonresp(v, id)), + Err(err) => match err { + TaudError::InvalidId => JsonResult::Err(jsonerr( + ErrorCode::InvalidParams, + Some("invalid task's id".into()), + id, + )), + TaudError::InvalidData(e) | TaudError::SerdeJsonError(e) => { + JsonResult::Err(jsonerr(ErrorCode::InvalidParams, Some(e), id)) + } + TaudError::InvalidDueTime => JsonResult::Err(jsonerr( + ErrorCode::InvalidParams, + Some("invalid due time".into()), + id, + )), + TaudError::Darkfi(e) => { + JsonResult::Err(jsonerr(ErrorCode::InternalError, Some(e.to_string()), id)) + } + }, + } +} diff --git a/bin/tau/taud/src/jsonrpc.rs b/bin/tau/taud/src/jsonrpc.rs index a849b30d9..f33e14971 100644 --- a/bin/tau/taud/src/jsonrpc.rs +++ b/bin/tau/taud/src/jsonrpc.rs @@ -8,14 +8,14 @@ use serde_json::{json, Value}; use darkfi::{ rpc::{ - jsonrpc::{error as jsonerr, response as jsonresp, ErrorCode, JsonRequest, JsonResult}, + jsonrpc::{error as jsonerr, ErrorCode, JsonRequest, JsonResult}, rpcserver::RequestHandler, }, Error, }; use crate::{ - error::{TaudError, TaudResult}, + error::{to_json_result, TaudError, TaudResult}, month_tasks::MonthTasks, task_info::{Comment, TaskInfo}, util::{Settings, Timestamp}, @@ -23,7 +23,7 @@ use crate::{ pub struct JsonRpcInterface { settings: Settings, - notify_queue_sender: async_channel::Sender, + notify_queue_sender: async_channel::Sender>, } #[derive(Clone, Debug, Serialize, Deserialize)] @@ -43,6 +43,10 @@ impl RequestHandler for JsonRpcInterface { return JsonResult::Err(jsonerr(ErrorCode::InvalidParams, None, req.id)) } + if let Err(_) = self.notify_queue_sender.send(None).await { + return JsonResult::Err(jsonerr(ErrorCode::InternalError, None, req.id)) + } + debug!(target: "RPC", "--> {}", serde_json::to_string(&req).unwrap()); let rep = match req.method.as_str() { @@ -58,12 +62,15 @@ impl RequestHandler for JsonRpcInterface { } }; - from_taud_result(rep, req.id) + to_json_result(rep, req.id) } } impl JsonRpcInterface { - pub fn new(notify_queue_sender: async_channel::Sender, settings: Settings) -> Self { + pub fn new( + notify_queue_sender: async_channel::Sender>, + settings: Settings, + ) -> Self { Self { notify_queue_sender, settings } } @@ -91,7 +98,7 @@ impl JsonRpcInterface { new_task.set_project(&task.project); new_task.set_assign(&task.assign); - self.notify_queue_sender.send(new_task).await.map_err(Error::from)?; + self.notify_queue_sender.send(Some(new_task)).await.map_err(Error::from)?; Ok(json!(true)) } @@ -118,7 +125,7 @@ impl JsonRpcInterface { let task = self.check_data_for_update(&args[0], &args[1])?; - self.notify_queue_sender.send(task).await.map_err(Error::from)?; + self.notify_queue_sender.send(Some(task)).await.map_err(Error::from)?; Ok(json!(true)) } @@ -155,7 +162,7 @@ impl JsonRpcInterface { let mut task: TaskInfo = self.load_task_by_id(&args[0])?; task.set_state(&state); - self.notify_queue_sender.send(task).await.map_err(Error::from)?; + self.notify_queue_sender.send(Some(task)).await.map_err(Error::from)?; Ok(json!(true)) } @@ -177,7 +184,7 @@ impl JsonRpcInterface { let mut task: TaskInfo = self.load_task_by_id(&args[0])?; task.set_comment(Comment::new(&comment_content, &comment_author)); - self.notify_queue_sender.send(task).await.map_err(Error::from)?; + self.notify_queue_sender.send(Some(task)).await.map_err(Error::from)?; Ok(json!(true)) } @@ -254,27 +261,3 @@ impl JsonRpcInterface { Ok(task) } } - -fn from_taud_result(res: TaudResult, id: Value) -> JsonResult { - match res { - Ok(v) => JsonResult::Resp(jsonresp(v, id)), - Err(err) => match err { - TaudError::InvalidId => JsonResult::Err(jsonerr( - ErrorCode::InvalidParams, - Some("invalid task's id".into()), - id, - )), - TaudError::InvalidData(e) | TaudError::SerdeJsonError(e) => { - JsonResult::Err(jsonerr(ErrorCode::InvalidParams, Some(e), id)) - } - TaudError::InvalidDueTime => JsonResult::Err(jsonerr( - ErrorCode::InvalidParams, - Some("invalid due time".into()), - id, - )), - TaudError::Darkfi(e) => { - JsonResult::Err(jsonerr(ErrorCode::InternalError, Some(e.to_string()), id)) - } - }, - } -} diff --git a/bin/tau/taud/src/main.rs b/bin/tau/taud/src/main.rs index 4954ae719..2aafb25ad 100644 --- a/bin/tau/taud/src/main.rs +++ b/bin/tau/taud/src/main.rs @@ -72,22 +72,21 @@ async fn start(config: TauConfig, args: CliTaud, executor: Arc>) -> identity_pass: Default::default(), }; - let (snd, rcv) = async_channel::unbounded::(); + let (rpc_snd, rpc_rcv) = async_channel::unbounded::>(); - let rpc_interface = Arc::new(JsonRpcInterface::new(snd, settings)); - - let recv_update_from_rpc: smol::Task> = executor.spawn(async move { - loop { - let task_info = rcv.recv().await?; - raft_sender.send(task_info).await?; - } - }); + let rpc_interface = Arc::new(JsonRpcInterface::new(rpc_snd, settings)); let recv_update_from_raft: smol::Task> = executor.spawn(async move { loop { - // FIXME TODO - // this should update once receive rpc request from the tau-cli + let task_info = rpc_rcv.recv().await.map_err(Error::from)?; + + if let Some(tk) = task_info { + raft_sender.send(tk).await.map_err(Error::from)?; + } + + // XXX THIS FOR DEBUGING sleep(1).await; + let recv_commits = commits.lock().await; for task_info in recv_commits.iter() { @@ -107,7 +106,6 @@ async fn start(config: TauConfig, args: CliTaud, executor: Arc>) -> raft.start(p2p_settings.clone(), executor.clone()).await?; - recv_update_from_rpc.cancel().await; recv_update_from_raft.cancel().await; Ok(()) }