dnetview: cleaned into 'rpc' and 'parser' modules. simplified main()

This commit is contained in:
lunar-mining
2022-08-10 07:08:07 +02:00
parent 554c17e4cb
commit e051bcfd2d
5 changed files with 665 additions and 618 deletions

View File

@@ -1,10 +1,12 @@
use async_std::sync::{Arc, Mutex};
use std::{collections::hash_map::Entry, fs::File, io, io::Read, path::PathBuf};
use async_std::sync::Arc;
use std::{fs::File, io, io::Read, path::PathBuf};
use darkfi::util::{
cli::{get_log_config, get_log_level, spawn_config, Config},
join_config_path,
};
use easy_parallel::Parallel;
use fxhash::{FxHashMap, FxHashSet};
use log::{error, info, trace};
use serde_json::{json, Value};
use log::{debug, info};
use simplelog::*;
use smol::Executor;
use termion::{async_stdin, event::Key, input::TermRead, raw::IntoRawMode};
@@ -12,68 +14,99 @@ use tui::{
backend::{Backend, TermionBackend},
Terminal,
};
use url::Url;
use darkfi::{
error::Result,
rpc::{client::RpcClient, jsonrpc::JsonRequest},
util::{
//async_util,
cli::{get_log_config, get_log_level, spawn_config, Config},
join_config_path,
NanoTimestamp,
},
};
pub mod config;
pub mod error;
pub mod model;
pub mod options;
pub mod parser;
pub mod rpc;
pub mod util;
pub mod view;
use crate::{
config::{DnvConfig, CONFIG_FILE_CONTENTS},
error::{DnetViewError, DnetViewResult},
model::{ConnectInfo, Model, NodeInfo, SelectableObject, Session, SessionInfo},
model::Model,
options::ProgramOptions,
util::{
is_empty_session, make_connect_id, make_empty_id, make_node_id, make_session_id, sleep,
},
view::{IdMenu, MsgList, View},
parser::DataParser,
view::View,
};
struct DnetView {
name: String,
rpc_client: RpcClient,
model: Arc<Model>,
view: View,
}
impl DnetView {
async fn new(url: Url, name: String) -> Result<Self> {
let rpc_client = RpcClient::new(url).await?;
Ok(Self { name, rpc_client })
fn new(model: Arc<Model>, view: View) -> Self {
Self { model, view }
}
// --> {"jsonrpc": "2.0", "method": "ping", "params": [], "id": 42}
// <-- {"jsonrpc": "2.0", "result": "pong", "id": 42}
async fn ping(&self) -> Result<Value> {
let req = JsonRequest::new("ping", json!([]));
self.rpc_client.request(req).await
}
async fn render_view<B: Backend>(&mut self, terminal: &mut Terminal<B>) -> DnetViewResult<()> {
let mut asi = async_stdin();
//--> {"jsonrpc": "2.0", "method": "poll", "params": [], "id": 42}
// <-- {"jsonrpc": "2.0", "result": {"nodeID": [], "nodeinfo" [], "id": 42}
async fn get_info(&self) -> DnetViewResult<Value> {
let req = JsonRequest::new("get_info", json!([]));
match self.rpc_client.request(req).await {
Ok(req) => Ok(req),
Err(e) => Err(DnetViewError::Darkfi(e)),
terminal.clear()?;
self.view.id_menu.state.select(Some(0));
self.view.msg_list.state.select(Some(0));
loop {
self.view.update(
self.model.new_id.lock().await.clone(),
self.model.msg_map.lock().await.clone(),
self.model.selectables.lock().await.clone(),
);
debug!(target: "dnetview::render_view()", "ID LIST: {:?}", self.view.id_menu.ids);
let mut err: Option<DnetViewError> = None;
terminal.draw(|f| match self.view.render(f) {
Ok(()) => {}
Err(e) => {
err = Some(e);
}
})?;
match err {
Some(e) => return Err(e),
None => {}
}
self.view.msg_list.scroll()?;
for k in asi.by_ref().keys() {
match k.unwrap() {
Key::Char('q') => {
terminal.clear()?;
return Ok(())
}
Key::Char('j') => {
self.view.id_menu.next();
}
Key::Char('k') => {
self.view.id_menu.previous();
}
Key::Char('u') => {
// TODO
//view.msg_list.next();
}
Key::Char('d') => {
// TODO
//view.msg_list.previous();
}
_ => (),
}
}
util::sleep(100).await;
}
}
}
#[async_std::main]
async fn main() -> DnetViewResult<()> {
debug!(target: "dnetview", "main() START");
let options = ProgramOptions::load()?;
let verbosity_level = options.app.occurrences_of("verbose");
@@ -97,36 +130,24 @@ async fn main() -> DnetViewResult<()> {
terminal.clear()?;
let ids = Mutex::new(FxHashSet::default());
let nodes = Mutex::new(FxHashMap::default());
let selectables = Mutex::new(FxHashMap::default());
let msg_map = Mutex::new(FxHashMap::default());
let msg_log = Mutex::new(Vec::new());
let new_id = Mutex::new(Vec::new());
let model = Arc::new(Model::new(ids, new_id, nodes, msg_map, msg_log, selectables));
let msg_map = FxHashMap::default();
let msg_list = MsgList::new(msg_map.clone(), 0);
let selectables = FxHashMap::default();
let id_menu = IdMenu::new(Vec::new());
let mut view = View::new(id_menu, msg_list, selectables);
let nthreads = num_cpus::get();
let (signal, shutdown) = async_channel::unbounded::<()>();
let (s, r) = async_channel::unbounded::<()>();
let model = Model::new();
let view = View::new();
let ex = Arc::new(Executor::new());
let ex2 = ex.clone();
let ex3 = ex.clone();
let mut dnetview = DnetView::new(model.clone(), view.clone());
let parser = DataParser::new(model.clone(), config);
let nthreads = num_cpus::get();
let (signal, shutdown) = async_channel::unbounded::<()>();
let (_, result) = Parallel::new()
.each(0..nthreads, |_| smol::future::block_on(ex.run(shutdown.recv())))
.finish(|| {
smol::future::block_on(async move {
start_connect_slots(&config, ex2.clone(), model.clone()).await?;
render_view(&mut terminal, model.clone(), &mut view.clone(), ex3).await?;
parser.start_connect_slots(ex2).await?;
dnetview.render_view(&mut terminal).await?;
drop(signal);
Ok(())
})
@@ -134,544 +155,3 @@ async fn main() -> DnetViewResult<()> {
result
}
async fn start_connect_slots(
config: &DnvConfig,
ex: Arc<Executor<'_>>,
model: Arc<Model>,
) -> DnetViewResult<()> {
for node in &config.nodes {
ex.spawn(try_connect(model.clone(), node.name.clone(), node.rpc_url.clone())).detach();
}
Ok(())
}
async fn try_connect(model: Arc<Model>, node_name: String, rpc_url: String) -> DnetViewResult<()> {
loop {
info!("Attempting to poll {}, RPC URL: {}", node_name, rpc_url);
match DnetView::new(Url::parse(&rpc_url)?, node_name.clone()).await {
Ok(client) => {
poll(client, model.clone()).await?;
}
Err(e) => {
error!("{}", e);
parse_offline(node_name.clone(), model.clone()).await?;
util::sleep(2000).await;
}
}
}
}
async fn poll(client: DnetView, model: Arc<Model>) -> DnetViewResult<()> {
loop {
match client.ping().await {
// TODO
Ok(reply) => {}
Err(e) => {}
}
match client.get_info().await {
Ok(reply) => {
if reply.as_object().is_some() && !reply.as_object().unwrap().is_empty() {
parse_data(reply.as_object().unwrap(), &client, model.clone()).await?;
} else {
return Err(DnetViewError::EmptyRpcReply)
}
}
Err(e) => {
error!("{:?}", e);
parse_offline(client.name.clone(), model.clone()).await?;
}
}
util::sleep(2000).await;
}
}
async fn parse_offline(node_name: String, model: Arc<Model>) -> DnetViewResult<()> {
let name = "Offline".to_string();
let session_type = Session::Offline;
let node_id = make_node_id(&node_name)?;
let session_id = make_session_id(&node_id, &session_type)?;
let mut connects: Vec<ConnectInfo> = Vec::new();
let mut sessions: Vec<SessionInfo> = Vec::new();
// initialize with empty values
let id = make_empty_id(&node_id, &session_type, 0)?;
let addr = "Null".to_string();
let state = "Null".to_string();
let parent = node_id.clone();
let msg_log = Vec::new();
let is_empty = true;
let last_msg = "Null".to_string();
let last_status = "Null".to_string();
let remote_node_id = "Null".to_string();
let connect_info = ConnectInfo::new(
id,
addr,
state.clone(),
parent.clone(),
msg_log,
is_empty,
last_msg,
last_status,
remote_node_id,
);
connects.push(connect_info.clone());
let accept_addr = None;
let session_info =
SessionInfo::new(session_id, name, is_empty, parent.clone(), connects, accept_addr);
sessions.push(session_info);
let node = NodeInfo::new(
node_id.clone(),
node_name.to_string(),
state.clone(),
sessions.clone(),
None,
true,
);
update_node(model.clone(), node.clone(), node_id.clone()).await;
update_selectable_and_ids(model.clone(), sessions, node.clone()).await?;
update_new_id(model.clone()).await;
Ok(())
}
async fn parse_data(
reply: &serde_json::Map<String, Value>,
client: &DnetView,
model: Arc<Model>,
) -> DnetViewResult<()> {
let addr = &reply.get("external_addr");
let inbound = &reply["session_inbound"];
let _manual = &reply["session_manual"];
let outbound = &reply["session_outbound"];
let state = &reply["state"];
let mut sessions: Vec<SessionInfo> = Vec::new();
let node_name = &client.name;
let node_id = make_node_id(node_name)?;
let ext_addr = parse_external_addr(addr).await?;
let in_session = parse_inbound(inbound, &node_id).await?;
let out_session = parse_outbound(outbound, &node_id).await?;
//let man_session = parse_manual(manual, &node_id).await?;
sessions.push(in_session.clone());
sessions.push(out_session.clone());
//sessions.push(man_session.clone());
let node = NodeInfo::new(
node_id.clone(),
node_name.to_string(),
state.as_str().unwrap().to_string(),
sessions.clone(),
ext_addr,
false,
);
update_node(model.clone(), node.clone(), node_id.clone()).await;
update_selectable_and_ids(model.clone(), sessions.clone(), node.clone()).await?;
update_msgs(model.clone(), sessions.clone()).await?;
update_new_id(model.clone()).await;
//trace!("IDS: {:?}", model.ids.lock().await);
//trace!("INFOS: {:?}", model.nodes.lock().await);
Ok(())
}
async fn update_msgs(model: Arc<Model>, sessions: Vec<SessionInfo>) -> DnetViewResult<()> {
for session in sessions {
for connection in session.children {
if !model.msg_map.lock().await.contains_key(&connection.id) {
// we don't have this ID: it is a new node
model.msg_map.lock().await.insert(connection.id, connection.msg_log.clone());
} else {
// we have this id: append the msg values
match model.msg_map.lock().await.entry(connection.id) {
Entry::Vacant(e) => {
e.insert(connection.msg_log);
}
Entry::Occupied(mut e) => {
for msg in connection.msg_log {
e.get_mut().push(msg);
}
}
}
}
}
}
Ok(())
}
async fn update_ids(model: Arc<Model>, id: String) {
model.ids.lock().await.insert(id);
}
async fn update_new_id(model: Arc<Model>) {
let ids = model.ids.lock().await.clone();
for id in ids.iter() {
model.new_id.lock().await.push(id.to_string());
}
}
async fn update_node(model: Arc<Model>, node: NodeInfo, id: String) {
model.nodes.lock().await.insert(id, node);
}
async fn update_selectable_and_ids(
model: Arc<Model>,
sessions: Vec<SessionInfo>,
node: NodeInfo,
) -> DnetViewResult<()> {
if node.is_offline == true {
let node_obj = SelectableObject::Node(node.clone());
model.selectables.lock().await.insert(node.id.clone(), node_obj);
update_ids(model.clone(), node.id.clone()).await;
} else {
let node_obj = SelectableObject::Node(node.clone());
model.selectables.lock().await.insert(node.id.clone(), node_obj);
update_ids(model.clone(), node.id.clone()).await;
for session in sessions {
if !session.is_empty {
let session_obj = SelectableObject::Session(session.clone());
model.selectables.lock().await.insert(session.clone().id, session_obj);
update_ids(model.clone(), session.clone().id).await;
for connect in session.children {
let connect_obj = SelectableObject::Connect(connect.clone());
model.selectables.lock().await.insert(connect.clone().id, connect_obj);
update_ids(model.clone(), connect.clone().id).await;
}
}
}
}
Ok(())
}
async fn parse_external_addr(addr: &Option<&Value>) -> DnetViewResult<Option<String>> {
match addr {
Some(addr) => match addr.as_str() {
Some(addr) => Ok(Some(addr.to_string())),
None => Ok(None),
},
None => Err(DnetViewError::NoExternalAddr),
}
}
async fn parse_inbound(inbound: &Value, node_id: &String) -> DnetViewResult<SessionInfo> {
let name = "Inbound".to_string();
let session_type = Session::Inbound;
let parent = node_id.to_string();
let id = make_session_id(&parent, &session_type)?;
let mut connects: Vec<ConnectInfo> = Vec::new();
let connections = &inbound["connected"];
let mut connect_count = 0;
let mut accept_vec = Vec::new();
match connections.as_object() {
Some(connect) => {
match connect.is_empty() {
true => {
connect_count += 1;
// channel is empty. initialize with empty values
let id = make_empty_id(node_id, &session_type, connect_count)?;
let addr = "Null".to_string();
let state = "Null".to_string();
let parent = parent.clone();
let msg_log = Vec::new();
let is_empty = true;
let last_msg = "Null".to_string();
let last_status = "Null".to_string();
let remote_node_id = "Null".to_string();
let connect_info = ConnectInfo::new(
id,
addr,
state,
parent,
msg_log,
is_empty,
last_msg,
last_status,
remote_node_id,
);
connects.push(connect_info);
}
false => {
// channel is not empty. initialize with whole values
for k in connect.keys() {
let node = connect.get(k);
let addr = k.to_string();
let info = node.unwrap().as_array();
// get the accept address
let accept_addr = info.unwrap().get(0);
let acc_addr = accept_addr
.unwrap()
.get("accept_addr")
.unwrap()
.as_str()
.unwrap()
.to_string();
accept_vec.push(acc_addr);
let info2 = info.unwrap().get(1);
let id = info2.unwrap().get("random_id").unwrap().as_u64().unwrap();
let id = make_connect_id(&id)?;
let state = "state".to_string();
let parent = parent.clone();
let msg_values = info2.unwrap().get("log").unwrap().as_array().unwrap();
let mut msg_log: Vec<(NanoTimestamp, String, String)> = Vec::new();
for msg in msg_values {
let msg: (NanoTimestamp, String, String) =
serde_json::from_value(msg.clone())?;
msg_log.push(msg);
}
let is_empty = false;
let last_msg =
info2.unwrap().get("last_msg").unwrap().as_str().unwrap().to_string();
let last_status = info2
.unwrap()
.get("last_status")
.unwrap()
.as_str()
.unwrap()
.to_string();
let remote_node_id = info2
.unwrap()
.get("remote_node_id")
.unwrap()
.as_str()
.unwrap()
.to_string();
let r_node_id: String = match remote_node_id.is_empty() {
true => "no remote id".to_string(),
false => remote_node_id,
};
let connect_info = ConnectInfo::new(
id,
addr,
state,
parent,
msg_log,
is_empty,
last_msg,
last_status,
r_node_id,
);
connects.push(connect_info.clone());
}
}
}
let is_empty = is_empty_session(&connects);
// TODO: clean this up
if accept_vec.is_empty() {
let accept_addr = None;
let session_info =
SessionInfo::new(id, name, is_empty, parent, connects, accept_addr);
Ok(session_info)
} else {
let accept_addr = Some(accept_vec[0].clone());
let session_info =
SessionInfo::new(id, name, is_empty, parent, connects, accept_addr);
Ok(session_info)
}
}
None => Err(DnetViewError::ValueIsNotObject),
}
}
// TODO: placeholder for now
async fn _parse_manual(_manual: &Value, node_id: &String) -> DnetViewResult<SessionInfo> {
let name = "Manual".to_string();
let session_type = Session::Manual;
let mut connects: Vec<ConnectInfo> = Vec::new();
let parent = node_id.to_string();
let session_id = make_session_id(&parent, &session_type)?;
//let id: u64 = 0;
let connect_id = make_empty_id(node_id, &session_type, 0)?;
//let connect_id = make_connect_id(&id)?;
let addr = "Null".to_string();
let state = "Null".to_string();
let msg_log = Vec::new();
let is_empty = true;
let msg = "Null".to_string();
let status = "Null".to_string();
let remote_node_id = "Null".to_string();
let connect_info = ConnectInfo::new(
connect_id.clone(),
addr,
state,
parent,
msg_log,
is_empty,
msg,
status,
remote_node_id,
);
connects.push(connect_info);
let parent = connect_id;
let is_empty = is_empty_session(&connects);
let accept_addr = None;
let session_info =
SessionInfo::new(session_id, name, is_empty, parent, connects.clone(), accept_addr);
Ok(session_info)
}
async fn parse_outbound(outbound: &Value, node_id: &String) -> DnetViewResult<SessionInfo> {
let name = "Outbound".to_string();
let session_type = Session::Outbound;
let parent = node_id.to_string();
let id = make_session_id(&parent, &session_type)?;
let mut connects: Vec<ConnectInfo> = Vec::new();
let slots = &outbound["slots"];
let mut slot_count = 0;
match slots.as_array() {
Some(slots) => {
for slot in slots {
slot_count += 1;
match slot["channel"].is_null() {
true => {
// TODO: this is not actually empty
let id = make_empty_id(node_id, &session_type, slot_count)?;
let addr = "Null".to_string();
let state = &slot["state"];
let state = state.as_str().unwrap().to_string();
let parent = parent.clone();
let msg_log = Vec::new();
let is_empty = false;
let last_msg = "Null".to_string();
let last_status = "Null".to_string();
let remote_node_id = "Null".to_string();
let connect_info = ConnectInfo::new(
id,
addr,
state,
parent,
msg_log,
is_empty,
last_msg,
last_status,
remote_node_id,
);
connects.push(connect_info.clone());
}
false => {
// channel is not empty. initialize with whole values
let channel = &slot["channel"];
let id = channel["random_id"].as_u64().unwrap();
let id = make_connect_id(&id)?;
let addr = &slot["addr"];
let addr = addr.as_str().unwrap().to_string();
let state = &slot["state"];
let state = state.as_str().unwrap().to_string();
let parent = parent.clone();
let msg_values = channel["log"].as_array().unwrap();
let mut msg_log: Vec<(NanoTimestamp, String, String)> = Vec::new();
for msg in msg_values {
let msg: (NanoTimestamp, String, String) =
serde_json::from_value(msg.clone())?;
msg_log.push(msg);
}
let is_empty = false;
let last_msg = channel["last_msg"].as_str().unwrap().to_string();
let last_status = channel["last_status"].as_str().unwrap().to_string();
let remote_node_id =
channel["remote_node_id"].as_str().unwrap().to_string();
let r_node_id: String = match remote_node_id.is_empty() {
true => "no remote id".to_string(),
false => remote_node_id,
};
let connect_info = ConnectInfo::new(
id,
addr,
state,
parent,
msg_log,
is_empty,
last_msg,
last_status,
r_node_id,
);
connects.push(connect_info.clone());
}
}
}
let is_empty = is_empty_session(&connects);
let accept_addr = None;
let session_info = SessionInfo::new(id, name, is_empty, parent, connects, accept_addr);
Ok(session_info)
}
None => Err(DnetViewError::ValueIsNotObject),
}
}
async fn render_view<B: Backend>(
terminal: &mut Terminal<B>,
model: Arc<Model>,
view: &mut View,
ex: Arc<Executor<'_>>,
) -> DnetViewResult<()> {
let mut asi = async_stdin();
terminal.clear()?;
view.id_menu.state.select(Some(0));
view.msg_list.state.select(Some(0));
loop {
view.update(
model.new_id.lock().await.clone(),
model.msg_map.lock().await.clone(),
model.selectables.lock().await.clone(),
);
trace!(target: "dnetview::render_view()", "ID LIST: {:?}", view.id_menu.ids);
let mut err: Option<DnetViewError> = None;
terminal.draw(|f| match view.render(f) {
Ok(()) => {}
Err(e) => {
err = Some(e);
}
})?;
match err {
Some(e) => return Err(e),
None => {}
}
view.msg_list.scroll()?;
for k in asi.by_ref().keys() {
match k.unwrap() {
Key::Char('q') => {
terminal.clear()?;
return Ok(())
}
Key::Char('j') => {
view.id_menu.next();
}
Key::Char('k') => {
view.id_menu.previous();
}
Key::Char('u') => {
// TODO
//view.msg_list.next();
}
Key::Char('d') => {
// TODO
//view.msg_list.previous();
}
_ => (),
}
}
util::sleep(200).await;
}
}

View File

@@ -1,4 +1,4 @@
use async_std::sync::Mutex;
use async_std::sync::{Arc, Mutex};
use fxhash::{FxHashMap, FxHashSet};
use serde::{Deserialize, Serialize};
@@ -34,15 +34,14 @@ pub struct Model {
}
impl Model {
pub fn new(
ids: Mutex<FxHashSet<String>>,
new_id: Mutex<Vec<String>>,
nodes: Mutex<FxHashMap<String, NodeInfo>>,
msg_map: MsgMap,
msg_log: Mutex<MsgLog>,
selectables: Mutex<FxHashMap<String, SelectableObject>>,
) -> Model {
Model { ids, new_id, nodes, msg_map, msg_log, selectables }
pub fn new() -> Arc<Self> {
let ids = Mutex::new(FxHashSet::default());
let nodes = Mutex::new(FxHashMap::default());
let selectables = Mutex::new(FxHashMap::default());
let msg_map = Mutex::new(FxHashMap::default());
let msg_log = Mutex::new(Vec::new());
let new_id = Mutex::new(Vec::new());
Arc::new(Model { ids, new_id, nodes, msg_map, msg_log, selectables })
}
}

529
bin/dnetview/src/parser.rs Normal file
View File

@@ -0,0 +1,529 @@
use async_std::sync::Arc;
use std::collections::hash_map::Entry;
use log::{debug, error, info};
use serde_json::Value;
use smol::Executor;
use url::Url;
use darkfi::util::NanoTimestamp;
use crate::{
config::DnvConfig,
error::{DnetViewError, DnetViewResult},
model::{ConnectInfo, Model, NodeInfo, SelectableObject, Session, SessionInfo},
rpc::RpcConnect,
util::{is_empty_session, make_connect_id, make_empty_id, make_node_id, make_session_id},
};
pub struct DataParser {
model: Arc<Model>,
config: DnvConfig,
}
impl DataParser {
pub fn new(model: Arc<Model>, config: DnvConfig) -> Arc<Self> {
Arc::new(Self { model, config })
}
pub async fn start_connect_slots(self: Arc<Self>, ex: Arc<Executor<'_>>) -> DnetViewResult<()> {
debug!(target: "dnetview", "start_connect_slots() START");
for node in &self.config.nodes {
let self2 = self.clone();
debug!(target: "dnetview", "attempting to spawn...");
ex.clone().spawn(self2.try_connect(node.name.clone(), node.rpc_url.clone())).detach();
}
Ok(())
}
async fn try_connect(
self: Arc<Self>,
node_name: String,
rpc_url: String,
) -> DnetViewResult<()> {
debug!(target: "dnetview", "try_connect() START");
loop {
info!("Attempting to poll {}, RPC URL: {}", node_name, rpc_url);
match RpcConnect::new(Url::parse(&rpc_url)?, node_name.clone()).await {
Ok(client) => {
self.poll(client).await?;
}
Err(e) => {
error!("{}", e);
self.parse_offline(node_name.clone()).await?;
crate::util::sleep(2000).await;
}
}
}
}
async fn poll(&self, client: RpcConnect) -> DnetViewResult<()> {
loop {
match client.ping().await {
// TODO
Ok(_reply) => {}
Err(_e) => {}
}
match client.get_info().await {
Ok(reply) => {
if reply.as_object().is_some() && !reply.as_object().unwrap().is_empty() {
self.parse_data(reply.as_object().unwrap(), &client).await?;
} else {
return Err(DnetViewError::EmptyRpcReply)
}
}
Err(e) => {
error!("{:?}", e);
self.parse_offline(client.name.clone()).await?;
}
}
crate::util::sleep(2000).await;
}
}
async fn parse_offline(&self, node_name: String) -> DnetViewResult<()> {
let name = "Offline".to_string();
let session_type = Session::Offline;
let node_id = make_node_id(&node_name)?;
let session_id = make_session_id(&node_id, &session_type)?;
let mut connects: Vec<ConnectInfo> = Vec::new();
let mut sessions: Vec<SessionInfo> = Vec::new();
// initialize with empty values
let id = make_empty_id(&node_id, &session_type, 0)?;
let addr = "Null".to_string();
let state = "Null".to_string();
let parent = node_id.clone();
let msg_log = Vec::new();
let is_empty = true;
let last_msg = "Null".to_string();
let last_status = "Null".to_string();
let remote_node_id = "Null".to_string();
let connect_info = ConnectInfo::new(
id,
addr,
state.clone(),
parent.clone(),
msg_log,
is_empty,
last_msg,
last_status,
remote_node_id,
);
connects.push(connect_info.clone());
let accept_addr = None;
let session_info =
SessionInfo::new(session_id, name, is_empty, parent.clone(), connects, accept_addr);
sessions.push(session_info);
let node = NodeInfo::new(
node_id.clone(),
node_name.to_string(),
state.clone(),
sessions.clone(),
None,
true,
);
self.update_node(node.clone(), node_id.clone()).await;
self.update_selectable_and_ids(sessions, node.clone()).await?;
self.update_new_id().await;
Ok(())
}
async fn parse_data(
&self,
reply: &serde_json::Map<String, Value>,
client: &RpcConnect,
) -> DnetViewResult<()> {
let addr = &reply.get("external_addr");
let inbound = &reply["session_inbound"];
let _manual = &reply["session_manual"];
let outbound = &reply["session_outbound"];
let state = &reply["state"];
let mut sessions: Vec<SessionInfo> = Vec::new();
let node_name = &client.name;
let node_id = make_node_id(node_name)?;
let ext_addr = self.parse_external_addr(addr).await?;
let in_session = self.parse_inbound(inbound, &node_id).await?;
let out_session = self.parse_outbound(outbound, &node_id).await?;
//let man_session = self.parse_manual(manual, &node_id).await?;
sessions.push(in_session.clone());
sessions.push(out_session.clone());
//sessions.push(man_session.clone());
let node = NodeInfo::new(
node_id.clone(),
node_name.to_string(),
state.as_str().unwrap().to_string(),
sessions.clone(),
ext_addr,
false,
);
self.update_node(node.clone(), node_id.clone()).await;
self.update_selectable_and_ids(sessions.clone(), node.clone()).await?;
self.update_msgs(sessions.clone()).await?;
self.update_new_id().await;
//debug!("IDS: {:?}", self.model.ids.lock().await);
//debug!("INFOS: {:?}", self.model.nodes.lock().await);
Ok(())
}
async fn update_msgs(&self, sessions: Vec<SessionInfo>) -> DnetViewResult<()> {
for session in sessions {
for connection in session.children {
if !self.model.msg_map.lock().await.contains_key(&connection.id) {
// we don't have this ID: it is a new node
self.model
.msg_map
.lock()
.await
.insert(connection.id, connection.msg_log.clone());
} else {
// we have this id: append the msg values
match self.model.msg_map.lock().await.entry(connection.id) {
Entry::Vacant(e) => {
e.insert(connection.msg_log);
}
Entry::Occupied(mut e) => {
for msg in connection.msg_log {
e.get_mut().push(msg);
}
}
}
}
}
}
Ok(())
}
async fn update_ids(&self, id: String) {
self.model.ids.lock().await.insert(id);
}
async fn update_new_id(&self) {
let ids = self.model.ids.lock().await.clone();
for id in ids.iter() {
self.model.new_id.lock().await.push(id.to_string());
}
}
async fn update_node(&self, node: NodeInfo, id: String) {
self.model.nodes.lock().await.insert(id, node);
}
async fn update_selectable_and_ids(
&self,
sessions: Vec<SessionInfo>,
node: NodeInfo,
) -> DnetViewResult<()> {
if node.is_offline == true {
let node_obj = SelectableObject::Node(node.clone());
self.model.selectables.lock().await.insert(node.id.clone(), node_obj);
self.update_ids(node.id.clone()).await;
} else {
let node_obj = SelectableObject::Node(node.clone());
self.model.selectables.lock().await.insert(node.id.clone(), node_obj);
self.update_ids(node.id.clone()).await;
for session in sessions {
if !session.is_empty {
let session_obj = SelectableObject::Session(session.clone());
self.model.selectables.lock().await.insert(session.clone().id, session_obj);
self.update_ids(session.clone().id).await;
for connect in session.children {
let connect_obj = SelectableObject::Connect(connect.clone());
self.model.selectables.lock().await.insert(connect.clone().id, connect_obj);
self.update_ids(connect.clone().id).await;
}
}
}
}
Ok(())
}
async fn parse_external_addr(&self, addr: &Option<&Value>) -> DnetViewResult<Option<String>> {
match addr {
Some(addr) => match addr.as_str() {
Some(addr) => Ok(Some(addr.to_string())),
None => Ok(None),
},
None => Err(DnetViewError::NoExternalAddr),
}
}
async fn parse_inbound(
&self,
inbound: &Value,
node_id: &String,
) -> DnetViewResult<SessionInfo> {
let name = "Inbound".to_string();
let session_type = Session::Inbound;
let parent = node_id.to_string();
let id = make_session_id(&parent, &session_type)?;
let mut connects: Vec<ConnectInfo> = Vec::new();
let connections = &inbound["connected"];
let mut connect_count = 0;
let mut accept_vec = Vec::new();
match connections.as_object() {
Some(connect) => {
match connect.is_empty() {
true => {
connect_count += 1;
// channel is empty. initialize with empty values
let id = make_empty_id(node_id, &session_type, connect_count)?;
let addr = "Null".to_string();
let state = "Null".to_string();
let parent = parent.clone();
let msg_log = Vec::new();
let is_empty = true;
let last_msg = "Null".to_string();
let last_status = "Null".to_string();
let remote_node_id = "Null".to_string();
let connect_info = ConnectInfo::new(
id,
addr,
state,
parent,
msg_log,
is_empty,
last_msg,
last_status,
remote_node_id,
);
connects.push(connect_info);
}
false => {
// channel is not empty. initialize with whole values
for k in connect.keys() {
let node = connect.get(k);
let addr = k.to_string();
let info = node.unwrap().as_array();
// get the accept address
let accept_addr = info.unwrap().get(0);
let acc_addr = accept_addr
.unwrap()
.get("accept_addr")
.unwrap()
.as_str()
.unwrap()
.to_string();
accept_vec.push(acc_addr);
let info2 = info.unwrap().get(1);
let id = info2.unwrap().get("random_id").unwrap().as_u64().unwrap();
let id = make_connect_id(&id)?;
let state = "state".to_string();
let parent = parent.clone();
let msg_values = info2.unwrap().get("log").unwrap().as_array().unwrap();
let mut msg_log: Vec<(NanoTimestamp, String, String)> = Vec::new();
for msg in msg_values {
let msg: (NanoTimestamp, String, String) =
serde_json::from_value(msg.clone())?;
msg_log.push(msg);
}
let is_empty = false;
let last_msg = info2
.unwrap()
.get("last_msg")
.unwrap()
.as_str()
.unwrap()
.to_string();
let last_status = info2
.unwrap()
.get("last_status")
.unwrap()
.as_str()
.unwrap()
.to_string();
let remote_node_id = info2
.unwrap()
.get("remote_node_id")
.unwrap()
.as_str()
.unwrap()
.to_string();
let r_node_id: String = match remote_node_id.is_empty() {
true => "no remote id".to_string(),
false => remote_node_id,
};
let connect_info = ConnectInfo::new(
id,
addr,
state,
parent,
msg_log,
is_empty,
last_msg,
last_status,
r_node_id,
);
connects.push(connect_info.clone());
}
}
}
let is_empty = is_empty_session(&connects);
// TODO: clean this up
if accept_vec.is_empty() {
let accept_addr = None;
let session_info =
SessionInfo::new(id, name, is_empty, parent, connects, accept_addr);
Ok(session_info)
} else {
let accept_addr = Some(accept_vec[0].clone());
let session_info =
SessionInfo::new(id, name, is_empty, parent, connects, accept_addr);
Ok(session_info)
}
}
None => Err(DnetViewError::ValueIsNotObject),
}
}
// TODO: placeholder for now
async fn _parse_manual(
&self,
_manual: &Value,
node_id: &String,
) -> DnetViewResult<SessionInfo> {
let name = "Manual".to_string();
let session_type = Session::Manual;
let mut connects: Vec<ConnectInfo> = Vec::new();
let parent = node_id.to_string();
let session_id = make_session_id(&parent, &session_type)?;
//let id: u64 = 0;
let connect_id = make_empty_id(node_id, &session_type, 0)?;
//let connect_id = make_connect_id(&id)?;
let addr = "Null".to_string();
let state = "Null".to_string();
let msg_log = Vec::new();
let is_empty = true;
let msg = "Null".to_string();
let status = "Null".to_string();
let remote_node_id = "Null".to_string();
let connect_info = ConnectInfo::new(
connect_id.clone(),
addr,
state,
parent,
msg_log,
is_empty,
msg,
status,
remote_node_id,
);
connects.push(connect_info);
let parent = connect_id;
let is_empty = is_empty_session(&connects);
let accept_addr = None;
let session_info =
SessionInfo::new(session_id, name, is_empty, parent, connects.clone(), accept_addr);
Ok(session_info)
}
async fn parse_outbound(
&self,
outbound: &Value,
node_id: &String,
) -> DnetViewResult<SessionInfo> {
let name = "Outbound".to_string();
let session_type = Session::Outbound;
let parent = node_id.to_string();
let id = make_session_id(&parent, &session_type)?;
let mut connects: Vec<ConnectInfo> = Vec::new();
let slots = &outbound["slots"];
let mut slot_count = 0;
match slots.as_array() {
Some(slots) => {
for slot in slots {
slot_count += 1;
match slot["channel"].is_null() {
true => {
// TODO: this is not actually empty
let id = make_empty_id(node_id, &session_type, slot_count)?;
let addr = "Null".to_string();
let state = &slot["state"];
let state = state.as_str().unwrap().to_string();
let parent = parent.clone();
let msg_log = Vec::new();
let is_empty = false;
let last_msg = "Null".to_string();
let last_status = "Null".to_string();
let remote_node_id = "Null".to_string();
let connect_info = ConnectInfo::new(
id,
addr,
state,
parent,
msg_log,
is_empty,
last_msg,
last_status,
remote_node_id,
);
connects.push(connect_info.clone());
}
false => {
// channel is not empty. initialize with whole values
let channel = &slot["channel"];
let id = channel["random_id"].as_u64().unwrap();
let id = make_connect_id(&id)?;
let addr = &slot["addr"];
let addr = addr.as_str().unwrap().to_string();
let state = &slot["state"];
let state = state.as_str().unwrap().to_string();
let parent = parent.clone();
let msg_values = channel["log"].as_array().unwrap();
let mut msg_log: Vec<(NanoTimestamp, String, String)> = Vec::new();
for msg in msg_values {
let msg: (NanoTimestamp, String, String) =
serde_json::from_value(msg.clone())?;
msg_log.push(msg);
}
let is_empty = false;
let last_msg = channel["last_msg"].as_str().unwrap().to_string();
let last_status = channel["last_status"].as_str().unwrap().to_string();
let remote_node_id =
channel["remote_node_id"].as_str().unwrap().to_string();
let r_node_id: String = match remote_node_id.is_empty() {
true => "no remote id".to_string(),
false => remote_node_id,
};
let connect_info = ConnectInfo::new(
id,
addr,
state,
parent,
msg_log,
is_empty,
last_msg,
last_status,
r_node_id,
);
connects.push(connect_info.clone());
}
}
}
let is_empty = is_empty_session(&connects);
let accept_addr = None;
let session_info =
SessionInfo::new(id, name, is_empty, parent, connects, accept_addr);
Ok(session_info)
}
None => Err(DnetViewError::ValueIsNotObject),
}
}
}

38
bin/dnetview/src/rpc.rs Normal file
View File

@@ -0,0 +1,38 @@
use darkfi::{
error::Result,
rpc::{client::RpcClient, jsonrpc::JsonRequest},
};
use serde_json::{json, Value};
use url::Url;
use crate::error::{DnetViewError, DnetViewResult};
pub struct RpcConnect {
pub name: String,
pub rpc_client: RpcClient,
}
impl RpcConnect {
pub async fn new(url: Url, name: String) -> Result<Self> {
let rpc_client = RpcClient::new(url).await?;
Ok(Self { name, rpc_client })
}
// --> {"jsonrpc": "2.0", "method": "ping", "params": [], "id": 42}
// <-- {"jsonrpc": "2.0", "result": "pong", "id": 42}
pub async fn ping(&self) -> Result<Value> {
let req = JsonRequest::new("ping", json!([]));
self.rpc_client.request(req).await
}
//--> {"jsonrpc": "2.0", "method": "poll", "params": [], "id": 42}
// <-- {"jsonrpc": "2.0", "result": {"nodeID": [], "nodeinfo" [], "id": 42}
pub async fn get_info(&self) -> DnetViewResult<Value> {
let req = JsonRequest::new("get_info", json!([]));
match self.rpc_client.request(req).await {
Ok(req) => Ok(req),
Err(e) => Err(DnetViewError::Darkfi(e)),
}
}
}

View File

@@ -30,11 +30,12 @@ pub struct View {
}
impl<'a> View {
pub fn new(
id_menu: IdMenu,
msg_list: MsgList,
selectables: FxHashMap<String, SelectableObject>,
) -> View {
pub fn new() -> View {
let msg_map = FxHashMap::default();
let msg_list = MsgList::new(msg_map.clone(), 0);
let selectables = FxHashMap::default();
let id_menu = IdMenu::new(Vec::new());
View { id_menu, msg_list, selectables }
}