bin/taud: udpate commits from raft when receive rpc request

This commit is contained in:
ghassmo
2022-04-11 07:29:50 +04:00
parent ae6d27001c
commit dca3d745b4
3 changed files with 55 additions and 46 deletions

View File

@@ -1,3 +1,7 @@
use serde_json::Value;
use darkfi::rpc::jsonrpc::{error as jsonerr, response as jsonresp, ErrorCode, JsonResult};
#[derive(Debug, thiserror::Error)]
pub enum TaudError {
#[error("Due timestamp invalid")]
@@ -6,7 +10,7 @@ pub enum TaudError {
InvalidId,
#[error("Invalid Data/Params: `{0}` ")]
InvalidData(String),
#[error(transparent)]
#[error("InternalError")]
Darkfi(#[from] darkfi::error::Error),
#[error("Json serialization error: `{0}`")]
SerdeJsonError(String),
@@ -19,3 +23,27 @@ impl From<serde_json::Error> for TaudError {
TaudError::SerdeJsonError(err.to_string())
}
}
pub fn to_json_result(res: TaudResult<Value>, id: Value) -> JsonResult {
match res {
Ok(v) => JsonResult::Resp(jsonresp(v, id)),
Err(err) => match err {
TaudError::InvalidId => JsonResult::Err(jsonerr(
ErrorCode::InvalidParams,
Some("invalid task's id".into()),
id,
)),
TaudError::InvalidData(e) | TaudError::SerdeJsonError(e) => {
JsonResult::Err(jsonerr(ErrorCode::InvalidParams, Some(e), id))
}
TaudError::InvalidDueTime => JsonResult::Err(jsonerr(
ErrorCode::InvalidParams,
Some("invalid due time".into()),
id,
)),
TaudError::Darkfi(e) => {
JsonResult::Err(jsonerr(ErrorCode::InternalError, Some(e.to_string()), id))
}
},
}
}

View File

@@ -8,14 +8,14 @@ use serde_json::{json, Value};
use darkfi::{
rpc::{
jsonrpc::{error as jsonerr, response as jsonresp, ErrorCode, JsonRequest, JsonResult},
jsonrpc::{error as jsonerr, ErrorCode, JsonRequest, JsonResult},
rpcserver::RequestHandler,
},
Error,
};
use crate::{
error::{TaudError, TaudResult},
error::{to_json_result, TaudError, TaudResult},
month_tasks::MonthTasks,
task_info::{Comment, TaskInfo},
util::{Settings, Timestamp},
@@ -23,7 +23,7 @@ use crate::{
pub struct JsonRpcInterface {
settings: Settings,
notify_queue_sender: async_channel::Sender<TaskInfo>,
notify_queue_sender: async_channel::Sender<Option<TaskInfo>>,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
@@ -43,6 +43,10 @@ impl RequestHandler for JsonRpcInterface {
return JsonResult::Err(jsonerr(ErrorCode::InvalidParams, None, req.id))
}
if let Err(_) = self.notify_queue_sender.send(None).await {
return JsonResult::Err(jsonerr(ErrorCode::InternalError, None, req.id))
}
debug!(target: "RPC", "--> {}", serde_json::to_string(&req).unwrap());
let rep = match req.method.as_str() {
@@ -58,12 +62,15 @@ impl RequestHandler for JsonRpcInterface {
}
};
from_taud_result(rep, req.id)
to_json_result(rep, req.id)
}
}
impl JsonRpcInterface {
pub fn new(notify_queue_sender: async_channel::Sender<TaskInfo>, settings: Settings) -> Self {
pub fn new(
notify_queue_sender: async_channel::Sender<Option<TaskInfo>>,
settings: Settings,
) -> Self {
Self { notify_queue_sender, settings }
}
@@ -91,7 +98,7 @@ impl JsonRpcInterface {
new_task.set_project(&task.project);
new_task.set_assign(&task.assign);
self.notify_queue_sender.send(new_task).await.map_err(Error::from)?;
self.notify_queue_sender.send(Some(new_task)).await.map_err(Error::from)?;
Ok(json!(true))
}
@@ -118,7 +125,7 @@ impl JsonRpcInterface {
let task = self.check_data_for_update(&args[0], &args[1])?;
self.notify_queue_sender.send(task).await.map_err(Error::from)?;
self.notify_queue_sender.send(Some(task)).await.map_err(Error::from)?;
Ok(json!(true))
}
@@ -155,7 +162,7 @@ impl JsonRpcInterface {
let mut task: TaskInfo = self.load_task_by_id(&args[0])?;
task.set_state(&state);
self.notify_queue_sender.send(task).await.map_err(Error::from)?;
self.notify_queue_sender.send(Some(task)).await.map_err(Error::from)?;
Ok(json!(true))
}
@@ -177,7 +184,7 @@ impl JsonRpcInterface {
let mut task: TaskInfo = self.load_task_by_id(&args[0])?;
task.set_comment(Comment::new(&comment_content, &comment_author));
self.notify_queue_sender.send(task).await.map_err(Error::from)?;
self.notify_queue_sender.send(Some(task)).await.map_err(Error::from)?;
Ok(json!(true))
}
@@ -254,27 +261,3 @@ impl JsonRpcInterface {
Ok(task)
}
}
fn from_taud_result(res: TaudResult<Value>, id: Value) -> JsonResult {
match res {
Ok(v) => JsonResult::Resp(jsonresp(v, id)),
Err(err) => match err {
TaudError::InvalidId => JsonResult::Err(jsonerr(
ErrorCode::InvalidParams,
Some("invalid task's id".into()),
id,
)),
TaudError::InvalidData(e) | TaudError::SerdeJsonError(e) => {
JsonResult::Err(jsonerr(ErrorCode::InvalidParams, Some(e), id))
}
TaudError::InvalidDueTime => JsonResult::Err(jsonerr(
ErrorCode::InvalidParams,
Some("invalid due time".into()),
id,
)),
TaudError::Darkfi(e) => {
JsonResult::Err(jsonerr(ErrorCode::InternalError, Some(e.to_string()), id))
}
},
}
}

View File

@@ -72,22 +72,21 @@ async fn start(config: TauConfig, args: CliTaud, executor: Arc<Executor<'_>>) ->
identity_pass: Default::default(),
};
let (snd, rcv) = async_channel::unbounded::<TaskInfo>();
let (rpc_snd, rpc_rcv) = async_channel::unbounded::<Option<TaskInfo>>();
let rpc_interface = Arc::new(JsonRpcInterface::new(snd, settings));
let recv_update_from_rpc: smol::Task<Result<()>> = executor.spawn(async move {
loop {
let task_info = rcv.recv().await?;
raft_sender.send(task_info).await?;
}
});
let rpc_interface = Arc::new(JsonRpcInterface::new(rpc_snd, settings));
let recv_update_from_raft: smol::Task<TaudResult<()>> = executor.spawn(async move {
loop {
// FIXME TODO
// this should update once receive rpc request from the tau-cli
let task_info = rpc_rcv.recv().await.map_err(Error::from)?;
if let Some(tk) = task_info {
raft_sender.send(tk).await.map_err(Error::from)?;
}
// XXX THIS FOR DEBUGING
sleep(1).await;
let recv_commits = commits.lock().await;
for task_info in recv_commits.iter() {
@@ -107,7 +106,6 @@ async fn start(config: TauConfig, args: CliTaud, executor: Arc<Executor<'_>>) ->
raft.start(p2p_settings.clone(), executor.clone()).await?;
recv_update_from_rpc.cancel().await;
recv_update_from_raft.cancel().await;
Ok(())
}