diff --git a/Cargo.toml b/Cargo.toml
index dd5551d72..c8f55604d 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -292,6 +292,11 @@ zkas = [
"darkfi-serial",
]
+dht = [
+ "async-trait",
+ "num-bigint",
+]
+
# Could not get this to work. Complains manifest-key is ignored.
#[target.'cfg(target_family = "unix")'.features]
#net = ["net-defaults", "p2p-unix"]
diff --git a/bin/fud/fud/Cargo.toml b/bin/fud/fud/Cargo.toml
index a5c32d64c..3cef92e09 100644
--- a/bin/fud/fud/Cargo.toml
+++ b/bin/fud/fud/Cargo.toml
@@ -9,7 +9,7 @@ homepage = "https://dark.fi"
repository = "https://codeberg.org/darkrenaissance/darkfi"
[dependencies]
-darkfi = {path = "../../../", features = ["async-daemonize", "geode", "rpc"]}
+darkfi = {path = "../../../", features = ["async-daemonize", "geode", "rpc", "dht"]}
darkfi-serial = {version = "0.4.2", features = ["hash"]}
# Misc
diff --git a/bin/fud/fud/src/main.rs b/bin/fud/fud/src/main.rs
index 2f8e8968f..a2f5541c6 100644
--- a/bin/fud/fud/src/main.rs
+++ b/bin/fud/fud/src/main.rs
@@ -16,13 +16,6 @@
* along with this program. If not, see .
*/
-use std::{
- collections::{HashMap, HashSet},
- io::ErrorKind,
- path::PathBuf,
- sync::Arc,
-};
-
use async_trait::async_trait;
use futures::{future::FutureExt, pin_mut, select};
use log::{debug, error, info, warn};
@@ -36,14 +29,17 @@ use smol::{
stream::StreamExt,
Executor,
};
+use std::{
+ collections::{HashMap, HashSet},
+ io::ErrorKind,
+ path::PathBuf,
+ sync::Arc,
+};
use structopt_toml::{structopt::StructOpt, StructOptToml};
-use crate::{
- dht::{DhtSettings, DhtSettingsOpt},
- rpc::FudEvent,
-};
use darkfi::{
async_daemonize, cli_desc,
+ dht::{Dht, DhtHandler, DhtNode, DhtRouterItem, DhtRouterPtr, DhtSettings, DhtSettingsOpt},
geode::{hash_to_string, ChunkedFile, Geode},
net::{
session::SESSION_DEFAULT, settings::SettingsOpt, ChannelPtr, P2p, P2pPtr,
@@ -59,9 +55,9 @@ use darkfi::{
util::path::expand_path,
Error, Result,
};
-use dht::{Dht, DhtHandler, DhtNode, DhtRouterItem, DhtRouterPtr};
+
use resource::{Resource, ResourceStatus};
-use rpc::{ChunkDownloadCompleted, ChunkNotFound};
+use rpc::{ChunkDownloadCompleted, ChunkNotFound, FudEvent};
use tasks::FetchReply;
/// P2P protocols
@@ -72,7 +68,6 @@ use proto::{
FudPingRequest, ProtocolFud,
};
-mod dht;
mod resource;
mod rpc;
mod tasks;
@@ -175,7 +170,7 @@ impl DhtHandler for Fud {
self.dht.clone()
}
- async fn ping(&self, channel: ChannelPtr) -> Result {
+ async fn ping(&self, channel: ChannelPtr) -> Result {
debug!(target: "fud::DhtHandler::ping()", "Sending ping to channel {}", channel.info.id);
let msg_subsystem = channel.message_subsystem();
msg_subsystem.add_dispatch::().await;
diff --git a/bin/fud/fud/src/proto.rs b/bin/fud/fud/src/proto.rs
index 3ccfd8c5d..c46ad7ff7 100644
--- a/bin/fud/fud/src/proto.rs
+++ b/bin/fud/fud/src/proto.rs
@@ -16,10 +16,13 @@
* along with this program. If not, see .
*/
+use async_trait::async_trait;
+use log::{debug, error, info};
+use smol::{fs::File, Executor};
use std::sync::Arc;
-use async_trait::async_trait;
use darkfi::{
+ dht::{DhtHandler, DhtNode, DhtRouterItem},
geode::{hash_to_string, read_until_filled, MAX_CHUNK_SIZE},
impl_p2p_message,
net::{
@@ -30,11 +33,8 @@ use darkfi::{
Error, Result,
};
use darkfi_serial::{SerialDecodable, SerialEncodable};
-use log::{debug, error, info};
-use smol::{fs::File, Executor};
use super::Fud;
-use crate::dht::{DhtHandler, DhtNode, DhtRouterItem};
/// Message representing a file reply from the network
#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
diff --git a/bin/fud/fud/src/rpc.rs b/bin/fud/fud/src/rpc.rs
index de5c9f6c8..b1669d3a3 100644
--- a/bin/fud/fud/src/rpc.rs
+++ b/bin/fud/fud/src/rpc.rs
@@ -16,19 +16,20 @@
* along with this program. If not, see .
*/
+use async_trait::async_trait;
+use log::{error, info};
+use smol::{
+ fs::{self, File},
+ lock::MutexGuard,
+};
use std::{
collections::{HashMap, HashSet},
path::PathBuf,
};
+use tinyjson::JsonValue;
-use crate::{
- dht::DhtHandler,
- proto::FudAnnounce,
- resource::{Resource, ResourceStatus},
- Fud,
-};
-use async_trait::async_trait;
use darkfi::{
+ dht::DhtHandler,
geode::hash_to_string,
rpc::{
jsonrpc::{ErrorCode, JsonError, JsonRequest, JsonResponse, JsonResult},
@@ -40,12 +41,12 @@ use darkfi::{
util::path::expand_path,
Error, Result,
};
-use log::{error, info};
-use smol::{
- fs::{self, File},
- lock::MutexGuard,
+
+use crate::{
+ proto::FudAnnounce,
+ resource::{Resource, ResourceStatus},
+ Fud,
};
-use tinyjson::JsonValue;
#[async_trait]
impl RequestHandler<()> for Fud {
diff --git a/bin/fud/fud/src/tasks.rs b/bin/fud/fud/src/tasks.rs
index e204d9499..f1f28d43d 100644
--- a/bin/fud/fud/src/tasks.rs
+++ b/bin/fud/fud/src/tasks.rs
@@ -16,16 +16,15 @@
* along with this program. If not, see .
*/
+use log::{error, info};
use std::sync::Arc;
-use darkfi::{geode::hash_to_string, system::sleep, Error, Result};
+use darkfi::{dht::DhtHandler, geode::hash_to_string, system::sleep, Error, Result};
use crate::{
- dht::DhtHandler,
proto::{FudAnnounce, FudChunkReply, FudFileReply},
Fud,
};
-use log::{error, info};
/// Triggered when calling the `get` RPC method
pub async fn get_task(fud: Arc) -> Result<()> {
diff --git a/bin/fud/fud/src/dht.rs b/src/dht/handler.rs
similarity index 62%
rename from bin/fud/fud/src/dht.rs
rename to src/dht/handler.rs
index bef755aeb..671cb9bde 100644
--- a/bin/fud/fud/src/dht.rs
+++ b/src/dht/handler.rs
@@ -16,312 +16,34 @@
* along with this program. If not, see .
*/
-use std::{
- collections::{HashMap, HashSet},
- hash::{Hash, Hasher},
- sync::Arc,
- time::Duration,
-};
-
use async_trait::async_trait;
-use darkfi::{
- net::{connector::Connector, session::Session, ChannelPtr, Message, P2pPtr},
- system::{sleep, timeout::timeout, ExecutorPtr},
- util::time::Timestamp,
- Error, Result,
-};
-use darkfi_serial::{SerialDecodable, SerialEncodable};
use futures::stream::FuturesUnordered;
use log::{debug, info, warn};
use num_bigint::BigUint;
-use smol::{
- lock::{RwLock, Semaphore},
- stream::StreamExt,
+use smol::{lock::Semaphore, stream::StreamExt};
+use std::{collections::HashSet, sync::Arc, time::Duration};
+
+use super::{Dht, DhtNode, DhtRouterItem, DhtRouterPtr};
+use crate::{
+ net::{connector::Connector, session::Session, ChannelPtr, Message},
+ system::{sleep, timeout::timeout},
+ Error, Result,
};
-use structopt::StructOpt;
-use url::Url;
-
-#[derive(Debug, Clone, SerialEncodable, SerialDecodable, Eq)]
-pub struct DhtNode {
- pub id: blake3::Hash,
- pub addresses: Vec,
-}
-
-impl Hash for DhtNode {
- fn hash(&self, state: &mut H) {
- self.id.hash(state);
- }
-}
-
-impl PartialEq for DhtNode {
- fn eq(&self, other: &Self) -> bool {
- self.id == other.id
- }
-}
-
-pub struct DhtBucket {
- pub nodes: Vec,
-}
-
-/// "Router" means: Key -> Set of nodes (+ additional data for each node)
-pub type DhtRouterPtr = Arc>>>;
-
-#[derive(Debug, Clone, SerialEncodable, SerialDecodable, Eq)]
-pub struct DhtRouterItem {
- pub node: DhtNode,
- pub timestamp: u64,
-}
-
-impl Hash for DhtRouterItem {
- fn hash(&self, state: &mut H) {
- self.node.id.hash(state);
- }
-}
-
-impl PartialEq for DhtRouterItem {
- fn eq(&self, other: &Self) -> bool {
- self.node.id == other.node.id
- }
-}
-
-impl From for DhtRouterItem {
- fn from(node: DhtNode) -> Self {
- DhtRouterItem { node, timestamp: Timestamp::current_time().inner() }
- }
-}
-
-#[derive(Clone, Debug)]
-pub struct DhtSettings {
- /// Number of nodes in a bucket
- pub k: usize,
- /// Number of lookup requests in a burst
- pub alpha: usize,
- /// Maximum number of parallel lookup requests
- pub concurrency: usize,
- /// Timeout in seconds
- pub timeout: u64,
-}
-
-impl Default for DhtSettings {
- fn default() -> Self {
- Self { k: 16, alpha: 4, concurrency: 10, timeout: 5 }
- }
-}
-
-#[derive(Clone, Debug, serde::Deserialize, structopt::StructOpt, structopt_toml::StructOptToml)]
-#[structopt()]
-#[serde(rename = "dht")]
-pub struct DhtSettingsOpt {
- /// Number of nodes in a DHT bucket
- #[structopt(long)]
- pub dht_k: Option,
-
- /// Number of DHT lookup requests in a burst
- #[structopt(long)]
- pub dht_alpha: Option,
-
- /// Maximum number of parallel DHT lookup requests
- #[structopt(long)]
- pub dht_concurrency: Option,
-
- /// Timeout in seconds
- #[structopt(long)]
- pub dht_timeout: Option,
-}
-
-impl From for DhtSettings {
- fn from(opt: DhtSettingsOpt) -> Self {
- let def = DhtSettings::default();
-
- Self {
- k: opt.dht_k.unwrap_or(def.k),
- alpha: opt.dht_alpha.unwrap_or(def.alpha),
- concurrency: opt.dht_concurrency.unwrap_or(def.concurrency),
- timeout: opt.dht_timeout.unwrap_or(def.timeout),
- }
- }
-}
-
-pub struct Dht {
- /// Our own node id
- pub node_id: blake3::Hash,
- /// Are we bootstrapped?
- pub bootstrapped: Arc>,
- /// Vec of buckets
- pub buckets: Arc>>,
- /// Number of buckets
- pub n_buckets: usize,
- /// Channel ID -> Node ID
- pub node_cache: Arc>>,
- /// Node ID -> Channel ID
- pub channel_cache: Arc>>,
- /// Node ID -> Set of keys
- pub router_cache: Arc>>>,
-
- pub settings: DhtSettings,
-
- pub p2p: P2pPtr,
- pub executor: ExecutorPtr,
-}
-impl Dht {
- pub async fn new(
- node_id: &blake3::Hash,
- settings: &DhtSettings,
- p2p: P2pPtr,
- ex: ExecutorPtr,
- ) -> Self {
- // Create empty buckets
- let mut buckets = vec![];
- for _ in 0..256 {
- buckets.push(DhtBucket { nodes: vec![] })
- }
-
- Self {
- node_id: *node_id,
- buckets: Arc::new(RwLock::new(buckets)),
- n_buckets: 256,
- bootstrapped: Arc::new(RwLock::new(false)),
- node_cache: Arc::new(RwLock::new(HashMap::new())),
- channel_cache: Arc::new(RwLock::new(HashMap::new())),
- router_cache: Arc::new(RwLock::new(HashMap::new())),
-
- settings: settings.clone(),
-
- p2p: p2p.clone(),
- executor: ex,
- }
- }
-
- pub async fn is_bootstrapped(&self) -> bool {
- let bootstrapped = self.bootstrapped.read().await;
- *bootstrapped
- }
-
- pub async fn set_bootstrapped(&self) {
- let mut bootstrapped = self.bootstrapped.write().await;
- *bootstrapped = true;
- }
-
- /// Get own node
- pub async fn node(&self) -> DhtNode {
- DhtNode {
- id: self.node_id,
- addresses: self
- .p2p
- .clone()
- .hosts()
- .external_addrs()
- .await
- .iter()
- .filter(|addr| !addr.to_string().contains("[::]"))
- .cloned()
- .collect(),
- }
- }
-
- // Get the distance between `key_1` and `key_2`
- pub fn distance(&self, key_1: &blake3::Hash, key_2: &blake3::Hash) -> [u8; 32] {
- let bytes1 = key_1.as_bytes();
- let bytes2 = key_2.as_bytes();
-
- let mut result_bytes = [0u8; 32];
-
- for i in 0..32 {
- result_bytes[i] = bytes1[i] ^ bytes2[i];
- }
-
- result_bytes
- }
-
- // Sort `nodes` by distance from `key`
- pub fn sort_by_distance(&self, nodes: &mut [DhtNode], key: &blake3::Hash) {
- nodes.sort_by(|a, b| {
- let distance_a = BigUint::from_bytes_be(&self.distance(key, &a.id));
- let distance_b = BigUint::from_bytes_be(&self.distance(key, &b.id));
- distance_a.cmp(&distance_b)
- });
- }
-
- // key -> bucket index
- pub async fn get_bucket_index(&self, key: &blake3::Hash) -> usize {
- if key == &self.node_id {
- return 0
- }
- let distance = self.distance(&self.node_id, key);
- let mut leading_zeros = 0;
-
- for &byte in &distance {
- if byte == 0 {
- leading_zeros += 8;
- } else {
- leading_zeros += byte.leading_zeros() as usize;
- break;
- }
- }
-
- let bucket_index = self.n_buckets - leading_zeros;
- std::cmp::min(bucket_index, self.n_buckets - 1)
- }
-
- // Get `n` closest known nodes to a key
- // TODO: Can be optimized
- pub async fn find_neighbors(&self, key: &blake3::Hash, n: usize) -> Vec {
- let buckets_lock = self.buckets.clone();
- let buckets = buckets_lock.read().await;
-
- let mut neighbors = Vec::new();
-
- for i in 0..self.n_buckets {
- if let Some(bucket) = buckets.get(i) {
- neighbors.extend(bucket.nodes.iter().cloned());
- }
- }
-
- self.sort_by_distance(&mut neighbors, key);
-
- neighbors.truncate(n);
-
- neighbors
- }
-
- // Channel ID -> DhtNode
- pub async fn get_node_from_channel(&self, channel_id: u32) -> Option {
- let node_cache_lock = self.node_cache.clone();
- let node_cache = node_cache_lock.read().await;
- node_cache.get(&channel_id).cloned()
- }
-
- // Remove nodes in router that are older than expiry_secs
- pub async fn prune_router(&self, router: DhtRouterPtr, expiry_secs: u32) {
- let expiry_timestamp = Timestamp::current_time().inner() - (expiry_secs as u64);
- let mut router_write = router.write().await;
-
- let keys: Vec<_> = router_write.keys().cloned().collect();
-
- for key in keys {
- let items = router_write.get_mut(&key).unwrap();
- items.retain(|item| item.timestamp > expiry_timestamp);
- if items.is_empty() {
- router_write.remove(&key);
- }
- }
- }
-}
#[async_trait]
pub trait DhtHandler {
fn dht(&self) -> Arc;
- // Send a DHT ping request
+ /// Send a DHT ping request
async fn ping(&self, channel: ChannelPtr) -> Result;
- // Triggered when we find a new node
+ /// Triggered when we find a new node
async fn on_new_node(&self, node: &DhtNode) -> Result<()>;
- // Send FIND NODES request to a peer to get nodes close to `key`
+ /// Send FIND NODES request to a peer to get nodes close to `key`
async fn fetch_nodes(&self, node: &DhtNode, key: &blake3::Hash) -> Result>;
- // Announce message `m` for a key, and add ourselves to router
+ /// Announce message `m` for a key, and add ourselves to router
async fn announce(
&self,
key: &blake3::Hash,
@@ -346,8 +68,8 @@ pub trait DhtHandler {
Ok(())
}
- // Send a DHT ping request when there is a new channel, to know the node id of the new peer,
- // Then fill the channel cache and the buckets
+ /// Send a DHT ping request when there is a new channel, to know the node id of the new peer,
+ /// Then fill the channel cache and the buckets
async fn channel_task(&self) -> Result<()> {
loop {
let channel_sub = self.dht().p2p.hosts().subscribe_channel().await;
@@ -380,7 +102,7 @@ pub trait DhtHandler {
}
}
- // Remove disconnected nodes from the channel cache
+ /// Remove disconnected nodes from the channel cache
async fn disconnect_task(&self) -> Result<()> {
loop {
sleep(15).await;
@@ -396,7 +118,7 @@ pub trait DhtHandler {
}
}
- // Add a node in the correct bucket
+ /// Add a node in the correct bucket
async fn add_node(&self, node: DhtNode) {
// Do not add ourselves to the buckets
if node.id == self.dht().node_id {
@@ -569,7 +291,7 @@ pub trait DhtHandler {
return Ok(result.to_vec())
}
- // Get an existing channel, or create a new one
+ /// Get an existing channel, or create a new one
async fn get_channel(&self, node: &DhtNode) -> Result {
let channel_cache_lock = self.dht().channel_cache.clone();
let channel_cache = channel_cache_lock.read().await;
@@ -613,7 +335,7 @@ pub trait DhtHandler {
Err(Error::Custom("Could not create channel".to_string()))
}
- // Add nodes as a provider for a key
+ /// Add nodes as a provider for a key
async fn add_to_router(
&self,
router: DhtRouterPtr,
diff --git a/src/dht/mod.rs b/src/dht/mod.rs
new file mode 100644
index 000000000..165919049
--- /dev/null
+++ b/src/dht/mod.rs
@@ -0,0 +1,253 @@
+/* This file is part of DarkFi (https://dark.fi)
+ *
+ * Copyright (C) 2020-2025 Dyne.org foundation
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+
+use async_trait::async_trait;
+use num_bigint::BigUint;
+use smol::lock::RwLock;
+use std::{
+ collections::{HashMap, HashSet},
+ hash::{Hash, Hasher},
+ sync::Arc,
+};
+use url::Url;
+
+use darkfi_serial::{SerialDecodable, SerialEncodable};
+
+use crate::{net::P2pPtr, system::ExecutorPtr, util::time::Timestamp};
+
+pub mod settings;
+pub use settings::{DhtSettings, DhtSettingsOpt};
+
+pub mod handler;
+pub use handler::DhtHandler;
+
+#[derive(Debug, Clone, SerialEncodable, SerialDecodable, Eq)]
+pub struct DhtNode {
+ pub id: blake3::Hash,
+ pub addresses: Vec,
+}
+
+impl Hash for DhtNode {
+ fn hash(&self, state: &mut H) {
+ self.id.hash(state);
+ }
+}
+
+impl PartialEq for DhtNode {
+ fn eq(&self, other: &Self) -> bool {
+ self.id == other.id
+ }
+}
+
+pub struct DhtBucket {
+ pub nodes: Vec,
+}
+
+/// "Router" means: Key -> Set of nodes (+ additional data for each node)
+pub type DhtRouterPtr = Arc>>>;
+
+#[derive(Debug, Clone, SerialEncodable, SerialDecodable, Eq)]
+pub struct DhtRouterItem {
+ pub node: DhtNode,
+ pub timestamp: u64,
+}
+
+impl Hash for DhtRouterItem {
+ fn hash(&self, state: &mut H) {
+ self.node.id.hash(state);
+ }
+}
+
+impl PartialEq for DhtRouterItem {
+ fn eq(&self, other: &Self) -> bool {
+ self.node.id == other.node.id
+ }
+}
+
+impl From for DhtRouterItem {
+ fn from(node: DhtNode) -> Self {
+ DhtRouterItem { node, timestamp: Timestamp::current_time().inner() }
+ }
+}
+
+pub struct Dht {
+ /// Our own node id
+ pub node_id: blake3::Hash,
+ /// Are we bootstrapped?
+ pub bootstrapped: Arc>,
+ /// Vec of buckets
+ pub buckets: Arc>>,
+ /// Number of buckets
+ pub n_buckets: usize,
+ /// Channel ID -> Node ID
+ pub node_cache: Arc>>,
+ /// Node ID -> Channel ID
+ pub channel_cache: Arc>>,
+ /// Node ID -> Set of keys
+ pub router_cache: Arc>>>,
+
+ pub settings: DhtSettings,
+
+ pub p2p: P2pPtr,
+ pub executor: ExecutorPtr,
+}
+
+impl Dht {
+ pub async fn new(
+ node_id: &blake3::Hash,
+ settings: &DhtSettings,
+ p2p: P2pPtr,
+ ex: ExecutorPtr,
+ ) -> Self {
+ // Create empty buckets
+ let mut buckets = vec![];
+ for _ in 0..256 {
+ buckets.push(DhtBucket { nodes: vec![] })
+ }
+
+ Self {
+ node_id: *node_id,
+ buckets: Arc::new(RwLock::new(buckets)),
+ n_buckets: 256,
+ bootstrapped: Arc::new(RwLock::new(false)),
+ node_cache: Arc::new(RwLock::new(HashMap::new())),
+ channel_cache: Arc::new(RwLock::new(HashMap::new())),
+ router_cache: Arc::new(RwLock::new(HashMap::new())),
+
+ settings: settings.clone(),
+
+ p2p: p2p.clone(),
+ executor: ex,
+ }
+ }
+
+ pub async fn is_bootstrapped(&self) -> bool {
+ let bootstrapped = self.bootstrapped.read().await;
+ *bootstrapped
+ }
+
+ pub async fn set_bootstrapped(&self) {
+ let mut bootstrapped = self.bootstrapped.write().await;
+ *bootstrapped = true;
+ }
+
+ /// Get own node
+ pub async fn node(&self) -> DhtNode {
+ DhtNode {
+ id: self.node_id,
+ addresses: self
+ .p2p
+ .clone()
+ .hosts()
+ .external_addrs()
+ .await
+ .iter()
+ .filter(|addr| !addr.to_string().contains("[::]"))
+ .cloned()
+ .collect(),
+ }
+ }
+
+ /// Get the distance between `key_1` and `key_2`
+ pub fn distance(&self, key_1: &blake3::Hash, key_2: &blake3::Hash) -> [u8; 32] {
+ let bytes1 = key_1.as_bytes();
+ let bytes2 = key_2.as_bytes();
+
+ let mut result_bytes = [0u8; 32];
+
+ for i in 0..32 {
+ result_bytes[i] = bytes1[i] ^ bytes2[i];
+ }
+
+ result_bytes
+ }
+
+ /// Sort `nodes` by distance from `key`
+ pub fn sort_by_distance(&self, nodes: &mut [DhtNode], key: &blake3::Hash) {
+ nodes.sort_by(|a, b| {
+ let distance_a = BigUint::from_bytes_be(&self.distance(key, &a.id));
+ let distance_b = BigUint::from_bytes_be(&self.distance(key, &b.id));
+ distance_a.cmp(&distance_b)
+ });
+ }
+
+ /// `key` -> bucket index
+ pub async fn get_bucket_index(&self, key: &blake3::Hash) -> usize {
+ if key == &self.node_id {
+ return 0
+ }
+ let distance = self.distance(&self.node_id, key);
+ let mut leading_zeros = 0;
+
+ for &byte in &distance {
+ if byte == 0 {
+ leading_zeros += 8;
+ } else {
+ leading_zeros += byte.leading_zeros() as usize;
+ break;
+ }
+ }
+
+ let bucket_index = self.n_buckets - leading_zeros;
+ std::cmp::min(bucket_index, self.n_buckets - 1)
+ }
+
+ /// Get `n` closest known nodes to a key
+ /// TODO: Can be optimized
+ pub async fn find_neighbors(&self, key: &blake3::Hash, n: usize) -> Vec {
+ let buckets_lock = self.buckets.clone();
+ let buckets = buckets_lock.read().await;
+
+ let mut neighbors = Vec::new();
+
+ for i in 0..self.n_buckets {
+ if let Some(bucket) = buckets.get(i) {
+ neighbors.extend(bucket.nodes.iter().cloned());
+ }
+ }
+
+ self.sort_by_distance(&mut neighbors, key);
+
+ neighbors.truncate(n);
+
+ neighbors
+ }
+
+ /// Channel ID -> DhtNode
+ pub async fn get_node_from_channel(&self, channel_id: u32) -> Option {
+ let node_cache_lock = self.node_cache.clone();
+ let node_cache = node_cache_lock.read().await;
+ node_cache.get(&channel_id).cloned()
+ }
+
+ /// Remove nodes in router that are older than expiry_secs
+ pub async fn prune_router(&self, router: DhtRouterPtr, expiry_secs: u32) {
+ let expiry_timestamp = Timestamp::current_time().inner() - (expiry_secs as u64);
+ let mut router_write = router.write().await;
+
+ let keys: Vec<_> = router_write.keys().cloned().collect();
+
+ for key in keys {
+ let items = router_write.get_mut(&key).unwrap();
+ items.retain(|item| item.timestamp > expiry_timestamp);
+ if items.is_empty() {
+ router_write.remove(&key);
+ }
+ }
+ }
+}
diff --git a/src/dht/settings.rs b/src/dht/settings.rs
new file mode 100644
index 000000000..e1fabc363
--- /dev/null
+++ b/src/dht/settings.rs
@@ -0,0 +1,71 @@
+/* This file is part of DarkFi (https://dark.fi)
+ *
+ * Copyright (C) 2020-2025 Dyne.org foundation
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+
+use structopt::StructOpt;
+
+#[derive(Clone, Debug)]
+pub struct DhtSettings {
+ /// Number of nodes in a bucket
+ pub k: usize,
+ /// Number of lookup requests in a burst
+ pub alpha: usize,
+ /// Maximum number of parallel lookup requests
+ pub concurrency: usize,
+ /// Timeout in seconds
+ pub timeout: u64,
+}
+
+impl Default for DhtSettings {
+ fn default() -> Self {
+ Self { k: 16, alpha: 4, concurrency: 10, timeout: 5 }
+ }
+}
+
+#[derive(Clone, Debug, serde::Deserialize, structopt::StructOpt, structopt_toml::StructOptToml)]
+#[structopt()]
+#[serde(rename = "dht")]
+pub struct DhtSettingsOpt {
+ /// Number of nodes in a DHT bucket
+ #[structopt(long)]
+ pub dht_k: Option,
+
+ /// Number of DHT lookup requests in a burst
+ #[structopt(long)]
+ pub dht_alpha: Option,
+
+ /// Maximum number of parallel DHT lookup requests
+ #[structopt(long)]
+ pub dht_concurrency: Option,
+
+ /// Timeout in seconds
+ #[structopt(long)]
+ pub dht_timeout: Option,
+}
+
+impl From for DhtSettings {
+ fn from(opt: DhtSettingsOpt) -> Self {
+ let def = DhtSettings::default();
+
+ Self {
+ k: opt.dht_k.unwrap_or(def.k),
+ alpha: opt.dht_alpha.unwrap_or(def.alpha),
+ concurrency: opt.dht_concurrency.unwrap_or(def.concurrency),
+ timeout: opt.dht_timeout.unwrap_or(def.timeout),
+ }
+ }
+}
diff --git a/src/lib.rs b/src/lib.rs
index ef0bf5d51..87df9c3c5 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -55,6 +55,9 @@ pub mod zk;
#[cfg(feature = "zkas")]
pub mod zkas;
+#[cfg(feature = "dht")]
+pub mod dht;
+
pub const ANSI_LOGO: &str = include_str!("../contrib/darkfi.ansi");
#[macro_export]