diff --git a/bin/genev/genevd/src/main.rs b/bin/genev/genevd/src/main.rs index 0f6d196fa..35003ceb3 100644 --- a/bin/genev/genevd/src/main.rs +++ b/bin/genev/genevd/src/main.rs @@ -170,6 +170,30 @@ async fn realmain(settings: Args, executor: Arc>) -> Res executor.clone(), ); + info!("Starting dnet subs task"); + let dnet_sub = JsonSubscriber::new("dnet.subscribe_events"); + let dnet_sub_ = dnet_sub.clone(); + let p2p_ = p2p.clone(); + let dnet_task = StoppableTask::new(); + dnet_task.clone().start( + async move { + let dnet_sub = p2p_.dnet_subscribe().await; + loop { + let event = dnet_sub.receive().await; + debug!("Got dnet event: {:?}", event); + dnet_sub_.notify(vec![event.into()].into()).await; + } + }, + |res| async { + match res { + Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ } + Err(e) => panic!("{}", e), + } + }, + Error::DetachedTaskStopped, + executor.clone(), + ); + info!("Starting deg subs task"); let deg_sub = JsonSubscriber::new("deg.subscribe_events"); let deg_sub_ = deg_sub.clone(); @@ -201,6 +225,7 @@ async fn realmain(settings: Args, executor: Arc>) -> Res "Alolymous".to_string(), event_graph.clone(), p2p.clone(), + dnet_sub, deg_sub, )); let rpc_task = StoppableTask::new(); @@ -224,6 +249,9 @@ async fn realmain(settings: Args, executor: Arc>) -> Res info!(target: "genevd", "Stopping JSON-RPC server..."); rpc_task.stop().await; + + info!(target: "genevd", "Stopping Debugging tasks..."); + dnet_task.stop().await; deg_task.stop().await; info!(target: "genevd", "Stopping sync loop task..."); diff --git a/bin/genev/genevd/src/rpc.rs b/bin/genev/genevd/src/rpc.rs index 28ecce405..d603e1cb1 100644 --- a/bin/genev/genevd/src/rpc.rs +++ b/bin/genev/genevd/src/rpc.rs @@ -28,6 +28,7 @@ use darkfi::{ net, rpc::{ jsonrpc::{ErrorCode, JsonError, JsonRequest, JsonResponse, JsonResult, JsonSubscriber}, + p2p_method::HandlerP2p, server::RequestHandler, }, system::StoppableTaskPtr, @@ -42,6 +43,7 @@ pub struct JsonRpcInterface { event_graph: EventGraphPtr, p2p: net::P2pPtr, rpc_connections: Mutex>, + dnet_sub: JsonSubscriber, deg_sub: JsonSubscriber, } @@ -53,7 +55,9 @@ impl RequestHandler for JsonRpcInterface { "list" => self.list(req.id, req.params).await, "ping" => self.pong(req.id, req.params).await, - "dnet_switch" => self.dnet_switch(req.id, req.params).await, + "dnet.subscribe_events" => self.dnet_subscribe_events(req.id, req.params).await, + "dnet.switch" => self.dnet_switch(req.id, req.params).await, + "p2p.get_info" => self.p2p_get_info(req.id, req.params).await, "deg.switch" => self.deg_switch(req.id, req.params).await, "deg.subscribe_events" => self.deg_subscribe_events(req.id, req.params).await, @@ -69,14 +73,44 @@ impl RequestHandler for JsonRpcInterface { } } +impl HandlerP2p for JsonRpcInterface { + fn p2p(&self) -> net::P2pPtr { + self.p2p.clone() + } +} + impl JsonRpcInterface { pub fn new( _nickname: String, event_graph: EventGraphPtr, p2p: net::P2pPtr, + dnet_sub: JsonSubscriber, deg_sub: JsonSubscriber, ) -> Self { - Self { _nickname, event_graph, p2p, rpc_connections: Mutex::new(HashSet::new()), deg_sub } + Self { + _nickname, + event_graph, + p2p, + rpc_connections: Mutex::new(HashSet::new()), + dnet_sub, + deg_sub, + } + } + + // RPCAPI: + // Initializes a subscription to p2p dnet events. + // Once a subscription is established, `darkirc` will send JSON-RPC notifications of + // new network events to the subscriber. + // + // --> {"jsonrpc": "2.0", "method": "dnet.subscribe_events", "params": [], "id": 1} + // <-- {"jsonrpc": "2.0", "method": "dnet.subscribe_events", "params": [`event`]} + pub async fn dnet_subscribe_events(&self, id: u16, params: JsonValue) -> JsonResult { + let params = params.get::>().unwrap(); + if !params.is_empty() { + return JsonError::new(ErrorCode::InvalidParams, None, id).into() + } + + self.dnet_sub.clone().into() } // RPCAPI: @@ -104,7 +138,7 @@ impl JsonRpcInterface { } // RPCAPI: - // Initializes a subscription to p2p deg events. + // Initializes a subscription to deg events. // Once a subscription is established, apps using eventgraph will send JSON-RPC notifications of // new eventgraph events to the subscriber. //