diff --git a/script/evgrd/bin/evgrd.rs b/script/evgrd/bin/evgrd.rs index f5ca0e7f1..82a339520 100644 --- a/script/evgrd/bin/evgrd.rs +++ b/script/evgrd/bin/evgrd.rs @@ -43,6 +43,7 @@ use rand::rngs::OsRng; use sled_overlay::sled; use smol::{fs, lock::Mutex, stream::StreamExt, Executor}; use std::{ + collections::HashSet, path::PathBuf, sync::{Arc, Mutex as SyncMutex}, }; @@ -51,6 +52,8 @@ use url::Url; use evgrd::{FetchEventsMessage, VersionMessage, MSG_EVENT, MSG_FETCHEVENTS}; +mod rpc; + const CONFIG_FILE: &str = "evgrd.toml"; const CONFIG_FILE_CONTENTS: &str = include_str!("../evgrd.toml"); @@ -72,7 +75,11 @@ struct Args { #[structopt(long, default_value = "tcp://127.0.0.1:5588")] /// RPC server listen address - rpc_listen: Url, + daemon_listen: Url, + + #[structopt(long, default_value = "tcp://127.0.0.1:26690")] + /// JSON-RPC server listen address + json_rpc_listen: Url, #[structopt(short, long, default_value = "~/.local/darkfi/evgrd_db")] /// Datastore (DB) path @@ -104,39 +111,39 @@ struct Args { } pub struct Daemon { - ///// P2P network pointer - //p2p: P2pPtr, + /// P2P network pointer + p2p: P2pPtr, ///// Sled DB (also used in event_graph and for RLN) //sled: sled::Db, /// Event Graph instance event_graph: EventGraphPtr, - ///// JSON-RPC connection tracker - //rpc_connections: Mutex>, - ///// dnet JSON-RPC subscriber - //dnet_sub: JsonSubscriber, - ///// deg JSON-RPC subscriber - //deg_sub: JsonSubscriber, - ///// Replay logs (DB) path - //replay_datastore: PathBuf, + /// JSON-RPC connection tracker + rpc_connections: Mutex>, + /// dnet JSON-RPC subscriber + dnet_sub: JsonSubscriber, + /// deg JSON-RPC subscriber + deg_sub: JsonSubscriber, + /// Replay logs (DB) path + replay_datastore: PathBuf, } impl Daemon { fn new( - //p2p: P2pPtr, + p2p: P2pPtr, //sled: sled::Db, event_graph: EventGraphPtr, - //dnet_sub: JsonSubscriber, - //deg_sub: JsonSubscriber, - //replay_datastore: PathBuf, + dnet_sub: JsonSubscriber, + deg_sub: JsonSubscriber, + replay_datastore: PathBuf, ) -> Self { Self { - //p2p, + p2p, //sled, event_graph, - //rpc_connections: Mutex::new(HashSet::new()), - //dnet_sub, - //deg_sub, - //replay_datastore, + rpc_connections: Mutex::new(HashSet::new()), + dnet_sub, + deg_sub, + replay_datastore, } } } @@ -299,15 +306,31 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> { info!("Starting JSON-RPC server"); let daemon = Arc::new(Daemon::new( - //p2p.clone(), + p2p.clone(), //sled_db.clone(), event_graph.clone(), - //dnet_sub, - //deg_sub, - //replay_datastore.clone(), + dnet_sub, + deg_sub, + replay_datastore.clone(), )); - let listener = Listener::new(args.rpc_listen, None).await?; + // Used for deg and dnet + let daemon_ = daemon.clone(); + let rpc_task = StoppableTask::new(); + rpc_task.clone().start( + listen_and_serve(args.json_rpc_listen, daemon.clone(), None, ex.clone()), + |res| async move { + match res { + Ok(()) | Err(Error::RpcServerStopped) => daemon_.stop_connections().await, + Err(e) => error!("Failed stopping JSON-RPC server: {}", e), + } + }, + Error::RpcServerStopped, + ex.clone(), + ); + + info!("Starting evgrd server"); + let listener = Listener::new(args.daemon_listen, None).await?; let ptlistener = listener.listen().await?; let rpc_task = StoppableTask::new(); diff --git a/script/evgrd/bin/rpc.rs b/script/evgrd/bin/rpc.rs new file mode 100644 index 000000000..696fcc7b9 --- /dev/null +++ b/script/evgrd/bin/rpc.rs @@ -0,0 +1,177 @@ +/* This file is part of DarkFi (https://dark.fi) + * + * Copyright (C) 2020-2024 Dyne.org foundation + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +use std::collections::HashSet; + +use async_trait::async_trait; +use darkfi::{ + event_graph::util::recreate_from_replayer_log, + net::P2pPtr, + rpc::{ + jsonrpc::{ErrorCode, JsonError, JsonRequest, JsonResponse, JsonResult}, + p2p_method::HandlerP2p, + server::RequestHandler, + util::JsonValue, + }, + system::StoppableTaskPtr, +}; +use log::debug; +use smol::lock::MutexGuard; + +use super::Daemon; + +#[async_trait] +impl RequestHandler for Daemon { + async fn handle_request(&self, req: JsonRequest) -> JsonResult { + debug!(target: "darkirc::rpc", "--> {}", req.stringify().unwrap()); + + match req.method.as_str() { + "ping" => self.pong(req.id, req.params).await, + "dnet.switch" => self.dnet_switch(req.id, req.params).await, + "dnet.subscribe_events" => self.dnet_subscribe_events(req.id, req.params).await, + // TODO: Make this optional + "p2p.get_info" => self.p2p_get_info(req.id, req.params).await, + + "deg.switch" => self.deg_switch(req.id, req.params).await, + "deg.subscribe_events" => self.deg_subscribe_events(req.id, req.params).await, + "eventgraph.get_info" => self.eg_get_info(req.id, req.params).await, + "eventgraph.replay" => self.eg_rep_info(req.id, req.params).await, + + _ => JsonError::new(ErrorCode::MethodNotFound, None, req.id).into(), + } + } + + async fn connections_mut(&self) -> MutexGuard<'_, HashSet> { + self.rpc_connections.lock().await + } +} + +impl Daemon { + // RPCAPI: + // Activate or deactivate dnet in the P2P stack. + // By sending `true`, dnet will be activated, and by sending `false` dnet + // will be deactivated. Returns `true` on success. + // + // --> {"jsonrpc": "2.0", "method": "dnet.switch", "params": [true], "id": 42} + // <-- {"jsonrpc": "2.0", "result": true, "id": 42} + async fn dnet_switch(&self, id: u16, params: JsonValue) -> JsonResult { + let params = params.get::>().unwrap(); + if params.len() != 1 || !params[0].is_bool() { + return JsonError::new(ErrorCode::InvalidParams, None, id).into() + } + + let switch = params[0].get::().unwrap(); + + if *switch { + self.p2p.dnet_enable(); + } else { + self.p2p.dnet_disable(); + } + + JsonResponse::new(JsonValue::Boolean(true), id).into() + } + + // RPCAPI: + // Initializes a subscription to p2p dnet events. + // Once a subscription is established, `darkirc` will send JSON-RPC notifications of + // new network events to the subscriber. + // + // --> {"jsonrpc": "2.0", "method": "dnet.subscribe_events", "params": [], "id": 1} + // <-- {"jsonrpc": "2.0", "method": "dnet.subscribe_events", "params": [`event`]} + pub async fn dnet_subscribe_events(&self, id: u16, params: JsonValue) -> JsonResult { + let params = params.get::>().unwrap(); + if !params.is_empty() { + return JsonError::new(ErrorCode::InvalidParams, None, id).into() + } + + self.dnet_sub.clone().into() + } + + // RPCAPI: + // Initializes a subscription to deg events. + // Once a subscription is established, apps using eventgraph will send JSON-RPC notifications of + // new eventgraph events to the subscriber. + // + // --> {"jsonrpc": "2.0", "method": "deg.subscribe_events", "params": [], "id": 1} + // <-- {"jsonrpc": "2.0", "method": "deg.subscribe_events", "params": [`event`]} + pub async fn deg_subscribe_events(&self, id: u16, params: JsonValue) -> JsonResult { + let params = params.get::>().unwrap(); + if !params.is_empty() { + return JsonError::new(ErrorCode::InvalidParams, None, id).into() + } + + self.deg_sub.clone().into() + } + + // RPCAPI: + // Activate or deactivate deg in the EVENTGRAPH. + // By sending `true`, deg will be activated, and by sending `false` deg + // will be deactivated. Returns `true` on success. + // + // --> {"jsonrpc": "2.0", "method": "deg.switch", "params": [true], "id": 42} + // <-- {"jsonrpc": "2.0", "result": true, "id": 42} + async fn deg_switch(&self, id: u16, params: JsonValue) -> JsonResult { + let params = params.get::>().unwrap(); + if params.len() != 1 || !params[0].is_bool() { + return JsonError::new(ErrorCode::InvalidParams, None, id).into() + } + + let switch = params[0].get::().unwrap(); + + if *switch { + self.event_graph.deg_enable().await; + } else { + self.event_graph.deg_disable().await; + } + + JsonResponse::new(JsonValue::Boolean(true), id).into() + } + + // RPCAPI: + // Get EVENTGRAPH info. + // + // --> {"jsonrpc": "2.0", "method": "deg.switch", "params": [true], "id": 42} + // <-- {"jsonrpc": "2.0", "result": true, "id": 42} + async fn eg_get_info(&self, id: u16, params: JsonValue) -> JsonResult { + let params_ = params.get::>().unwrap(); + if !params_.is_empty() { + return JsonError::new(ErrorCode::InvalidParams, None, id).into() + } + + self.event_graph.eventgraph_info(id, params).await + } + + // RPCAPI: + // Get replayed EVENTGRAPH info. + // + // --> {"jsonrpc": "2.0", "method": "eventgraph.replay", "params": ..., "id": 42} + // <-- {"jsonrpc": "2.0", "result": true, "id": 42} + async fn eg_rep_info(&self, id: u16, params: JsonValue) -> JsonResult { + let params_ = params.get::>().unwrap(); + if !params_.is_empty() { + return JsonError::new(ErrorCode::InvalidParams, None, id).into() + } + + recreate_from_replayer_log(&self.replay_datastore).await + } +} + +impl HandlerP2p for Daemon { + fn p2p(&self) -> P2pPtr { + self.p2p.clone() + } +}