dchat: update to new RPC + p2p API and delete redundant dependecies

This commit is contained in:
lunar-mining
2023-12-06 12:15:04 +01:00
parent 247a87e608
commit 914e8037aa
5 changed files with 141 additions and 122 deletions

View File

@@ -1,6 +1,6 @@
[package]
name = "dchat"
version = "0.4.1"
version = "0.4.2"
homepage = "https://dark.fi"
description = "Demo chat app used to document DarkFi networking code"
authors = ["Dyne.org foundation <foundation@dyne.org>"]
@@ -10,16 +10,14 @@ edition = "2021"
# ANCHOR: darkfi
[dependencies]
darkfi = {path = "../../", features = ["net", "rpc", "async_daemonize"]}
darkfi = {path = "../../", features = ["net", "rpc"]}
darkfi-serial = {path = "../../src/serial"}
# ANCHOR_END: darkfi
# ANCHOR: dependencies
async-std = "1.12.0"
async-trait = "0.1.74"
easy-parallel = "3.3.1"
smol = "1.3.0"
log = "0.4.20"
simplelog = "0.12.1"
url = "2.5.0"

View File

@@ -17,13 +17,11 @@
*/
// ANCHOR: msg
use async_std::sync::{Arc, Mutex};
use smol::lock::Mutex;
use std::sync::Arc;
use darkfi::{impl_p2p_message, net::Message};
use darkfi_serial::{
async_trait, AsyncDecodable, AsyncEncodable, Decodable, Encodable, SerialDecodable,
SerialEncodable, VarInt,
};
use darkfi_serial::{async_trait, SerialDecodable, SerialEncodable};
pub type DchatMsgsBuffer = Arc<Mutex<Vec<DchatMsg>>>;

View File

@@ -1,4 +1,4 @@
/* This file is part of DarkFi (https://dark.fi)
/* This file is part of Darkfi (https://dark.fi)
*
* Copyright (C) 2020-2023 Dyne.org foundation
*
@@ -19,17 +19,24 @@
use std::{error, fs::File, io::stdin};
// ANCHOR: daemon_deps
use async_std::sync::{Arc, Mutex};
use darkfi::{async_daemonize, system::StoppableTaskPtr};
use darkfi::system::StoppableTask;
use easy_parallel::Parallel;
use smol::Executor;
use smol::{lock::Mutex, Executor};
use std::{collections::HashSet, sync::Arc};
// ANCHOR_END: daemon_deps
use log::debug;
use log::{debug, error};
use simplelog::WriteLogger;
use url::Url;
use darkfi::{net, net::Settings, rpc::server::listen_and_serve};
use darkfi::{
net,
net::Settings,
rpc::{
jsonrpc::JsonSubscriber,
server::{listen_and_serve, RequestHandler},
},
};
use crate::{
dchat_error::ErrorMissingSpecifier,
@@ -44,6 +51,7 @@ pub mod protocol_dchat;
pub mod rpc;
// ANCHOR: error
pub type DarkfiError = darkfi::Error;
pub type Error = Box<dyn error::Error>;
pub type Result<T> = std::result::Result<T, Error>;
// ANCHOR_END: error
@@ -126,20 +134,17 @@ impl Dchat {
// ANCHOR_END: register_protocol
// ANCHOR: start
async fn start(&mut self, ex: Arc<Executor<'_>>) -> Result<()> {
//debug!(target: "dchat", "Dchat::start() [START]");
async fn start(&mut self) -> Result<()> {
debug!(target: "dchat", "Dchat::start() [START]");
//let ex2 = ex.clone();
self.register_protocol(self.recv_msgs.clone()).await?;
self.p2p.clone().start().await?;
//self.register_protocol(self.recv_msgs.clone()).await?;
//self.p2p.clone().start(ex.clone()).await?;
//ex2.spawn(self.p2p.clone().run(ex.clone())).detach();
self.menu().await?;
//self.menu().await?;
self.p2p.stop().await;
//self.p2p.stop().await;
//debug!(target: "dchat", "Dchat::start() [STOP]");
debug!(target: "dchat", "Dchat::start() [STOP]");
Ok(())
}
// ANCHOR_END: start
@@ -222,50 +227,88 @@ fn bob() -> Result<AppSettings> {
// ANCHOR_END: bob
// ANCHOR: main
async_daemonize!(realmain);
async fn realmain() -> Result<()> {
//let settings: Result<AppSettings> = match std::env::args().nth(1) {
// Some(id) => match id.as_str() {
// "a" => alice(),
// "b" => bob(),
// _ => Err(ErrorMissingSpecifier.into()),
// },
// None => Err(ErrorMissingSpecifier.into()),
//};
fn main() -> Result<()> {
smol::block_on(async {
let settings: Result<AppSettings> = match std::env::args().nth(1) {
Some(id) => match id.as_str() {
"a" => alice(),
"b" => bob(),
_ => Err(ErrorMissingSpecifier.into()),
},
None => Err(ErrorMissingSpecifier.into()),
};
//let settings = settings?.clone();
let settings = settings?.clone();
let ex = Arc::new(Executor::new());
let p2p = net::P2p::new(settings.net, ex.clone()).await;
let msgs: DchatMsgsBuffer = Arc::new(Mutex::new(vec![DchatMsg { msg: String::new() }]));
let mut dchat = Dchat::new(p2p.clone(), msgs);
//let p2p = net::P2p::new(settings.net).await;
// ANCHOR: dnet_sub
let dnet_sub = JsonSubscriber::new("dnet.subscribe_events");
let dnet_sub_ = dnet_sub.clone();
let p2p_ = p2p.clone();
let dnet_task = StoppableTask::new();
dnet_task.clone().start(
async move {
let dnet_sub = p2p_.dnet_subscribe().await;
loop {
let event = dnet_sub.receive().await;
//debug!("Got dnet event: {:?}", event);
dnet_sub_.notify(vec![event.into()].into()).await;
}
},
|res| async {
match res {
Ok(()) | Err(DarkfiError::DetachedTaskStopped) => { /* Do nothing */ }
Err(e) => panic!("{}", e),
}
},
DarkfiError::DetachedTaskStopped,
ex.clone(),
);
// ANCHOR_END: dnet_sub
//let ex = Arc::new(Executor::new());
//let ex2 = ex.clone();
//let ex3 = ex2.clone();
// ANCHOR: json_init
let accept_addr = settings.accept_addr.clone();
//let msgs: DchatMsgsBuffer = Arc::new(Mutex::new(vec![DchatMsg { msg: String::new() }]));
let rpc_connections = Mutex::new(HashSet::new());
let rpc = Arc::new(JsonRpcInterface {
addr: accept_addr.clone(),
p2p,
rpc_connections,
dnet_sub,
});
let _ex = ex.clone();
//let mut dchat = Dchat::new(p2p.clone(), msgs);
let rpc_task = StoppableTask::new();
rpc_task.clone().start(
listen_and_serve(accept_addr.clone(), rpc.clone(), None, ex.clone()),
|res| async move {
match res {
Ok(()) | Err(DarkfiError::RpcServerStopped) => rpc.stop_connections().await,
Err(e) => error!("Failed stopping JSON-RPC server: {}", e),
}
},
DarkfiError::RpcServerStopped,
ex.clone(),
);
// ANCHOR_END: json_init
//// ANCHOR: json_init
//let accept_addr = settings.accept_addr.clone();
//let rpc = Arc::new(JsonRpcInterface { addr: accept_addr.clone(), p2p });
//let _ex = ex.clone();
//ex.spawn(async move { listen_and_serve(accept_addr.clone(), rpc, _ex).await }).detach();
//// ANCHOR_END: json_init
let nthreads = std::thread::available_parallelism().unwrap().get();
let (signal, shutdown) = smol::channel::unbounded::<()>();
//let nthreads = std::thread::available_parallelism().unwrap().get();
//let (signal, shutdown) = smol::channel::unbounded::<()>();
let (_, result) = Parallel::new()
.each(0..nthreads, |_| smol::future::block_on(ex.run(shutdown.recv())))
.finish(|| {
smol::future::block_on(async move {
dchat.start().await?;
drop(signal);
Ok(())
})
});
//let (_, result) = Parallel::new()
// .each(0..nthreads, |_| smol::future::block_on(ex2.run(shutdown.recv())))
// .finish(|| {
// smol::future::block_on(async move {
// dchat.start(ex3).await?;
// drop(signal);
// Ok(())
// })
// });
//result
Ok(())
result
})
}
// ANCHOR_END: main

View File

@@ -17,11 +17,11 @@
*/
// ANCHOR: protocol_dchat
use async_std::sync::Arc;
use async_trait::async_trait;
use darkfi::{net, Result};
use log::debug;
use smol::Executor;
use std::sync::Arc;
use crate::dchatmsg::{DchatMsg, DchatMsgsBuffer};

View File

@@ -19,7 +19,6 @@
use async_trait::async_trait;
use darkfi::system::StoppableTaskPtr;
use log::debug;
use serde_json::{json, Value};
use std::collections::HashSet;
//use darkfi::system::
use smol::lock::{Mutex, MutexGuard};
@@ -28,8 +27,9 @@ use url::Url;
use darkfi::{
net,
rpc::{
jsonrpc::{ErrorCode, JsonError, JsonRequest, JsonResponse, JsonResult},
jsonrpc::{ErrorCode, JsonError, JsonRequest, JsonResponse, JsonResult, JsonSubscriber},
server::RequestHandler,
util::JsonValue,
},
};
@@ -38,37 +38,22 @@ pub struct JsonRpcInterface {
pub addr: Url,
pub p2p: net::P2pPtr,
pub rpc_connections: Mutex<HashSet<StoppableTaskPtr>>,
pub dnet_sub: JsonSubscriber,
}
// ANCHOR_END: jsonrpc
#[async_trait]
impl RequestHandler for JsonRpcInterface {
async fn handle_request(&self, req: JsonRequest) -> JsonResult {
//debug!(target: "darkirc::rpc", "--> {}", req.stringify().unwrap());
match req.method.as_str() {
"ping" => self.pong(req.id, req.params).await,
//"dnet.switch" => self.dnet_switch(req.id, req.params).await,
//"dnet.subscribe_events" => self.dnet_subscribe_events(req.id, req.params).await,
//// TODO: Make this optional
//"p2p.get_info" => self.p2p_get_info(req.id, req.params).await,
_ => JsonError::new(ErrorCode::MethodNotFound, None, req.id).into(),
}
//if req.params.as_array().is_none() {
// return JsonError::new(ErrorCode::InvalidRequest, None, req.id).into()
//}
//debug!(target: "RPC", "--> {}", serde_json::to_string(&req).unwrap());
debug!(target: "dchat::rpc", "--> {}", req.stringify().unwrap());
// ANCHOR: req_match
// TODO
//match req.method.as_str() {
// //Some("ping") => self.pong(req.id, req.params).await,
// // Some("dnet_switch") => self.dnet_switch(req.id, req.params).await,
// // Some("dnet_info") => self.dnet_info(req.id, req.params).await,
// Some(_) | None => JsonError::new(ErrorCode::MethodNotFound, None, req.id).into(),
//}
match req.method.as_str() {
"ping" => self.pong(req.id, req.params).await,
"dnet.switch" => self.dnet_switch(req.id, req.params).await,
"dnet.subscribe_events" => self.dnet_subscribe_events(req.id, req.params).await,
_ => JsonError::new(ErrorCode::MethodNotFound, None, req.id).into(),
}
// ANCHOR_END: req_match
}
async fn connections_mut(&self) -> MutexGuard<'_, HashSet<StoppableTaskPtr>> {
@@ -77,16 +62,6 @@ impl RequestHandler for JsonRpcInterface {
}
impl JsonRpcInterface {
// RPCAPI:
// Replies to a ping method.
// --> {"jsonrpc": "2.0", "method": "ping", "params": [], "id": 42}
// <-- {"jsonrpc": "2.0", "result": "pong", "id": 42}
// ANCHOR: pong
//async fn pong(&self, id: Value, _params: Value) -> JsonResult {
// JsonResponse::new(json!("pong"), id).into()
//}
// ANCHOR_END: pong
// RPCAPI:
// Activate or deactivate dnet in the P2P stack.
// By sending `true`, dnet will be activated, and by sending `false` dnet will
@@ -94,31 +69,36 @@ impl JsonRpcInterface {
//
// --> {"jsonrpc": "2.0", "method": "dnet_switch", "params": [true], "id": 42}
// <-- {"jsonrpc": "2.0", "result": true, "id": 42}
//async fn dnet_switch(&self, id: Value, params: Value) -> JsonResult {
// let params = params.as_array().unwrap();
async fn dnet_switch(&self, id: u16, params: JsonValue) -> JsonResult {
let params = params.get::<Vec<JsonValue>>().unwrap();
if params.len() != 1 || !params[0].is_bool() {
return JsonError::new(ErrorCode::InvalidParams, None, id).into()
}
// if params.len() != 1 && params[0].as_bool().is_none() {
// return JsonError::new(ErrorCode::InvalidParams, None, id).into()
// }
let switch = params[0].get::<bool>().unwrap();
// if params[0].as_bool().unwrap() {
// self.p2p.dnet_enable().await;
// } else {
// self.p2p.dnet_disable().await;
// }
if *switch {
self.p2p.dnet_enable().await;
} else {
self.p2p.dnet_disable().await;
}
// JsonResponse::new(json!(true), id).into()
//}
// RPCAPI:
// Retrieves P2P network information.
JsonResponse::new(JsonValue::Boolean(true), id).into()
}
//
// --> {"jsonrpc": "2.0", "method": "dnet_info", "params": [], "id": 42}
// <-- {"jsonrpc": "2.0", result": {"nodeID": [], "nodeinfo": [], "id": 42}
// ANCHOR: dnet_info
//async fn dnet_info(&self, id: Value, _params: Value) -> JsonResult {
// let dnet_info = self.p2p.dnet_info().await;
// JsonResponse::new(net::P2p::map_dnet_info(dnet_info), id).into()
//}
// ANCHOR_END: dnet_info
// RPCAPI:
// Initializes a subscription to p2p dnet events.
// Once a subscription is established, `darkirc` will send JSON-RPC notifications of
// new network events to the subscriber.
//
// --> {"jsonrpc": "2.0", "method": "dnet.subscribe_events", "params": [], "id": 1}
// <-- {"jsonrpc": "2.0", "method": "dnet.subscribe_events", "params": [`event`]}
pub async fn dnet_subscribe_events(&self, id: u16, params: JsonValue) -> JsonResult {
let params = params.get::<Vec<JsonValue>>().unwrap();
if !params.is_empty() {
return JsonError::new(ErrorCode::InvalidParams, None, id).into()
}
self.dnet_sub.clone().into()
}
}