script/research/rlnd: daemon for RLN state management added

This commit is contained in:
skoupidi
2025-02-05 17:43:38 +02:00
parent 868306637b
commit 53b2993f38
13 changed files with 1224 additions and 0 deletions

View File

@@ -0,0 +1,3 @@
/target
Cargo.lock
rustfmt.toml

View File

@@ -0,0 +1,25 @@
[package]
name = "rlnd-cli"
version = "0.4.1"
description = "CLI-utility to control an rlnd daemon."
authors = ["Dyne.org foundation <foundation@dyne.org>"]
repository = "https://codeberg.org/darkrenaissance/darkfi"
license = "AGPL-3.0-only"
edition = "2021"
[workspace]
[dependencies]
# Darkfi
darkfi = {path = "../../../../", features = ["async-sdk", "rpc"]}
darkfi-sdk = {path = "../../../../src/sdk"}
darkfi-serial = "0.4.2"
rlnd = {path = "../rlnd"}
# Misc
bs58 = "0.5.1"
clap = {version = "4.4.11", features = ["derive"]}
prettytable-rs = "0.10.0"
rand = "0.8.5"
smol = "2.0.2"
url = "2.5.4"

View File

@@ -0,0 +1,42 @@
/* This file is part of DarkFi (https://dark.fi)
*
* Copyright (C) 2020-2025 Dyne.org foundation
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
use std::sync::Arc;
use darkfi::{rpc::client::RpcClient, Result};
use smol::Executor;
use url::Url;
/// rlnd JSON-RPC related methods
pub mod rpc;
/// CLI-util structure
pub struct RlndCli {
/// JSON-RPC client to execute requests to darkfid daemon
pub rpc_client: RpcClient,
}
impl RlndCli {
pub async fn new(endpoint: &str, ex: Arc<Executor<'static>>) -> Result<Self> {
// Initialize rpc client
let endpoint = Url::parse(endpoint)?;
let rpc_client = RpcClient::new(endpoint, ex).await?;
Ok(Self { rpc_client })
}
}

View File

@@ -0,0 +1,118 @@
/* This file is part of DarkFi (https://dark.fi)
*
* Copyright (C) 2020-2025 Dyne.org foundation
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
use std::sync::Arc;
use clap::{Parser, Subcommand};
use darkfi::{cli_desc, Result};
use prettytable::{format, row, Table};
use smol::Executor;
use rlnd_cli::RlndCli;
#[derive(Parser)]
#[command(about = cli_desc!())]
struct Args {
#[arg(short, long, default_value = "tcp://127.0.0.1:25637")]
/// rldn JSON-RPC endpoint
endpoint: String,
#[command(subcommand)]
/// Sub command to execute
command: Subcmd,
}
#[derive(Subcommand)]
enum Subcmd {
/// Send a ping request to the rlnd RPC endpoint
Ping,
/// List all memberships
List,
/// Register a membership
Register {
/// Stake of this membership
stake: u64,
},
/// Slash a membership
Slash {
/// Membership id to slash
id: String,
},
}
fn main() -> Result<()> {
// Initialize an executor
let executor = Arc::new(Executor::new());
let ex = executor.clone();
smol::block_on(executor.run(async {
// Parse arguments
let args = Args::parse();
// Execute a subcommand
let rlnd_cli = RlndCli::new(&args.endpoint, ex).await?;
match args.command {
Subcmd::Ping => {
rlnd_cli.ping().await?;
}
Subcmd::List => {
match rlnd_cli.get_all_memberships().await {
Ok(memberships) => {
// Create a prettytable with the memberships:
let mut table = Table::new();
table.set_format(*format::consts::FORMAT_NO_BORDER_LINE_SEPARATOR);
table.set_titles(row!["ID", "Leaf Position", "Stake"]);
for (id, membership) in memberships.iter() {
table.add_row(row![
id,
format!("{:?}", membership.leaf_position),
membership.stake
]);
}
if table.is_empty() {
println!("No memberships found");
} else {
println!("{table}");
}
}
Err(e) => println!("Membership registration failed: {e}"),
}
}
Subcmd::Register { stake } => match rlnd_cli.register_membership(stake).await {
Ok((id, membership)) => println!("Registered membership {id:?}: {membership:?}"),
Err(e) => println!("Membership registration failed: {e}"),
},
Subcmd::Slash { id } => {
println!("Slashing membership: {id}");
match rlnd_cli.slash_membership(&id).await {
Ok(membership) => println!("Slashed membership {id}: {membership:?}"),
Err(e) => println!("Membership slashing failed: {e}"),
}
}
}
rlnd_cli.rpc_client.stop().await;
Ok(())
}))
}

View File

@@ -0,0 +1,119 @@
/* This file is part of DarkFi (https://dark.fi)
*
* Copyright (C) 2020-2025 Dyne.org foundation
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
use std::time::Instant;
use darkfi::{
rpc::{jsonrpc::JsonRequest, util::JsonValue},
Error, Result,
};
use darkfi_sdk::{
crypto::pasta_prelude::{Field, PrimeField},
pasta::pallas,
};
use darkfi_serial::deserialize;
use rand::rngs::OsRng;
use rlnd::database::Membership;
use crate::RlndCli;
impl RlndCli {
/// Auxiliary function to ping configured rlnd daemon for liveness.
pub async fn ping(&self) -> Result<()> {
println!("Executing ping request to rlnd...");
let latency = Instant::now();
let rep = self.rlnd_daemon_request("ping", &JsonValue::Array(vec![])).await?;
let latency = latency.elapsed();
println!("Got reply: {rep:?}");
println!("Latency: {latency:?}");
Ok(())
}
/// Auxiliary function to execute a request towards the configured rlnd daemon JSON-RPC endpoint.
pub async fn rlnd_daemon_request(&self, method: &str, params: &JsonValue) -> Result<JsonValue> {
let req = JsonRequest::new(method, params.clone());
let rep = self.rpc_client.request(req).await?;
Ok(rep)
}
/// Queries rlnd to register a new membership for given stake.
pub async fn register_membership(&self, stake: u64) -> Result<(String, Membership)> {
// Generate a new random membership identity
let id = pallas::Base::random(&mut OsRng);
let id = bs58::encode(&id.to_repr()).into_string();
// Generate request params
let params = JsonValue::Array(vec![
JsonValue::String(id.clone()),
JsonValue::String(stake.to_string()),
]);
// Execute request
let rep = self.rlnd_daemon_request("add_membership", &params).await?;
// Parse response
let membership = parse_membership(rep.get::<String>().unwrap())?;
Ok((id, membership))
}
/// Queries rlnd to retrieve all memberships.
pub async fn get_all_memberships(&self) -> Result<Vec<(String, Membership)>> {
// Execute request
let rep = self.rlnd_daemon_request("get_memberships", &JsonValue::Array(vec![])).await?;
// Parse response
let params = rep.get::<Vec<JsonValue>>().unwrap();
let mut ret = Vec::with_capacity(params.len() / 2);
for (index, param) in params.iter().enumerate() {
// Skip second half of pair
if index % 2 == 1 {
continue
}
let id = param.get::<String>().unwrap().clone();
let membership = parse_membership(params[index + 1].get::<String>().unwrap())?;
ret.push((id, membership));
}
Ok(ret)
}
/// Queries rlnd to slash a membership.
pub async fn slash_membership(&self, id: &str) -> Result<Membership> {
// Generate request params
let params = JsonValue::Array(vec![JsonValue::String(id.to_string())]);
// Execute request
let rep = self.rlnd_daemon_request("slash_membership", &params).await?;
// Parse response
let membership = parse_membership(rep.get::<String>().unwrap())?;
Ok(membership)
}
}
/// Auxiliary function to parse a `Membership` from a `JsonValue::String`.
pub fn parse_membership(membership: &str) -> Result<Membership> {
let Ok(decoded_bytes) = bs58::decode(membership).into_vec() else {
return Err(Error::ParseFailed("Invalid Membership"))
};
Ok(deserialize(&decoded_bytes)?)
}

3
script/research/rlnd/rlnd/.gitignore vendored Normal file
View File

@@ -0,0 +1,3 @@
/target
Cargo.lock
rustfmt.toml

View File

@@ -0,0 +1,39 @@
[package]
name = "rlnd"
version = "0.4.1"
description = "DarkFi RLN state management daemon."
authors = ["Dyne.org foundation <foundation@dyne.org>"]
repository = "https://codeberg.org/darkrenaissance/darkfi"
license = "AGPL-3.0-only"
edition = "2021"
[workspace]
[dependencies]
# Darkfi
darkfi = {path = "../../../../", features = ["async-daemonize", "async-sdk", "rpc", "sled-overlay"]}
darkfi-sdk = {path = "../../../../src/sdk"}
darkfi-serial = "0.4.2"
# Misc
bs58 = "0.5.1"
log = "0.4.25"
sled-overlay = "0.1.6"
# JSON-RPC
async-trait = "0.1.85"
tinyjson = "2.5.1"
url = "2.5.4"
# Daemon
async-std = {version = "1.13.0", features = ["attributes"]}
easy-parallel = "3.3.1"
signal-hook-async-std = "0.2.2"
signal-hook = "0.3.17"
simplelog = "0.12.2"
smol = "2.0.2"
# Argument parsing
serde = {version = "1.0.217", features = ["derive"]}
structopt = "0.3.26"
structopt-toml = "0.5.1"

View File

@@ -0,0 +1,19 @@
## rlnd configuration file
##
## Please make sure you go through all the settings so you can configure
## your daemon properly.
##
## The default values are left commented. They can be overridden either by
## uncommenting, or by using the command-line.
# Path to the database directory
database = "~/.local/share/darkfi/rlnd"
# Private JSON-RPC listen URL
private_rpc_listen = "tcp://127.0.0.1:25637"
# Publicly exposed JSON-RPC listen URL
public_rpc_listen = "tcp://127.0.0.1:25638"
# darkirc JSON-RPC endpoint
endpoint = "tcp://127.0.0.1:26660"

View File

@@ -0,0 +1,195 @@
/* This file is part of DarkFi (https://dark.fi)
*
* Copyright (C) 2020-2025 Dyne.org foundation
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
use darkfi::{util::path::expand_path, Error, Result};
use darkfi_sdk::{
bridgetree::Position,
crypto::{pasta_prelude::PrimeField, MerkleNode, MerkleTree},
pasta::pallas,
};
use darkfi_serial::{async_trait, deserialize, serialize, SerialDecodable, SerialEncodable};
use sled_overlay::sled;
/// This struct represents a tuple of the form (id, stake).
#[derive(Debug, Clone, PartialEq, Eq, SerialEncodable, SerialDecodable)]
pub struct Membership {
/// Membership leaf position in the memberships Merkle tree
pub leaf_position: Position,
/// Member stake value
pub stake: u64,
}
impl Membership {
/// Generate a new `Membership` in the memberships tree for provided id and stake.
pub fn new(memberships_tree: &mut MerkleTree, id: pallas::Base, stake: u64) -> Self {
memberships_tree.append(MerkleNode::from(id));
let leaf_position = memberships_tree.mark().unwrap();
Self { leaf_position, stake }
}
}
pub const SLED_MAIN_TREE: &[u8] = b"_main";
pub const SLED_MAIN_TREE_MEMBERSHIPS_TREE_KEY: &[u8] = b"_memberships_tree";
pub const SLED_MEMBERSHIP_TREE: &[u8] = b"_memberships";
/// Structure holding all sled trees for DarkFi RLN state management.
#[derive(Clone)]
pub struct RlndDatabase {
/// Main pointer to the sled db connection
pub sled_db: sled::Db,
/// Main `sled` tree, storing arbitrary data,
/// like the memberships Merkle tree.
pub main: sled::Tree,
/// The `sled` tree storing all the memberships information,
/// where the key is the membership id, and the value is the serialized
/// structure itself.
pub membership: sled::Tree,
}
impl RlndDatabase {
/// Instantiate a new `RlndDatabase`.
pub fn new(db_path: &str) -> Result<Self> {
// Initialize or open sled database
let db_path = expand_path(db_path)?;
let sled_db = sled_overlay::sled::open(&db_path)?;
// Open the database trees
let main = sled_db.open_tree(SLED_MAIN_TREE)?;
let membership = sled_db.open_tree(SLED_MEMBERSHIP_TREE)?;
// Check if memberships Merkle tree is initialized
if main.get(SLED_MAIN_TREE_MEMBERSHIPS_TREE_KEY)?.is_none() {
main.insert(SLED_MAIN_TREE_MEMBERSHIPS_TREE_KEY, serialize(&MerkleTree::new(1)))?;
}
Ok(Self { sled_db, main, membership })
}
/// Retrieve memberships Merkle tree record from the main tree.
pub fn get_memberships_tree(&self) -> Result<MerkleTree> {
let merkle_tree_bytes = self.main.get(SLED_MAIN_TREE_MEMBERSHIPS_TREE_KEY)?;
let merkle_tree = deserialize(&merkle_tree_bytes.unwrap())?;
Ok(merkle_tree)
}
/// Generate a new `Membership` record for provided id and stake
/// and instert it into the database.
pub fn add_membership(&self, id: pallas::Base, stake: u64) -> Result<Membership> {
// Grab the memberships Merkle tree
let mut memberships_merkle_tree = self.get_memberships_tree()?;
// Generate the new `Membership`
let membership = Membership::new(&mut memberships_merkle_tree, id, stake);
// Update the memberships Merkle tree record in the database
self.main
.insert(SLED_MAIN_TREE_MEMBERSHIPS_TREE_KEY, serialize(&memberships_merkle_tree))?;
// Store the new membership
self.membership.insert(id.to_repr(), serialize(&membership))?;
Ok(membership)
}
/// Retrieve `Membership` by given id.
pub fn get_membership_by_id(&self, id: &pallas::Base) -> Result<Membership> {
let Some(found) = self.membership.get(id.to_repr())? else {
return Err(Error::DatabaseError(format!("Membership was not found for id: {id:?}")))
};
let membership = deserialize(&found)?;
Ok(membership)
}
/// Retrieve all membership records contained in the database.
/// Be careful as this will try to load everything in memory.
pub fn get_all(&self) -> Result<Vec<(pallas::Base, Membership)>> {
let mut memberships = vec![];
for record in self.membership.iter() {
let (key, membership) = record?;
let id = convert_pallas_key(&key)?;
let membership = deserialize(&membership)?;
memberships.push((id, membership));
}
Ok(memberships)
}
/// Remove `Membership` record of given id and rebuild the memberships Merkle tree.
pub fn remove_membership_by_id(&self, id: &pallas::Base) -> Result<Membership> {
// TODO: add a mutex guard here
// Remove membership record
let Some(found) = self.membership.remove(id.to_repr())? else {
return Err(Error::DatabaseError(format!("Membership was not found for id: {id:?}")))
};
// Rebuild the memberships Merkle tree
self.rebuild_memberships_merkle_tree()?;
Ok(deserialize(&found)?)
}
/// Auxiliary function to rebuild the memberships Merkle tree in the database.
pub fn rebuild_memberships_merkle_tree(&self) -> Result<()> {
// Create a new Merkle tree
let mut memberships_merkle_tree = MerkleTree::new(1);
// Iterate over keys and generate the new memberships
let mut memberships = vec![];
for record in self.membership.iter() {
let (key, membership) = record?;
let id = convert_pallas_key(&key)?;
let membership: Membership = deserialize(&membership)?;
let membership = Membership::new(&mut memberships_merkle_tree, id, membership.stake);
memberships.push((id, membership));
}
// Update the memberships Merkle tree record in the database
self.main
.insert(SLED_MAIN_TREE_MEMBERSHIPS_TREE_KEY, serialize(&memberships_merkle_tree))?;
// Store the updated memberships
for (id, membership) in memberships {
self.membership.insert(id.to_repr(), serialize(&membership))?;
}
Ok(())
}
/// Retrieve stored memberships count.
pub fn len(&self) -> usize {
self.membership.len()
}
/// Check if database contains any memberships.
pub fn is_empty(&self) -> bool {
self.membership.is_empty()
}
}
/// Auxiliary function to convert a `pallas::Base` key from an `IVec`.
fn convert_pallas_key(key: &sled::IVec) -> Result<pallas::Base> {
let mut repr = [0; 32];
repr.copy_from_slice(key);
match pallas::Base::from_repr(repr).into_option() {
Some(key) => Ok(key),
None => Err(Error::DatabaseError(format!(
"Key could not be converted into pallas::Base: {key:?}"
))),
}
}

View File

@@ -0,0 +1,45 @@
/* This file is part of DarkFi (https://dark.fi)
*
* Copyright (C) 2020-2025 Dyne.org foundation
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
use darkfi::rpc::jsonrpc::{ErrorCode::ServerError, JsonError, JsonResult};
/// Custom RPC errors available for rlnd.
/// Please sort them sensefully.
pub enum RpcError {
// Misc errors
PingFailed = -32300,
}
fn to_tuple(e: RpcError) -> (i32, String) {
let msg = match e {
// Misc errors
RpcError::PingFailed => "Darkirc daemon ping error",
};
(e as i32, msg.to_string())
}
pub fn server_error(e: RpcError, id: u16, msg: Option<&str>) -> JsonResult {
let (code, default_msg) = to_tuple(e);
if let Some(message) = msg {
return JsonError::new(ServerError(code), Some(message.to_string()), id).into()
}
JsonError::new(ServerError(code), Some(default_msg), id).into()
}

View File

@@ -0,0 +1,184 @@
/* This file is part of DarkFi (https://dark.fi)
*
* Copyright (C) 2020-2025 Dyne.org foundation
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
use std::{collections::HashSet, sync::Arc};
use log::{error, info};
use smol::lock::Mutex;
use url::Url;
use darkfi::{
rpc::server::{listen_and_serve, RequestHandler},
system::{ExecutorPtr, StoppableTask, StoppableTaskPtr},
Error, Result,
};
pub mod error;
/// DarkFi RLN state management database
pub mod database;
use database::RlndDatabase;
/// rlnd JSON-RPC related methods
pub mod rpc;
use rpc::{DarkircRpcClient, PrivateRpcHandler, PublicRpcHandler};
/// Atomic pointer to the DarkFi RLN state management node
pub type RlnNodePtr = Arc<RlnNode>;
/// Structure representing a DarkFi RLN state management node
pub struct RlnNode {
/// Main pointer to the sled db connection
database: RlndDatabase,
/// Private JSON-RPC connection tracker
private_rpc_connections: Mutex<HashSet<StoppableTaskPtr>>,
/// Publicly exposed JSON-RPC connection tracker
public_rpc_connections: Mutex<HashSet<StoppableTaskPtr>>,
/// JSON-RPC client to execute requests to the darkirc daemon
rpc_client: Mutex<DarkircRpcClient>,
}
impl RlnNode {
pub async fn new(database: RlndDatabase, rpc_client: Mutex<DarkircRpcClient>) -> RlnNodePtr {
Arc::new(Self {
database,
private_rpc_connections: Mutex::new(HashSet::new()),
public_rpc_connections: Mutex::new(HashSet::new()),
rpc_client,
})
}
}
/// Atomic pointer to the DarkFi RLN state management daemon
pub type RlndPtr = Arc<Rlnd>;
/// Structure representing a DarkFi RLN state management daemon
pub struct Rlnd {
/// Darkfi RLN state management node instance
node: RlnNodePtr,
/// Private JSON-RPC background task
private_rpc_task: StoppableTaskPtr,
/// Publicly exposed JSON-RPC background task
public_rpc_task: StoppableTaskPtr,
}
impl Rlnd {
/// Initialize a DarkFi RLN state management daemon.
///
/// Generates a new `RlnNode` for provided configuration,
/// along with all the corresponding background tasks.
pub async fn init(db_path: &str, endpoint: &Url, ex: &ExecutorPtr) -> Result<RlndPtr> {
info!(target: "rlnd::Rlnd::init", "Initializing a Darkfi RLN state management daemon...");
// Initialize database
let database = RlndDatabase::new(db_path)?;
// Initialize JSON-RPC client to perform requests to darkirc
let Ok(rpc_client) = DarkircRpcClient::new(endpoint.clone(), ex.clone()).await else {
error!(target: "rlnd::Rlnd::init", "Failed to initialize darkirc daemon rpc client, check if darkirc is running");
return Err(Error::RpcClientStopped)
};
let rpc_client = Mutex::new(rpc_client);
// Initialize node
let node = RlnNode::new(database, rpc_client).await;
// Generate the background tasks
let private_rpc_task = StoppableTask::new();
let public_rpc_task = StoppableTask::new();
info!(target: "rlnd::Rlnd::init", "Darkfi RLN state management daemon initialized successfully!");
Ok(Arc::new(Self { node, private_rpc_task, public_rpc_task }))
}
/// Start the DarkFi RLN state management daemon in the given executor, using the provided
/// JSON-RPC listen urls.
pub async fn start(
&self,
executor: &ExecutorPtr,
private_rpc_listen: &Url,
public_rpc_listen: &Url,
) -> Result<()> {
info!(target: "rlnd::Rlnd::start", "Starting Darkfi RLN state management daemon...");
// Pinging darkirc daemon to verify it listens
if let Err(e) = self.node.ping_darkirc_daemon().await {
error!(target: "rlnd::Rlnd::start", "Failed to ping darkirc daemon: {}", e);
return Err(Error::RpcClientStopped)
}
// Start the private JSON-RPC task
info!(target: "rlnd::Rlnd::start", "Starting private JSON-RPC server");
let node_ = self.node.clone();
self.private_rpc_task.clone().start(
listen_and_serve::<PrivateRpcHandler>(private_rpc_listen.clone(), self.node.clone(), None, executor.clone()),
|res| async move {
match res {
Ok(()) | Err(Error::RpcServerStopped) => <RlnNode as RequestHandler<PrivateRpcHandler>>::stop_connections(&node_).await,
Err(e) => error!(target: "rlnd::Rlnd::start", "Failed starting private JSON-RPC server: {}", e),
}
},
Error::RpcServerStopped,
executor.clone(),
);
// Start the publicly exposed JSON-RPC task
info!(target: "rlnd::Rlnd::start", "Starting publicly exposed JSON-RPC server");
let node_ = self.node.clone();
self.public_rpc_task.clone().start(
listen_and_serve::<PublicRpcHandler>(public_rpc_listen.clone(), self.node.clone(), None, executor.clone()),
|res| async move {
match res {
Ok(()) | Err(Error::RpcServerStopped) => <RlnNode as RequestHandler<PublicRpcHandler>>::stop_connections(&node_).await,
Err(e) => error!(target: "rlnd::Rlnd::start", "Failed starting publicly exposed JSON-RPC server: {}", e),
}
},
Error::RpcServerStopped,
executor.clone(),
);
info!(target: "rlnd::Rlnd::start", "Darkfi RLN state management daemon started successfully!");
Ok(())
}
/// Stop the DarkFi RLN state management daemon.
pub async fn stop(&self) -> Result<()> {
info!(target: "rlnd::Rlnd::stop", "Terminating Darkfi RLN state management daemon...");
// Stop the JSON-RPC task
info!(target: "rlnd::Rlnd::stop", "Stopping private JSON-RPC server...");
self.private_rpc_task.stop().await;
// Stop the JSON-RPC task
info!(target: "rlnd::Rlnd::stop", "Stopping publicly exposed JSON-RPC server...");
self.public_rpc_task.stop().await;
// Flush sled database data
info!(target: "rlnd::Rlnd::stop", "Flushing sled database...");
let flushed_bytes = self.node.database.sled_db.flush_async().await?;
info!(target: "rlnd::Rlnd::stop", "Flushed {} bytes", flushed_bytes);
// Close the JSON-RPC client, if it was initialized
info!(target: "rlnd::Rlnd::stop", "Stopping JSON-RPC client...");
self.node.rpc_client.lock().await.stop().await;
info!(target: "rlnd::Rlnd::stop", "Darkfi RLN state management daemon terminated successfully!");
Ok(())
}
}

View File

@@ -0,0 +1,86 @@
/* This file is part of DarkFi (https://dark.fi)
*
* Copyright (C) 2020-2025 Dyne.org foundation
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
use std::sync::Arc;
use async_std::prelude::StreamExt;
use log::info;
use structopt_toml::{serde::Deserialize, structopt::StructOpt, StructOptToml};
use url::Url;
use darkfi::{async_daemonize, cli_desc, Result};
use rlnd::Rlnd;
const CONFIG_FILE: &str = "rlnd_config.toml";
const CONFIG_FILE_CONTENTS: &str = include_str!("../rlnd_config.toml");
#[derive(Clone, Debug, Deserialize, StructOpt, StructOptToml)]
#[serde(default)]
#[structopt(name = "rlnd", about = cli_desc!())]
struct Args {
#[structopt(short, long)]
/// Configuration file to use
config: Option<String>,
#[structopt(long, default_value = "~/.local/share/darkfi/rlnd")]
/// Path to the database directory
database: String,
#[structopt(short, long, default_value = "tcp://127.0.0.1:25637")]
/// Private JSON-RPC listen URL
private_rpc_listen: Url,
#[structopt(short, long, default_value = "tcp://127.0.0.1:25638")]
/// Publicly exposed JSON-RPC listen URL
public_rpc_listen: Url,
#[structopt(short, long, default_value = "tcp://127.0.0.1:26660")]
/// darkirc JSON-RPC endpoint
endpoint: Url,
#[structopt(short, long)]
/// Set log file to ouput into
log: Option<String>,
#[structopt(short, parse(from_occurrences))]
/// Increase verbosity (-vvv supported)
verbose: u8,
}
async_daemonize!(realmain);
async fn realmain(args: Args, ex: Arc<smol::Executor<'static>>) -> Result<()> {
info!(target: "rlnd", "Initializing DarkFi RLN state management node...");
// Generate the daemon
let daemon = Rlnd::init(&args.database, &args.endpoint, &ex).await?;
// Start the daemon
daemon.start(&ex, &args.private_rpc_listen, &args.public_rpc_listen).await?;
// Signal handling for graceful termination.
let (signals_handler, signals_task) = SignalHandler::new(ex)?;
signals_handler.wait_termination(signals_task).await?;
info!(target: "rlnd", "Caught termination signal, cleaning up and exiting...");
daemon.stop().await?;
info!(target: "rlnd", "Shut down successfully");
Ok(())
}

View File

@@ -0,0 +1,346 @@
/* This file is part of DarkFi (https://dark.fi)
*
* Copyright (C) 2020-2025 Dyne.org foundation
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
use std::{collections::HashSet, time::Instant};
use async_trait::async_trait;
use darkfi::{
rpc::{
client::RpcClient,
jsonrpc::{ErrorCode, JsonError, JsonRequest, JsonResponse, JsonResult},
server::RequestHandler,
},
system::{sleep, ExecutorPtr, StoppableTaskPtr},
Error, Result,
};
use darkfi_sdk::{crypto::pasta_prelude::PrimeField, pasta::pallas};
use darkfi_serial::serialize;
use log::{debug, error, info};
use smol::lock::MutexGuard;
use tinyjson::JsonValue;
use url::Url;
use crate::{
error::{server_error, RpcError},
RlnNode,
};
/// Private JSON-RPC `RequestHandler` type
pub struct PrivateRpcHandler;
/// Publicly exposed JSON-RPC `RequestHandler` type
pub struct PublicRpcHandler;
/// Structure to hold a JSON-RPC client and its config,
/// so we can recreate it in case of an error.
pub struct DarkircRpcClient {
endpoint: Url,
ex: ExecutorPtr,
client: RpcClient,
}
impl DarkircRpcClient {
pub async fn new(endpoint: Url, ex: ExecutorPtr) -> Result<Self> {
let client = RpcClient::new(endpoint.clone(), ex.clone()).await?;
Ok(Self { endpoint, ex, client })
}
/// Stop the client.
pub async fn stop(&self) {
self.client.stop().await
}
}
#[async_trait]
impl RequestHandler<PrivateRpcHandler> for RlnNode {
async fn handle_request(&self, req: JsonRequest) -> JsonResult {
debug!(target: "rlnd::private_rpc", "--> {}", req.stringify().unwrap());
match req.method.as_str() {
// =====================
// Miscellaneous methods
// =====================
"ping" => {
<RlnNode as RequestHandler<PrivateRpcHandler>>::pong(self, req.id, req.params).await
}
"ping_darkirc" => self.ping_darkirc(req.id, req.params).await,
// ================
// Database methods
// ================
"add_membership" => self.add_membership(req.id, req.params).await,
"get_memberships" => self.get_memberships(req.id, req.params).await,
"slash_membership" => self.slash_membership(req.id, req.params).await,
// ==============
// Invalid method
// ==============
_ => JsonError::new(ErrorCode::MethodNotFound, None, req.id).into(),
}
}
async fn connections_mut(&self) -> MutexGuard<'life0, HashSet<StoppableTaskPtr>> {
self.private_rpc_connections.lock().await
}
}
#[async_trait]
impl RequestHandler<PublicRpcHandler> for RlnNode {
async fn handle_request(&self, req: JsonRequest) -> JsonResult {
debug!(target: "rlnd::public_rpc", "--> {}", req.stringify().unwrap());
match req.method.as_str() {
// =====================
// Miscellaneous methods
// =====================
"ping" => {
<RlnNode as RequestHandler<PublicRpcHandler>>::pong(self, req.id, req.params).await
}
// ================
// Database methods
// ================
"slash_membership" => self.slash_membership(req.id, req.params).await,
// ==============
// Invalid method
// ==============
_ => JsonError::new(ErrorCode::MethodNotFound, None, req.id).into(),
}
}
async fn connections_mut(&self) -> MutexGuard<'life0, HashSet<StoppableTaskPtr>> {
self.public_rpc_connections.lock().await
}
}
impl RlnNode {
// RPCAPI:
// Pings configured darkirc daemon for liveness.
// Returns `true` on success.
//
// --> {"jsonrpc": "2.0", "method": "ping_darkirc", "params": [], "id": 1}
// <-- {"jsonrpc": "2.0", "result": "true", "id": 1}
async fn ping_darkirc(&self, id: u16, _params: JsonValue) -> JsonResult {
if let Err(e) = self.ping_darkirc_daemon().await {
error!(target: "rlnd::rpc::ping_darkirc", "Failed to ping darkirc daemon: {}", e);
return server_error(RpcError::PingFailed, id, None)
}
JsonResponse::new(JsonValue::Boolean(true), id).into()
}
/// Ping configured darkirc daemon JSON-RPC endpoint.
pub async fn ping_darkirc_daemon(&self) -> Result<()> {
debug!(target: "rlnd::ping_darkirc_daemon", "Pinging darkirc daemon...");
self.darkirc_daemon_request("ping", &JsonValue::Array(vec![])).await?;
Ok(())
}
/// Auxiliary function to execute a request towards the configured darkirc daemon JSON-RPC endpoint.
pub async fn darkirc_daemon_request(
&self,
method: &str,
params: &JsonValue,
) -> Result<JsonValue> {
debug!(target: "rlnd::rpc::darkirc_daemon_request", "Executing request {} with params: {:?}", method, params);
let latency = Instant::now();
let req = JsonRequest::new(method, params.clone());
let lock = self.rpc_client.lock().await;
let rep = lock.client.request(req).await?;
drop(lock);
let latency = latency.elapsed();
debug!(target: "rlnd::rpc::darkirc_daemon_request", "Got reply: {:?}", rep);
debug!(target: "rlnd::rpc::darkirc_daemon_request", "Latency: {:?}", latency);
Ok(rep)
}
/// Auxiliary function to execute a request towards the configured darkirc daemon JSON-RPC endpoint,
/// but in case of failure, sleep and retry until connection is re-established.
pub async fn darkirc_daemon_request_with_retry(
&self,
method: &str,
params: &JsonValue,
) -> JsonValue {
loop {
// Try to execute the request using current client
match self.darkirc_daemon_request(method, params).await {
Ok(v) => return v,
Err(e) => {
error!(target: "rlnd::rpc::darkirc_daemon_request_with_retry", "Failed to execute darkirc daemon request: {}", e);
}
}
loop {
// Sleep a bit before retrying
info!(target: "rlnd::rpc::darkirc_daemon_request_with_retry", "Sleeping so we can retry later");
sleep(10).await;
// Create a new client
let mut rpc_client = self.rpc_client.lock().await;
let Ok(client) =
RpcClient::new(rpc_client.endpoint.clone(), rpc_client.ex.clone()).await
else {
error!(target: "rlnd::rpc::darkirc_daemon_request_with_retry", "Failed to initialize darkirc daemon rpc client, check if darkirc is running");
drop(rpc_client);
continue
};
info!(target: "rlnd::rpc::darkirc_daemon_request_with_retry", "Connection re-established!");
// Set the new client as the daemon one
rpc_client.client = client;
break;
}
}
}
// RPCAPI:
// Generate a new membership for given identity and stake.
// Returns a readable membership upon success.
//
// **Params:**
// * `array[0]`: base58-encoded `pallas::Base` string
// * `array[1]`: `u64` Membership stake (as string)
//
// **Returns:**
// * `String`: `Membership` struct serialized into base58.
//
// --> {"jsonrpc": "2.0", "method": "add_membership", "params": ["3px89oUYY7nzA43vNCBkbvJbsQjNeuH5XRxJ9C2oGnmV", "42"], "id": 1}
// <-- {"jsonrpc": "2.0", "result": {...}, "id": 1}
async fn add_membership(&self, id: u16, params: JsonValue) -> JsonResult {
let params = params.get::<Vec<JsonValue>>().unwrap();
if params.len() != 2 || !params[0].is_string() || !params[1].is_string() {
return JsonError::new(ErrorCode::InvalidParams, None, id).into()
}
let membership_id = match parse_pallas_base(params[0].get::<String>().unwrap()) {
Ok(v) => v,
Err(e) => {
error!(target: "rlnd::rpc::add_membership", "Error parsing membership id: {e}");
return JsonError::new(ErrorCode::InvalidParams, None, id).into()
}
};
let stake = match params[1].get::<String>().unwrap().parse::<u64>() {
Ok(v) => v,
Err(_) => return JsonError::new(ErrorCode::ParseError, None, id).into(),
};
let membership = match self.database.add_membership(membership_id, stake) {
Ok(v) => v,
Err(e) => {
error!(target: "rlnd::rpc::add_membership", "Failed generating membership: {e}");
return JsonError::new(ErrorCode::InternalError, None, id).into()
}
};
let membership = bs58::encode(&serialize(&membership)).into_string();
JsonResponse::new(JsonValue::String(membership), id).into()
}
// RPCAPI:
// Returns all database memberships upon success.
//
// **Params:**
// * `None`
//
// **Returns:**
// * `array[N]`: Pairs of `pallas::Base` and `Membership` serialized into base58
//
// --> {"jsonrpc": "2.0", "method": "get_memberships", "params": [], "id": 1}
// <-- {"jsonrpc": "2.0", "result": {...}, "id": 1}
async fn get_memberships(&self, id: u16, params: JsonValue) -> JsonResult {
let params = params.get::<Vec<JsonValue>>().unwrap();
if !params.is_empty() {
return JsonError::new(ErrorCode::InvalidParams, None, id).into()
}
let memberships = match self.database.get_all() {
Ok(v) => v,
Err(e) => {
error!(target: "rlnd::rpc::get_memberships", "Error retrieving memberships: {e}");
return JsonError::new(ErrorCode::InvalidParams, None, id).into()
}
};
let mut ret = vec![];
for (id, membership) in memberships {
ret.push(JsonValue::String(bs58::encode(&id.to_repr()).into_string()));
ret.push(JsonValue::String(bs58::encode(&serialize(&membership)).into_string()));
}
JsonResponse::new(JsonValue::Array(ret), id).into()
}
// RPCAPI:
// Slash the membership of given identity.
// Returns the membership information upon success.
//
// **Params:**
// * `array[0]`: base58-encoded `pallas::Base` string
//
// **Returns:**
// * `String`: `Membership` struct serialized into base58.
//
// --> {"jsonrpc": "2.0", "method": "slash_membership", "params": ["3px89oUYY7nzA43vNCBkbvJbsQjNeuH5XRxJ9C2oGnmV"], "id": 1}
// <-- {"jsonrpc": "2.0", "result": {...}, "id": 1}
async fn slash_membership(&self, id: u16, params: JsonValue) -> JsonResult {
let params = params.get::<Vec<JsonValue>>().unwrap();
if params.len() != 1 || !params[0].is_string() {
return JsonError::new(ErrorCode::InvalidParams, None, id).into()
}
let membership_id = match parse_pallas_base(params[0].get::<String>().unwrap()) {
Ok(v) => v,
Err(e) => {
error!(target: "rlnd::rpc::slash_membership", "Error parsing membership id: {e}");
return JsonError::new(ErrorCode::InvalidParams, None, id).into()
}
};
let membership = match self.database.remove_membership_by_id(&membership_id) {
Ok(v) => v,
Err(e) => {
error!(target: "rlnd::rpc::slash_membership", "Failed removing membership: {e}");
return JsonError::new(ErrorCode::InternalError, None, id).into()
}
};
let membership = bs58::encode(&serialize(&membership)).into_string();
JsonResponse::new(JsonValue::String(membership), id).into()
}
}
/// Auxiliary function to parse a `pallas::Base` membership id from a `JsonValue::String`.
pub fn parse_pallas_base(id: &str) -> Result<pallas::Base> {
let Ok(decoded_bytes) = bs58::decode(id).into_vec() else {
error!(target: "rlnd::rpc::parse_pallas_base", "Error decoding string: {id}");
return Err(Error::ParseFailed("Invalid pallas::Base"))
};
let bytes: [u8; 32] = match decoded_bytes.try_into() {
Ok(b) => b,
Err(e) => {
error!(target: "rlnd::rpc::parse_pallas_base", "Error decoding string bytes: {e:?}");
return Err(Error::ParseFailed("Invalid pallas::Base"))
}
};
match pallas::Base::from_repr(bytes).into() {
Some(id) => Ok(id),
None => {
error!(target: "rlnd::rpc::parse_pallas_base", "Error converting bytes to pallas::Base");
Err(Error::ParseFailed("Invalid pallas::Base"))
}
}
}