mirror of
https://github.com/darkrenaissance/darkfi.git
synced 2026-01-09 14:48:08 -05:00
evgrd: add missing JSON-RPC modules so dnet and deg work.
This commit is contained in:
@@ -43,6 +43,7 @@ use rand::rngs::OsRng;
|
||||
use sled_overlay::sled;
|
||||
use smol::{fs, lock::Mutex, stream::StreamExt, Executor};
|
||||
use std::{
|
||||
collections::HashSet,
|
||||
path::PathBuf,
|
||||
sync::{Arc, Mutex as SyncMutex},
|
||||
};
|
||||
@@ -51,6 +52,8 @@ use url::Url;
|
||||
|
||||
use evgrd::{FetchEventsMessage, VersionMessage, MSG_EVENT, MSG_FETCHEVENTS};
|
||||
|
||||
mod rpc;
|
||||
|
||||
const CONFIG_FILE: &str = "evgrd.toml";
|
||||
const CONFIG_FILE_CONTENTS: &str = include_str!("../evgrd.toml");
|
||||
|
||||
@@ -72,7 +75,11 @@ struct Args {
|
||||
|
||||
#[structopt(long, default_value = "tcp://127.0.0.1:5588")]
|
||||
/// RPC server listen address
|
||||
rpc_listen: Url,
|
||||
daemon_listen: Url,
|
||||
|
||||
#[structopt(long, default_value = "tcp://127.0.0.1:26690")]
|
||||
/// JSON-RPC server listen address
|
||||
json_rpc_listen: Url,
|
||||
|
||||
#[structopt(short, long, default_value = "~/.local/darkfi/evgrd_db")]
|
||||
/// Datastore (DB) path
|
||||
@@ -104,39 +111,39 @@ struct Args {
|
||||
}
|
||||
|
||||
pub struct Daemon {
|
||||
///// P2P network pointer
|
||||
//p2p: P2pPtr,
|
||||
/// P2P network pointer
|
||||
p2p: P2pPtr,
|
||||
///// Sled DB (also used in event_graph and for RLN)
|
||||
//sled: sled::Db,
|
||||
/// Event Graph instance
|
||||
event_graph: EventGraphPtr,
|
||||
///// JSON-RPC connection tracker
|
||||
//rpc_connections: Mutex<HashSet<StoppableTaskPtr>>,
|
||||
///// dnet JSON-RPC subscriber
|
||||
//dnet_sub: JsonSubscriber,
|
||||
///// deg JSON-RPC subscriber
|
||||
//deg_sub: JsonSubscriber,
|
||||
///// Replay logs (DB) path
|
||||
//replay_datastore: PathBuf,
|
||||
/// JSON-RPC connection tracker
|
||||
rpc_connections: Mutex<HashSet<StoppableTaskPtr>>,
|
||||
/// dnet JSON-RPC subscriber
|
||||
dnet_sub: JsonSubscriber,
|
||||
/// deg JSON-RPC subscriber
|
||||
deg_sub: JsonSubscriber,
|
||||
/// Replay logs (DB) path
|
||||
replay_datastore: PathBuf,
|
||||
}
|
||||
|
||||
impl Daemon {
|
||||
fn new(
|
||||
//p2p: P2pPtr,
|
||||
p2p: P2pPtr,
|
||||
//sled: sled::Db,
|
||||
event_graph: EventGraphPtr,
|
||||
//dnet_sub: JsonSubscriber,
|
||||
//deg_sub: JsonSubscriber,
|
||||
//replay_datastore: PathBuf,
|
||||
dnet_sub: JsonSubscriber,
|
||||
deg_sub: JsonSubscriber,
|
||||
replay_datastore: PathBuf,
|
||||
) -> Self {
|
||||
Self {
|
||||
//p2p,
|
||||
p2p,
|
||||
//sled,
|
||||
event_graph,
|
||||
//rpc_connections: Mutex::new(HashSet::new()),
|
||||
//dnet_sub,
|
||||
//deg_sub,
|
||||
//replay_datastore,
|
||||
rpc_connections: Mutex::new(HashSet::new()),
|
||||
dnet_sub,
|
||||
deg_sub,
|
||||
replay_datastore,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -299,15 +306,31 @@ async fn realmain(args: Args, ex: Arc<Executor<'static>>) -> Result<()> {
|
||||
|
||||
info!("Starting JSON-RPC server");
|
||||
let daemon = Arc::new(Daemon::new(
|
||||
//p2p.clone(),
|
||||
p2p.clone(),
|
||||
//sled_db.clone(),
|
||||
event_graph.clone(),
|
||||
//dnet_sub,
|
||||
//deg_sub,
|
||||
//replay_datastore.clone(),
|
||||
dnet_sub,
|
||||
deg_sub,
|
||||
replay_datastore.clone(),
|
||||
));
|
||||
|
||||
let listener = Listener::new(args.rpc_listen, None).await?;
|
||||
// Used for deg and dnet
|
||||
let daemon_ = daemon.clone();
|
||||
let rpc_task = StoppableTask::new();
|
||||
rpc_task.clone().start(
|
||||
listen_and_serve(args.json_rpc_listen, daemon.clone(), None, ex.clone()),
|
||||
|res| async move {
|
||||
match res {
|
||||
Ok(()) | Err(Error::RpcServerStopped) => daemon_.stop_connections().await,
|
||||
Err(e) => error!("Failed stopping JSON-RPC server: {}", e),
|
||||
}
|
||||
},
|
||||
Error::RpcServerStopped,
|
||||
ex.clone(),
|
||||
);
|
||||
|
||||
info!("Starting evgrd server");
|
||||
let listener = Listener::new(args.daemon_listen, None).await?;
|
||||
let ptlistener = listener.listen().await?;
|
||||
|
||||
let rpc_task = StoppableTask::new();
|
||||
|
||||
177
script/evgrd/bin/rpc.rs
Normal file
177
script/evgrd/bin/rpc.rs
Normal file
@@ -0,0 +1,177 @@
|
||||
/* This file is part of DarkFi (https://dark.fi)
|
||||
*
|
||||
* Copyright (C) 2020-2024 Dyne.org foundation
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as
|
||||
* published by the Free Software Foundation, either version 3 of the
|
||||
* License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU Affero General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
*/
|
||||
use std::collections::HashSet;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use darkfi::{
|
||||
event_graph::util::recreate_from_replayer_log,
|
||||
net::P2pPtr,
|
||||
rpc::{
|
||||
jsonrpc::{ErrorCode, JsonError, JsonRequest, JsonResponse, JsonResult},
|
||||
p2p_method::HandlerP2p,
|
||||
server::RequestHandler,
|
||||
util::JsonValue,
|
||||
},
|
||||
system::StoppableTaskPtr,
|
||||
};
|
||||
use log::debug;
|
||||
use smol::lock::MutexGuard;
|
||||
|
||||
use super::Daemon;
|
||||
|
||||
#[async_trait]
|
||||
impl RequestHandler for Daemon {
|
||||
async fn handle_request(&self, req: JsonRequest) -> JsonResult {
|
||||
debug!(target: "darkirc::rpc", "--> {}", req.stringify().unwrap());
|
||||
|
||||
match req.method.as_str() {
|
||||
"ping" => self.pong(req.id, req.params).await,
|
||||
"dnet.switch" => self.dnet_switch(req.id, req.params).await,
|
||||
"dnet.subscribe_events" => self.dnet_subscribe_events(req.id, req.params).await,
|
||||
// TODO: Make this optional
|
||||
"p2p.get_info" => self.p2p_get_info(req.id, req.params).await,
|
||||
|
||||
"deg.switch" => self.deg_switch(req.id, req.params).await,
|
||||
"deg.subscribe_events" => self.deg_subscribe_events(req.id, req.params).await,
|
||||
"eventgraph.get_info" => self.eg_get_info(req.id, req.params).await,
|
||||
"eventgraph.replay" => self.eg_rep_info(req.id, req.params).await,
|
||||
|
||||
_ => JsonError::new(ErrorCode::MethodNotFound, None, req.id).into(),
|
||||
}
|
||||
}
|
||||
|
||||
async fn connections_mut(&self) -> MutexGuard<'_, HashSet<StoppableTaskPtr>> {
|
||||
self.rpc_connections.lock().await
|
||||
}
|
||||
}
|
||||
|
||||
impl Daemon {
|
||||
// RPCAPI:
|
||||
// Activate or deactivate dnet in the P2P stack.
|
||||
// By sending `true`, dnet will be activated, and by sending `false` dnet
|
||||
// will be deactivated. Returns `true` on success.
|
||||
//
|
||||
// --> {"jsonrpc": "2.0", "method": "dnet.switch", "params": [true], "id": 42}
|
||||
// <-- {"jsonrpc": "2.0", "result": true, "id": 42}
|
||||
async fn dnet_switch(&self, id: u16, params: JsonValue) -> JsonResult {
|
||||
let params = params.get::<Vec<JsonValue>>().unwrap();
|
||||
if params.len() != 1 || !params[0].is_bool() {
|
||||
return JsonError::new(ErrorCode::InvalidParams, None, id).into()
|
||||
}
|
||||
|
||||
let switch = params[0].get::<bool>().unwrap();
|
||||
|
||||
if *switch {
|
||||
self.p2p.dnet_enable();
|
||||
} else {
|
||||
self.p2p.dnet_disable();
|
||||
}
|
||||
|
||||
JsonResponse::new(JsonValue::Boolean(true), id).into()
|
||||
}
|
||||
|
||||
// RPCAPI:
|
||||
// Initializes a subscription to p2p dnet events.
|
||||
// Once a subscription is established, `darkirc` will send JSON-RPC notifications of
|
||||
// new network events to the subscriber.
|
||||
//
|
||||
// --> {"jsonrpc": "2.0", "method": "dnet.subscribe_events", "params": [], "id": 1}
|
||||
// <-- {"jsonrpc": "2.0", "method": "dnet.subscribe_events", "params": [`event`]}
|
||||
pub async fn dnet_subscribe_events(&self, id: u16, params: JsonValue) -> JsonResult {
|
||||
let params = params.get::<Vec<JsonValue>>().unwrap();
|
||||
if !params.is_empty() {
|
||||
return JsonError::new(ErrorCode::InvalidParams, None, id).into()
|
||||
}
|
||||
|
||||
self.dnet_sub.clone().into()
|
||||
}
|
||||
|
||||
// RPCAPI:
|
||||
// Initializes a subscription to deg events.
|
||||
// Once a subscription is established, apps using eventgraph will send JSON-RPC notifications of
|
||||
// new eventgraph events to the subscriber.
|
||||
//
|
||||
// --> {"jsonrpc": "2.0", "method": "deg.subscribe_events", "params": [], "id": 1}
|
||||
// <-- {"jsonrpc": "2.0", "method": "deg.subscribe_events", "params": [`event`]}
|
||||
pub async fn deg_subscribe_events(&self, id: u16, params: JsonValue) -> JsonResult {
|
||||
let params = params.get::<Vec<JsonValue>>().unwrap();
|
||||
if !params.is_empty() {
|
||||
return JsonError::new(ErrorCode::InvalidParams, None, id).into()
|
||||
}
|
||||
|
||||
self.deg_sub.clone().into()
|
||||
}
|
||||
|
||||
// RPCAPI:
|
||||
// Activate or deactivate deg in the EVENTGRAPH.
|
||||
// By sending `true`, deg will be activated, and by sending `false` deg
|
||||
// will be deactivated. Returns `true` on success.
|
||||
//
|
||||
// --> {"jsonrpc": "2.0", "method": "deg.switch", "params": [true], "id": 42}
|
||||
// <-- {"jsonrpc": "2.0", "result": true, "id": 42}
|
||||
async fn deg_switch(&self, id: u16, params: JsonValue) -> JsonResult {
|
||||
let params = params.get::<Vec<JsonValue>>().unwrap();
|
||||
if params.len() != 1 || !params[0].is_bool() {
|
||||
return JsonError::new(ErrorCode::InvalidParams, None, id).into()
|
||||
}
|
||||
|
||||
let switch = params[0].get::<bool>().unwrap();
|
||||
|
||||
if *switch {
|
||||
self.event_graph.deg_enable().await;
|
||||
} else {
|
||||
self.event_graph.deg_disable().await;
|
||||
}
|
||||
|
||||
JsonResponse::new(JsonValue::Boolean(true), id).into()
|
||||
}
|
||||
|
||||
// RPCAPI:
|
||||
// Get EVENTGRAPH info.
|
||||
//
|
||||
// --> {"jsonrpc": "2.0", "method": "deg.switch", "params": [true], "id": 42}
|
||||
// <-- {"jsonrpc": "2.0", "result": true, "id": 42}
|
||||
async fn eg_get_info(&self, id: u16, params: JsonValue) -> JsonResult {
|
||||
let params_ = params.get::<Vec<JsonValue>>().unwrap();
|
||||
if !params_.is_empty() {
|
||||
return JsonError::new(ErrorCode::InvalidParams, None, id).into()
|
||||
}
|
||||
|
||||
self.event_graph.eventgraph_info(id, params).await
|
||||
}
|
||||
|
||||
// RPCAPI:
|
||||
// Get replayed EVENTGRAPH info.
|
||||
//
|
||||
// --> {"jsonrpc": "2.0", "method": "eventgraph.replay", "params": ..., "id": 42}
|
||||
// <-- {"jsonrpc": "2.0", "result": true, "id": 42}
|
||||
async fn eg_rep_info(&self, id: u16, params: JsonValue) -> JsonResult {
|
||||
let params_ = params.get::<Vec<JsonValue>>().unwrap();
|
||||
if !params_.is_empty() {
|
||||
return JsonError::new(ErrorCode::InvalidParams, None, id).into()
|
||||
}
|
||||
|
||||
recreate_from_replayer_log(&self.replay_datastore).await
|
||||
}
|
||||
}
|
||||
|
||||
impl HandlerP2p for Daemon {
|
||||
fn p2p(&self) -> P2pPtr {
|
||||
self.p2p.clone()
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user