mirror of
https://github.com/darkrenaissance/darkfi.git
synced 2026-01-08 22:28:12 -05:00
chore: dropped obselete dhtd
This commit is contained in:
@@ -1,24 +0,0 @@
|
|||||||
[package]
|
|
||||||
name = "dhtd"
|
|
||||||
version = "0.5.0"
|
|
||||||
homepage = "https://dark.fi"
|
|
||||||
description = "DHT daemon"
|
|
||||||
authors = ["Dyne.org foundation <foundation@dyne.org>"]
|
|
||||||
repository = "https://codeberg.org/darkrenaissance/darkfi"
|
|
||||||
license = "AGPL-3.0-only"
|
|
||||||
edition = "2021"
|
|
||||||
|
|
||||||
[dependencies]
|
|
||||||
async-trait = "0.1.88"
|
|
||||||
blake3 = "1.8.2"
|
|
||||||
darkfi = {path = "../../../", features = ["dht"]}
|
|
||||||
darkfi-serial = {version = "0.5.0", features = ["derive", "crypto"]}
|
|
||||||
easy-parallel = "3.3.1"
|
|
||||||
log = "0.4.27"
|
|
||||||
rand = "0.8.5"
|
|
||||||
simplelog = "0.12.2"
|
|
||||||
smol = "2.0.2"
|
|
||||||
url = "2.5.4"
|
|
||||||
|
|
||||||
[lints]
|
|
||||||
workspace = true
|
|
||||||
@@ -1,40 +0,0 @@
|
|||||||
/* This file is part of DarkFi (https://dark.fi)
|
|
||||||
*
|
|
||||||
* Copyright (C) 2020-2025 Dyne.org foundation
|
|
||||||
*
|
|
||||||
* This program is free software: you can redistribute it and/or modify
|
|
||||||
* it under the terms of the GNU Affero General Public License as
|
|
||||||
* published by the Free Software Foundation, either version 3 of the
|
|
||||||
* License, or (at your option) any later version.
|
|
||||||
*
|
|
||||||
* This program is distributed in the hope that it will be useful,
|
|
||||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
||||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
||||||
* GNU Affero General Public License for more details.
|
|
||||||
*
|
|
||||||
* You should have received a copy of the GNU Affero General Public License
|
|
||||||
* along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
||||||
*/
|
|
||||||
|
|
||||||
use std::collections::{HashMap, HashSet};
|
|
||||||
|
|
||||||
use async_std::sync::{Arc, RwLock};
|
|
||||||
use darkfi::{dht2::Dht, Result};
|
|
||||||
use url::Url;
|
|
||||||
|
|
||||||
/// Protocol implementations
|
|
||||||
mod proto;
|
|
||||||
|
|
||||||
//#[cfg(test)]
|
|
||||||
mod tests;
|
|
||||||
|
|
||||||
pub type DhtdPtr = Arc<RwLock<Dhtd>>;
|
|
||||||
|
|
||||||
pub struct Dhtd {
|
|
||||||
pub dht: Dht,
|
|
||||||
pub routing_table: HashMap<blake3::Hash, HashSet<Url>>,
|
|
||||||
}
|
|
||||||
|
|
||||||
fn main() -> Result<()> {
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
@@ -1,198 +0,0 @@
|
|||||||
/* This file is part of DarkFi (https://dark.fi)
|
|
||||||
*
|
|
||||||
* Copyright (C) 2020-2025 Dyne.org foundation
|
|
||||||
*
|
|
||||||
* This program is free software: you can redistribute it and/or modify
|
|
||||||
* it under the terms of the GNU Affero General Public License as
|
|
||||||
* published by the Free Software Foundation, either version 3 of the
|
|
||||||
* License, or (at your option) any later version.
|
|
||||||
*
|
|
||||||
* This program is distributed in the hope that it will be useful,
|
|
||||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
||||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
||||||
* GNU Affero General Public License for more details.
|
|
||||||
*
|
|
||||||
* You should have received a copy of the GNU Affero General Public License
|
|
||||||
* along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
||||||
*/
|
|
||||||
|
|
||||||
use std::collections::HashSet;
|
|
||||||
|
|
||||||
use async_std::sync::Arc;
|
|
||||||
use async_trait::async_trait;
|
|
||||||
use darkfi::{
|
|
||||||
dht2::net_hashmap::{NetHashMapInsert, NetHashMapRemove},
|
|
||||||
impl_p2p_message,
|
|
||||||
net::{
|
|
||||||
metering::{DEFAULT_METERING_CONFIGURATION, MeteringConfiguration},
|
|
||||||
ChannelPtr, Message, MessageSubscription, P2pPtr, ProtocolBase, ProtocolBasePtr,
|
|
||||||
ProtocolJobsManager, ProtocolJobsManagerPtr,
|
|
||||||
},
|
|
||||||
Result,
|
|
||||||
};
|
|
||||||
use darkfi_serial::{SerialDecodable, SerialEncodable};
|
|
||||||
use log::debug;
|
|
||||||
use smol::Executor;
|
|
||||||
|
|
||||||
use super::DhtdPtr;
|
|
||||||
|
|
||||||
pub struct ProtocolDht {
|
|
||||||
jobsman: ProtocolJobsManagerPtr,
|
|
||||||
channel: ChannelPtr,
|
|
||||||
_p2p: P2pPtr,
|
|
||||||
state: DhtdPtr,
|
|
||||||
insert_sub: MessageSubscription<NetHashMapInsert<blake3::Hash, Vec<blake3::Hash>>>,
|
|
||||||
remove_sub: MessageSubscription<NetHashMapRemove<blake3::Hash>>,
|
|
||||||
chunk_request_sub: MessageSubscription<ChunkRequest>,
|
|
||||||
chunk_reply_sub: MessageSubscription<ChunkReply>,
|
|
||||||
file_request_sub: MessageSubscription<FileRequest>,
|
|
||||||
file_reply_sub: MessageSubscription<FileReply>,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
|
|
||||||
pub struct ChunkRequest {
|
|
||||||
pub hash: blake3::Hash,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl_p2p_message!(ChunkRequest, "dhtchunkrequest", 0, 0, DEFAULT_METERING_CONFIGURATION);
|
|
||||||
|
|
||||||
#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
|
|
||||||
pub struct ChunkReply {
|
|
||||||
pub hash: blake3::Hash,
|
|
||||||
pub data: Vec<u8>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl_p2p_message!(ChunkReply, "dhtchunkreply", 0, 0, DEFAULT_METERING_CONFIGURATION);
|
|
||||||
|
|
||||||
#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
|
|
||||||
pub struct FileRequest {
|
|
||||||
pub hash: blake3::Hash,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl_p2p_message!(FileRequest, "dhtfilerequest", 0, 0, DEFAULT_METERING_CONFIGURATION);
|
|
||||||
|
|
||||||
#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
|
|
||||||
pub struct FileReply {
|
|
||||||
pub hash: blake3::Hash,
|
|
||||||
pub chunks: Vec<blake3::Hash>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl_p2p_message!(FileReply, "dhtfilereply", 0, 0, DEFAULT_METERING_CONFIGURATION);
|
|
||||||
|
|
||||||
impl ProtocolDht {
|
|
||||||
#[allow(dead_code)]
|
|
||||||
pub async fn init(channel: ChannelPtr, p2p: P2pPtr, state: DhtdPtr) -> Result<ProtocolBasePtr> {
|
|
||||||
let msg_subsystem = channel.message_subsystem();
|
|
||||||
msg_subsystem.add_dispatch::<NetHashMapInsert<blake3::Hash, Vec<blake3::Hash>>>().await;
|
|
||||||
msg_subsystem.add_dispatch::<NetHashMapRemove<blake3::Hash>>().await;
|
|
||||||
msg_subsystem.add_dispatch::<ChunkRequest>().await;
|
|
||||||
msg_subsystem.add_dispatch::<ChunkReply>().await;
|
|
||||||
msg_subsystem.add_dispatch::<FileRequest>().await;
|
|
||||||
msg_subsystem.add_dispatch::<FileReply>().await;
|
|
||||||
|
|
||||||
let insert_sub = channel.subscribe_msg().await?;
|
|
||||||
let remove_sub = channel.subscribe_msg().await?;
|
|
||||||
let chunk_request_sub = channel.subscribe_msg().await?;
|
|
||||||
let chunk_reply_sub = channel.subscribe_msg().await?;
|
|
||||||
let file_request_sub = channel.subscribe_msg().await?;
|
|
||||||
let file_reply_sub = channel.subscribe_msg().await?;
|
|
||||||
|
|
||||||
Ok(Arc::new(Self {
|
|
||||||
jobsman: ProtocolJobsManager::new("DHTProto", channel.clone()),
|
|
||||||
channel,
|
|
||||||
_p2p: p2p,
|
|
||||||
state,
|
|
||||||
insert_sub,
|
|
||||||
remove_sub,
|
|
||||||
chunk_request_sub,
|
|
||||||
chunk_reply_sub,
|
|
||||||
file_request_sub,
|
|
||||||
file_reply_sub,
|
|
||||||
}))
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn handle_insert(self: Arc<Self>) -> Result<()> {
|
|
||||||
debug!("ProtocolDht::handle_insert START");
|
|
||||||
loop {
|
|
||||||
let Ok(msg) = self.insert_sub.receive().await else { continue };
|
|
||||||
|
|
||||||
let mut state = self.state.write().await;
|
|
||||||
|
|
||||||
state.routing_table.entry(msg.k).or_insert_with(HashSet::new);
|
|
||||||
|
|
||||||
let hashset = state.routing_table.get_mut(&msg.k).unwrap();
|
|
||||||
hashset.insert(self.channel.address().clone());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn handle_remove(self: Arc<Self>) -> Result<()> {
|
|
||||||
debug!("ProtocolDht::handle_remove START");
|
|
||||||
loop {
|
|
||||||
let Ok(msg) = self.remove_sub.receive().await else { continue };
|
|
||||||
|
|
||||||
let mut state = self.state.write().await;
|
|
||||||
|
|
||||||
if !state.routing_table.contains_key(&msg.k) {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
let hashset = state.routing_table.get_mut(&msg.k).unwrap();
|
|
||||||
hashset.remove(self.channel.address());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn handle_chunk_request(self: Arc<Self>) -> Result<()> {
|
|
||||||
debug!("ProtocolDht::handle_chunk_request START");
|
|
||||||
loop {
|
|
||||||
let Ok(msg) = self.chunk_request_sub.receive().await else { continue };
|
|
||||||
|
|
||||||
println!("{:?}", msg);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn handle_chunk_reply(self: Arc<Self>) -> Result<()> {
|
|
||||||
debug!("ProtocolDht::handle_chunk_reply START");
|
|
||||||
loop {
|
|
||||||
let Ok(msg) = self.chunk_reply_sub.receive().await else { continue };
|
|
||||||
|
|
||||||
println!("{:?}", msg);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn handle_file_request(self: Arc<Self>) -> Result<()> {
|
|
||||||
debug!("ProtocolDht::handle_file_request START");
|
|
||||||
loop {
|
|
||||||
let Ok(msg) = self.file_request_sub.receive().await else { continue };
|
|
||||||
|
|
||||||
println!("{:?}", msg);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn handle_file_reply(self: Arc<Self>) -> Result<()> {
|
|
||||||
debug!("ProtocolDht::handle_file_reply START");
|
|
||||||
loop {
|
|
||||||
let Ok(msg) = self.file_reply_sub.receive().await else { continue };
|
|
||||||
|
|
||||||
println!("{:?}", msg);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[async_trait]
|
|
||||||
impl ProtocolBase for ProtocolDht {
|
|
||||||
async fn start(self: Arc<Self>, ex: Arc<Executor<'_>>) -> Result<()> {
|
|
||||||
debug!("ProtocolDht::start()");
|
|
||||||
self.jobsman.clone().start(ex.clone());
|
|
||||||
self.jobsman.clone().spawn(self.clone().handle_insert(), ex.clone()).await;
|
|
||||||
self.jobsman.clone().spawn(self.clone().handle_remove(), ex.clone()).await;
|
|
||||||
self.jobsman.clone().spawn(self.clone().handle_chunk_request(), ex.clone()).await;
|
|
||||||
self.jobsman.clone().spawn(self.clone().handle_chunk_reply(), ex.clone()).await;
|
|
||||||
self.jobsman.clone().spawn(self.clone().handle_file_request(), ex.clone()).await;
|
|
||||||
self.jobsman.clone().spawn(self.clone().handle_file_reply(), ex.clone()).await;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn name(&self) -> &'static str {
|
|
||||||
"ProtoDHT"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -1,175 +0,0 @@
|
|||||||
/* This file is part of DarkFi (https://dark.fi)
|
|
||||||
*
|
|
||||||
* Copyright (C) 2020-2025 Dyne.org foundation
|
|
||||||
*
|
|
||||||
* This program is free software: you can redistribute it and/or modify
|
|
||||||
* it under the terms of the GNU Affero General Public License as
|
|
||||||
* published by the Free Software Foundation, either version 3 of the
|
|
||||||
* License, or (at your option) any later version.
|
|
||||||
*
|
|
||||||
* This program is distributed in the hope that it will be useful,
|
|
||||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
||||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
||||||
* GNU Affero General Public License for more details.
|
|
||||||
*
|
|
||||||
* You should have received a copy of the GNU Affero General Public License
|
|
||||||
* along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
||||||
*/
|
|
||||||
|
|
||||||
use std::collections::HashMap;
|
|
||||||
|
|
||||||
use async_std::{
|
|
||||||
fs,
|
|
||||||
net::TcpListener,
|
|
||||||
sync::{Arc, RwLock},
|
|
||||||
};
|
|
||||||
use darkfi::{
|
|
||||||
dht2::{Dht, MAX_CHUNK_SIZE},
|
|
||||||
net::{self, P2p},
|
|
||||||
util::async_util::{msleep, sleep},
|
|
||||||
system::StoppableTask,
|
|
||||||
Error, Result,
|
|
||||||
};
|
|
||||||
use rand::{rngs::OsRng, RngCore};
|
|
||||||
use smol::Executor;
|
|
||||||
use url::Url;
|
|
||||||
use log::{error, warn};
|
|
||||||
|
|
||||||
use super::{proto::ProtocolDht, Dhtd};
|
|
||||||
|
|
||||||
#[allow(dead_code)]
|
|
||||||
async fn dht_remote_get_insert_real(ex: Arc<Executor<'_>>) -> Result<()> {
|
|
||||||
const NET_SIZE: usize = 5;
|
|
||||||
|
|
||||||
let mut dhtds = vec![];
|
|
||||||
let mut base_path = std::env::temp_dir();
|
|
||||||
base_path.push("dht");
|
|
||||||
|
|
||||||
let mut addrs = vec![];
|
|
||||||
for i in 0..NET_SIZE {
|
|
||||||
// Find an available port
|
|
||||||
let listener = TcpListener::bind("127.0.0.1:0").await?;
|
|
||||||
let sockaddr = listener.local_addr()?;
|
|
||||||
let url = Url::parse(&format!("tcp://127.0.0.1:{}", sockaddr.port()))?;
|
|
||||||
drop(listener);
|
|
||||||
|
|
||||||
let settings = net::Settings {
|
|
||||||
inbound_addrs: vec![url.clone()],
|
|
||||||
peers: addrs.clone(),
|
|
||||||
allowed_transports: vec!["tcp".into()],
|
|
||||||
localnet: true,
|
|
||||||
..Default::default()
|
|
||||||
};
|
|
||||||
|
|
||||||
addrs.push(url);
|
|
||||||
|
|
||||||
let p2p = P2p::new(settings).await;
|
|
||||||
let mut node_path = base_path.clone();
|
|
||||||
node_path.push(format!("node_{}", i));
|
|
||||||
let dht = Dht::new(&node_path.into(), p2p.clone()).await?;
|
|
||||||
let dhtd = Arc::new(RwLock::new(Dhtd { dht, routing_table: HashMap::new() }));
|
|
||||||
|
|
||||||
// Register P2P protocol
|
|
||||||
let registry = p2p.protocol_registry();
|
|
||||||
|
|
||||||
let _dhtd = dhtd.clone();
|
|
||||||
registry
|
|
||||||
.register(net::SESSION_NET, move |channel, p2p| {
|
|
||||||
let dhtd = _dhtd.clone();
|
|
||||||
async move { ProtocolDht::init(channel, p2p, dhtd).await.unwrap() }
|
|
||||||
})
|
|
||||||
.await;
|
|
||||||
|
|
||||||
p2p.clone().start(ex.clone()).await?;
|
|
||||||
StoppableTask::new().start(
|
|
||||||
p2p.run(ex.clone()),
|
|
||||||
|res| async {
|
|
||||||
match res {
|
|
||||||
Ok(()) | Err(Error::P2PNetworkStopped) => { /* Do nothing */ }
|
|
||||||
Err(e) => error!("Failed starting P2P network: {}", e),
|
|
||||||
}
|
|
||||||
},
|
|
||||||
Error::P2PNetworkStopped,
|
|
||||||
ex,
|
|
||||||
);
|
|
||||||
|
|
||||||
dhtds.push(dhtd);
|
|
||||||
|
|
||||||
sleep(1).await;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Now the P2P network is set up. Try some stuff.
|
|
||||||
for dhtd in dhtds.iter_mut() {
|
|
||||||
dhtd.write().await.dht.garbage_collect().await?;
|
|
||||||
}
|
|
||||||
|
|
||||||
let dhtd = &mut dhtds[NET_SIZE - 1];
|
|
||||||
let rng = &mut OsRng;
|
|
||||||
let mut data = vec![0u8; MAX_CHUNK_SIZE];
|
|
||||||
rng.fill_bytes(&mut data);
|
|
||||||
let (file_hash, chunk_hashes) = dhtd.write().await.dht.insert(&data).await?;
|
|
||||||
msleep(1000).await;
|
|
||||||
|
|
||||||
for (i, node) in dhtds.iter().enumerate() {
|
|
||||||
if i == NET_SIZE - 1 {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
assert!(node.read().await.routing_table.contains_key(&file_hash));
|
|
||||||
}
|
|
||||||
|
|
||||||
let dhtd = &mut dhtds[NET_SIZE - 1];
|
|
||||||
let mut chunk_path = dhtd.read().await.dht.chunks_path();
|
|
||||||
chunk_path.push(chunk_hashes[0].to_hex().as_str());
|
|
||||||
fs::remove_file(chunk_path).await?;
|
|
||||||
dhtd.write().await.dht.garbage_collect().await?;
|
|
||||||
msleep(1000).await;
|
|
||||||
|
|
||||||
for (i, node) in dhtds.iter().enumerate() {
|
|
||||||
if i == NET_SIZE - 1 {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
let peers = node.read().await.routing_table.get(&file_hash).unwrap().clone();
|
|
||||||
assert!(peers.is_empty());
|
|
||||||
}
|
|
||||||
|
|
||||||
fs::remove_dir_all(base_path).await?;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn dht_remote_get_insert() -> Result<()> {
|
|
||||||
let mut cfg = simplelog::ConfigBuilder::new();
|
|
||||||
cfg.add_filter_ignore("net::protocol_version".to_string());
|
|
||||||
cfg.add_filter_ignore("net::protocol_ping".to_string());
|
|
||||||
|
|
||||||
// We check this error so we can execute same file tests in parallel,
|
|
||||||
// otherwise second one fails to init logger here.
|
|
||||||
if simplelog::TermLogger::init(
|
|
||||||
simplelog::LevelFilter::Info,
|
|
||||||
//simplelog::LevelFilter::Debug,
|
|
||||||
//simplelog::LevelFilter::Trace,
|
|
||||||
cfg.build(),
|
|
||||||
simplelog::TerminalMode::Mixed,
|
|
||||||
simplelog::ColorChoice::Auto,
|
|
||||||
)
|
|
||||||
.is_err()
|
|
||||||
{
|
|
||||||
warn!(target: "test_harness", "Logger already initialized");
|
|
||||||
}
|
|
||||||
|
|
||||||
let ex = Arc::new(Executor::new());
|
|
||||||
let (signal, shutdown) = async_std::channel::unbounded::<()>();
|
|
||||||
|
|
||||||
easy_parallel::Parallel::new().each(0..4, |_| smol::block_on(ex.run(shutdown.recv()))).finish(
|
|
||||||
|| {
|
|
||||||
smol::block_on(async {
|
|
||||||
dht_remote_get_insert_real(ex.clone()).await.unwrap();
|
|
||||||
drop(signal);
|
|
||||||
})
|
|
||||||
},
|
|
||||||
);
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
2
script/research/dhtd/.gitignore
vendored
2
script/research/dhtd/.gitignore
vendored
@@ -1,2 +0,0 @@
|
|||||||
/target
|
|
||||||
Cargo.lock
|
|
||||||
@@ -1,36 +0,0 @@
|
|||||||
[package]
|
|
||||||
name = "dhtd"
|
|
||||||
version = "0.4.1"
|
|
||||||
authors = ["Dyne.org foundation <foundation@dyne.org>"]
|
|
||||||
license = "AGPL-3.0-only"
|
|
||||||
edition = "2021"
|
|
||||||
|
|
||||||
[dependencies.darkfi]
|
|
||||||
path = "../../../"
|
|
||||||
features = ["dht"]
|
|
||||||
|
|
||||||
[dependencies]
|
|
||||||
async-channel = "2.5.0"
|
|
||||||
async-executor = "1.13.2"
|
|
||||||
async-std = "1.13.1"
|
|
||||||
async-trait = "0.1.88"
|
|
||||||
blake3 = "1.8.2"
|
|
||||||
ctrlc = { version = "3.4.7", features = ["termination"] }
|
|
||||||
easy-parallel = "3.3.1"
|
|
||||||
futures-lite = "2.6.1"
|
|
||||||
log = "0.4.27"
|
|
||||||
serde_json = "1.0.142"
|
|
||||||
simplelog = "0.12.2"
|
|
||||||
url = "2.5.4"
|
|
||||||
|
|
||||||
# Argument parsing
|
|
||||||
serde = "1.0.219"
|
|
||||||
serde_derive = "1.0.219"
|
|
||||||
structopt = "0.3.26"
|
|
||||||
structopt-toml = "0.5.1"
|
|
||||||
|
|
||||||
[workspace]
|
|
||||||
|
|
||||||
[lints]
|
|
||||||
workspace = true
|
|
||||||
|
|
||||||
@@ -1,30 +0,0 @@
|
|||||||
## dht configuration file
|
|
||||||
##
|
|
||||||
## Please make sure you go through all the settings so you can configure
|
|
||||||
## your daemon properly.
|
|
||||||
##
|
|
||||||
## The default values are left commented. They can be overridden either by
|
|
||||||
## uncommenting, or by using the command-line.
|
|
||||||
|
|
||||||
# P2P accept addresses
|
|
||||||
#p2p_accept = ["tls://127.0.0.1:9541"]
|
|
||||||
|
|
||||||
# P2P external addresses
|
|
||||||
#p2p_external = ["tls://127.0.0.1:9541"]
|
|
||||||
|
|
||||||
# Connection slots
|
|
||||||
#slots = 8
|
|
||||||
|
|
||||||
# Seed nodes to connect to
|
|
||||||
#seed = []
|
|
||||||
|
|
||||||
# Peers to connect to
|
|
||||||
#peer = []
|
|
||||||
|
|
||||||
## JSON-RPC settings
|
|
||||||
[rpc]
|
|
||||||
# JSON-RPC listen URL
|
|
||||||
rpc_listen = "tcp://127.0.0.1:9540"
|
|
||||||
|
|
||||||
# Disabled RPC methods
|
|
||||||
#rpc_disabled_methods = []
|
|
||||||
@@ -1,46 +0,0 @@
|
|||||||
/* This file is part of DarkFi (https://dark.fi)
|
|
||||||
*
|
|
||||||
* Copyright (C) 2020-2025 Dyne.org foundation
|
|
||||||
*
|
|
||||||
* This program is free software: you can redistribute it and/or modify
|
|
||||||
* it under the terms of the GNU Affero General Public License as
|
|
||||||
* published by the Free Software Foundation, either version 3 of the
|
|
||||||
* License, or (at your option) any later version.
|
|
||||||
*
|
|
||||||
* This program is distributed in the hope that it will be useful,
|
|
||||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
||||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
||||||
* GNU Affero General Public License for more details.
|
|
||||||
*
|
|
||||||
* You should have received a copy of the GNU Affero General Public License
|
|
||||||
* along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
||||||
*/
|
|
||||||
|
|
||||||
use serde_json::Value;
|
|
||||||
|
|
||||||
use darkfi::rpc::jsonrpc::{ErrorCode::ServerError, JsonError, JsonResult};
|
|
||||||
|
|
||||||
pub enum RpcError {
|
|
||||||
UnknownKey = -35107,
|
|
||||||
QueryFailed = -35108,
|
|
||||||
KeyInsertFail = -35110,
|
|
||||||
KeyRemoveFail = -35111,
|
|
||||||
WaitingNetworkError = -35112,
|
|
||||||
}
|
|
||||||
|
|
||||||
fn to_tuple(e: RpcError) -> (i64, String) {
|
|
||||||
let msg = match e {
|
|
||||||
RpcError::UnknownKey => "Did not find key",
|
|
||||||
RpcError::QueryFailed => "Failed to query key",
|
|
||||||
RpcError::KeyInsertFail => "Failed to insert key",
|
|
||||||
RpcError::KeyRemoveFail => "Failed to remove key",
|
|
||||||
RpcError::WaitingNetworkError => "Error while waiting network response.",
|
|
||||||
};
|
|
||||||
|
|
||||||
(e as i64, msg.to_string())
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn server_error(e: RpcError, id: Value) -> JsonResult {
|
|
||||||
let (code, msg) = to_tuple(e);
|
|
||||||
JsonError::new(ServerError(code), Some(msg), id).into()
|
|
||||||
}
|
|
||||||
@@ -1,323 +0,0 @@
|
|||||||
/* This file is part of DarkFi (https://dark.fi)
|
|
||||||
*
|
|
||||||
* Copyright (C) 2020-2025 Dyne.org foundation
|
|
||||||
*
|
|
||||||
* This program is free software: you can redistribute it and/or modify
|
|
||||||
* it under the terms of the GNU Affero General Public License as
|
|
||||||
* published by the Free Software Foundation, either version 3 of the
|
|
||||||
* License, or (at your option) any later version.
|
|
||||||
*
|
|
||||||
* This program is distributed in the hope that it will be useful,
|
|
||||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
||||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
||||||
* GNU Affero General Public License for more details.
|
|
||||||
*
|
|
||||||
* You should have received a copy of the GNU Affero General Public License
|
|
||||||
* along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
||||||
*/
|
|
||||||
|
|
||||||
use async_executor::Executor;
|
|
||||||
use async_std::sync::Arc;
|
|
||||||
use async_trait::async_trait;
|
|
||||||
use futures_lite::future;
|
|
||||||
use log::{error, info};
|
|
||||||
use serde_derive::Deserialize;
|
|
||||||
use serde_json::{json, Value};
|
|
||||||
use structopt::StructOpt;
|
|
||||||
use structopt_toml::StructOptToml;
|
|
||||||
use url::Url;
|
|
||||||
|
|
||||||
use darkfi::{
|
|
||||||
async_daemonize, cli_desc,
|
|
||||||
dht::{waiting_for_response, Dht, DhtPtr},
|
|
||||||
net,
|
|
||||||
rpc::{
|
|
||||||
jsonrpc::{
|
|
||||||
ErrorCode::{InvalidParams, MethodNotFound},
|
|
||||||
JsonError, JsonRequest, JsonResponse, JsonResult,
|
|
||||||
},
|
|
||||||
server::{listen_and_serve, RequestHandler},
|
|
||||||
settings::RpcSettingsOpt
|
|
||||||
},
|
|
||||||
util::{
|
|
||||||
cli::{get_log_config, get_log_level, spawn_config},
|
|
||||||
path::get_config_path,
|
|
||||||
serial::serialize,
|
|
||||||
expand_path,
|
|
||||||
},
|
|
||||||
Result,
|
|
||||||
};
|
|
||||||
|
|
||||||
mod error;
|
|
||||||
use error::{server_error, RpcError};
|
|
||||||
const CONFIG_FILE: &str = "dhtd_config.toml";
|
|
||||||
const CONFIG_FILE_CONTENTS: &str = include_str!("../dhtd_config.toml");
|
|
||||||
|
|
||||||
#[derive(Clone, Debug, Deserialize, StructOpt, StructOptToml)]
|
|
||||||
#[serde(default)]
|
|
||||||
#[structopt(name = "dhtd", about = cli_desc!())]
|
|
||||||
struct Args {
|
|
||||||
#[structopt(short, long)]
|
|
||||||
/// Configuration file to use
|
|
||||||
config: Option<String>,
|
|
||||||
|
|
||||||
#[structopt(flatten)]
|
|
||||||
/// JSON-RPC settings
|
|
||||||
rpc: RpcSettingsOpt,
|
|
||||||
|
|
||||||
#[structopt(long)]
|
|
||||||
/// P2P accept addresses (repeatable flag)
|
|
||||||
p2p_accept: Vec<Url>,
|
|
||||||
|
|
||||||
#[structopt(long)]
|
|
||||||
/// P2P external addresses (repeatable flag)
|
|
||||||
p2p_external: Vec<Url>,
|
|
||||||
|
|
||||||
#[structopt(long, default_value = "8")]
|
|
||||||
/// Connection slots
|
|
||||||
slots: u32,
|
|
||||||
|
|
||||||
#[structopt(long)]
|
|
||||||
/// Connect to seed (repeatable flag)
|
|
||||||
p2p_seed: Vec<Url>,
|
|
||||||
|
|
||||||
#[structopt(long)]
|
|
||||||
/// Connect to peer (repeatable flag)
|
|
||||||
p2p_peer: Vec<Url>,
|
|
||||||
|
|
||||||
#[structopt(short, parse(from_occurrences))]
|
|
||||||
/// Increase verbosity (-vvv supported)
|
|
||||||
verbose: u8,
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Struct representing DHT daemon.
|
|
||||||
/// This example/temp-impl stores String data.
|
|
||||||
/// In final version everything will be in bytes (Vec<u8).
|
|
||||||
pub struct Dhtd {
|
|
||||||
/// Daemon dht state
|
|
||||||
dht: DhtPtr,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Dhtd {
|
|
||||||
pub async fn new(dht: DhtPtr) -> Result<Self> {
|
|
||||||
Ok(Self { dht })
|
|
||||||
}
|
|
||||||
|
|
||||||
// RPCAPI:
|
|
||||||
// Checks if provided key exists and retrieve it from the local map or queries the network.
|
|
||||||
// Returns key value or not found message.
|
|
||||||
// --> {"jsonrpc": "2.0", "method": "get", "params": ["key"], "id": 1}
|
|
||||||
// <-- {"jsonrpc": "2.0", "result": "value", "id": 1}
|
|
||||||
async fn get(&self, id: Value, params: &[Value]) -> JsonResult {
|
|
||||||
if params.len() != 1 || !params[0].is_string() {
|
|
||||||
return JsonError::new(InvalidParams, None, id).into()
|
|
||||||
}
|
|
||||||
|
|
||||||
let key = params[0].to_string();
|
|
||||||
let key_hash = blake3::hash(&serialize(&key));
|
|
||||||
|
|
||||||
// We execute this sequence to prevent lock races between threads
|
|
||||||
// Verify key exists
|
|
||||||
let exists = self.dht.read().await.contains_key(key_hash.clone());
|
|
||||||
if let None = exists {
|
|
||||||
info!("Did not find key: {}", key);
|
|
||||||
return server_error(RpcError::UnknownKey, id).into()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check if key is local or shoud query network
|
|
||||||
let local = exists.unwrap();
|
|
||||||
if local {
|
|
||||||
return match self.dht.read().await.get(key_hash.clone()) {
|
|
||||||
Some(value) => {
|
|
||||||
let string = std::str::from_utf8(&value).unwrap().to_string();
|
|
||||||
JsonResponse::new(json!((key, string)), id).into()
|
|
||||||
}
|
|
||||||
None => {
|
|
||||||
info!("Did not find key: {}", key);
|
|
||||||
server_error(RpcError::UnknownKey, id).into()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
info!("Key doesn't exist locally, querring network...");
|
|
||||||
if let Err(e) = self.dht.read().await.request_key(key_hash).await {
|
|
||||||
error!("Failed to query key: {}", e);
|
|
||||||
return server_error(RpcError::QueryFailed, id).into()
|
|
||||||
}
|
|
||||||
|
|
||||||
info!("Waiting response...");
|
|
||||||
match waiting_for_response(self.dht.clone()).await {
|
|
||||||
Ok(response) => {
|
|
||||||
match response {
|
|
||||||
Some(resp) => {
|
|
||||||
info!("Key found!");
|
|
||||||
// Optionally, we insert the key to our local map
|
|
||||||
if let Err(e) =
|
|
||||||
self.dht.write().await.insert(resp.key, resp.value.clone()).await
|
|
||||||
{
|
|
||||||
error!("Failed to insert key: {}", e);
|
|
||||||
return server_error(RpcError::KeyInsertFail, id)
|
|
||||||
}
|
|
||||||
let string = std::str::from_utf8(&resp.value).unwrap().to_string();
|
|
||||||
JsonResponse::new(json!((key, string)), id).into()
|
|
||||||
}
|
|
||||||
None => {
|
|
||||||
info!("Did not find key: {}", key);
|
|
||||||
server_error(RpcError::UnknownKey, id).into()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
error!("Error while waiting network response: {}", e);
|
|
||||||
server_error(RpcError::WaitingNetworkError, id).into()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// RPCAPI:
|
|
||||||
// Insert key value pair in dht.
|
|
||||||
// --> {"jsonrpc": "2.0", "method": "insert", "params": ["key", "value"], "id": 1}
|
|
||||||
// <-- {"jsonrpc": "2.0", "result": "(key, value)", "id": 1}
|
|
||||||
async fn insert(&self, id: Value, params: &[Value]) -> JsonResult {
|
|
||||||
if params.len() != 2 || !params[0].is_string() || !params[1].is_string() {
|
|
||||||
return JsonError::new(InvalidParams, None, id).into()
|
|
||||||
}
|
|
||||||
|
|
||||||
let key = params[0].to_string();
|
|
||||||
let key_hash = blake3::hash(&serialize(&key));
|
|
||||||
let value = params[1].to_string();
|
|
||||||
|
|
||||||
if let Err(e) = self.dht.write().await.insert(key_hash, value.as_bytes().to_vec()).await {
|
|
||||||
error!("Failed to insert key: {}", e);
|
|
||||||
return server_error(RpcError::KeyInsertFail, id)
|
|
||||||
}
|
|
||||||
|
|
||||||
JsonResponse::new(json!((key, value)), id).into()
|
|
||||||
}
|
|
||||||
|
|
||||||
// RPCAPI:
|
|
||||||
// Remove key value pair from local map.
|
|
||||||
// --> {"jsonrpc": "2.0", "method": "remove", "params": ["key"], "id": 1}
|
|
||||||
// <-- {"jsonrpc": "2.0", "result": "key", "id": 1}
|
|
||||||
async fn remove(&self, id: Value, params: &[Value]) -> JsonResult {
|
|
||||||
if params.len() != 1 || !params[0].is_string() {
|
|
||||||
return JsonError::new(InvalidParams, None, id).into()
|
|
||||||
}
|
|
||||||
|
|
||||||
let key = params[0].to_string();
|
|
||||||
let key_hash = blake3::hash(&serialize(&key));
|
|
||||||
|
|
||||||
// Check if key value pair existed and act accordingly
|
|
||||||
let result = self.dht.write().await.remove(key_hash).await;
|
|
||||||
match result {
|
|
||||||
Ok(option) => match option {
|
|
||||||
Some(k) => {
|
|
||||||
info!("Hash key removed: {}", k);
|
|
||||||
JsonResponse::new(json!(k.to_string()), id).into()
|
|
||||||
}
|
|
||||||
None => {
|
|
||||||
info!("Did not find key: {}", key);
|
|
||||||
server_error(RpcError::UnknownKey, id).into()
|
|
||||||
}
|
|
||||||
},
|
|
||||||
Err(e) => {
|
|
||||||
error!("Failed to remove key: {}", e);
|
|
||||||
server_error(RpcError::KeyRemoveFail, id)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// RPCAPI:
|
|
||||||
// Returns current local map.
|
|
||||||
// --> {"jsonrpc": "2.0", "method": "map", "params": [], "id": 1}
|
|
||||||
// <-- {"jsonrpc": "2.0", "result": "map", "id": 1}
|
|
||||||
pub async fn map(&self, id: Value, _params: &[Value]) -> JsonResult {
|
|
||||||
let map = self.dht.read().await.map.clone();
|
|
||||||
let map_string = format!("{:#?}", map);
|
|
||||||
JsonResponse::new(json!(map_string), id).into()
|
|
||||||
}
|
|
||||||
|
|
||||||
// RPCAPI:
|
|
||||||
// Returns current lookup map.
|
|
||||||
// --> {"jsonrpc": "2.0", "method": "lookup", "params": [], "id": 1}
|
|
||||||
// <-- {"jsonrpc": "2.0", "result": "lookup", "id": 1}
|
|
||||||
pub async fn lookup(&self, id: Value, _params: &[Value]) -> JsonResult {
|
|
||||||
let lookup = self.dht.read().await.lookup.clone();
|
|
||||||
let lookup_string = format!("{:#?}", lookup);
|
|
||||||
JsonResponse::new(json!(lookup_string), id).into()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[async_trait]
|
|
||||||
impl RequestHandler<()> for Dhtd {
|
|
||||||
async fn handle_request(&self, req: JsonRequest) -> JsonResult {
|
|
||||||
if !req.params.is_array() {
|
|
||||||
return JsonError::new(InvalidParams, None, req.id).into()
|
|
||||||
}
|
|
||||||
|
|
||||||
let params = req.params.as_array().unwrap();
|
|
||||||
|
|
||||||
return match req.method.as_str() {
|
|
||||||
Some("get") => self.get(req.id, params).await,
|
|
||||||
Some("insert") => self.insert(req.id, params).await,
|
|
||||||
Some("remove") => self.remove(req.id, params).await,
|
|
||||||
Some("map") => self.map(req.id, params).await,
|
|
||||||
Some("lookup") => self.lookup(req.id, params).await,
|
|
||||||
Some(_) | None => JsonError::new(MethodNotFound, None, req.id).into(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async_daemonize!(realmain);
|
|
||||||
async fn realmain(args: Args, ex: Arc<Executor<'_>>) -> Result<()> {
|
|
||||||
// We use this handler to block this function after detaching all
|
|
||||||
// tasks, and to catch a shutdown signal, where we can clean up and
|
|
||||||
// exit gracefully.
|
|
||||||
let (signal, shutdown) = async_channel::bounded::<()>(1);
|
|
||||||
ctrlc::set_handler(move || {
|
|
||||||
async_std::task::block_on(signal.send(())).unwrap();
|
|
||||||
})
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
// P2P network
|
|
||||||
let network_settings = net::Settings {
|
|
||||||
inbound: args.p2p_accept,
|
|
||||||
outbound_connections: args.slots,
|
|
||||||
external_addr: args.p2p_external,
|
|
||||||
peers: args.p2p_seed.clone(),
|
|
||||||
seeds: args.p2p_seed.clone(),
|
|
||||||
..Default::default()
|
|
||||||
};
|
|
||||||
|
|
||||||
let p2p = net::P2p::new(network_settings).await;
|
|
||||||
|
|
||||||
// Initialize daemon dht
|
|
||||||
let dht = Dht::new(None, p2p.clone(), shutdown.clone(), ex.clone()).await?;
|
|
||||||
|
|
||||||
// Initialize daemon
|
|
||||||
let dhtd = Dhtd::new(dht.clone()).await?;
|
|
||||||
let dhtd = Arc::new(dhtd);
|
|
||||||
|
|
||||||
// JSON-RPC server
|
|
||||||
info!("Starting JSON-RPC server");
|
|
||||||
let _ex = ex.clone();
|
|
||||||
ex.spawn(listen_and_serve(args.rpc.into(), dhtd.clone(), _ex)).detach();
|
|
||||||
|
|
||||||
info!("Starting sync P2P network");
|
|
||||||
p2p.clone().start(ex.clone()).await?;
|
|
||||||
let _ex = ex.clone();
|
|
||||||
let _p2p = p2p.clone();
|
|
||||||
ex.spawn(async move {
|
|
||||||
if let Err(e) = _p2p.run(_ex).await {
|
|
||||||
error!("Failed starting P2P network: {}", e);
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.detach();
|
|
||||||
|
|
||||||
// Wait for SIGINT
|
|
||||||
shutdown.recv().await?;
|
|
||||||
print!("\r");
|
|
||||||
info!("Caught termination signal, cleaning up and exiting...");
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
Reference in New Issue
Block a user