From e051bcfd2ddfd0e85caca42e71fe3e3db758c8a4 Mon Sep 17 00:00:00 2001 From: lunar-mining Date: Wed, 10 Aug 2022 07:08:07 +0200 Subject: [PATCH] dnetview: cleaned into 'rpc' and 'parser' modules. simplified main() --- bin/dnetview/src/main.rs | 686 +++++-------------------------------- bin/dnetview/src/model.rs | 19 +- bin/dnetview/src/parser.rs | 529 ++++++++++++++++++++++++++++ bin/dnetview/src/rpc.rs | 38 ++ bin/dnetview/src/view.rs | 11 +- 5 files changed, 665 insertions(+), 618 deletions(-) create mode 100644 bin/dnetview/src/parser.rs create mode 100644 bin/dnetview/src/rpc.rs diff --git a/bin/dnetview/src/main.rs b/bin/dnetview/src/main.rs index 7781a8b61..b41b30848 100644 --- a/bin/dnetview/src/main.rs +++ b/bin/dnetview/src/main.rs @@ -1,10 +1,12 @@ -use async_std::sync::{Arc, Mutex}; -use std::{collections::hash_map::Entry, fs::File, io, io::Read, path::PathBuf}; +use async_std::sync::Arc; +use std::{fs::File, io, io::Read, path::PathBuf}; +use darkfi::util::{ + cli::{get_log_config, get_log_level, spawn_config, Config}, + join_config_path, +}; use easy_parallel::Parallel; -use fxhash::{FxHashMap, FxHashSet}; -use log::{error, info, trace}; -use serde_json::{json, Value}; +use log::{debug, info}; use simplelog::*; use smol::Executor; use termion::{async_stdin, event::Key, input::TermRead, raw::IntoRawMode}; @@ -12,68 +14,99 @@ use tui::{ backend::{Backend, TermionBackend}, Terminal, }; -use url::Url; - -use darkfi::{ - error::Result, - rpc::{client::RpcClient, jsonrpc::JsonRequest}, - util::{ - //async_util, - cli::{get_log_config, get_log_level, spawn_config, Config}, - join_config_path, - NanoTimestamp, - }, -}; pub mod config; pub mod error; pub mod model; pub mod options; +pub mod parser; +pub mod rpc; pub mod util; pub mod view; use crate::{ config::{DnvConfig, CONFIG_FILE_CONTENTS}, error::{DnetViewError, DnetViewResult}, - model::{ConnectInfo, Model, NodeInfo, SelectableObject, Session, SessionInfo}, + model::Model, options::ProgramOptions, - util::{ - is_empty_session, make_connect_id, make_empty_id, make_node_id, make_session_id, sleep, - }, - view::{IdMenu, MsgList, View}, + parser::DataParser, + view::View, }; struct DnetView { - name: String, - rpc_client: RpcClient, + model: Arc, + view: View, } impl DnetView { - async fn new(url: Url, name: String) -> Result { - let rpc_client = RpcClient::new(url).await?; - Ok(Self { name, rpc_client }) + fn new(model: Arc, view: View) -> Self { + Self { model, view } } - // --> {"jsonrpc": "2.0", "method": "ping", "params": [], "id": 42} - // <-- {"jsonrpc": "2.0", "result": "pong", "id": 42} - async fn ping(&self) -> Result { - let req = JsonRequest::new("ping", json!([])); - self.rpc_client.request(req).await - } + async fn render_view(&mut self, terminal: &mut Terminal) -> DnetViewResult<()> { + let mut asi = async_stdin(); - //--> {"jsonrpc": "2.0", "method": "poll", "params": [], "id": 42} - // <-- {"jsonrpc": "2.0", "result": {"nodeID": [], "nodeinfo" [], "id": 42} - async fn get_info(&self) -> DnetViewResult { - let req = JsonRequest::new("get_info", json!([])); - match self.rpc_client.request(req).await { - Ok(req) => Ok(req), - Err(e) => Err(DnetViewError::Darkfi(e)), + terminal.clear()?; + + self.view.id_menu.state.select(Some(0)); + self.view.msg_list.state.select(Some(0)); + + loop { + self.view.update( + self.model.new_id.lock().await.clone(), + self.model.msg_map.lock().await.clone(), + self.model.selectables.lock().await.clone(), + ); + + debug!(target: "dnetview::render_view()", "ID LIST: {:?}", self.view.id_menu.ids); + + let mut err: Option = None; + + terminal.draw(|f| match self.view.render(f) { + Ok(()) => {} + Err(e) => { + err = Some(e); + } + })?; + + match err { + Some(e) => return Err(e), + None => {} + } + + self.view.msg_list.scroll()?; + + for k in asi.by_ref().keys() { + match k.unwrap() { + Key::Char('q') => { + terminal.clear()?; + return Ok(()) + } + Key::Char('j') => { + self.view.id_menu.next(); + } + Key::Char('k') => { + self.view.id_menu.previous(); + } + Key::Char('u') => { + // TODO + //view.msg_list.next(); + } + Key::Char('d') => { + // TODO + //view.msg_list.previous(); + } + _ => (), + } + } + util::sleep(100).await; } } } #[async_std::main] async fn main() -> DnetViewResult<()> { + debug!(target: "dnetview", "main() START"); let options = ProgramOptions::load()?; let verbosity_level = options.app.occurrences_of("verbose"); @@ -97,36 +130,24 @@ async fn main() -> DnetViewResult<()> { terminal.clear()?; - let ids = Mutex::new(FxHashSet::default()); - let nodes = Mutex::new(FxHashMap::default()); - let selectables = Mutex::new(FxHashMap::default()); - let msg_map = Mutex::new(FxHashMap::default()); - let msg_log = Mutex::new(Vec::new()); - let new_id = Mutex::new(Vec::new()); - let model = Arc::new(Model::new(ids, new_id, nodes, msg_map, msg_log, selectables)); - - let msg_map = FxHashMap::default(); - let msg_list = MsgList::new(msg_map.clone(), 0); - let selectables = FxHashMap::default(); - let id_menu = IdMenu::new(Vec::new()); - - let mut view = View::new(id_menu, msg_list, selectables); - - let nthreads = num_cpus::get(); - let (signal, shutdown) = async_channel::unbounded::<()>(); - - let (s, r) = async_channel::unbounded::<()>(); + let model = Model::new(); + let view = View::new(); let ex = Arc::new(Executor::new()); let ex2 = ex.clone(); - let ex3 = ex.clone(); + + let mut dnetview = DnetView::new(model.clone(), view.clone()); + let parser = DataParser::new(model.clone(), config); + + let nthreads = num_cpus::get(); + let (signal, shutdown) = async_channel::unbounded::<()>(); let (_, result) = Parallel::new() .each(0..nthreads, |_| smol::future::block_on(ex.run(shutdown.recv()))) .finish(|| { smol::future::block_on(async move { - start_connect_slots(&config, ex2.clone(), model.clone()).await?; - render_view(&mut terminal, model.clone(), &mut view.clone(), ex3).await?; + parser.start_connect_slots(ex2).await?; + dnetview.render_view(&mut terminal).await?; drop(signal); Ok(()) }) @@ -134,544 +155,3 @@ async fn main() -> DnetViewResult<()> { result } - -async fn start_connect_slots( - config: &DnvConfig, - ex: Arc>, - model: Arc, -) -> DnetViewResult<()> { - for node in &config.nodes { - ex.spawn(try_connect(model.clone(), node.name.clone(), node.rpc_url.clone())).detach(); - } - Ok(()) -} - -async fn try_connect(model: Arc, node_name: String, rpc_url: String) -> DnetViewResult<()> { - loop { - info!("Attempting to poll {}, RPC URL: {}", node_name, rpc_url); - match DnetView::new(Url::parse(&rpc_url)?, node_name.clone()).await { - Ok(client) => { - poll(client, model.clone()).await?; - } - Err(e) => { - error!("{}", e); - parse_offline(node_name.clone(), model.clone()).await?; - util::sleep(2000).await; - } - } - } -} - -async fn poll(client: DnetView, model: Arc) -> DnetViewResult<()> { - loop { - match client.ping().await { - // TODO - Ok(reply) => {} - Err(e) => {} - } - match client.get_info().await { - Ok(reply) => { - if reply.as_object().is_some() && !reply.as_object().unwrap().is_empty() { - parse_data(reply.as_object().unwrap(), &client, model.clone()).await?; - } else { - return Err(DnetViewError::EmptyRpcReply) - } - } - Err(e) => { - error!("{:?}", e); - parse_offline(client.name.clone(), model.clone()).await?; - } - } - util::sleep(2000).await; - } -} - -async fn parse_offline(node_name: String, model: Arc) -> DnetViewResult<()> { - let name = "Offline".to_string(); - let session_type = Session::Offline; - let node_id = make_node_id(&node_name)?; - let session_id = make_session_id(&node_id, &session_type)?; - let mut connects: Vec = Vec::new(); - let mut sessions: Vec = Vec::new(); - - // initialize with empty values - let id = make_empty_id(&node_id, &session_type, 0)?; - let addr = "Null".to_string(); - let state = "Null".to_string(); - let parent = node_id.clone(); - let msg_log = Vec::new(); - let is_empty = true; - let last_msg = "Null".to_string(); - let last_status = "Null".to_string(); - let remote_node_id = "Null".to_string(); - let connect_info = ConnectInfo::new( - id, - addr, - state.clone(), - parent.clone(), - msg_log, - is_empty, - last_msg, - last_status, - remote_node_id, - ); - connects.push(connect_info.clone()); - - let accept_addr = None; - let session_info = - SessionInfo::new(session_id, name, is_empty, parent.clone(), connects, accept_addr); - sessions.push(session_info); - - let node = NodeInfo::new( - node_id.clone(), - node_name.to_string(), - state.clone(), - sessions.clone(), - None, - true, - ); - - update_node(model.clone(), node.clone(), node_id.clone()).await; - update_selectable_and_ids(model.clone(), sessions, node.clone()).await?; - update_new_id(model.clone()).await; - Ok(()) -} - -async fn parse_data( - reply: &serde_json::Map, - client: &DnetView, - model: Arc, -) -> DnetViewResult<()> { - let addr = &reply.get("external_addr"); - let inbound = &reply["session_inbound"]; - let _manual = &reply["session_manual"]; - let outbound = &reply["session_outbound"]; - let state = &reply["state"]; - - let mut sessions: Vec = Vec::new(); - - let node_name = &client.name; - let node_id = make_node_id(node_name)?; - - let ext_addr = parse_external_addr(addr).await?; - let in_session = parse_inbound(inbound, &node_id).await?; - let out_session = parse_outbound(outbound, &node_id).await?; - //let man_session = parse_manual(manual, &node_id).await?; - - sessions.push(in_session.clone()); - sessions.push(out_session.clone()); - //sessions.push(man_session.clone()); - - let node = NodeInfo::new( - node_id.clone(), - node_name.to_string(), - state.as_str().unwrap().to_string(), - sessions.clone(), - ext_addr, - false, - ); - - update_node(model.clone(), node.clone(), node_id.clone()).await; - update_selectable_and_ids(model.clone(), sessions.clone(), node.clone()).await?; - update_msgs(model.clone(), sessions.clone()).await?; - update_new_id(model.clone()).await; - - //trace!("IDS: {:?}", model.ids.lock().await); - //trace!("INFOS: {:?}", model.nodes.lock().await); - - Ok(()) -} - -async fn update_msgs(model: Arc, sessions: Vec) -> DnetViewResult<()> { - for session in sessions { - for connection in session.children { - if !model.msg_map.lock().await.contains_key(&connection.id) { - // we don't have this ID: it is a new node - model.msg_map.lock().await.insert(connection.id, connection.msg_log.clone()); - } else { - // we have this id: append the msg values - match model.msg_map.lock().await.entry(connection.id) { - Entry::Vacant(e) => { - e.insert(connection.msg_log); - } - Entry::Occupied(mut e) => { - for msg in connection.msg_log { - e.get_mut().push(msg); - } - } - } - } - } - } - Ok(()) -} - -async fn update_ids(model: Arc, id: String) { - model.ids.lock().await.insert(id); -} - -async fn update_new_id(model: Arc) { - let ids = model.ids.lock().await.clone(); - - for id in ids.iter() { - model.new_id.lock().await.push(id.to_string()); - } -} -async fn update_node(model: Arc, node: NodeInfo, id: String) { - model.nodes.lock().await.insert(id, node); -} - -async fn update_selectable_and_ids( - model: Arc, - sessions: Vec, - node: NodeInfo, -) -> DnetViewResult<()> { - if node.is_offline == true { - let node_obj = SelectableObject::Node(node.clone()); - model.selectables.lock().await.insert(node.id.clone(), node_obj); - update_ids(model.clone(), node.id.clone()).await; - } else { - let node_obj = SelectableObject::Node(node.clone()); - model.selectables.lock().await.insert(node.id.clone(), node_obj); - update_ids(model.clone(), node.id.clone()).await; - for session in sessions { - if !session.is_empty { - let session_obj = SelectableObject::Session(session.clone()); - model.selectables.lock().await.insert(session.clone().id, session_obj); - update_ids(model.clone(), session.clone().id).await; - for connect in session.children { - let connect_obj = SelectableObject::Connect(connect.clone()); - model.selectables.lock().await.insert(connect.clone().id, connect_obj); - update_ids(model.clone(), connect.clone().id).await; - } - } - } - } - Ok(()) -} - -async fn parse_external_addr(addr: &Option<&Value>) -> DnetViewResult> { - match addr { - Some(addr) => match addr.as_str() { - Some(addr) => Ok(Some(addr.to_string())), - None => Ok(None), - }, - None => Err(DnetViewError::NoExternalAddr), - } -} - -async fn parse_inbound(inbound: &Value, node_id: &String) -> DnetViewResult { - let name = "Inbound".to_string(); - let session_type = Session::Inbound; - let parent = node_id.to_string(); - let id = make_session_id(&parent, &session_type)?; - let mut connects: Vec = Vec::new(); - let connections = &inbound["connected"]; - let mut connect_count = 0; - let mut accept_vec = Vec::new(); - - match connections.as_object() { - Some(connect) => { - match connect.is_empty() { - true => { - connect_count += 1; - // channel is empty. initialize with empty values - let id = make_empty_id(node_id, &session_type, connect_count)?; - let addr = "Null".to_string(); - let state = "Null".to_string(); - let parent = parent.clone(); - let msg_log = Vec::new(); - let is_empty = true; - let last_msg = "Null".to_string(); - let last_status = "Null".to_string(); - let remote_node_id = "Null".to_string(); - let connect_info = ConnectInfo::new( - id, - addr, - state, - parent, - msg_log, - is_empty, - last_msg, - last_status, - remote_node_id, - ); - connects.push(connect_info); - } - false => { - // channel is not empty. initialize with whole values - for k in connect.keys() { - let node = connect.get(k); - let addr = k.to_string(); - let info = node.unwrap().as_array(); - // get the accept address - let accept_addr = info.unwrap().get(0); - let acc_addr = accept_addr - .unwrap() - .get("accept_addr") - .unwrap() - .as_str() - .unwrap() - .to_string(); - accept_vec.push(acc_addr); - let info2 = info.unwrap().get(1); - let id = info2.unwrap().get("random_id").unwrap().as_u64().unwrap(); - let id = make_connect_id(&id)?; - let state = "state".to_string(); - let parent = parent.clone(); - let msg_values = info2.unwrap().get("log").unwrap().as_array().unwrap(); - let mut msg_log: Vec<(NanoTimestamp, String, String)> = Vec::new(); - for msg in msg_values { - let msg: (NanoTimestamp, String, String) = - serde_json::from_value(msg.clone())?; - msg_log.push(msg); - } - let is_empty = false; - let last_msg = - info2.unwrap().get("last_msg").unwrap().as_str().unwrap().to_string(); - let last_status = info2 - .unwrap() - .get("last_status") - .unwrap() - .as_str() - .unwrap() - .to_string(); - let remote_node_id = info2 - .unwrap() - .get("remote_node_id") - .unwrap() - .as_str() - .unwrap() - .to_string(); - let r_node_id: String = match remote_node_id.is_empty() { - true => "no remote id".to_string(), - false => remote_node_id, - }; - let connect_info = ConnectInfo::new( - id, - addr, - state, - parent, - msg_log, - is_empty, - last_msg, - last_status, - r_node_id, - ); - connects.push(connect_info.clone()); - } - } - } - let is_empty = is_empty_session(&connects); - - // TODO: clean this up - if accept_vec.is_empty() { - let accept_addr = None; - let session_info = - SessionInfo::new(id, name, is_empty, parent, connects, accept_addr); - Ok(session_info) - } else { - let accept_addr = Some(accept_vec[0].clone()); - let session_info = - SessionInfo::new(id, name, is_empty, parent, connects, accept_addr); - Ok(session_info) - } - } - None => Err(DnetViewError::ValueIsNotObject), - } -} - -// TODO: placeholder for now -async fn _parse_manual(_manual: &Value, node_id: &String) -> DnetViewResult { - let name = "Manual".to_string(); - let session_type = Session::Manual; - let mut connects: Vec = Vec::new(); - let parent = node_id.to_string(); - - let session_id = make_session_id(&parent, &session_type)?; - //let id: u64 = 0; - let connect_id = make_empty_id(node_id, &session_type, 0)?; - //let connect_id = make_connect_id(&id)?; - let addr = "Null".to_string(); - let state = "Null".to_string(); - let msg_log = Vec::new(); - let is_empty = true; - let msg = "Null".to_string(); - let status = "Null".to_string(); - let remote_node_id = "Null".to_string(); - let connect_info = ConnectInfo::new( - connect_id.clone(), - addr, - state, - parent, - msg_log, - is_empty, - msg, - status, - remote_node_id, - ); - connects.push(connect_info); - let parent = connect_id; - let is_empty = is_empty_session(&connects); - let accept_addr = None; - let session_info = - SessionInfo::new(session_id, name, is_empty, parent, connects.clone(), accept_addr); - - Ok(session_info) -} - -async fn parse_outbound(outbound: &Value, node_id: &String) -> DnetViewResult { - let name = "Outbound".to_string(); - let session_type = Session::Outbound; - let parent = node_id.to_string(); - let id = make_session_id(&parent, &session_type)?; - let mut connects: Vec = Vec::new(); - let slots = &outbound["slots"]; - let mut slot_count = 0; - - match slots.as_array() { - Some(slots) => { - for slot in slots { - slot_count += 1; - match slot["channel"].is_null() { - true => { - // TODO: this is not actually empty - let id = make_empty_id(node_id, &session_type, slot_count)?; - let addr = "Null".to_string(); - let state = &slot["state"]; - let state = state.as_str().unwrap().to_string(); - let parent = parent.clone(); - let msg_log = Vec::new(); - let is_empty = false; - let last_msg = "Null".to_string(); - let last_status = "Null".to_string(); - let remote_node_id = "Null".to_string(); - let connect_info = ConnectInfo::new( - id, - addr, - state, - parent, - msg_log, - is_empty, - last_msg, - last_status, - remote_node_id, - ); - connects.push(connect_info.clone()); - } - false => { - // channel is not empty. initialize with whole values - let channel = &slot["channel"]; - let id = channel["random_id"].as_u64().unwrap(); - let id = make_connect_id(&id)?; - let addr = &slot["addr"]; - let addr = addr.as_str().unwrap().to_string(); - let state = &slot["state"]; - let state = state.as_str().unwrap().to_string(); - let parent = parent.clone(); - let msg_values = channel["log"].as_array().unwrap(); - let mut msg_log: Vec<(NanoTimestamp, String, String)> = Vec::new(); - for msg in msg_values { - let msg: (NanoTimestamp, String, String) = - serde_json::from_value(msg.clone())?; - msg_log.push(msg); - } - let is_empty = false; - let last_msg = channel["last_msg"].as_str().unwrap().to_string(); - let last_status = channel["last_status"].as_str().unwrap().to_string(); - let remote_node_id = - channel["remote_node_id"].as_str().unwrap().to_string(); - let r_node_id: String = match remote_node_id.is_empty() { - true => "no remote id".to_string(), - false => remote_node_id, - }; - let connect_info = ConnectInfo::new( - id, - addr, - state, - parent, - msg_log, - is_empty, - last_msg, - last_status, - r_node_id, - ); - connects.push(connect_info.clone()); - } - } - } - - let is_empty = is_empty_session(&connects); - - let accept_addr = None; - let session_info = SessionInfo::new(id, name, is_empty, parent, connects, accept_addr); - Ok(session_info) - } - None => Err(DnetViewError::ValueIsNotObject), - } -} - -async fn render_view( - terminal: &mut Terminal, - model: Arc, - view: &mut View, - ex: Arc>, -) -> DnetViewResult<()> { - let mut asi = async_stdin(); - - terminal.clear()?; - - view.id_menu.state.select(Some(0)); - view.msg_list.state.select(Some(0)); - - loop { - view.update( - model.new_id.lock().await.clone(), - model.msg_map.lock().await.clone(), - model.selectables.lock().await.clone(), - ); - - trace!(target: "dnetview::render_view()", "ID LIST: {:?}", view.id_menu.ids); - - let mut err: Option = None; - - terminal.draw(|f| match view.render(f) { - Ok(()) => {} - Err(e) => { - err = Some(e); - } - })?; - - match err { - Some(e) => return Err(e), - None => {} - } - - view.msg_list.scroll()?; - - for k in asi.by_ref().keys() { - match k.unwrap() { - Key::Char('q') => { - terminal.clear()?; - return Ok(()) - } - Key::Char('j') => { - view.id_menu.next(); - } - Key::Char('k') => { - view.id_menu.previous(); - } - Key::Char('u') => { - // TODO - //view.msg_list.next(); - } - Key::Char('d') => { - // TODO - //view.msg_list.previous(); - } - _ => (), - } - } - util::sleep(200).await; - } -} diff --git a/bin/dnetview/src/model.rs b/bin/dnetview/src/model.rs index a45d2fbfe..102afde79 100644 --- a/bin/dnetview/src/model.rs +++ b/bin/dnetview/src/model.rs @@ -1,4 +1,4 @@ -use async_std::sync::Mutex; +use async_std::sync::{Arc, Mutex}; use fxhash::{FxHashMap, FxHashSet}; use serde::{Deserialize, Serialize}; @@ -34,15 +34,14 @@ pub struct Model { } impl Model { - pub fn new( - ids: Mutex>, - new_id: Mutex>, - nodes: Mutex>, - msg_map: MsgMap, - msg_log: Mutex, - selectables: Mutex>, - ) -> Model { - Model { ids, new_id, nodes, msg_map, msg_log, selectables } + pub fn new() -> Arc { + let ids = Mutex::new(FxHashSet::default()); + let nodes = Mutex::new(FxHashMap::default()); + let selectables = Mutex::new(FxHashMap::default()); + let msg_map = Mutex::new(FxHashMap::default()); + let msg_log = Mutex::new(Vec::new()); + let new_id = Mutex::new(Vec::new()); + Arc::new(Model { ids, new_id, nodes, msg_map, msg_log, selectables }) } } diff --git a/bin/dnetview/src/parser.rs b/bin/dnetview/src/parser.rs new file mode 100644 index 000000000..1176ff318 --- /dev/null +++ b/bin/dnetview/src/parser.rs @@ -0,0 +1,529 @@ +use async_std::sync::Arc; +use std::collections::hash_map::Entry; + +use log::{debug, error, info}; +use serde_json::Value; +use smol::Executor; +use url::Url; + +use darkfi::util::NanoTimestamp; + +use crate::{ + config::DnvConfig, + error::{DnetViewError, DnetViewResult}, + model::{ConnectInfo, Model, NodeInfo, SelectableObject, Session, SessionInfo}, + rpc::RpcConnect, + util::{is_empty_session, make_connect_id, make_empty_id, make_node_id, make_session_id}, +}; + +pub struct DataParser { + model: Arc, + config: DnvConfig, +} + +impl DataParser { + pub fn new(model: Arc, config: DnvConfig) -> Arc { + Arc::new(Self { model, config }) + } + + pub async fn start_connect_slots(self: Arc, ex: Arc>) -> DnetViewResult<()> { + debug!(target: "dnetview", "start_connect_slots() START"); + for node in &self.config.nodes { + let self2 = self.clone(); + debug!(target: "dnetview", "attempting to spawn..."); + ex.clone().spawn(self2.try_connect(node.name.clone(), node.rpc_url.clone())).detach(); + } + Ok(()) + } + + async fn try_connect( + self: Arc, + node_name: String, + rpc_url: String, + ) -> DnetViewResult<()> { + debug!(target: "dnetview", "try_connect() START"); + loop { + info!("Attempting to poll {}, RPC URL: {}", node_name, rpc_url); + match RpcConnect::new(Url::parse(&rpc_url)?, node_name.clone()).await { + Ok(client) => { + self.poll(client).await?; + } + Err(e) => { + error!("{}", e); + self.parse_offline(node_name.clone()).await?; + crate::util::sleep(2000).await; + } + } + } + } + + async fn poll(&self, client: RpcConnect) -> DnetViewResult<()> { + loop { + match client.ping().await { + // TODO + Ok(_reply) => {} + Err(_e) => {} + } + match client.get_info().await { + Ok(reply) => { + if reply.as_object().is_some() && !reply.as_object().unwrap().is_empty() { + self.parse_data(reply.as_object().unwrap(), &client).await?; + } else { + return Err(DnetViewError::EmptyRpcReply) + } + } + Err(e) => { + error!("{:?}", e); + self.parse_offline(client.name.clone()).await?; + } + } + crate::util::sleep(2000).await; + } + } + async fn parse_offline(&self, node_name: String) -> DnetViewResult<()> { + let name = "Offline".to_string(); + let session_type = Session::Offline; + let node_id = make_node_id(&node_name)?; + let session_id = make_session_id(&node_id, &session_type)?; + let mut connects: Vec = Vec::new(); + let mut sessions: Vec = Vec::new(); + + // initialize with empty values + let id = make_empty_id(&node_id, &session_type, 0)?; + let addr = "Null".to_string(); + let state = "Null".to_string(); + let parent = node_id.clone(); + let msg_log = Vec::new(); + let is_empty = true; + let last_msg = "Null".to_string(); + let last_status = "Null".to_string(); + let remote_node_id = "Null".to_string(); + let connect_info = ConnectInfo::new( + id, + addr, + state.clone(), + parent.clone(), + msg_log, + is_empty, + last_msg, + last_status, + remote_node_id, + ); + connects.push(connect_info.clone()); + + let accept_addr = None; + let session_info = + SessionInfo::new(session_id, name, is_empty, parent.clone(), connects, accept_addr); + sessions.push(session_info); + + let node = NodeInfo::new( + node_id.clone(), + node_name.to_string(), + state.clone(), + sessions.clone(), + None, + true, + ); + + self.update_node(node.clone(), node_id.clone()).await; + self.update_selectable_and_ids(sessions, node.clone()).await?; + self.update_new_id().await; + Ok(()) + } + + async fn parse_data( + &self, + reply: &serde_json::Map, + client: &RpcConnect, + ) -> DnetViewResult<()> { + let addr = &reply.get("external_addr"); + let inbound = &reply["session_inbound"]; + let _manual = &reply["session_manual"]; + let outbound = &reply["session_outbound"]; + let state = &reply["state"]; + + let mut sessions: Vec = Vec::new(); + + let node_name = &client.name; + let node_id = make_node_id(node_name)?; + + let ext_addr = self.parse_external_addr(addr).await?; + let in_session = self.parse_inbound(inbound, &node_id).await?; + let out_session = self.parse_outbound(outbound, &node_id).await?; + //let man_session = self.parse_manual(manual, &node_id).await?; + + sessions.push(in_session.clone()); + sessions.push(out_session.clone()); + //sessions.push(man_session.clone()); + + let node = NodeInfo::new( + node_id.clone(), + node_name.to_string(), + state.as_str().unwrap().to_string(), + sessions.clone(), + ext_addr, + false, + ); + + self.update_node(node.clone(), node_id.clone()).await; + self.update_selectable_and_ids(sessions.clone(), node.clone()).await?; + self.update_msgs(sessions.clone()).await?; + self.update_new_id().await; + + //debug!("IDS: {:?}", self.model.ids.lock().await); + //debug!("INFOS: {:?}", self.model.nodes.lock().await); + + Ok(()) + } + + async fn update_msgs(&self, sessions: Vec) -> DnetViewResult<()> { + for session in sessions { + for connection in session.children { + if !self.model.msg_map.lock().await.contains_key(&connection.id) { + // we don't have this ID: it is a new node + self.model + .msg_map + .lock() + .await + .insert(connection.id, connection.msg_log.clone()); + } else { + // we have this id: append the msg values + match self.model.msg_map.lock().await.entry(connection.id) { + Entry::Vacant(e) => { + e.insert(connection.msg_log); + } + Entry::Occupied(mut e) => { + for msg in connection.msg_log { + e.get_mut().push(msg); + } + } + } + } + } + } + Ok(()) + } + + async fn update_ids(&self, id: String) { + self.model.ids.lock().await.insert(id); + } + + async fn update_new_id(&self) { + let ids = self.model.ids.lock().await.clone(); + + for id in ids.iter() { + self.model.new_id.lock().await.push(id.to_string()); + } + } + async fn update_node(&self, node: NodeInfo, id: String) { + self.model.nodes.lock().await.insert(id, node); + } + + async fn update_selectable_and_ids( + &self, + sessions: Vec, + node: NodeInfo, + ) -> DnetViewResult<()> { + if node.is_offline == true { + let node_obj = SelectableObject::Node(node.clone()); + self.model.selectables.lock().await.insert(node.id.clone(), node_obj); + self.update_ids(node.id.clone()).await; + } else { + let node_obj = SelectableObject::Node(node.clone()); + self.model.selectables.lock().await.insert(node.id.clone(), node_obj); + self.update_ids(node.id.clone()).await; + for session in sessions { + if !session.is_empty { + let session_obj = SelectableObject::Session(session.clone()); + self.model.selectables.lock().await.insert(session.clone().id, session_obj); + self.update_ids(session.clone().id).await; + for connect in session.children { + let connect_obj = SelectableObject::Connect(connect.clone()); + self.model.selectables.lock().await.insert(connect.clone().id, connect_obj); + self.update_ids(connect.clone().id).await; + } + } + } + } + Ok(()) + } + + async fn parse_external_addr(&self, addr: &Option<&Value>) -> DnetViewResult> { + match addr { + Some(addr) => match addr.as_str() { + Some(addr) => Ok(Some(addr.to_string())), + None => Ok(None), + }, + None => Err(DnetViewError::NoExternalAddr), + } + } + + async fn parse_inbound( + &self, + inbound: &Value, + node_id: &String, + ) -> DnetViewResult { + let name = "Inbound".to_string(); + let session_type = Session::Inbound; + let parent = node_id.to_string(); + let id = make_session_id(&parent, &session_type)?; + let mut connects: Vec = Vec::new(); + let connections = &inbound["connected"]; + let mut connect_count = 0; + let mut accept_vec = Vec::new(); + + match connections.as_object() { + Some(connect) => { + match connect.is_empty() { + true => { + connect_count += 1; + // channel is empty. initialize with empty values + let id = make_empty_id(node_id, &session_type, connect_count)?; + let addr = "Null".to_string(); + let state = "Null".to_string(); + let parent = parent.clone(); + let msg_log = Vec::new(); + let is_empty = true; + let last_msg = "Null".to_string(); + let last_status = "Null".to_string(); + let remote_node_id = "Null".to_string(); + let connect_info = ConnectInfo::new( + id, + addr, + state, + parent, + msg_log, + is_empty, + last_msg, + last_status, + remote_node_id, + ); + connects.push(connect_info); + } + false => { + // channel is not empty. initialize with whole values + for k in connect.keys() { + let node = connect.get(k); + let addr = k.to_string(); + let info = node.unwrap().as_array(); + // get the accept address + let accept_addr = info.unwrap().get(0); + let acc_addr = accept_addr + .unwrap() + .get("accept_addr") + .unwrap() + .as_str() + .unwrap() + .to_string(); + accept_vec.push(acc_addr); + let info2 = info.unwrap().get(1); + let id = info2.unwrap().get("random_id").unwrap().as_u64().unwrap(); + let id = make_connect_id(&id)?; + let state = "state".to_string(); + let parent = parent.clone(); + let msg_values = info2.unwrap().get("log").unwrap().as_array().unwrap(); + let mut msg_log: Vec<(NanoTimestamp, String, String)> = Vec::new(); + for msg in msg_values { + let msg: (NanoTimestamp, String, String) = + serde_json::from_value(msg.clone())?; + msg_log.push(msg); + } + let is_empty = false; + let last_msg = info2 + .unwrap() + .get("last_msg") + .unwrap() + .as_str() + .unwrap() + .to_string(); + let last_status = info2 + .unwrap() + .get("last_status") + .unwrap() + .as_str() + .unwrap() + .to_string(); + let remote_node_id = info2 + .unwrap() + .get("remote_node_id") + .unwrap() + .as_str() + .unwrap() + .to_string(); + let r_node_id: String = match remote_node_id.is_empty() { + true => "no remote id".to_string(), + false => remote_node_id, + }; + let connect_info = ConnectInfo::new( + id, + addr, + state, + parent, + msg_log, + is_empty, + last_msg, + last_status, + r_node_id, + ); + connects.push(connect_info.clone()); + } + } + } + let is_empty = is_empty_session(&connects); + + // TODO: clean this up + if accept_vec.is_empty() { + let accept_addr = None; + let session_info = + SessionInfo::new(id, name, is_empty, parent, connects, accept_addr); + Ok(session_info) + } else { + let accept_addr = Some(accept_vec[0].clone()); + let session_info = + SessionInfo::new(id, name, is_empty, parent, connects, accept_addr); + Ok(session_info) + } + } + None => Err(DnetViewError::ValueIsNotObject), + } + } + + // TODO: placeholder for now + async fn _parse_manual( + &self, + _manual: &Value, + node_id: &String, + ) -> DnetViewResult { + let name = "Manual".to_string(); + let session_type = Session::Manual; + let mut connects: Vec = Vec::new(); + let parent = node_id.to_string(); + + let session_id = make_session_id(&parent, &session_type)?; + //let id: u64 = 0; + let connect_id = make_empty_id(node_id, &session_type, 0)?; + //let connect_id = make_connect_id(&id)?; + let addr = "Null".to_string(); + let state = "Null".to_string(); + let msg_log = Vec::new(); + let is_empty = true; + let msg = "Null".to_string(); + let status = "Null".to_string(); + let remote_node_id = "Null".to_string(); + let connect_info = ConnectInfo::new( + connect_id.clone(), + addr, + state, + parent, + msg_log, + is_empty, + msg, + status, + remote_node_id, + ); + connects.push(connect_info); + let parent = connect_id; + let is_empty = is_empty_session(&connects); + let accept_addr = None; + let session_info = + SessionInfo::new(session_id, name, is_empty, parent, connects.clone(), accept_addr); + + Ok(session_info) + } + + async fn parse_outbound( + &self, + outbound: &Value, + node_id: &String, + ) -> DnetViewResult { + let name = "Outbound".to_string(); + let session_type = Session::Outbound; + let parent = node_id.to_string(); + let id = make_session_id(&parent, &session_type)?; + let mut connects: Vec = Vec::new(); + let slots = &outbound["slots"]; + let mut slot_count = 0; + + match slots.as_array() { + Some(slots) => { + for slot in slots { + slot_count += 1; + match slot["channel"].is_null() { + true => { + // TODO: this is not actually empty + let id = make_empty_id(node_id, &session_type, slot_count)?; + let addr = "Null".to_string(); + let state = &slot["state"]; + let state = state.as_str().unwrap().to_string(); + let parent = parent.clone(); + let msg_log = Vec::new(); + let is_empty = false; + let last_msg = "Null".to_string(); + let last_status = "Null".to_string(); + let remote_node_id = "Null".to_string(); + let connect_info = ConnectInfo::new( + id, + addr, + state, + parent, + msg_log, + is_empty, + last_msg, + last_status, + remote_node_id, + ); + connects.push(connect_info.clone()); + } + false => { + // channel is not empty. initialize with whole values + let channel = &slot["channel"]; + let id = channel["random_id"].as_u64().unwrap(); + let id = make_connect_id(&id)?; + let addr = &slot["addr"]; + let addr = addr.as_str().unwrap().to_string(); + let state = &slot["state"]; + let state = state.as_str().unwrap().to_string(); + let parent = parent.clone(); + let msg_values = channel["log"].as_array().unwrap(); + let mut msg_log: Vec<(NanoTimestamp, String, String)> = Vec::new(); + for msg in msg_values { + let msg: (NanoTimestamp, String, String) = + serde_json::from_value(msg.clone())?; + msg_log.push(msg); + } + let is_empty = false; + let last_msg = channel["last_msg"].as_str().unwrap().to_string(); + let last_status = channel["last_status"].as_str().unwrap().to_string(); + let remote_node_id = + channel["remote_node_id"].as_str().unwrap().to_string(); + let r_node_id: String = match remote_node_id.is_empty() { + true => "no remote id".to_string(), + false => remote_node_id, + }; + let connect_info = ConnectInfo::new( + id, + addr, + state, + parent, + msg_log, + is_empty, + last_msg, + last_status, + r_node_id, + ); + connects.push(connect_info.clone()); + } + } + } + + let is_empty = is_empty_session(&connects); + + let accept_addr = None; + let session_info = + SessionInfo::new(id, name, is_empty, parent, connects, accept_addr); + Ok(session_info) + } + None => Err(DnetViewError::ValueIsNotObject), + } + } +} diff --git a/bin/dnetview/src/rpc.rs b/bin/dnetview/src/rpc.rs new file mode 100644 index 000000000..eebb28959 --- /dev/null +++ b/bin/dnetview/src/rpc.rs @@ -0,0 +1,38 @@ +use darkfi::{ + error::Result, + rpc::{client::RpcClient, jsonrpc::JsonRequest}, +}; + +use serde_json::{json, Value}; +use url::Url; + +use crate::error::{DnetViewError, DnetViewResult}; + +pub struct RpcConnect { + pub name: String, + pub rpc_client: RpcClient, +} + +impl RpcConnect { + pub async fn new(url: Url, name: String) -> Result { + let rpc_client = RpcClient::new(url).await?; + Ok(Self { name, rpc_client }) + } + + // --> {"jsonrpc": "2.0", "method": "ping", "params": [], "id": 42} + // <-- {"jsonrpc": "2.0", "result": "pong", "id": 42} + pub async fn ping(&self) -> Result { + let req = JsonRequest::new("ping", json!([])); + self.rpc_client.request(req).await + } + + //--> {"jsonrpc": "2.0", "method": "poll", "params": [], "id": 42} + // <-- {"jsonrpc": "2.0", "result": {"nodeID": [], "nodeinfo" [], "id": 42} + pub async fn get_info(&self) -> DnetViewResult { + let req = JsonRequest::new("get_info", json!([])); + match self.rpc_client.request(req).await { + Ok(req) => Ok(req), + Err(e) => Err(DnetViewError::Darkfi(e)), + } + } +} diff --git a/bin/dnetview/src/view.rs b/bin/dnetview/src/view.rs index 40dcb3bc1..134ee4bbd 100644 --- a/bin/dnetview/src/view.rs +++ b/bin/dnetview/src/view.rs @@ -30,11 +30,12 @@ pub struct View { } impl<'a> View { - pub fn new( - id_menu: IdMenu, - msg_list: MsgList, - selectables: FxHashMap, - ) -> View { + pub fn new() -> View { + let msg_map = FxHashMap::default(); + let msg_list = MsgList::new(msg_map.clone(), 0); + let selectables = FxHashMap::default(); + let id_menu = IdMenu::new(Vec::new()); + View { id_menu, msg_list, selectables } }