bin/genev: add dnet task & rpc calls

This commit is contained in:
dasman
2024-02-29 04:33:15 +03:00
parent 47a319fd00
commit 103e7b64f8
2 changed files with 65 additions and 3 deletions

View File

@@ -170,6 +170,30 @@ async fn realmain(settings: Args, executor: Arc<smol::Executor<'static>>) -> Res
executor.clone(),
);
info!("Starting dnet subs task");
let dnet_sub = JsonSubscriber::new("dnet.subscribe_events");
let dnet_sub_ = dnet_sub.clone();
let p2p_ = p2p.clone();
let dnet_task = StoppableTask::new();
dnet_task.clone().start(
async move {
let dnet_sub = p2p_.dnet_subscribe().await;
loop {
let event = dnet_sub.receive().await;
debug!("Got dnet event: {:?}", event);
dnet_sub_.notify(vec![event.into()].into()).await;
}
},
|res| async {
match res {
Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ }
Err(e) => panic!("{}", e),
}
},
Error::DetachedTaskStopped,
executor.clone(),
);
info!("Starting deg subs task");
let deg_sub = JsonSubscriber::new("deg.subscribe_events");
let deg_sub_ = deg_sub.clone();
@@ -201,6 +225,7 @@ async fn realmain(settings: Args, executor: Arc<smol::Executor<'static>>) -> Res
"Alolymous".to_string(),
event_graph.clone(),
p2p.clone(),
dnet_sub,
deg_sub,
));
let rpc_task = StoppableTask::new();
@@ -224,6 +249,9 @@ async fn realmain(settings: Args, executor: Arc<smol::Executor<'static>>) -> Res
info!(target: "genevd", "Stopping JSON-RPC server...");
rpc_task.stop().await;
info!(target: "genevd", "Stopping Debugging tasks...");
dnet_task.stop().await;
deg_task.stop().await;
info!(target: "genevd", "Stopping sync loop task...");

View File

@@ -28,6 +28,7 @@ use darkfi::{
net,
rpc::{
jsonrpc::{ErrorCode, JsonError, JsonRequest, JsonResponse, JsonResult, JsonSubscriber},
p2p_method::HandlerP2p,
server::RequestHandler,
},
system::StoppableTaskPtr,
@@ -42,6 +43,7 @@ pub struct JsonRpcInterface {
event_graph: EventGraphPtr,
p2p: net::P2pPtr,
rpc_connections: Mutex<HashSet<StoppableTaskPtr>>,
dnet_sub: JsonSubscriber,
deg_sub: JsonSubscriber,
}
@@ -53,7 +55,9 @@ impl RequestHandler for JsonRpcInterface {
"list" => self.list(req.id, req.params).await,
"ping" => self.pong(req.id, req.params).await,
"dnet_switch" => self.dnet_switch(req.id, req.params).await,
"dnet.subscribe_events" => self.dnet_subscribe_events(req.id, req.params).await,
"dnet.switch" => self.dnet_switch(req.id, req.params).await,
"p2p.get_info" => self.p2p_get_info(req.id, req.params).await,
"deg.switch" => self.deg_switch(req.id, req.params).await,
"deg.subscribe_events" => self.deg_subscribe_events(req.id, req.params).await,
@@ -69,14 +73,44 @@ impl RequestHandler for JsonRpcInterface {
}
}
impl HandlerP2p for JsonRpcInterface {
fn p2p(&self) -> net::P2pPtr {
self.p2p.clone()
}
}
impl JsonRpcInterface {
pub fn new(
_nickname: String,
event_graph: EventGraphPtr,
p2p: net::P2pPtr,
dnet_sub: JsonSubscriber,
deg_sub: JsonSubscriber,
) -> Self {
Self { _nickname, event_graph, p2p, rpc_connections: Mutex::new(HashSet::new()), deg_sub }
Self {
_nickname,
event_graph,
p2p,
rpc_connections: Mutex::new(HashSet::new()),
dnet_sub,
deg_sub,
}
}
// RPCAPI:
// Initializes a subscription to p2p dnet events.
// Once a subscription is established, `darkirc` will send JSON-RPC notifications of
// new network events to the subscriber.
//
// --> {"jsonrpc": "2.0", "method": "dnet.subscribe_events", "params": [], "id": 1}
// <-- {"jsonrpc": "2.0", "method": "dnet.subscribe_events", "params": [`event`]}
pub async fn dnet_subscribe_events(&self, id: u16, params: JsonValue) -> JsonResult {
let params = params.get::<Vec<JsonValue>>().unwrap();
if !params.is_empty() {
return JsonError::new(ErrorCode::InvalidParams, None, id).into()
}
self.dnet_sub.clone().into()
}
// RPCAPI:
@@ -104,7 +138,7 @@ impl JsonRpcInterface {
}
// RPCAPI:
// Initializes a subscription to p2p deg events.
// Initializes a subscription to deg events.
// Once a subscription is established, apps using eventgraph will send JSON-RPC notifications of
// new eventgraph events to the subscriber.
//