diff --git a/example/dchat/src/dchatmsg.rs b/example/dchat/src/dchatmsg.rs index 835720946..135b0a2a3 100644 --- a/example/dchat/src/dchatmsg.rs +++ b/example/dchat/src/dchatmsg.rs @@ -20,7 +20,10 @@ use async_std::sync::{Arc, Mutex}; use darkfi::{impl_p2p_message, net::Message}; -use darkfi_serial::{SerialDecodable, SerialEncodable}; +use darkfi_serial::{ + async_trait, AsyncDecodable, AsyncEncodable, Decodable, Encodable, SerialDecodable, + SerialEncodable, VarInt, +}; pub type DchatMsgsBuffer = Arc>>; diff --git a/example/dchat/src/main.rs b/example/dchat/src/main.rs index 609b01168..bb4fb2cc9 100644 --- a/example/dchat/src/main.rs +++ b/example/dchat/src/main.rs @@ -19,7 +19,9 @@ use std::{error, fs::File, io::stdin}; // ANCHOR: daemon_deps + use async_std::sync::{Arc, Mutex}; +use darkfi::{async_daemonize, system::StoppableTaskPtr}; use easy_parallel::Parallel; use smol::Executor; // ANCHOR_END: daemon_deps @@ -125,19 +127,19 @@ impl Dchat { // ANCHOR: start async fn start(&mut self, ex: Arc>) -> Result<()> { - debug!(target: "dchat", "Dchat::start() [START]"); + //debug!(target: "dchat", "Dchat::start() [START]"); - let ex2 = ex.clone(); + //let ex2 = ex.clone(); - self.register_protocol(self.recv_msgs.clone()).await?; - self.p2p.clone().start(ex.clone()).await?; - ex2.spawn(self.p2p.clone().run(ex.clone())).detach(); + //self.register_protocol(self.recv_msgs.clone()).await?; + //self.p2p.clone().start(ex.clone()).await?; + //ex2.spawn(self.p2p.clone().run(ex.clone())).detach(); - self.menu().await?; + //self.menu().await?; - self.p2p.stop().await; + //self.p2p.stop().await; - debug!(target: "dchat", "Dchat::start() [STOP]"); + //debug!(target: "dchat", "Dchat::start() [STOP]"); Ok(()) } // ANCHOR_END: start @@ -220,49 +222,50 @@ fn bob() -> Result { // ANCHOR_END: bob // ANCHOR: main -#[async_std::main] -async fn main() -> Result<()> { - let settings: Result = match std::env::args().nth(1) { - Some(id) => match id.as_str() { - "a" => alice(), - "b" => bob(), - _ => Err(ErrorMissingSpecifier.into()), - }, - None => Err(ErrorMissingSpecifier.into()), - }; +async_daemonize!(realmain); +async fn realmain() -> Result<()> { + //let settings: Result = match std::env::args().nth(1) { + // Some(id) => match id.as_str() { + // "a" => alice(), + // "b" => bob(), + // _ => Err(ErrorMissingSpecifier.into()), + // }, + // None => Err(ErrorMissingSpecifier.into()), + //}; - let settings = settings?.clone(); + //let settings = settings?.clone(); - let p2p = net::P2p::new(settings.net).await; + //let p2p = net::P2p::new(settings.net).await; - let ex = Arc::new(Executor::new()); - let ex2 = ex.clone(); - let ex3 = ex2.clone(); + //let ex = Arc::new(Executor::new()); + //let ex2 = ex.clone(); + //let ex3 = ex2.clone(); - let msgs: DchatMsgsBuffer = Arc::new(Mutex::new(vec![DchatMsg { msg: String::new() }])); + //let msgs: DchatMsgsBuffer = Arc::new(Mutex::new(vec![DchatMsg { msg: String::new() }])); - let mut dchat = Dchat::new(p2p.clone(), msgs); + //let mut dchat = Dchat::new(p2p.clone(), msgs); - // ANCHOR: json_init - let accept_addr = settings.accept_addr.clone(); - let rpc = Arc::new(JsonRpcInterface { addr: accept_addr.clone(), p2p }); - let _ex = ex.clone(); - ex.spawn(async move { listen_and_serve(accept_addr.clone(), rpc, _ex).await }).detach(); - // ANCHOR_END: json_init + //// ANCHOR: json_init + //let accept_addr = settings.accept_addr.clone(); + //let rpc = Arc::new(JsonRpcInterface { addr: accept_addr.clone(), p2p }); + //let _ex = ex.clone(); + //ex.spawn(async move { listen_and_serve(accept_addr.clone(), rpc, _ex).await }).detach(); + //// ANCHOR_END: json_init - let nthreads = std::thread::available_parallelism().unwrap().get(); - let (signal, shutdown) = smol::channel::unbounded::<()>(); + //let nthreads = std::thread::available_parallelism().unwrap().get(); + //let (signal, shutdown) = smol::channel::unbounded::<()>(); - let (_, result) = Parallel::new() - .each(0..nthreads, |_| smol::future::block_on(ex2.run(shutdown.recv()))) - .finish(|| { - smol::future::block_on(async move { - dchat.start(ex3).await?; - drop(signal); - Ok(()) - }) - }); + //let (_, result) = Parallel::new() + // .each(0..nthreads, |_| smol::future::block_on(ex2.run(shutdown.recv()))) + // .finish(|| { + // smol::future::block_on(async move { + // dchat.start(ex3).await?; + // drop(signal); + // Ok(()) + // }) + // }); - result + //result + Ok(()) } // ANCHOR_END: main diff --git a/example/dchat/src/rpc.rs b/example/dchat/src/rpc.rs index 416cac752..ab301741b 100644 --- a/example/dchat/src/rpc.rs +++ b/example/dchat/src/rpc.rs @@ -17,8 +17,12 @@ */ use async_trait::async_trait; +use darkfi::system::StoppableTaskPtr; use log::debug; use serde_json::{json, Value}; +use std::collections::HashSet; +//use darkfi::system:: +use smol::lock::{Mutex, MutexGuard}; use url::Url; use darkfi::{ @@ -33,27 +37,43 @@ use darkfi::{ pub struct JsonRpcInterface { pub addr: Url, pub p2p: net::P2pPtr, + pub rpc_connections: Mutex>, } // ANCHOR_END: jsonrpc #[async_trait] impl RequestHandler for JsonRpcInterface { async fn handle_request(&self, req: JsonRequest) -> JsonResult { - if req.params.as_array().is_none() { - return JsonError::new(ErrorCode::InvalidRequest, None, req.id).into() + //debug!(target: "darkirc::rpc", "--> {}", req.stringify().unwrap()); + + match req.method.as_str() { + "ping" => self.pong(req.id, req.params).await, + //"dnet.switch" => self.dnet_switch(req.id, req.params).await, + //"dnet.subscribe_events" => self.dnet_subscribe_events(req.id, req.params).await, + //// TODO: Make this optional + //"p2p.get_info" => self.p2p_get_info(req.id, req.params).await, + _ => JsonError::new(ErrorCode::MethodNotFound, None, req.id).into(), } - debug!(target: "RPC", "--> {}", serde_json::to_string(&req).unwrap()); + //if req.params.as_array().is_none() { + // return JsonError::new(ErrorCode::InvalidRequest, None, req.id).into() + //} + + //debug!(target: "RPC", "--> {}", serde_json::to_string(&req).unwrap()); // ANCHOR: req_match - match req.method.as_str() { - Some("ping") => self.pong(req.id, req.params).await, - Some("dnet_switch") => self.dnet_switch(req.id, req.params).await, - Some("dnet_info") => self.dnet_info(req.id, req.params).await, - Some(_) | None => JsonError::new(ErrorCode::MethodNotFound, None, req.id).into(), - } + // TODO + //match req.method.as_str() { + // //Some("ping") => self.pong(req.id, req.params).await, + // // Some("dnet_switch") => self.dnet_switch(req.id, req.params).await, + // // Some("dnet_info") => self.dnet_info(req.id, req.params).await, + // Some(_) | None => JsonError::new(ErrorCode::MethodNotFound, None, req.id).into(), + //} // ANCHOR_END: req_match } + async fn connections_mut(&self) -> MutexGuard<'_, HashSet> { + self.rpc_connections.lock().await + } } impl JsonRpcInterface { @@ -62,9 +82,9 @@ impl JsonRpcInterface { // --> {"jsonrpc": "2.0", "method": "ping", "params": [], "id": 42} // <-- {"jsonrpc": "2.0", "result": "pong", "id": 42} // ANCHOR: pong - async fn pong(&self, id: Value, _params: Value) -> JsonResult { - JsonResponse::new(json!("pong"), id).into() - } + //async fn pong(&self, id: Value, _params: Value) -> JsonResult { + // JsonResponse::new(json!("pong"), id).into() + //} // ANCHOR_END: pong // RPCAPI: @@ -74,21 +94,21 @@ impl JsonRpcInterface { // // --> {"jsonrpc": "2.0", "method": "dnet_switch", "params": [true], "id": 42} // <-- {"jsonrpc": "2.0", "result": true, "id": 42} - async fn dnet_switch(&self, id: Value, params: Value) -> JsonResult { - let params = params.as_array().unwrap(); + //async fn dnet_switch(&self, id: Value, params: Value) -> JsonResult { + // let params = params.as_array().unwrap(); - if params.len() != 1 && params[0].as_bool().is_none() { - return JsonError::new(ErrorCode::InvalidParams, None, id).into() - } + // if params.len() != 1 && params[0].as_bool().is_none() { + // return JsonError::new(ErrorCode::InvalidParams, None, id).into() + // } - if params[0].as_bool().unwrap() { - self.p2p.dnet_enable().await; - } else { - self.p2p.dnet_disable().await; - } + // if params[0].as_bool().unwrap() { + // self.p2p.dnet_enable().await; + // } else { + // self.p2p.dnet_disable().await; + // } - JsonResponse::new(json!(true), id).into() - } + // JsonResponse::new(json!(true), id).into() + //} // RPCAPI: // Retrieves P2P network information. @@ -96,9 +116,9 @@ impl JsonRpcInterface { // --> {"jsonrpc": "2.0", "method": "dnet_info", "params": [], "id": 42} // <-- {"jsonrpc": "2.0", result": {"nodeID": [], "nodeinfo": [], "id": 42} // ANCHOR: dnet_info - async fn dnet_info(&self, id: Value, _params: Value) -> JsonResult { - let dnet_info = self.p2p.dnet_info().await; - JsonResponse::new(net::P2p::map_dnet_info(dnet_info), id).into() - } + //async fn dnet_info(&self, id: Value, _params: Value) -> JsonResult { + // let dnet_info = self.p2p.dnet_info().await; + // JsonResponse::new(net::P2p::map_dnet_info(dnet_info), id).into() + //} // ANCHOR_END: dnet_info }