mirror of
https://github.com/darkrenaissance/darkfi.git
synced 2026-01-08 22:28:12 -05:00
fud, dht: move the dht out of fud
This commit is contained in:
@@ -292,6 +292,11 @@ zkas = [
|
||||
"darkfi-serial",
|
||||
]
|
||||
|
||||
dht = [
|
||||
"async-trait",
|
||||
"num-bigint",
|
||||
]
|
||||
|
||||
# Could not get this to work. Complains manifest-key is ignored.
|
||||
#[target.'cfg(target_family = "unix")'.features]
|
||||
#net = ["net-defaults", "p2p-unix"]
|
||||
|
||||
@@ -9,7 +9,7 @@ homepage = "https://dark.fi"
|
||||
repository = "https://codeberg.org/darkrenaissance/darkfi"
|
||||
|
||||
[dependencies]
|
||||
darkfi = {path = "../../../", features = ["async-daemonize", "geode", "rpc"]}
|
||||
darkfi = {path = "../../../", features = ["async-daemonize", "geode", "rpc", "dht"]}
|
||||
darkfi-serial = {version = "0.4.2", features = ["hash"]}
|
||||
|
||||
# Misc
|
||||
|
||||
@@ -16,13 +16,6 @@
|
||||
* along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
use std::{
|
||||
collections::{HashMap, HashSet},
|
||||
io::ErrorKind,
|
||||
path::PathBuf,
|
||||
sync::Arc,
|
||||
};
|
||||
|
||||
use async_trait::async_trait;
|
||||
use futures::{future::FutureExt, pin_mut, select};
|
||||
use log::{debug, error, info, warn};
|
||||
@@ -36,14 +29,17 @@ use smol::{
|
||||
stream::StreamExt,
|
||||
Executor,
|
||||
};
|
||||
use std::{
|
||||
collections::{HashMap, HashSet},
|
||||
io::ErrorKind,
|
||||
path::PathBuf,
|
||||
sync::Arc,
|
||||
};
|
||||
use structopt_toml::{structopt::StructOpt, StructOptToml};
|
||||
|
||||
use crate::{
|
||||
dht::{DhtSettings, DhtSettingsOpt},
|
||||
rpc::FudEvent,
|
||||
};
|
||||
use darkfi::{
|
||||
async_daemonize, cli_desc,
|
||||
dht::{Dht, DhtHandler, DhtNode, DhtRouterItem, DhtRouterPtr, DhtSettings, DhtSettingsOpt},
|
||||
geode::{hash_to_string, ChunkedFile, Geode},
|
||||
net::{
|
||||
session::SESSION_DEFAULT, settings::SettingsOpt, ChannelPtr, P2p, P2pPtr,
|
||||
@@ -59,9 +55,9 @@ use darkfi::{
|
||||
util::path::expand_path,
|
||||
Error, Result,
|
||||
};
|
||||
use dht::{Dht, DhtHandler, DhtNode, DhtRouterItem, DhtRouterPtr};
|
||||
|
||||
use resource::{Resource, ResourceStatus};
|
||||
use rpc::{ChunkDownloadCompleted, ChunkNotFound};
|
||||
use rpc::{ChunkDownloadCompleted, ChunkNotFound, FudEvent};
|
||||
use tasks::FetchReply;
|
||||
|
||||
/// P2P protocols
|
||||
@@ -72,7 +68,6 @@ use proto::{
|
||||
FudPingRequest, ProtocolFud,
|
||||
};
|
||||
|
||||
mod dht;
|
||||
mod resource;
|
||||
mod rpc;
|
||||
mod tasks;
|
||||
@@ -175,7 +170,7 @@ impl DhtHandler for Fud {
|
||||
self.dht.clone()
|
||||
}
|
||||
|
||||
async fn ping(&self, channel: ChannelPtr) -> Result<dht::DhtNode> {
|
||||
async fn ping(&self, channel: ChannelPtr) -> Result<DhtNode> {
|
||||
debug!(target: "fud::DhtHandler::ping()", "Sending ping to channel {}", channel.info.id);
|
||||
let msg_subsystem = channel.message_subsystem();
|
||||
msg_subsystem.add_dispatch::<FudPingReply>().await;
|
||||
|
||||
@@ -16,10 +16,13 @@
|
||||
* along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
use async_trait::async_trait;
|
||||
use log::{debug, error, info};
|
||||
use smol::{fs::File, Executor};
|
||||
use std::sync::Arc;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use darkfi::{
|
||||
dht::{DhtHandler, DhtNode, DhtRouterItem},
|
||||
geode::{hash_to_string, read_until_filled, MAX_CHUNK_SIZE},
|
||||
impl_p2p_message,
|
||||
net::{
|
||||
@@ -30,11 +33,8 @@ use darkfi::{
|
||||
Error, Result,
|
||||
};
|
||||
use darkfi_serial::{SerialDecodable, SerialEncodable};
|
||||
use log::{debug, error, info};
|
||||
use smol::{fs::File, Executor};
|
||||
|
||||
use super::Fud;
|
||||
use crate::dht::{DhtHandler, DhtNode, DhtRouterItem};
|
||||
|
||||
/// Message representing a file reply from the network
|
||||
#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
|
||||
|
||||
@@ -16,19 +16,20 @@
|
||||
* along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
use async_trait::async_trait;
|
||||
use log::{error, info};
|
||||
use smol::{
|
||||
fs::{self, File},
|
||||
lock::MutexGuard,
|
||||
};
|
||||
use std::{
|
||||
collections::{HashMap, HashSet},
|
||||
path::PathBuf,
|
||||
};
|
||||
use tinyjson::JsonValue;
|
||||
|
||||
use crate::{
|
||||
dht::DhtHandler,
|
||||
proto::FudAnnounce,
|
||||
resource::{Resource, ResourceStatus},
|
||||
Fud,
|
||||
};
|
||||
use async_trait::async_trait;
|
||||
use darkfi::{
|
||||
dht::DhtHandler,
|
||||
geode::hash_to_string,
|
||||
rpc::{
|
||||
jsonrpc::{ErrorCode, JsonError, JsonRequest, JsonResponse, JsonResult},
|
||||
@@ -40,12 +41,12 @@ use darkfi::{
|
||||
util::path::expand_path,
|
||||
Error, Result,
|
||||
};
|
||||
use log::{error, info};
|
||||
use smol::{
|
||||
fs::{self, File},
|
||||
lock::MutexGuard,
|
||||
|
||||
use crate::{
|
||||
proto::FudAnnounce,
|
||||
resource::{Resource, ResourceStatus},
|
||||
Fud,
|
||||
};
|
||||
use tinyjson::JsonValue;
|
||||
|
||||
#[async_trait]
|
||||
impl RequestHandler<()> for Fud {
|
||||
|
||||
@@ -16,16 +16,15 @@
|
||||
* along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
use log::{error, info};
|
||||
use std::sync::Arc;
|
||||
|
||||
use darkfi::{geode::hash_to_string, system::sleep, Error, Result};
|
||||
use darkfi::{dht::DhtHandler, geode::hash_to_string, system::sleep, Error, Result};
|
||||
|
||||
use crate::{
|
||||
dht::DhtHandler,
|
||||
proto::{FudAnnounce, FudChunkReply, FudFileReply},
|
||||
Fud,
|
||||
};
|
||||
use log::{error, info};
|
||||
|
||||
/// Triggered when calling the `get` RPC method
|
||||
pub async fn get_task(fud: Arc<Fud>) -> Result<()> {
|
||||
|
||||
@@ -16,312 +16,34 @@
|
||||
* along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
use std::{
|
||||
collections::{HashMap, HashSet},
|
||||
hash::{Hash, Hasher},
|
||||
sync::Arc,
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
use async_trait::async_trait;
|
||||
use darkfi::{
|
||||
net::{connector::Connector, session::Session, ChannelPtr, Message, P2pPtr},
|
||||
system::{sleep, timeout::timeout, ExecutorPtr},
|
||||
util::time::Timestamp,
|
||||
Error, Result,
|
||||
};
|
||||
use darkfi_serial::{SerialDecodable, SerialEncodable};
|
||||
use futures::stream::FuturesUnordered;
|
||||
use log::{debug, info, warn};
|
||||
use num_bigint::BigUint;
|
||||
use smol::{
|
||||
lock::{RwLock, Semaphore},
|
||||
stream::StreamExt,
|
||||
use smol::{lock::Semaphore, stream::StreamExt};
|
||||
use std::{collections::HashSet, sync::Arc, time::Duration};
|
||||
|
||||
use super::{Dht, DhtNode, DhtRouterItem, DhtRouterPtr};
|
||||
use crate::{
|
||||
net::{connector::Connector, session::Session, ChannelPtr, Message},
|
||||
system::{sleep, timeout::timeout},
|
||||
Error, Result,
|
||||
};
|
||||
use structopt::StructOpt;
|
||||
use url::Url;
|
||||
|
||||
#[derive(Debug, Clone, SerialEncodable, SerialDecodable, Eq)]
|
||||
pub struct DhtNode {
|
||||
pub id: blake3::Hash,
|
||||
pub addresses: Vec<Url>,
|
||||
}
|
||||
|
||||
impl Hash for DhtNode {
|
||||
fn hash<H: Hasher>(&self, state: &mut H) {
|
||||
self.id.hash(state);
|
||||
}
|
||||
}
|
||||
|
||||
impl PartialEq for DhtNode {
|
||||
fn eq(&self, other: &Self) -> bool {
|
||||
self.id == other.id
|
||||
}
|
||||
}
|
||||
|
||||
pub struct DhtBucket {
|
||||
pub nodes: Vec<DhtNode>,
|
||||
}
|
||||
|
||||
/// "Router" means: Key -> Set of nodes (+ additional data for each node)
|
||||
pub type DhtRouterPtr = Arc<RwLock<HashMap<blake3::Hash, HashSet<DhtRouterItem>>>>;
|
||||
|
||||
#[derive(Debug, Clone, SerialEncodable, SerialDecodable, Eq)]
|
||||
pub struct DhtRouterItem {
|
||||
pub node: DhtNode,
|
||||
pub timestamp: u64,
|
||||
}
|
||||
|
||||
impl Hash for DhtRouterItem {
|
||||
fn hash<H: Hasher>(&self, state: &mut H) {
|
||||
self.node.id.hash(state);
|
||||
}
|
||||
}
|
||||
|
||||
impl PartialEq for DhtRouterItem {
|
||||
fn eq(&self, other: &Self) -> bool {
|
||||
self.node.id == other.node.id
|
||||
}
|
||||
}
|
||||
|
||||
impl From<DhtNode> for DhtRouterItem {
|
||||
fn from(node: DhtNode) -> Self {
|
||||
DhtRouterItem { node, timestamp: Timestamp::current_time().inner() }
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct DhtSettings {
|
||||
/// Number of nodes in a bucket
|
||||
pub k: usize,
|
||||
/// Number of lookup requests in a burst
|
||||
pub alpha: usize,
|
||||
/// Maximum number of parallel lookup requests
|
||||
pub concurrency: usize,
|
||||
/// Timeout in seconds
|
||||
pub timeout: u64,
|
||||
}
|
||||
|
||||
impl Default for DhtSettings {
|
||||
fn default() -> Self {
|
||||
Self { k: 16, alpha: 4, concurrency: 10, timeout: 5 }
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, serde::Deserialize, structopt::StructOpt, structopt_toml::StructOptToml)]
|
||||
#[structopt()]
|
||||
#[serde(rename = "dht")]
|
||||
pub struct DhtSettingsOpt {
|
||||
/// Number of nodes in a DHT bucket
|
||||
#[structopt(long)]
|
||||
pub dht_k: Option<usize>,
|
||||
|
||||
/// Number of DHT lookup requests in a burst
|
||||
#[structopt(long)]
|
||||
pub dht_alpha: Option<usize>,
|
||||
|
||||
/// Maximum number of parallel DHT lookup requests
|
||||
#[structopt(long)]
|
||||
pub dht_concurrency: Option<usize>,
|
||||
|
||||
/// Timeout in seconds
|
||||
#[structopt(long)]
|
||||
pub dht_timeout: Option<u64>,
|
||||
}
|
||||
|
||||
impl From<DhtSettingsOpt> for DhtSettings {
|
||||
fn from(opt: DhtSettingsOpt) -> Self {
|
||||
let def = DhtSettings::default();
|
||||
|
||||
Self {
|
||||
k: opt.dht_k.unwrap_or(def.k),
|
||||
alpha: opt.dht_alpha.unwrap_or(def.alpha),
|
||||
concurrency: opt.dht_concurrency.unwrap_or(def.concurrency),
|
||||
timeout: opt.dht_timeout.unwrap_or(def.timeout),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct Dht {
|
||||
/// Our own node id
|
||||
pub node_id: blake3::Hash,
|
||||
/// Are we bootstrapped?
|
||||
pub bootstrapped: Arc<RwLock<bool>>,
|
||||
/// Vec of buckets
|
||||
pub buckets: Arc<RwLock<Vec<DhtBucket>>>,
|
||||
/// Number of buckets
|
||||
pub n_buckets: usize,
|
||||
/// Channel ID -> Node ID
|
||||
pub node_cache: Arc<RwLock<HashMap<u32, DhtNode>>>,
|
||||
/// Node ID -> Channel ID
|
||||
pub channel_cache: Arc<RwLock<HashMap<blake3::Hash, u32>>>,
|
||||
/// Node ID -> Set of keys
|
||||
pub router_cache: Arc<RwLock<HashMap<blake3::Hash, HashSet<blake3::Hash>>>>,
|
||||
|
||||
pub settings: DhtSettings,
|
||||
|
||||
pub p2p: P2pPtr,
|
||||
pub executor: ExecutorPtr,
|
||||
}
|
||||
impl Dht {
|
||||
pub async fn new(
|
||||
node_id: &blake3::Hash,
|
||||
settings: &DhtSettings,
|
||||
p2p: P2pPtr,
|
||||
ex: ExecutorPtr,
|
||||
) -> Self {
|
||||
// Create empty buckets
|
||||
let mut buckets = vec![];
|
||||
for _ in 0..256 {
|
||||
buckets.push(DhtBucket { nodes: vec![] })
|
||||
}
|
||||
|
||||
Self {
|
||||
node_id: *node_id,
|
||||
buckets: Arc::new(RwLock::new(buckets)),
|
||||
n_buckets: 256,
|
||||
bootstrapped: Arc::new(RwLock::new(false)),
|
||||
node_cache: Arc::new(RwLock::new(HashMap::new())),
|
||||
channel_cache: Arc::new(RwLock::new(HashMap::new())),
|
||||
router_cache: Arc::new(RwLock::new(HashMap::new())),
|
||||
|
||||
settings: settings.clone(),
|
||||
|
||||
p2p: p2p.clone(),
|
||||
executor: ex,
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn is_bootstrapped(&self) -> bool {
|
||||
let bootstrapped = self.bootstrapped.read().await;
|
||||
*bootstrapped
|
||||
}
|
||||
|
||||
pub async fn set_bootstrapped(&self) {
|
||||
let mut bootstrapped = self.bootstrapped.write().await;
|
||||
*bootstrapped = true;
|
||||
}
|
||||
|
||||
/// Get own node
|
||||
pub async fn node(&self) -> DhtNode {
|
||||
DhtNode {
|
||||
id: self.node_id,
|
||||
addresses: self
|
||||
.p2p
|
||||
.clone()
|
||||
.hosts()
|
||||
.external_addrs()
|
||||
.await
|
||||
.iter()
|
||||
.filter(|addr| !addr.to_string().contains("[::]"))
|
||||
.cloned()
|
||||
.collect(),
|
||||
}
|
||||
}
|
||||
|
||||
// Get the distance between `key_1` and `key_2`
|
||||
pub fn distance(&self, key_1: &blake3::Hash, key_2: &blake3::Hash) -> [u8; 32] {
|
||||
let bytes1 = key_1.as_bytes();
|
||||
let bytes2 = key_2.as_bytes();
|
||||
|
||||
let mut result_bytes = [0u8; 32];
|
||||
|
||||
for i in 0..32 {
|
||||
result_bytes[i] = bytes1[i] ^ bytes2[i];
|
||||
}
|
||||
|
||||
result_bytes
|
||||
}
|
||||
|
||||
// Sort `nodes` by distance from `key`
|
||||
pub fn sort_by_distance(&self, nodes: &mut [DhtNode], key: &blake3::Hash) {
|
||||
nodes.sort_by(|a, b| {
|
||||
let distance_a = BigUint::from_bytes_be(&self.distance(key, &a.id));
|
||||
let distance_b = BigUint::from_bytes_be(&self.distance(key, &b.id));
|
||||
distance_a.cmp(&distance_b)
|
||||
});
|
||||
}
|
||||
|
||||
// key -> bucket index
|
||||
pub async fn get_bucket_index(&self, key: &blake3::Hash) -> usize {
|
||||
if key == &self.node_id {
|
||||
return 0
|
||||
}
|
||||
let distance = self.distance(&self.node_id, key);
|
||||
let mut leading_zeros = 0;
|
||||
|
||||
for &byte in &distance {
|
||||
if byte == 0 {
|
||||
leading_zeros += 8;
|
||||
} else {
|
||||
leading_zeros += byte.leading_zeros() as usize;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
let bucket_index = self.n_buckets - leading_zeros;
|
||||
std::cmp::min(bucket_index, self.n_buckets - 1)
|
||||
}
|
||||
|
||||
// Get `n` closest known nodes to a key
|
||||
// TODO: Can be optimized
|
||||
pub async fn find_neighbors(&self, key: &blake3::Hash, n: usize) -> Vec<DhtNode> {
|
||||
let buckets_lock = self.buckets.clone();
|
||||
let buckets = buckets_lock.read().await;
|
||||
|
||||
let mut neighbors = Vec::new();
|
||||
|
||||
for i in 0..self.n_buckets {
|
||||
if let Some(bucket) = buckets.get(i) {
|
||||
neighbors.extend(bucket.nodes.iter().cloned());
|
||||
}
|
||||
}
|
||||
|
||||
self.sort_by_distance(&mut neighbors, key);
|
||||
|
||||
neighbors.truncate(n);
|
||||
|
||||
neighbors
|
||||
}
|
||||
|
||||
// Channel ID -> DhtNode
|
||||
pub async fn get_node_from_channel(&self, channel_id: u32) -> Option<DhtNode> {
|
||||
let node_cache_lock = self.node_cache.clone();
|
||||
let node_cache = node_cache_lock.read().await;
|
||||
node_cache.get(&channel_id).cloned()
|
||||
}
|
||||
|
||||
// Remove nodes in router that are older than expiry_secs
|
||||
pub async fn prune_router(&self, router: DhtRouterPtr, expiry_secs: u32) {
|
||||
let expiry_timestamp = Timestamp::current_time().inner() - (expiry_secs as u64);
|
||||
let mut router_write = router.write().await;
|
||||
|
||||
let keys: Vec<_> = router_write.keys().cloned().collect();
|
||||
|
||||
for key in keys {
|
||||
let items = router_write.get_mut(&key).unwrap();
|
||||
items.retain(|item| item.timestamp > expiry_timestamp);
|
||||
if items.is_empty() {
|
||||
router_write.remove(&key);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
pub trait DhtHandler {
|
||||
fn dht(&self) -> Arc<Dht>;
|
||||
|
||||
// Send a DHT ping request
|
||||
/// Send a DHT ping request
|
||||
async fn ping(&self, channel: ChannelPtr) -> Result<DhtNode>;
|
||||
|
||||
// Triggered when we find a new node
|
||||
/// Triggered when we find a new node
|
||||
async fn on_new_node(&self, node: &DhtNode) -> Result<()>;
|
||||
|
||||
// Send FIND NODES request to a peer to get nodes close to `key`
|
||||
/// Send FIND NODES request to a peer to get nodes close to `key`
|
||||
async fn fetch_nodes(&self, node: &DhtNode, key: &blake3::Hash) -> Result<Vec<DhtNode>>;
|
||||
|
||||
// Announce message `m` for a key, and add ourselves to router
|
||||
/// Announce message `m` for a key, and add ourselves to router
|
||||
async fn announce<M: Message>(
|
||||
&self,
|
||||
key: &blake3::Hash,
|
||||
@@ -346,8 +68,8 @@ pub trait DhtHandler {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Send a DHT ping request when there is a new channel, to know the node id of the new peer,
|
||||
// Then fill the channel cache and the buckets
|
||||
/// Send a DHT ping request when there is a new channel, to know the node id of the new peer,
|
||||
/// Then fill the channel cache and the buckets
|
||||
async fn channel_task<M: Message>(&self) -> Result<()> {
|
||||
loop {
|
||||
let channel_sub = self.dht().p2p.hosts().subscribe_channel().await;
|
||||
@@ -380,7 +102,7 @@ pub trait DhtHandler {
|
||||
}
|
||||
}
|
||||
|
||||
// Remove disconnected nodes from the channel cache
|
||||
/// Remove disconnected nodes from the channel cache
|
||||
async fn disconnect_task(&self) -> Result<()> {
|
||||
loop {
|
||||
sleep(15).await;
|
||||
@@ -396,7 +118,7 @@ pub trait DhtHandler {
|
||||
}
|
||||
}
|
||||
|
||||
// Add a node in the correct bucket
|
||||
/// Add a node in the correct bucket
|
||||
async fn add_node(&self, node: DhtNode) {
|
||||
// Do not add ourselves to the buckets
|
||||
if node.id == self.dht().node_id {
|
||||
@@ -569,7 +291,7 @@ pub trait DhtHandler {
|
||||
return Ok(result.to_vec())
|
||||
}
|
||||
|
||||
// Get an existing channel, or create a new one
|
||||
/// Get an existing channel, or create a new one
|
||||
async fn get_channel(&self, node: &DhtNode) -> Result<ChannelPtr> {
|
||||
let channel_cache_lock = self.dht().channel_cache.clone();
|
||||
let channel_cache = channel_cache_lock.read().await;
|
||||
@@ -613,7 +335,7 @@ pub trait DhtHandler {
|
||||
Err(Error::Custom("Could not create channel".to_string()))
|
||||
}
|
||||
|
||||
// Add nodes as a provider for a key
|
||||
/// Add nodes as a provider for a key
|
||||
async fn add_to_router(
|
||||
&self,
|
||||
router: DhtRouterPtr,
|
||||
253
src/dht/mod.rs
Normal file
253
src/dht/mod.rs
Normal file
@@ -0,0 +1,253 @@
|
||||
/* This file is part of DarkFi (https://dark.fi)
|
||||
*
|
||||
* Copyright (C) 2020-2025 Dyne.org foundation
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as
|
||||
* published by the Free Software Foundation, either version 3 of the
|
||||
* License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU Affero General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
use async_trait::async_trait;
|
||||
use num_bigint::BigUint;
|
||||
use smol::lock::RwLock;
|
||||
use std::{
|
||||
collections::{HashMap, HashSet},
|
||||
hash::{Hash, Hasher},
|
||||
sync::Arc,
|
||||
};
|
||||
use url::Url;
|
||||
|
||||
use darkfi_serial::{SerialDecodable, SerialEncodable};
|
||||
|
||||
use crate::{net::P2pPtr, system::ExecutorPtr, util::time::Timestamp};
|
||||
|
||||
pub mod settings;
|
||||
pub use settings::{DhtSettings, DhtSettingsOpt};
|
||||
|
||||
pub mod handler;
|
||||
pub use handler::DhtHandler;
|
||||
|
||||
#[derive(Debug, Clone, SerialEncodable, SerialDecodable, Eq)]
|
||||
pub struct DhtNode {
|
||||
pub id: blake3::Hash,
|
||||
pub addresses: Vec<Url>,
|
||||
}
|
||||
|
||||
impl Hash for DhtNode {
|
||||
fn hash<H: Hasher>(&self, state: &mut H) {
|
||||
self.id.hash(state);
|
||||
}
|
||||
}
|
||||
|
||||
impl PartialEq for DhtNode {
|
||||
fn eq(&self, other: &Self) -> bool {
|
||||
self.id == other.id
|
||||
}
|
||||
}
|
||||
|
||||
pub struct DhtBucket {
|
||||
pub nodes: Vec<DhtNode>,
|
||||
}
|
||||
|
||||
/// "Router" means: Key -> Set of nodes (+ additional data for each node)
|
||||
pub type DhtRouterPtr = Arc<RwLock<HashMap<blake3::Hash, HashSet<DhtRouterItem>>>>;
|
||||
|
||||
#[derive(Debug, Clone, SerialEncodable, SerialDecodable, Eq)]
|
||||
pub struct DhtRouterItem {
|
||||
pub node: DhtNode,
|
||||
pub timestamp: u64,
|
||||
}
|
||||
|
||||
impl Hash for DhtRouterItem {
|
||||
fn hash<H: Hasher>(&self, state: &mut H) {
|
||||
self.node.id.hash(state);
|
||||
}
|
||||
}
|
||||
|
||||
impl PartialEq for DhtRouterItem {
|
||||
fn eq(&self, other: &Self) -> bool {
|
||||
self.node.id == other.node.id
|
||||
}
|
||||
}
|
||||
|
||||
impl From<DhtNode> for DhtRouterItem {
|
||||
fn from(node: DhtNode) -> Self {
|
||||
DhtRouterItem { node, timestamp: Timestamp::current_time().inner() }
|
||||
}
|
||||
}
|
||||
|
||||
pub struct Dht {
|
||||
/// Our own node id
|
||||
pub node_id: blake3::Hash,
|
||||
/// Are we bootstrapped?
|
||||
pub bootstrapped: Arc<RwLock<bool>>,
|
||||
/// Vec of buckets
|
||||
pub buckets: Arc<RwLock<Vec<DhtBucket>>>,
|
||||
/// Number of buckets
|
||||
pub n_buckets: usize,
|
||||
/// Channel ID -> Node ID
|
||||
pub node_cache: Arc<RwLock<HashMap<u32, DhtNode>>>,
|
||||
/// Node ID -> Channel ID
|
||||
pub channel_cache: Arc<RwLock<HashMap<blake3::Hash, u32>>>,
|
||||
/// Node ID -> Set of keys
|
||||
pub router_cache: Arc<RwLock<HashMap<blake3::Hash, HashSet<blake3::Hash>>>>,
|
||||
|
||||
pub settings: DhtSettings,
|
||||
|
||||
pub p2p: P2pPtr,
|
||||
pub executor: ExecutorPtr,
|
||||
}
|
||||
|
||||
impl Dht {
|
||||
pub async fn new(
|
||||
node_id: &blake3::Hash,
|
||||
settings: &DhtSettings,
|
||||
p2p: P2pPtr,
|
||||
ex: ExecutorPtr,
|
||||
) -> Self {
|
||||
// Create empty buckets
|
||||
let mut buckets = vec![];
|
||||
for _ in 0..256 {
|
||||
buckets.push(DhtBucket { nodes: vec![] })
|
||||
}
|
||||
|
||||
Self {
|
||||
node_id: *node_id,
|
||||
buckets: Arc::new(RwLock::new(buckets)),
|
||||
n_buckets: 256,
|
||||
bootstrapped: Arc::new(RwLock::new(false)),
|
||||
node_cache: Arc::new(RwLock::new(HashMap::new())),
|
||||
channel_cache: Arc::new(RwLock::new(HashMap::new())),
|
||||
router_cache: Arc::new(RwLock::new(HashMap::new())),
|
||||
|
||||
settings: settings.clone(),
|
||||
|
||||
p2p: p2p.clone(),
|
||||
executor: ex,
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn is_bootstrapped(&self) -> bool {
|
||||
let bootstrapped = self.bootstrapped.read().await;
|
||||
*bootstrapped
|
||||
}
|
||||
|
||||
pub async fn set_bootstrapped(&self) {
|
||||
let mut bootstrapped = self.bootstrapped.write().await;
|
||||
*bootstrapped = true;
|
||||
}
|
||||
|
||||
/// Get own node
|
||||
pub async fn node(&self) -> DhtNode {
|
||||
DhtNode {
|
||||
id: self.node_id,
|
||||
addresses: self
|
||||
.p2p
|
||||
.clone()
|
||||
.hosts()
|
||||
.external_addrs()
|
||||
.await
|
||||
.iter()
|
||||
.filter(|addr| !addr.to_string().contains("[::]"))
|
||||
.cloned()
|
||||
.collect(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Get the distance between `key_1` and `key_2`
|
||||
pub fn distance(&self, key_1: &blake3::Hash, key_2: &blake3::Hash) -> [u8; 32] {
|
||||
let bytes1 = key_1.as_bytes();
|
||||
let bytes2 = key_2.as_bytes();
|
||||
|
||||
let mut result_bytes = [0u8; 32];
|
||||
|
||||
for i in 0..32 {
|
||||
result_bytes[i] = bytes1[i] ^ bytes2[i];
|
||||
}
|
||||
|
||||
result_bytes
|
||||
}
|
||||
|
||||
/// Sort `nodes` by distance from `key`
|
||||
pub fn sort_by_distance(&self, nodes: &mut [DhtNode], key: &blake3::Hash) {
|
||||
nodes.sort_by(|a, b| {
|
||||
let distance_a = BigUint::from_bytes_be(&self.distance(key, &a.id));
|
||||
let distance_b = BigUint::from_bytes_be(&self.distance(key, &b.id));
|
||||
distance_a.cmp(&distance_b)
|
||||
});
|
||||
}
|
||||
|
||||
/// `key` -> bucket index
|
||||
pub async fn get_bucket_index(&self, key: &blake3::Hash) -> usize {
|
||||
if key == &self.node_id {
|
||||
return 0
|
||||
}
|
||||
let distance = self.distance(&self.node_id, key);
|
||||
let mut leading_zeros = 0;
|
||||
|
||||
for &byte in &distance {
|
||||
if byte == 0 {
|
||||
leading_zeros += 8;
|
||||
} else {
|
||||
leading_zeros += byte.leading_zeros() as usize;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
let bucket_index = self.n_buckets - leading_zeros;
|
||||
std::cmp::min(bucket_index, self.n_buckets - 1)
|
||||
}
|
||||
|
||||
/// Get `n` closest known nodes to a key
|
||||
/// TODO: Can be optimized
|
||||
pub async fn find_neighbors(&self, key: &blake3::Hash, n: usize) -> Vec<DhtNode> {
|
||||
let buckets_lock = self.buckets.clone();
|
||||
let buckets = buckets_lock.read().await;
|
||||
|
||||
let mut neighbors = Vec::new();
|
||||
|
||||
for i in 0..self.n_buckets {
|
||||
if let Some(bucket) = buckets.get(i) {
|
||||
neighbors.extend(bucket.nodes.iter().cloned());
|
||||
}
|
||||
}
|
||||
|
||||
self.sort_by_distance(&mut neighbors, key);
|
||||
|
||||
neighbors.truncate(n);
|
||||
|
||||
neighbors
|
||||
}
|
||||
|
||||
/// Channel ID -> DhtNode
|
||||
pub async fn get_node_from_channel(&self, channel_id: u32) -> Option<DhtNode> {
|
||||
let node_cache_lock = self.node_cache.clone();
|
||||
let node_cache = node_cache_lock.read().await;
|
||||
node_cache.get(&channel_id).cloned()
|
||||
}
|
||||
|
||||
/// Remove nodes in router that are older than expiry_secs
|
||||
pub async fn prune_router(&self, router: DhtRouterPtr, expiry_secs: u32) {
|
||||
let expiry_timestamp = Timestamp::current_time().inner() - (expiry_secs as u64);
|
||||
let mut router_write = router.write().await;
|
||||
|
||||
let keys: Vec<_> = router_write.keys().cloned().collect();
|
||||
|
||||
for key in keys {
|
||||
let items = router_write.get_mut(&key).unwrap();
|
||||
items.retain(|item| item.timestamp > expiry_timestamp);
|
||||
if items.is_empty() {
|
||||
router_write.remove(&key);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
71
src/dht/settings.rs
Normal file
71
src/dht/settings.rs
Normal file
@@ -0,0 +1,71 @@
|
||||
/* This file is part of DarkFi (https://dark.fi)
|
||||
*
|
||||
* Copyright (C) 2020-2025 Dyne.org foundation
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as
|
||||
* published by the Free Software Foundation, either version 3 of the
|
||||
* License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU Affero General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
use structopt::StructOpt;
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct DhtSettings {
|
||||
/// Number of nodes in a bucket
|
||||
pub k: usize,
|
||||
/// Number of lookup requests in a burst
|
||||
pub alpha: usize,
|
||||
/// Maximum number of parallel lookup requests
|
||||
pub concurrency: usize,
|
||||
/// Timeout in seconds
|
||||
pub timeout: u64,
|
||||
}
|
||||
|
||||
impl Default for DhtSettings {
|
||||
fn default() -> Self {
|
||||
Self { k: 16, alpha: 4, concurrency: 10, timeout: 5 }
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, serde::Deserialize, structopt::StructOpt, structopt_toml::StructOptToml)]
|
||||
#[structopt()]
|
||||
#[serde(rename = "dht")]
|
||||
pub struct DhtSettingsOpt {
|
||||
/// Number of nodes in a DHT bucket
|
||||
#[structopt(long)]
|
||||
pub dht_k: Option<usize>,
|
||||
|
||||
/// Number of DHT lookup requests in a burst
|
||||
#[structopt(long)]
|
||||
pub dht_alpha: Option<usize>,
|
||||
|
||||
/// Maximum number of parallel DHT lookup requests
|
||||
#[structopt(long)]
|
||||
pub dht_concurrency: Option<usize>,
|
||||
|
||||
/// Timeout in seconds
|
||||
#[structopt(long)]
|
||||
pub dht_timeout: Option<u64>,
|
||||
}
|
||||
|
||||
impl From<DhtSettingsOpt> for DhtSettings {
|
||||
fn from(opt: DhtSettingsOpt) -> Self {
|
||||
let def = DhtSettings::default();
|
||||
|
||||
Self {
|
||||
k: opt.dht_k.unwrap_or(def.k),
|
||||
alpha: opt.dht_alpha.unwrap_or(def.alpha),
|
||||
concurrency: opt.dht_concurrency.unwrap_or(def.concurrency),
|
||||
timeout: opt.dht_timeout.unwrap_or(def.timeout),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -55,6 +55,9 @@ pub mod zk;
|
||||
#[cfg(feature = "zkas")]
|
||||
pub mod zkas;
|
||||
|
||||
#[cfg(feature = "dht")]
|
||||
pub mod dht;
|
||||
|
||||
pub const ANSI_LOGO: &str = include_str!("../contrib/darkfi.ansi");
|
||||
|
||||
#[macro_export]
|
||||
|
||||
Reference in New Issue
Block a user