fud, dht: add a PoW to fud for node ID generation

- The node ID is not stored on the disk anymore, it's generated when fud starts.

- PoW uses Equi-X (https://spec.torproject.org/hspow-spec/v1-equix.html), the challenge contains a recent BTC block hash because it's 1) unpredictable 2) known by all fud nodes 3) periodically updated 4) stable. Once the block you chose for your node ID is too old, you will generate another one with another keypair and block hash (or other nodes will exclude you from their DHT).

- BTC block hashes are fetched from the Electrum nodes defined in the config file (you can use tor/i2p nodes if you want).

- Equi-X challenge is `(public_key || btc_block_hash || nonce)`, a node ID is `Hash(equix_challenge || equix_solution)`. You must evaluate the PoW to get a valid node ID.

- The keypair is random and regenerated when you create a new node ID, it's only there so that you can sign messages.

- `Dht` and `DhtHandler` now have a generic parameter which is any struct implementing the `DhtNode` trait. It was not needed before because fud did not use any fud-specific fields for its node struct. All the PoW stuff is in `fud` and NOT in `dht`.

- `fud/src/equix.rs` does not depend on fud, we can move it elsewhere if we want to use Equi-X in another bin.

- `FudPingRequest` now includes a random u64, the other node must provide a signature of that u64 (along with other data, including the data making up its node ID) in its `FudPingReply`.
This commit is contained in:
epiphany
2025-08-21 23:05:06 +02:00
parent 1052333fd4
commit d34113e8a5
14 changed files with 1243 additions and 321 deletions

47
Cargo.lock generated
View File

@@ -783,6 +783,15 @@ dependencies = [
"wyz",
]
[[package]]
name = "blake2"
version = "0.10.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "46502ad458c9a52b69d4d4d32775c788b7a1b85e8bc9d482d92250fc0e3f8efe"
dependencies = [
"digest",
]
[[package]]
name = "blake2b_simd"
version = "1.0.3"
@@ -2680,6 +2689,19 @@ version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "877a4ace8713b0bcf2a4e7eec82529c029f1d0619886d18145fea96c3ffe5c0f"
[[package]]
name = "equix"
version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "89178c5241f5cc0c8f2b5ac5008f3c7a32caad341b1ec747a6e1e51d2e877110"
dependencies = [
"arrayvec",
"hashx",
"num-traits",
"thiserror 2.0.12",
"visibility",
]
[[package]]
name = "errno"
version = "0.3.13"
@@ -2823,6 +2845,12 @@ dependencies = [
"windows-sys 0.59.0",
]
[[package]]
name = "fixed-capacity-vec"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6b31a14f5ee08ed1a40e1252b35af18bed062e3f39b69aab34decde36bc43e40"
[[package]]
name = "fixed-hash"
version = "0.8.0"
@@ -3036,16 +3064,20 @@ name = "fud"
version = "0.5.0"
dependencies = [
"async-trait",
"blake2",
"blake3",
"bs58",
"darkfi",
"darkfi-sdk",
"darkfi-serial",
"easy-parallel",
"equix",
"futures",
"log",
"num-bigint",
"rand 0.8.5",
"serde",
"sha2",
"signal-hook",
"signal-hook-async-std",
"simplelog",
@@ -3437,6 +3469,21 @@ dependencies = [
"hashbrown 0.15.4",
]
[[package]]
name = "hashx"
version = "0.3.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7cb639748a589a17df2126f8015897ab416e81113afb82f56df5d47fa1486ab1"
dependencies = [
"arrayvec",
"blake2",
"dynasmrt",
"fixed-capacity-vec",
"hex",
"rand_core 0.9.3",
"thiserror 2.0.12",
]
[[package]]
name = "heck"
version = "0.3.3"

View File

@@ -18,12 +18,16 @@ path = "src/main.rs"
[dependencies]
darkfi = {path = "../../../", features = ["async-daemonize", "geode", "rpc", "dht", "sled-overlay"]}
darkfi-sdk = {path = "../../../src/sdk"}
darkfi-serial = {version = "0.5.0", features = ["hash"]}
# Encoding
bs58 = "0.5.1"
sha2 = "0.10.9"
# Misc
async-trait = "0.1.88"
blake3 = "1.8.2"
bs58 = "0.5.1"
rand = "0.8.5"
log = "0.4.27"
tinyjson = "2.5.1"
@@ -38,6 +42,10 @@ signal-hook = "0.3.18"
simplelog = "0.12.2"
smol = "2.0.2"
# Equi-X
blake2 = "0.10.6"
equix = "0.2.5"
# Database
sled-overlay = "0.1.9"

View File

@@ -12,6 +12,31 @@ base_dir = "~/.local/share/darkfi/fud"
## Chunk transfer timeout in seconds
#chunk_timeout = 60
# PoW settings (to generate a valid node id)
[pow]
## Equi-X effort value
#equix_value = 10000
## Number of latest BTC block hashes that are valid for fud's PoW
#btc_hash_count = 144
## Electrum nodes timeout in seconds
#btc_timeout = 15
# Electrum nodes used to fetch the latest BTC block hashes (used in fud's PoW)
btc_electrum_nodes = [
"tcp://ax102.blockeng.ch:50001",
"tcp://fulcrum-core.1209k.com:50001",
"tcp://electrum.blockstream.info:50001",
"tcp://bitcoin.aranguren.org:50001",
"tcp://bitcoin.grey.pw:50001",
#"tor://4vrz2q62yxlfmcntnotzdjahpqh2joirp2vrcdsayyioxthffimbp2ad.onion:50001",
#"tor://k23xxwk6xipyfdqey4ylsfeetmcajjro63odwihzbmx5m6xabbwzp4yd.onion:50001",
#"tor://sysaw6aecffum4ghlbukauf6g7l3hzh3rffafmfak5bxnfowrynd56ad.onion:50001",
#"tor://udfpzbte2hommnvag5f3qlouqkhvp3xybhlus2yvfeqdwlhjroe4bbyd.onion:60001",
#"tor://lukebtcygzrosjtcklev2fhlvpbyu25saienzorhbf3vwc2fpa475qyd.onion:50001",
]
# DHT settings
[dht]
## Number of nodes in a bucket

203
bin/fud/fud/src/bitcoin.rs Normal file
View File

@@ -0,0 +1,203 @@
/* This file is part of DarkFi (https://dark.fi)
*
* Copyright (C) 2020-2025 Dyne.org foundation
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
use std::{
collections::HashMap,
io::{Cursor, Read},
sync::Arc,
time::Duration,
};
use log::{error, info, warn};
use rand::{prelude::IteratorRandom, rngs::OsRng};
use sha2::{Digest, Sha256};
use smol::lock::RwLock;
use tinyjson::JsonValue;
use url::Url;
use darkfi::{
rpc::{client::RpcClient, jsonrpc::JsonRequest},
system::{timeout::timeout, ExecutorPtr},
Error, Result,
};
use darkfi_sdk::{hex::decode_hex, GenericResult};
use crate::pow::PowSettings;
pub type BitcoinBlockHash = [u8; 32];
/// A struct that can fetch and store recent Bitcoin block hashes, using Electrum nodes.
/// This is only used to evaluate and verify fud's Equi-X PoW.
/// Bitcoin block hashes are used in the challenge, to make Equi-X solution
/// expirable and unpredictable.
/// It's meant to be swapped with DarkFi block hashes once it is stable enough.
/// TODO: It should ask for new Electrum nodes, and build a local database of them
/// instead of relying only on the list defined in the settings.
pub struct BitcoinHashCache {
/// PoW settings which includes BTC/Electrum settings
settings: Arc<RwLock<PowSettings>>,
/// Current list of block hashes, the most recent block is at the end of the list
pub block_hashes: Vec<BitcoinBlockHash>,
/// Global multithreaded executor reference
ex: ExecutorPtr,
}
impl BitcoinHashCache {
pub fn new(settings: Arc<RwLock<PowSettings>>, ex: ExecutorPtr) -> Self {
Self { settings, block_hashes: vec![], ex }
}
/// Fetch block hashes from Electrum nodes, and update [`BitcoinHashCache::block_hashes`].
pub async fn update(&mut self) -> Result<Vec<BitcoinBlockHash>> {
info!(target: "fud::BitcoinHashCache::update()", "[BTC] Updating block hashes...");
let mut block_hashes = vec![];
let btc_electrum_nodes = self.settings.read().await.btc_electrum_nodes.clone();
let mut rng = OsRng;
let random_nodes: Vec<_> =
btc_electrum_nodes.iter().choose_multiple(&mut rng, btc_electrum_nodes.len());
for addr in random_nodes {
// Connect to the Electrum node
let client = match self.create_rpc_client(addr).await {
Ok(client) => client,
Err(e) => {
warn!(target: "fud::BitcoinHashCache::update()", "[BTC] Error while creating RPC client for Electrum node {addr}: {e}");
continue
}
};
info!(target: "fud::BitcoinHashCache::update()", "[BTC] Connected to {addr}");
// Fetch the current BTC height
let current_height = match self.fetch_current_height(&client).await {
Ok(height) => height,
Err(e) => {
warn!(target: "fud::BitcoinHashCache::update()", "[BTC] Error while fetching current height: {e}");
client.stop().await;
continue
}
};
info!(target: "fud::BitcoinHashCache::update()", "[BTC] Found current height {current_height}");
// Fetch the latest block hashes
match self.fetch_hashes(current_height, &client).await {
Ok(hashes) => {
client.stop().await;
if !hashes.is_empty() {
block_hashes = hashes;
break
}
warn!(target: "fud::BitcoinHashCache::update()", "[BTC] The Electrum node replied with an empty list of block headers");
continue
}
Err(e) => {
warn!(target: "fud::BitcoinHashCache::update()", "[BTC] Error while fetching block hashes: {e}");
client.stop().await;
continue
}
};
}
if block_hashes.is_empty() {
let err_str = "Could not find any block hash";
error!(target: "fud::BitcoinHashCache::update()", "[BTC] {err_str}");
return Err(Error::Custom(err_str.to_string()))
}
info!(target: "fud::BitcoinHashCache::update()", "[BTC] Found {} block hashes", block_hashes.len());
self.block_hashes = block_hashes.clone();
Ok(block_hashes)
}
async fn create_rpc_client(&self, addr: &Url) -> Result<RpcClient> {
let btc_timeout = Duration::from_secs(self.settings.read().await.btc_timeout);
let client = timeout(btc_timeout, RpcClient::new(addr.clone(), self.ex.clone())).await??;
Ok(client)
}
/// Fetch the current BTC height using an Electrum node RPC.
async fn fetch_current_height(&self, client: &RpcClient) -> Result<u64> {
let btc_timeout = Duration::from_secs(self.settings.read().await.btc_timeout);
let req = JsonRequest::new("blockchain.headers.subscribe", vec![].into());
let rep = timeout(btc_timeout, client.request(req)).await??;
rep.get::<HashMap<String, JsonValue>>()
.and_then(|res| res.get("height"))
.and_then(|h| h.get::<f64>())
.map(|h| *h as u64)
.ok_or_else(|| {
Error::JsonParseError(
"Failed to parse `blockchain.headers.subscribe` response".into(),
)
})
}
/// Fetch `self.count` BTC block hashes from `height` using an Electrum node RPC.
async fn fetch_hashes(&self, height: u64, client: &RpcClient) -> Result<Vec<BitcoinBlockHash>> {
let count = self.settings.read().await.btc_hash_count;
let btc_timeout = Duration::from_secs(self.settings.read().await.btc_timeout);
let req = JsonRequest::new(
"blockchain.block.headers",
vec![
JsonValue::Number((height as f64) - (count as f64)),
JsonValue::Number(count as f64),
]
.into(),
);
let rep = timeout(btc_timeout, client.request(req)).await??;
let hex: &String = rep
.get::<HashMap<String, JsonValue>>()
.and_then(|res| res.get("hex"))
.and_then(|h| h.get::<String>())
.ok_or_else(|| {
Error::JsonParseError("Failed to parse `blockchain.block.headers` response".into())
})?;
let decoded_bytes = decode_hex(hex.as_str()).collect::<GenericResult<Vec<_>>>()?;
Self::decode_block_hashes(decoded_bytes)
}
/// Convert concatenated BTC block headers to a list of block hashes.
fn decode_block_hashes(data: Vec<u8>) -> Result<Vec<BitcoinBlockHash>> {
let mut cursor = Cursor::new(&data);
let count = data.len() / 80;
let mut hashes = Vec::with_capacity(count);
for _ in 0..count {
// Read the 80-byte header
let mut header = [0u8; 80];
cursor.read_exact(&mut header)?;
// Compute double SHA-256
let first_hash = Sha256::digest(header);
let second_hash = Sha256::digest(first_hash);
// Convert to big-endian hash
let mut be_hash = [0u8; 32];
be_hash.copy_from_slice(&second_hash);
be_hash.reverse();
hashes.push(be_hash);
}
Ok(hashes)
}
}

151
bin/fud/fud/src/equix.rs Normal file
View File

@@ -0,0 +1,151 @@
/* This file is part of DarkFi (https://dark.fi)
*
* Copyright (C) 2020-2025 Dyne.org foundation
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
//! Proof-of-Work using Equi-X
//!
//! <https://spec.torproject.org/hspow-spec/v1-equix.html>
//! <https://github.com/tevador/equix/blob/master/devlog.md>
//! <https://github.com/tevador/equix>
use std::convert::AsRef;
use blake2::{digest::consts::U4, Blake2b, Digest};
pub use equix::{EquiXBuilder, HashError, RuntimeOption, Solution, SolverMemory};
use darkfi::{Error, Result};
/// Algorithm personalization string
const P_STRING: &[u8] = b"DarkFi Equi-X\0";
/// Length of the personalization string, in bytes
const P_STRING_LEN: usize = 14;
/// Length of the nonce value generated by clients and included in the solution
pub const NONCE_LEN: usize = 16;
/// A challenge string
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct Challenge(Vec<u8>);
impl Challenge {
/// Build a new [`Challenge`].
///
/// Copies `input` and `nonce` values into
/// a new byte vector.
pub fn new(input: &[u8], nonce: &[u8; NONCE_LEN]) -> Self {
let mut result = Vec::<u8>::new();
result.extend_from_slice(P_STRING);
result.extend_from_slice(input.as_ref());
result.extend_from_slice(nonce.as_ref());
Self(result)
}
pub fn to_bytes(&self) -> Vec<u8> {
self.0.clone()
}
/// Clone the input portion of this challenge.
pub fn input(&self) -> Vec<u8> {
self.0[P_STRING_LEN..(self.0.len() - NONCE_LEN)].into()
}
/// Clone the nonce portion of this challenge.
pub fn nonce(&self) -> [u8; NONCE_LEN] {
self.0[(self.0.len() - NONCE_LEN)..].try_into().expect("slice length correct")
}
/// Increment the nonce value inside this challenge.
pub fn increment_nonce(&mut self) {
fn inc_le_bytes(slice: &mut [u8]) {
for byte in slice {
let (value, overflow) = (*byte).overflowing_add(1);
*byte = value;
if !overflow {
break;
}
}
}
let len = self.0.len();
inc_le_bytes(&mut self.0[(len - NONCE_LEN)..]);
}
/// Verify that a solution proof passes the effort test.
pub fn check_effort(&self, proof: &equix::SolutionByteArray, effort: u32) -> bool {
let mut hasher = Blake2b::<U4>::new();
hasher.update(self.as_ref());
hasher.update(proof.as_ref());
let value = u32::from_be_bytes(hasher.finalize().into());
value.checked_mul(effort).is_some()
}
}
impl AsRef<[u8]> for Challenge {
fn as_ref(&self) -> &[u8] {
self.0.as_ref()
}
}
pub struct EquiXPow {
/// Target effort
pub effort: u32,
/// The next [`Challenge`] to try
pub challenge: Challenge,
/// Configuration settings for Equi-X
pub equix: EquiXBuilder,
/// Temporary memory for Equi-X to use
pub mem: SolverMemory,
}
impl EquiXPow {
pub fn run(&mut self) -> Result<Solution> {
loop {
if let Some(solution) = self.run_step()? {
return Ok(solution);
}
}
}
pub fn run_step(&mut self) -> Result<Option<Solution>> {
match self.equix.build(self.challenge.as_ref()) {
Ok(equix) => {
for candidate in equix.solve_with_memory(&mut self.mem) {
if self.challenge.check_effort(&candidate.to_bytes(), self.effort) {
return Ok(Some(candidate))
}
}
}
Err(equix::Error::Hash(HashError::ProgramConstraints)) => (),
Err(e) => {
return Err(Error::Custom(e.to_string()));
}
};
self.challenge.increment_nonce();
Ok(None)
}
pub fn verify(&self, challenge: &Challenge, solution: &Solution) -> Result<()> {
if challenge.check_effort(&solution.to_bytes(), self.effort) {
return self
.equix
.verify(challenge.as_ref(), solution)
.map_err(|e| Error::Custom(e.to_string()))
}
Err(Error::Custom("Equi-X solution has insufficient effort".into()))
}
}

View File

@@ -16,18 +16,6 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
use async_trait::async_trait;
use futures::{future::FutureExt, pin_mut, select};
use log::{debug, error, info, warn};
use num_bigint::BigUint;
use rand::{prelude::IteratorRandom, rngs::OsRng, seq::SliceRandom, RngCore};
use sled_overlay::sled;
use smol::{
channel,
fs::{self, File, OpenOptions},
io::{AsyncReadExt, AsyncWriteExt},
lock::RwLock,
};
use std::{
collections::{HashMap, HashSet},
io::ErrorKind,
@@ -36,15 +24,31 @@ use std::{
time::Instant,
};
use async_trait::async_trait;
use futures::{future::FutureExt, pin_mut, select};
use log::{debug, error, info, warn};
use num_bigint::BigUint;
use rand::{prelude::IteratorRandom, rngs::OsRng, seq::SliceRandom, Rng};
use sled_overlay::sled;
use smol::{
channel,
fs::{self, File, OpenOptions},
lock::RwLock,
};
use url::Url;
use darkfi::{
dht::{Dht, DhtHandler, DhtNode, DhtRouterItem, DhtRouterPtr},
dht::{
impl_dht_node_defaults, Dht, DhtHandler, DhtNode, DhtRouterItem, DhtRouterPtr, DhtSettings,
},
geode::{hash_to_string, ChunkedStorage, FileSequence, Geode, MAX_CHUNK_SIZE},
net::{ChannelPtr, P2pPtr},
system::{PublisherPtr, StoppableTask},
system::{ExecutorPtr, PublisherPtr, StoppableTask},
util::path::expand_path,
Error, Result,
};
use darkfi_serial::{deserialize_async, serialize_async};
use darkfi_sdk::crypto::{schnorr::SchnorrPublic, SecretKey};
use darkfi_serial::{deserialize_async, serialize_async, SerialDecodable, SerialEncodable};
/// P2P protocols
pub mod proto;
@@ -73,6 +77,20 @@ pub mod rpc;
pub mod tasks;
use tasks::FetchReply;
/// Bitcoin
pub mod bitcoin;
/// PoW
pub mod pow;
use pow::{FudPow, VerifiableNodeData};
/// Equi-X
pub mod equix;
/// Settings and args
pub mod settings;
use settings::Args;
/// Utils
pub mod util;
use util::{get_all_files, FileSelection};
@@ -81,41 +99,31 @@ const SLED_PATH_TREE: &[u8] = b"_fud_paths";
const SLED_FILE_SELECTION_TREE: &[u8] = b"_fud_file_selections";
const SLED_SCRAP_TREE: &[u8] = b"_fud_scraps";
// TODO: This is not Sybil-resistant
fn generate_node_id() -> Result<blake3::Hash> {
let mut rng = OsRng;
let mut random_data = [0u8; 32];
rng.fill_bytes(&mut random_data);
let node_id = blake3::Hash::from_bytes(random_data);
Ok(node_id)
#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
pub struct FudNode {
data: VerifiableNodeData,
addresses: Vec<Url>,
}
impl_dht_node_defaults!(FudNode);
/// Get or generate the node id.
/// Fetches and saves the node id from/to a file.
pub async fn get_node_id(node_id_path: &Path) -> Result<blake3::Hash> {
match File::open(node_id_path).await {
Ok(mut file) => {
let mut buffer = Vec::new();
file.read_to_end(&mut buffer).await?;
let mut out_buf = [0u8; 32];
bs58::decode(buffer).onto(&mut out_buf)?;
let node_id = blake3::Hash::from_bytes(out_buf);
Ok(node_id)
impl DhtNode for FudNode {
fn id(&self) -> blake3::Hash {
self.data.id()
}
Err(e) if e.kind() == ErrorKind::NotFound => {
let node_id = generate_node_id()?;
let mut file = OpenOptions::new().write(true).create(true).open(node_id_path).await?;
file.write_all(&bs58::encode(node_id.as_bytes()).into_vec()).await?;
file.flush().await?;
Ok(node_id)
}
Err(e) => Err(e.into()),
fn addresses(&self) -> Vec<Url> {
self.addresses.clone()
}
}
pub struct Fud {
/// Our own [`VerifiableNodeData`]
pub node_data: Arc<RwLock<VerifiableNodeData>>,
/// Our secret key (the public key is in `node_data`)
pub secret_key: Arc<RwLock<SecretKey>>,
/// Key -> Seeders
seeders_router: DhtRouterPtr,
pub seeders_router: DhtRouterPtr<FudNode>,
/// Pointer to the P2P network instance
p2p: P2pPtr,
@@ -129,8 +137,11 @@ pub struct Fud {
/// Chunk transfer timeout in seconds
chunk_timeout: u64,
/// The [`FudPow`] instance
pub pow: Arc<RwLock<FudPow>>,
/// The DHT instance
dht: Arc<Dht>,
dht: Arc<Dht<FudNode>>,
/// Resources (current status of all downloads/seeds)
resources: Arc<RwLock<HashMap<blake3::Hash, Resource>>>,
@@ -152,7 +163,7 @@ pub struct Fud {
get_tx: channel::Sender<(blake3::Hash, PathBuf, FileSelection)>,
get_rx: channel::Receiver<(blake3::Hash, PathBuf, FileSelection)>,
/// Currently active downloading tasks (running the `fud.fetch_resource()` method)
/// Currently active includingdownloading tasks (running the `fud.fetch_resource()` method)
fetch_tasks: Arc<RwLock<HashMap<blake3::Hash, Arc<StoppableTask>>>>,
/// Used to send events to fud clients
@@ -160,45 +171,71 @@ pub struct Fud {
}
#[async_trait]
impl DhtHandler for Fud {
fn dht(&self) -> Arc<Dht> {
impl DhtHandler<FudNode> for Fud {
fn dht(&self) -> Arc<Dht<FudNode>> {
self.dht.clone()
}
async fn ping(&self, channel: ChannelPtr) -> Result<DhtNode> {
async fn node(&self) -> FudNode {
FudNode {
data: self.node_data.read().await.clone(),
addresses: self
.p2p
.clone()
.hosts()
.external_addrs()
.await
.iter()
.filter(|addr| !addr.to_string().contains("[::]"))
.cloned()
.collect(),
}
}
async fn ping(&self, channel: ChannelPtr) -> Result<FudNode> {
debug!(target: "fud::DhtHandler::ping()", "Sending ping to channel {}", channel.info.id);
let msg_subsystem = channel.message_subsystem();
msg_subsystem.add_dispatch::<FudPingReply>().await;
let msg_subscriber = channel.subscribe_msg::<FudPingReply>().await.unwrap();
let request = FudPingRequest {};
// Send `FudPingRequest`
let mut rng = OsRng;
let request = FudPingRequest { random: rng.gen() };
channel.send(&request).await?;
// Wait for `FudPingReply`
let reply = msg_subscriber.receive_with_timeout(self.dht().settings.timeout).await?;
msg_subscriber.unsubscribe().await;
// Verify the signature
if !reply.node.data.public_key.verify(&request.random.to_be_bytes(), &reply.sig) {
channel.ban().await;
return Err(Error::InvalidSignature)
}
// Verify PoW
if let Err(e) = self.pow.write().await.verify_node(&reply.node.data).await {
channel.ban().await;
return Err(e)
}
Ok(reply.node.clone())
}
// TODO: Optimize this
async fn on_new_node(&self, node: &DhtNode) -> Result<()> {
debug!(target: "fud::DhtHandler::on_new_node()", "New node {}", hash_to_string(&node.id));
async fn on_new_node(&self, node: &FudNode) -> Result<()> {
debug!(target: "fud::DhtHandler::on_new_node()", "New node {}", hash_to_string(&node.id()));
// If this is the first node we know about, then bootstrap
// If this is the first node we know about, then bootstrap and announce our files
if !self.dht().is_bootstrapped().await {
self.dht().set_bootstrapped().await;
// Lookup our own node id
debug!(target: "fud::DhtHandler::on_new_node()", "DHT bootstrapping {}", hash_to_string(&self.dht().node_id));
let _ = self.lookup_nodes(&self.dht().node_id).await;
let _ = self.init().await;
}
// Send keys that are closer to this node than we are
let self_id = self.dht().node_id;
let self_id = self.node_data.read().await.id();
let channel = self.get_channel(node, None).await?;
for (key, seeders) in self.seeders_router.read().await.iter() {
let node_distance = BigUint::from_bytes_be(&self.dht().distance(key, &node.id));
let node_distance = BigUint::from_bytes_be(&self.dht().distance(key, &node.id()));
let self_distance = BigUint::from_bytes_be(&self.dht().distance(key, &self_id));
if node_distance <= self_distance {
let _ = channel
@@ -214,8 +251,8 @@ impl DhtHandler for Fud {
Ok(())
}
async fn fetch_nodes(&self, node: &DhtNode, key: &blake3::Hash) -> Result<Vec<DhtNode>> {
debug!(target: "fud::DhtHandler::fetch_nodes()", "Fetching nodes close to {} from node {}", hash_to_string(key), hash_to_string(&node.id));
async fn fetch_nodes(&self, node: &FudNode, key: &blake3::Hash) -> Result<Vec<FudNode>> {
debug!(target: "fud::DhtHandler::fetch_nodes()", "Fetching nodes close to {} from node {}", hash_to_string(key), hash_to_string(&node.id()));
let channel = self.get_channel(node, None).await?;
let msg_subsystem = channel.message_subsystem();
@@ -236,30 +273,43 @@ impl DhtHandler for Fud {
impl Fud {
pub async fn new(
settings: Args,
p2p: P2pPtr,
basedir: PathBuf,
downloads_path: PathBuf,
chunk_timeout: u64,
dht: Arc<Dht>,
sled_db: &sled::Db,
event_publisher: PublisherPtr<FudEvent>,
executor: ExecutorPtr,
) -> Result<Self> {
let (get_tx, get_rx) = smol::channel::unbounded();
let basedir = expand_path(&settings.base_dir)?;
let downloads_path = match settings.downloads_path {
Some(downloads_path) => expand_path(&downloads_path)?,
None => basedir.join("downloads"),
};
// Hashmap used for routing
let seeders_router = Arc::new(RwLock::new(HashMap::new()));
// Run the PoW and generate a `VerifiableNodeData`
let mut pow = FudPow::new(settings.pow.into(), executor.clone());
pow.bitcoin_hash_cache.update().await?; // Fetch BTC block hashes
let (node_data, secret_key) = pow.generate_node().await?;
info!(target: "fud", "Your node ID: {}", hash_to_string(&node_data.id()));
// Geode
info!("Instantiating Geode instance");
let geode = Geode::new(&basedir).await?;
info!("Instantiating DHT instance");
// DHT
let dht_settings: DhtSettings = settings.dht.into();
let dht: Arc<Dht<FudNode>> =
Arc::new(Dht::<FudNode>::new(&dht_settings, p2p.clone(), executor.clone()).await);
let (get_tx, get_rx) = smol::channel::unbounded();
let fud = Self {
seeders_router,
node_data: Arc::new(RwLock::new(node_data)),
secret_key: Arc::new(RwLock::new(secret_key)),
seeders_router: Arc::new(RwLock::new(HashMap::new())),
p2p,
geode,
downloads_path,
chunk_timeout,
chunk_timeout: settings.chunk_timeout,
pow: Arc::new(RwLock::new(pow)),
dht,
path_tree: sled_db.open_tree(SLED_PATH_TREE)?,
file_selection_tree: sled_db.open_tree(SLED_FILE_SELECTION_TREE)?,
@@ -271,14 +321,15 @@ impl Fud {
event_publisher,
};
fud.init().await?;
Ok(fud)
}
/// Add ourselves to `seeders_router` for the files we already have.
/// Skipped if we have no external address.
/// Bootstrap the DHT, verify our resources, add ourselves to
/// `seeders_router` for the resources we already have, announce our files.
async fn init(&self) -> Result<()> {
info!(target: "fud::init()", "Bootstrapping the DHT...");
self.bootstrap().await;
info!(target: "fud::init()", "Finding resources...");
let mut resources_write = self.resources.write().await;
for result in self.path_tree.iter() {
@@ -339,16 +390,16 @@ impl Fud {
info!(target: "fud::init()", "Verifying resources...");
let resources = self.verify_resources(None).await?;
let self_node = self.dht().node().await;
let self_node = self.node().await;
// Stop here if we have no external address
if self_node.addresses.is_empty() {
return Ok(());
}
info!(target: "fud::init()", "Start seeding...");
let self_router_items: Vec<DhtRouterItem> = vec![self_node.into()];
for resource in resources {
// Add our own node as a seeder for the resources we are seeding
let self_router_items: Vec<DhtRouterItem<FudNode>> = vec![self_node.into()];
for resource in &resources {
self.add_to_router(
self.seeders_router.clone(),
&resource.hash,
@@ -357,6 +408,18 @@ impl Fud {
.await;
}
info!(target: "fud::init()", "Announcing resources...");
let seeders = vec![self.node().await.into()];
for resource in resources {
let _ = self
.announce(
&resource.hash,
&FudAnnounce { key: resource.hash, seeders: seeders.clone() },
self.seeders_router.clone(),
)
.await;
}
Ok(())
}
@@ -530,17 +593,17 @@ impl Fud {
/// Query `nodes` to find the seeders for `key`
async fn fetch_seeders(
&self,
nodes: &Vec<DhtNode>,
nodes: &Vec<FudNode>,
key: &blake3::Hash,
) -> HashSet<DhtRouterItem> {
let self_node = self.dht().node().await;
let mut seeders: HashSet<DhtRouterItem> = HashSet::new();
) -> HashSet<DhtRouterItem<FudNode>> {
let self_node = self.node().await;
let mut seeders: HashSet<DhtRouterItem<FudNode>> = HashSet::new();
for node in nodes {
let channel = match self.get_channel(node, None).await {
Ok(channel) => channel,
Err(e) => {
warn!(target: "fud::fetch_seeders()", "Could not get a channel for node {}: {e}", hash_to_string(&node.id));
warn!(target: "fud::fetch_seeders()", "Could not get a channel for node {}: {e}", hash_to_string(&node.id()));
continue;
}
};
@@ -581,7 +644,8 @@ impl Fud {
seeders.extend(reply.seeders.clone());
}
seeders = seeders.iter().filter(|seeder| seeder.node.id != self_node.id).cloned().collect();
seeders =
seeders.iter().filter(|seeder| seeder.node.id() != self_node.id()).cloned().collect();
info!(target: "fud::fetch_seeders()", "Found {} seeders for {}", seeders.len(), hash_to_string(key));
seeders
@@ -592,7 +656,7 @@ impl Fud {
&self,
hash: &blake3::Hash,
chunked: &mut ChunkedStorage,
seeders: &HashSet<DhtRouterItem>,
seeders: &HashSet<DhtRouterItem<FudNode>>,
chunks: &HashSet<blake3::Hash>,
) -> Result<()> {
let mut remaining_chunks = chunks.clone();
@@ -606,12 +670,12 @@ impl Fud {
let channel = match self.get_channel(&seeder.node, Some(*hash)).await {
Ok(channel) => channel,
Err(e) => {
warn!(target: "fud::fetch_chunks()", "Could not get a channel for node {}: {e}", hash_to_string(&seeder.node.id));
warn!(target: "fud::fetch_chunks()", "Could not get a channel for node {}: {e}", hash_to_string(&seeder.node.id()));
continue;
}
};
let mut chunks_to_query = remaining_chunks.clone();
info!("Requesting chunks from seeder {}", hash_to_string(&seeder.node.id));
info!("Requesting chunks from seeder {}", hash_to_string(&seeder.node.id()));
loop {
let start_time = Instant::now();
let msg_subsystem = channel.message_subsystem();
@@ -665,7 +729,7 @@ impl Fud {
continue; // Skip to next chunk, will retry this chunk later
}
info!(target: "fud::fetch_chunks()", "Received chunk {} from seeder {}", hash_to_string(&chunk_hash), hash_to_string(&seeder.node.id));
info!(target: "fud::fetch_chunks()", "Received chunk {} from seeder {}", hash_to_string(&chunk_hash), hash_to_string(&seeder.node.id()));
// If we did not write the whole chunk to the filesystem,
// save the chunk in the scraps.
@@ -726,7 +790,7 @@ impl Fud {
msg_subscriber_notfound.unsubscribe().await;
break; // Switch to another seeder
}
info!(target: "fud::fetch_chunks()", "Received NOTFOUND {} from seeder {}", hash_to_string(&chunk_hash), hash_to_string(&seeder.node.id));
info!(target: "fud::fetch_chunks()", "Received NOTFOUND {} from seeder {}", hash_to_string(&chunk_hash), hash_to_string(&seeder.node.id()));
notify_event!(self, ChunkNotFound, { hash: *hash, chunk_hash });
}
};
@@ -755,7 +819,7 @@ impl Fud {
pub async fn fetch_metadata(
&self,
hash: &blake3::Hash,
nodes: &Vec<DhtNode>,
nodes: &Vec<FudNode>,
path: &Path,
) -> Result<()> {
let mut queried_seeders: HashSet<blake3::Hash> = HashSet::new();
@@ -766,7 +830,7 @@ impl Fud {
let channel = match self.get_channel(node, Some(*hash)).await {
Ok(channel) => channel,
Err(e) => {
warn!(target: "fud::fetch_metadata()", "Could not get a channel for node {}: {e}", hash_to_string(&node.id));
warn!(target: "fud::fetch_metadata()", "Could not get a channel for node {}: {e}", hash_to_string(&node.id()));
continue;
}
};
@@ -801,7 +865,7 @@ impl Fud {
};
let mut seeders = reply.seeders.clone();
info!(target: "fud::fetch_metadata()", "Found {} seeders for {} (from {})", seeders.len(), hash_to_string(hash), hash_to_string(&node.id));
info!(target: "fud::fetch_metadata()", "Found {} seeders for {} (from {})", seeders.len(), hash_to_string(hash), hash_to_string(&node.id()));
msg_subscriber.unsubscribe().await;
self.cleanup_channel(channel).await;
@@ -809,10 +873,10 @@ impl Fud {
// 2. Request the file/chunk from the seeders
while let Some(seeder) = seeders.pop() {
// Only query a seeder once
if queried_seeders.iter().any(|s| *s == seeder.node.id) {
if queried_seeders.iter().any(|s| *s == seeder.node.id()) {
continue;
}
queried_seeders.insert(seeder.node.id);
queried_seeders.insert(seeder.node.id());
if let Ok(channel) = self.get_channel(&seeder.node, Some(*hash)).await {
let msg_subsystem = channel.message_subsystem();
@@ -876,7 +940,7 @@ impl Fud {
warn!(target: "fud::fetch_metadata()", "Received a chunk while fetching a file, the chunk did not match the file hash");
continue;
}
info!(target: "fud::fetch_metadata()", "Received chunk {} (for file {}) from seeder {}", hash_to_string(&chunk_hash), hash_to_string(hash), hash_to_string(&seeder.node.id));
info!(target: "fud::fetch_metadata()", "Received chunk {} (for file {}) from seeder {}", hash_to_string(&chunk_hash), hash_to_string(hash), hash_to_string(&seeder.node.id()));
result = Some(FetchReply::Chunk((*reply).clone()));
break;
}
@@ -891,7 +955,7 @@ impl Fud {
warn!(target: "fud::fetch_metadata()", "Received invalid file metadata");
continue;
}
info!(target: "fud::fetch_metadata()", "Received file {} from seeder {}", hash_to_string(hash), hash_to_string(&seeder.node.id));
info!(target: "fud::fetch_metadata()", "Received file {} from seeder {}", hash_to_string(hash), hash_to_string(&seeder.node.id()));
result = Some(FetchReply::File((*reply).clone()));
break;
}
@@ -912,7 +976,7 @@ impl Fud {
warn!(target: "fud::fetch_metadata()", "Received invalid directory metadata");
continue;
}
info!(target: "fud::fetch_metadata()", "Received directory {} from seeder {}", hash_to_string(hash), hash_to_string(&seeder.node.id));
info!(target: "fud::fetch_metadata()", "Received directory {} from seeder {}", hash_to_string(hash), hash_to_string(&seeder.node.id()));
result = Some(FetchReply::Directory((*reply).clone()));
break;
}
@@ -922,7 +986,7 @@ impl Fud {
warn!(target: "fud::fetch_metadata()", "Error waiting for NOTFOUND reply: {e}");
continue;
}
info!(target: "fud::fetch_metadata()", "Received NOTFOUND {} from seeder {}", hash_to_string(hash), hash_to_string(&seeder.node.id));
info!(target: "fud::fetch_metadata()", "Received NOTFOUND {} from seeder {}", hash_to_string(hash), hash_to_string(&seeder.node.id()));
}
};
}
@@ -1005,7 +1069,7 @@ impl Fud {
&self,
hash: &blake3::Hash,
path: &Path,
) -> Result<(ChunkedStorage, Vec<DhtNode>)> {
) -> Result<(ChunkedStorage, Vec<FudNode>)> {
match self.geode.get(hash, path).await {
// We already know the metadata
Ok(v) => Ok((v, vec![])),
@@ -1041,7 +1105,7 @@ impl Fud {
path: &Path,
files: &FileSelection,
) -> Result<()> {
let self_node = self.dht().node().await;
let self_node = self.node().await;
let hash_bytes = hash.as_bytes();
let path_string = path.to_string_lossy().to_string();
@@ -1456,7 +1520,7 @@ impl Fud {
/// Add a resource from the file system.
pub async fn put(&self, path: &PathBuf) -> Result<blake3::Hash> {
let self_node = self.dht.node().await;
let self_node = self.node().await;
if self_node.addresses.is_empty() {
return Err(Error::Custom(

View File

@@ -20,96 +20,46 @@ use log::{debug, error, info, warn};
use sled_overlay::sled;
use smol::{stream::StreamExt, Executor};
use std::sync::Arc;
use structopt_toml::{structopt::StructOpt, StructOptToml};
use structopt_toml::StructOptToml;
use darkfi::{
async_daemonize, cli_desc,
dht::{Dht, DhtHandler, DhtSettings, DhtSettingsOpt},
geode::hash_to_string,
net::{session::SESSION_DEFAULT, settings::SettingsOpt, P2p, Settings as NetSettings},
async_daemonize,
dht::DhtHandler,
net::{session::SESSION_DEFAULT, P2p, Settings as NetSettings},
rpc::{
jsonrpc::JsonSubscriber,
server::{listen_and_serve, RequestHandler},
settings::{RpcSettings, RpcSettingsOpt},
settings::RpcSettings,
},
system::{Publisher, StoppableTask},
util::path::expand_path,
Error, Result,
};
use fud::{
get_node_id,
proto::{FudFindNodesReply, ProtocolFud},
rpc::JsonRpcInterface,
tasks::{announce_seed_task, get_task},
settings::{Args, CONFIG_FILE, CONFIG_FILE_CONTENTS},
tasks::{announce_seed_task, get_task, node_id_task},
Fud,
};
const CONFIG_FILE: &str = "fud_config.toml";
const CONFIG_FILE_CONTENTS: &str = include_str!("../fud_config.toml");
const NODE_ID_PATH: &str = "node_id";
#[derive(Clone, Debug, serde::Deserialize, StructOpt, StructOptToml)]
#[serde(default)]
#[structopt(name = "fud", about = cli_desc!())]
struct Args {
#[structopt(short, parse(from_occurrences))]
/// Increase verbosity (-vvv supported)
verbose: u8,
#[structopt(short, long)]
/// Configuration file to use
config: Option<String>,
#[structopt(long)]
/// Set log file path to output daemon logs into
log: Option<String>,
#[structopt(long, default_value = "~/.local/share/darkfi/fud")]
/// Base directory for filesystem storage
base_dir: String,
#[structopt(short, long)]
/// Default path to store downloaded files (defaults to <base_dir>/downloads)
downloads_path: Option<String>,
#[structopt(long, default_value = "60")]
/// Chunk transfer timeout in seconds
chunk_timeout: u64,
#[structopt(flatten)]
/// Network settings
net: SettingsOpt,
#[structopt(flatten)]
/// JSON-RPC settings
rpc: RpcSettingsOpt,
#[structopt(flatten)]
/// DHT settings
dht: DhtSettingsOpt,
}
async_daemonize!(realmain);
async fn realmain(args: Args, ex: Arc<Executor<'static>>) -> Result<()> {
// The working directory for this daemon and geode.
let basedir = expand_path(&args.base_dir)?;
// The directory to store the downloaded files
let downloads_path = match args.downloads_path {
Some(downloads_path) => expand_path(&downloads_path)?,
None => basedir.join("downloads"),
};
// Cloned args
let args_ = args.clone();
// Sled database init
info!("Instantiating database");
info!(target: "fud", "Instantiating database");
let sled_db = sled::open(basedir.join("db"))?;
info!("Instantiating P2P network");
info!(target: "fud", "Instantiating P2P network");
let net_settings: NetSettings = args.net.into();
let p2p = P2p::new(net_settings.clone(), ex.clone()).await?;
info!("Starting dnet subs task");
info!(target: "fud", "Starting dnet subs task");
let dnet_sub = JsonSubscriber::new("dnet.subscribe_events");
let dnet_sub_ = dnet_sub.clone();
let p2p_ = p2p.clone();
@@ -133,28 +83,11 @@ async fn realmain(args: Args, ex: Arc<Executor<'static>>) -> Result<()> {
ex.clone(),
);
let mut node_id_path = basedir.to_path_buf();
node_id_path.push(NODE_ID_PATH);
let node_id = get_node_id(&node_id_path).await?;
info!(target: "fud", "Your node ID: {}", hash_to_string(&node_id));
// Daemon instantiation
let event_pub = Publisher::new();
let dht_settings: DhtSettings = args.dht.into();
let dht: Arc<Dht> = Arc::new(Dht::new(&node_id, &dht_settings, p2p.clone(), ex.clone()).await);
let fud: Arc<Fud> = Arc::new(
Fud::new(
p2p.clone(),
basedir,
downloads_path,
args.chunk_timeout,
dht.clone(),
&sled_db,
event_pub.clone(),
)
.await?,
);
let fud: Arc<Fud> =
Arc::new(Fud::new(args_, p2p.clone(), &sled_db, event_pub.clone(), ex.clone()).await?);
info!(target: "fud", "Starting download subs task");
let event_sub = JsonSubscriber::new("event");
@@ -210,7 +143,7 @@ async fn realmain(args: Args, ex: Arc<Executor<'static>>) -> Result<()> {
ex.clone(),
);
info!("Starting P2P protocols");
info!(target: "fud", "Starting P2P protocols");
let registry = p2p.protocol_registry();
let fud_ = fud.clone();
registry
@@ -256,6 +189,21 @@ async fn realmain(args: Args, ex: Arc<Executor<'static>>) -> Result<()> {
ex.clone(),
);
info!(target: "fud", "Starting node ID task");
let node_task = StoppableTask::new();
let fud_ = fud.clone();
node_task.clone().start(
async move { node_id_task(fud_.clone()).await },
|res| async {
match res {
Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ }
Err(e) => error!(target: "fud", "Failed starting node ID task: {e}"),
}
},
Error::DetachedTaskStopped,
ex.clone(),
);
// Signal handling for graceful termination.
let (signals_handler, signals_task) = SignalHandler::new(ex)?;
signals_handler.wait_termination(signals_task).await?;
@@ -273,10 +221,13 @@ async fn realmain(args: Args, ex: Arc<Executor<'static>>) -> Result<()> {
info!(target: "fud", "Stopping P2P network...");
p2p.stop().await;
info!(target: "fud", "Stopping DHT tasks");
info!(target: "fud", "Stopping DHT tasks...");
dht_channel_task.stop().await;
announce_task.stop().await;
info!(target: "fud", "Stopping node ID task...");
node_task.stop().await;
info!(target: "fud", "Flushing sled database...");
let flushed_bytes = sled_db.flush_async().await?;
info!(target: "fud", "Flushed {flushed_bytes} bytes");

265
bin/fud/fud/src/pow.rs Normal file
View File

@@ -0,0 +1,265 @@
/* This file is part of DarkFi (https://dark.fi)
*
* Copyright (C) 2020-2024 Dyne.org foundation
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
use std::{
io::{Error as IoError, Read, Result as IoResult, Write},
sync::Arc,
};
use log::info;
use rand::rngs::OsRng;
use smol::lock::RwLock;
use structopt::StructOpt;
use url::Url;
use darkfi::{system::ExecutorPtr, Error, Result};
use darkfi_sdk::crypto::{Keypair, PublicKey, SecretKey};
use darkfi_serial::{
async_trait, AsyncDecodable, AsyncEncodable, AsyncRead, AsyncWrite, Decodable, Encodable,
};
use crate::{
bitcoin::{BitcoinBlockHash, BitcoinHashCache},
equix::{Challenge, EquiXBuilder, EquiXPow, Solution, SolverMemory, NONCE_LEN},
};
#[derive(Clone, Debug)]
pub struct PowSettings {
/// Equi-X effort value
pub equix_effort: u32,
/// Number of latest BTC block hashes that are valid for fud's PoW
pub btc_hash_count: usize,
/// Electrum nodes timeout in seconds
pub btc_timeout: u64,
/// Electrum nodes used to fetch the latest block hashes (used in fud's PoW)
pub btc_electrum_nodes: Vec<Url>,
}
impl Default for PowSettings {
fn default() -> Self {
Self {
equix_effort: 10000,
btc_hash_count: 144,
btc_timeout: 15,
btc_electrum_nodes: vec![],
}
}
}
#[derive(Clone, Debug, serde::Deserialize, structopt::StructOpt, structopt_toml::StructOptToml)]
#[structopt()]
#[serde(rename = "pow")]
pub struct PowSettingsOpt {
/// Equi-X effort value
#[structopt(long)]
pub equix_effort: Option<u32>,
/// Number of latest BTC block hashes that are valid for fud's PoW
#[structopt(long)]
pub btc_hash_count: Option<usize>,
/// Electrum nodes timeout in seconds
#[structopt(long)]
pub btc_timeout: Option<u64>,
/// Electrum nodes used to fetch the latest block hashes (used in fud's PoW)
#[structopt(long, use_delimiter = true)]
pub btc_electrum_nodes: Vec<Url>,
}
impl From<PowSettingsOpt> for PowSettings {
fn from(opt: PowSettingsOpt) -> Self {
let def = PowSettings::default();
Self {
equix_effort: opt.equix_effort.unwrap_or(def.equix_effort),
btc_hash_count: opt.btc_hash_count.unwrap_or(def.btc_hash_count),
btc_timeout: opt.btc_timeout.unwrap_or(def.btc_timeout),
btc_electrum_nodes: opt.btc_electrum_nodes,
}
}
}
/// Struct handling a [`EquiXPow`] instance to generate and verify [`VerifiableNodeData`].
pub struct FudPow {
pub settings: Arc<RwLock<PowSettings>>,
pub bitcoin_hash_cache: BitcoinHashCache,
equix_pow: EquiXPow,
}
impl FudPow {
pub fn new(settings: PowSettings, ex: ExecutorPtr) -> Self {
let pow_settings: Arc<RwLock<PowSettings>> = Arc::new(RwLock::new(settings));
let bitcoin_hash_cache = BitcoinHashCache::new(pow_settings.clone(), ex.clone());
Self {
settings: pow_settings,
bitcoin_hash_cache,
equix_pow: EquiXPow {
effort: 0, // will be set when we call `generate_node()`
challenge: Challenge::new(&[], &[0u8; NONCE_LEN]),
equix: EquiXBuilder::default(),
mem: SolverMemory::default(),
},
}
}
/// Generate a random keypair and run the PoW to get a [`VerifiableNodeData`].
pub async fn generate_node(&mut self) -> Result<(VerifiableNodeData, SecretKey)> {
info!(target: "fud::FudPow::generate_node()", "Generating a new node id...");
// Generate a random keypair
let keypair = Keypair::random(&mut OsRng);
// Get a recent Bitcoin block hash
let n = 3;
let btc_block_hash = {
let block_hashes = &self.bitcoin_hash_cache.block_hashes;
if block_hashes.is_empty() {
return Err(Error::Custom(
"Can't generate a node id without BTC block hashes".into(),
));
}
let block_hash = if n > block_hashes.len() {
block_hashes.last()
} else {
block_hashes.get(block_hashes.len() - 1 - n)
};
if block_hash.is_none() {
return Err(Error::Custom("Could not find a recent BTC block hash".into()));
}
*block_hash.unwrap()
};
// Update the effort using the value from `self.settings`
self.equix_pow.effort = self.settings.read().await.equix_effort;
// Construct Equi-X challenge
self.equix_pow.challenge = Challenge::new(
&[keypair.public.to_bytes(), btc_block_hash].concat(),
&[0u8; NONCE_LEN],
);
// Evaluate PoW
info!(target: "fud::FudPow::generate_node()", "Equi-X Proof-of-Work starts...");
let solution =
self.equix_pow.run().map_err(|e| Error::Custom(format!("Equi-X error: {e}")))?;
info!(target: "fud::FudPow::generate_node()", "Equi-X Proof-of-Work is done");
// Create the VerifiableNodeData
Ok((
VerifiableNodeData {
public_key: keypair.public,
btc_block_hash,
nonce: self.equix_pow.challenge.nonce(),
solution,
},
keypair.secret,
))
}
/// Check if the Equi-X solution in a [`VerifiableNodeData`] is valid and has enough effort.
pub async fn verify_node(&mut self, node_data: &VerifiableNodeData) -> Result<()> {
// Update the effort using the value from `self.settings`
self.equix_pow.effort = self.settings.read().await.equix_effort;
// Verify if the Bitcoin block hash is known
if !self.bitcoin_hash_cache.block_hashes.contains(&node_data.btc_block_hash) {
return Err(Error::Custom(
"Error verifying node data: the BTC block hash is unknown".into(),
))
}
// Verify the solution
self.equix_pow
.verify(&node_data.challenge(), &node_data.solution)
.map_err(|e| Error::Custom(format!("Error verifying Equi-X solution: {e}")))
}
}
/// The data needed to verify a fud PoW.
#[derive(Debug, Clone)]
pub struct VerifiableNodeData {
pub public_key: PublicKey,
pub btc_block_hash: BitcoinBlockHash,
pub nonce: [u8; NONCE_LEN],
pub solution: Solution,
}
impl VerifiableNodeData {
/// The node id on the DHT.
pub fn id(&self) -> blake3::Hash {
blake3::hash(&[self.challenge().to_bytes(), self.solution.to_bytes().to_vec()].concat())
}
/// The Equi-X challenge.
pub fn challenge(&self) -> Challenge {
Challenge::new(&[self.public_key.to_bytes(), self.btc_block_hash].concat(), &self.nonce)
}
}
impl Encodable for VerifiableNodeData {
fn encode<S: Write>(&self, s: &mut S) -> IoResult<usize> {
let mut len = 0;
len += self.public_key.encode(s)?;
len += self.btc_block_hash.encode(s)?;
len += self.nonce.encode(s)?;
len += self.solution.to_bytes().encode(s)?;
Ok(len)
}
}
#[async_trait]
impl AsyncEncodable for VerifiableNodeData {
async fn encode_async<S: AsyncWrite + Unpin + Send>(&self, s: &mut S) -> IoResult<usize> {
let mut len = 0;
len += self.public_key.encode_async(s).await?;
len += self.btc_block_hash.encode_async(s).await?;
len += self.nonce.encode_async(s).await?;
len += self.solution.to_bytes().encode_async(s).await?;
Ok(len)
}
}
impl Decodable for VerifiableNodeData {
fn decode<D: Read>(d: &mut D) -> IoResult<Self> {
Ok(Self {
public_key: PublicKey::decode(d)?,
btc_block_hash: BitcoinBlockHash::decode(d)?,
nonce: <[u8; NONCE_LEN]>::decode(d)?,
solution: Solution::try_from_bytes(&<[u8; Solution::NUM_BYTES]>::decode(d)?)
.map_err(|e| IoError::other(format!("Error parsing Equi-X solution: {e}")))?,
})
}
}
#[async_trait]
impl AsyncDecodable for VerifiableNodeData {
async fn decode_async<D: AsyncRead + Unpin + Send>(d: &mut D) -> IoResult<Self> {
Ok(Self {
public_key: PublicKey::decode_async(d).await?,
btc_block_hash: BitcoinBlockHash::decode_async(d).await?,
nonce: <[u8; NONCE_LEN]>::decode_async(d).await?,
solution: Solution::try_from_bytes(
&<[u8; Solution::NUM_BYTES]>::decode_async(d).await?,
)
.map_err(|e| IoError::other(format!("Error parsing Equi-X solution: {e}")))?,
})
}
}

View File

@@ -22,7 +22,7 @@ use smol::Executor;
use std::{path::StripPrefixError, sync::Arc};
use darkfi::{
dht::{DhtHandler, DhtNode, DhtRouterItem},
dht::{DhtHandler, DhtRouterItem},
geode::hash_to_string,
impl_p2p_message,
net::{
@@ -32,9 +32,10 @@ use darkfi::{
},
Error, Result,
};
use darkfi_sdk::crypto::schnorr::{SchnorrSecret, Signature};
use darkfi_serial::{SerialDecodable, SerialEncodable};
use super::Fud;
use super::{Fud, FudNode};
/// Message representing a file reply from the network
#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
@@ -55,7 +56,7 @@ impl_p2p_message!(FudDirectoryReply, "FudDirectoryReply", 0, 0, DEFAULT_METERING
#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
pub struct FudAnnounce {
pub key: blake3::Hash,
pub seeders: Vec<DhtRouterItem>,
pub seeders: Vec<DhtRouterItem<FudNode>>,
}
impl_p2p_message!(FudAnnounce, "FudAnnounce", 0, 0, DEFAULT_METERING_CONFIGURATION);
@@ -74,13 +75,17 @@ impl_p2p_message!(FudNotFound, "FudNotFound", 0, 0, DEFAULT_METERING_CONFIGURATI
/// Message representing a ping request on the network
#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
pub struct FudPingRequest;
pub struct FudPingRequest {
pub random: u64,
}
impl_p2p_message!(FudPingRequest, "FudPingRequest", 0, 0, DEFAULT_METERING_CONFIGURATION);
/// Message representing a ping reply on the network
#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
pub struct FudPingReply {
pub node: DhtNode,
pub node: FudNode,
/// Signature of the random u64 from the ping request
pub sig: Signature,
}
impl_p2p_message!(FudPingReply, "FudPingReply", 0, 0, DEFAULT_METERING_CONFIGURATION);
@@ -102,7 +107,7 @@ impl_p2p_message!(FudFindNodesRequest, "FudFindNodesRequest", 0, 0, DEFAULT_METE
/// Message representing a find nodes reply on the network
#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
pub struct FudFindNodesReply {
pub nodes: Vec<DhtNode>,
pub nodes: Vec<FudNode>,
}
impl_p2p_message!(FudFindNodesReply, "FudFindNodesReply", 0, 0, DEFAULT_METERING_CONFIGURATION);
@@ -122,7 +127,7 @@ impl_p2p_message!(
/// Message representing a find seeders reply on the network
#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
pub struct FudFindSeedersReply {
pub seeders: Vec<DhtRouterItem>,
pub seeders: Vec<DhtRouterItem<FudNode>>,
}
impl_p2p_message!(FudFindSeedersReply, "FudFindSeedersReply", 0, 0, DEFAULT_METERING_CONFIGURATION);
@@ -174,7 +179,7 @@ impl ProtocolFud {
debug!(target: "fud::ProtocolFud::handle_fud_ping_request()", "START");
loop {
let _ = match self.ping_request_sub.receive().await {
let ping_req = match self.ping_request_sub.receive().await {
Ok(v) => v,
Err(Error::ChannelStopped) => continue,
Err(e) => {
@@ -182,9 +187,12 @@ impl ProtocolFud {
continue
}
};
info!(target: "fud::ProtocolFud::handle_fud_ping_request()", "Received PING");
info!(target: "fud::ProtocolFud::handle_fud_ping_request()", "Received PING REQUEST");
let reply = FudPingReply { node: self.fud.dht.node().await };
let reply = FudPingReply {
node: self.fud.node().await,
sig: self.fud.secret_key.read().await.sign(&ping_req.random.to_be_bytes()),
};
match self.channel.send(&reply).await {
Ok(()) => continue,
Err(_e) => continue,
@@ -253,7 +261,7 @@ impl ProtocolFud {
return false;
}
let reply = FudChunkReply { chunk };
info!(target: "fud::ProtocolFud::handle_fud_find_request()", "Sending chunk {}", hash_to_string(&request.key));
info!(target: "fud::ProtocolFud::handle_fud_chunk_request()", "Sending chunk {}", hash_to_string(&request.key));
let _ = self.channel.send(&reply).await;
return true;
}

View File

@@ -27,6 +27,7 @@ use std::{
use tinyjson::JsonValue;
use darkfi::{
dht::DhtNode,
geode::hash_to_string,
net::P2pPtr,
rpc::{
@@ -265,11 +266,11 @@ impl JsonRpcInterface {
let mut nodes = vec![];
for node in bucket.nodes.clone() {
let mut addresses = vec![];
for addr in node.addresses {
for addr in &node.addresses {
addresses.push(JsonValue::String(addr.to_string()));
}
nodes.push(JsonValue::Array(vec![
JsonValue::String(hash_to_string(&node.id)),
JsonValue::String(hash_to_string(&node.id())),
JsonValue::Array(addresses),
]));
}
@@ -293,7 +294,7 @@ impl JsonRpcInterface {
for (hash, items) in self.fud.seeders_router.read().await.iter() {
let mut node_ids = vec![];
for item in items {
node_ids.push(JsonValue::String(hash_to_string(&item.node.id)));
node_ids.push(JsonValue::String(hash_to_string(&item.node.id())));
}
seeders_router.insert(hash_to_string(hash), JsonValue::Array(node_ids));
}

View File

@@ -0,0 +1,74 @@
/* This file is part of DarkFi (https://dark.fi)
*
* Copyright (C) 2020-2025 Dyne.org foundation
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
use structopt::StructOpt;
use structopt_toml::{serde::Deserialize, StructOptToml};
use darkfi::{
cli_desc, dht::DhtSettingsOpt, net::settings::SettingsOpt, rpc::settings::RpcSettingsOpt,
};
use crate::pow::PowSettingsOpt;
pub const CONFIG_FILE: &str = "fud_config.toml";
pub const CONFIG_FILE_CONTENTS: &str = include_str!("../fud_config.toml");
#[derive(Clone, Debug, Deserialize, StructOpt, StructOptToml)]
#[serde(default)]
#[structopt(name = "fud", about = cli_desc!())]
pub struct Args {
#[structopt(short, parse(from_occurrences))]
/// Increase verbosity (-vvv supported)
pub verbose: u8,
#[structopt(short, long)]
/// Configuration file to use
pub config: Option<String>,
#[structopt(long)]
/// Set log file path to output daemon logs into
pub log: Option<String>,
#[structopt(long, default_value = "~/.local/share/darkfi/fud")]
/// Base directory for filesystem storage
pub base_dir: String,
#[structopt(short, long)]
/// Default path to store downloaded files (defaults to <base_dir>/downloads)
pub downloads_path: Option<String>,
#[structopt(long, default_value = "60")]
/// Chunk transfer timeout in seconds
pub chunk_timeout: u64,
#[structopt(flatten)]
/// Network settings
pub net: SettingsOpt,
#[structopt(flatten)]
/// JSON-RPC settings
pub rpc: RpcSettingsOpt,
#[structopt(flatten)]
/// DHT settings
pub dht: DhtSettingsOpt,
#[structopt(flatten)]
/// PoW settings
pub pow: PowSettingsOpt,
}

View File

@@ -16,11 +16,13 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
use log::{error, info};
use std::sync::Arc;
use std::{collections::HashMap, sync::Arc};
use log::{error, info, warn};
use darkfi::{
dht::DhtHandler,
dht::{DhtHandler, DhtNode},
geode::hash_to_string,
system::{sleep, ExecutorPtr, StoppableTask},
Error, Result,
};
@@ -89,7 +91,7 @@ pub async fn announce_seed_task(fud: Arc<Fud>) -> Result<()> {
loop {
sleep(interval).await;
let seeders = vec![fud.dht().node().await.into()];
let seeders = vec![fud.node().await.into()];
info!(target: "fud::announce_seed_task()", "Verifying seeds...");
let seeding_resources = match fud.verify_resources(None).await {
@@ -115,3 +117,90 @@ pub async fn announce_seed_task(fud: Arc<Fud>) -> Result<()> {
fud.dht().prune_router(fud.seeders_router.clone(), interval.try_into().unwrap()).await;
}
}
/// Background task that:
/// 1. Updates the [`crate::bitcoin::BitcoinHashCache`]
/// 2. Removes old nodes from the DHT
/// 3. Removes old nodes from the seeders router
/// 4. If the Bitcoin block hash we currently use in our `fud.node_data` is too old, we update it and reset our DHT
pub async fn node_id_task(fud: Arc<Fud>) -> Result<()> {
let interval = 600; // TODO: Make a setting
loop {
sleep(interval).await;
let mut pow = fud.pow.write().await;
let btc = &mut pow.bitcoin_hash_cache;
if btc.update().await.is_err() {
continue
}
let block = fud.node_data.read().await.btc_block_hash;
let needs_dht_reset = match btc.block_hashes.iter().position(|b| *b == block) {
Some(i) => i < 6,
None => true,
};
if !needs_dht_reset {
// Removes nodes in the DHT with unknown BTC block hashes.
let dht = fud.dht();
let mut buckets = dht.buckets.write().await;
for bucket in buckets.iter_mut() {
for (i, node) in bucket.nodes.clone().iter().enumerate().rev() {
// If this node's BTC block hash is unknown, remove it from the bucket
if !btc.block_hashes.contains(&node.data.btc_block_hash) {
bucket.nodes.remove(i);
info!(target: "fud::node_id_task()", "Removed node {} from the DHT (BTC block hash too old or unknown)", hash_to_string(&node.id()));
}
}
}
drop(buckets);
// Removes nodes in the seeders router with unknown BTC block hashes
let mut seeders_router = fud.seeders_router.write().await;
for (key, seeders) in seeders_router.iter_mut() {
for seeder in seeders.clone().iter() {
if !btc.block_hashes.contains(&seeder.node.data.btc_block_hash) {
seeders.remove(seeder);
info!(target: "fud::node_id_task()", "Removed node {} from the seeders of key {} (BTC block hash too old or unknown)", hash_to_string(&seeder.node.id()), hash_to_string(key));
}
}
}
continue
}
info!(target: "fud::node_id_task()", "Creating a new node id...");
let (node_data, secret_key) = match pow.generate_node().await {
Ok(res) => res,
Err(e) => {
warn!(target: "fud::node_id_task()", "Error creating a new node id: {e}");
continue
}
};
drop(pow);
info!(target: "fud::node_id_task()", "New node id: {}", hash_to_string(&node_data.id()));
// Close all channels
let dht = fud.dht();
let mut channel_cache = dht.channel_cache.write().await;
for channel in dht.p2p.hosts().channels().clone() {
channel.stop().await;
channel_cache.remove(&channel.info.id);
}
drop(channel_cache);
// Reset the DHT
dht.reset().await;
// Reset the seeders router
*fud.seeders_router.write().await = HashMap::new();
// Update our node data and our secret key
*fud.node_data.write().await = node_data;
*fud.secret_key.write().await = secret_key;
// DHT will be bootstrapped on the next channel connection
}
}

View File

@@ -40,27 +40,33 @@ use crate::{
};
#[async_trait]
pub trait DhtHandler {
fn dht(&self) -> Arc<Dht>;
pub trait DhtHandler<N: DhtNode> {
fn dht(&self) -> Arc<Dht<N>>;
/// Get our own node
async fn node(&self) -> N;
/// Send a DHT ping request
async fn ping(&self, channel: ChannelPtr) -> Result<DhtNode>;
async fn ping(&self, channel: ChannelPtr) -> Result<N>;
/// Triggered when we find a new node
async fn on_new_node(&self, node: &DhtNode) -> Result<()>;
async fn on_new_node(&self, node: &N) -> Result<()>;
/// Send FIND NODES request to a peer to get nodes close to `key`
async fn fetch_nodes(&self, node: &DhtNode, key: &blake3::Hash) -> Result<Vec<DhtNode>>;
async fn fetch_nodes(&self, node: &N, key: &blake3::Hash) -> Result<Vec<N>>;
/// Announce message for a key, and add ourselves to router
async fn announce<M: Message>(
&self,
key: &blake3::Hash,
message: &M,
router: DhtRouterPtr,
) -> Result<()> {
let self_node = self.dht().node().await;
if self_node.addresses.is_empty() {
router: DhtRouterPtr<N>,
) -> Result<()>
where
N: 'async_trait,
{
let self_node = self.node().await;
if self_node.addresses().is_empty() {
return Err(().into()); // TODO
}
@@ -79,6 +85,19 @@ pub trait DhtHandler {
Ok(())
}
/// Lookup our own node id to bootstrap our DHT
async fn bootstrap(&self) {
self.dht().set_bootstrapped(true).await;
let self_node_id = self.node().await.id();
debug!(target: "dht::DhtHandler::bootstrap()", "DHT bootstrapping {}", hash_to_string(&self_node_id));
let nodes = self.lookup_nodes(&self_node_id).await;
if nodes.is_err() || nodes.map_or(true, |v| v.is_empty()) {
self.dht().set_bootstrapped(false).await;
}
}
/// Send a DHT ping request when there is a new channel, to know the node id of the new peer,
/// Then fill the channel cache and the buckets
async fn channel_task<M: Message>(&self) -> Result<()> {
@@ -106,7 +125,7 @@ pub trait DhtHandler {
if let Err(e) = ping_res {
warn!(target: "dht::DhtHandler::channel_task()", "Error while pinging (requesting node id) {}: {e}", channel.address());
channel.stop().await;
// channel.stop().await;
continue;
}
@@ -119,7 +138,7 @@ pub trait DhtHandler {
});
drop(channel_cache);
if !node.addresses.is_empty() {
if !node.addresses().is_empty() {
self.add_node(node.clone()).await;
let _ = self.on_new_node(&node.clone()).await;
}
@@ -127,24 +146,35 @@ pub trait DhtHandler {
}
/// Add a node in the correct bucket
async fn add_node(&self, node: DhtNode) {
async fn add_node(&self, node: N)
where
N: 'async_trait,
{
let self_node = self.node().await;
// Do not add ourselves to the buckets
if node.id == self.dht().node_id {
if node.id() == self_node.id() {
return;
}
// Don't add this node if it has any external address that is the same as one of ours
let node_addresses = node.addresses();
if self_node.addresses().iter().any(|addr| node_addresses.contains(addr)) {
return;
}
// Do not add a node to the buckets if it does not have an address
if node.addresses.is_empty() {
if node.addresses().is_empty() {
return;
}
let bucket_index = self.dht().get_bucket_index(&node.id).await;
let bucket_index = self.dht().get_bucket_index(&self.node().await.id(), &node.id()).await;
let buckets_lock = self.dht().buckets.clone();
let mut buckets = buckets_lock.write().await;
let bucket = &mut buckets[bucket_index];
// Node is already in the bucket
if bucket.nodes.iter().any(|n| n.id == node.id) {
if bucket.nodes.iter().any(|n| n.id() == node.id()) {
return;
}
@@ -175,14 +205,15 @@ pub trait DhtHandler {
/// Move a node to the tail in its bucket,
/// to show that it is the most recently seen in the bucket.
/// If the node is not in a bucket it will be added using `add_node`
async fn update_node(&self, node: &DhtNode) {
let bucket_index = self.dht().get_bucket_index(&node.id).await;
async fn update_node(&self, node: &N) {
let bucket_index = self.dht().get_bucket_index(&self.node().await.id(), &node.id()).await;
let buckets_lock = self.dht().buckets.clone();
let mut buckets = buckets_lock.write().await;
let bucket = &mut buckets[bucket_index];
let node_index = bucket.nodes.iter().position(|n| n.id == node.id);
let node_index = bucket.nodes.iter().position(|n| n.id() == node.id());
if node_index.is_none() {
drop(buckets);
self.add_node(node.clone()).await;
return;
}
@@ -196,17 +227,21 @@ pub trait DhtHandler {
async fn fetch_nodes_sp(
&self,
semaphore: Arc<Semaphore>,
node: DhtNode,
node: N,
key: &blake3::Hash,
) -> (DhtNode, Result<Vec<DhtNode>>) {
) -> (N, Result<Vec<N>>)
where
N: 'async_trait,
{
let _permit = semaphore.acquire().await;
(node.clone(), self.fetch_nodes(&node, key).await)
}
/// Find `k` nodes closest to a key
async fn lookup_nodes(&self, key: &blake3::Hash) -> Result<Vec<DhtNode>> {
async fn lookup_nodes(&self, key: &blake3::Hash) -> Result<Vec<N>> {
info!(target: "dht::DhtHandler::lookup_nodes()", "Starting node lookup for key {}", bs58::encode(key.as_bytes()).into_string());
let self_node_id = self.node().await.id();
let k = self.dht().settings.k;
let a = self.dht().settings.alpha;
let semaphore = Arc::new(Semaphore::new(self.dht().settings.concurrency));
@@ -217,13 +252,13 @@ pub trait DhtHandler {
// Nodes with a pending request or a request completed
let mut visited_nodes = HashSet::<blake3::Hash>::new();
// Nodes that responded to our request, sorted by distance from `key`
let mut result = Vec::<DhtNode>::new();
let mut result = Vec::<N>::new();
// Create the first `alpha` tasks
for _ in 0..a {
match nodes_to_visit.pop() {
Some(node) => {
visited_nodes.insert(node.id);
visited_nodes.insert(node.id());
futures.push(self.fetch_nodes_sp(semaphore.clone(), node, key));
}
None => {
@@ -235,13 +270,13 @@ pub trait DhtHandler {
while let Some((queried_node, value_result)) = futures.next().await {
match value_result {
Ok(mut nodes) => {
info!(target: "dht::DhtHandler::lookup_nodes", "Queried {}, got {} nodes", bs58::encode(queried_node.id.as_bytes()).into_string(), nodes.len());
info!(target: "dht::DhtHandler::lookup_nodes", "Queried {}, got {} nodes", bs58::encode(queried_node.id().as_bytes()).into_string(), nodes.len());
// Remove ourselves and already known nodes from the new nodes
nodes.retain(|node| {
node.id != self.dht().node_id &&
!visited_nodes.contains(&node.id) &&
!nodes_to_visit.iter().any(|n| n.id == node.id)
node.id() != self_node_id &&
!visited_nodes.contains(&node.id()) &&
!nodes_to_visit.iter().any(|n| n.id() == node.id())
});
// Add new nodes to our buckets
@@ -263,10 +298,11 @@ pub trait DhtHandler {
if result.len() >= k {
if let Some(furthest) = result.last() {
if let Some(next_node) = nodes_to_visit.first() {
let furthest_dist =
BigUint::from_bytes_be(&self.dht().distance(key, &furthest.id));
let furthest_dist = BigUint::from_bytes_be(
&self.dht().distance(key, &furthest.id()),
);
let next_dist = BigUint::from_bytes_be(
&self.dht().distance(key, &next_node.id),
&self.dht().distance(key, &next_node.id()),
);
if furthest_dist < next_dist {
info!(target: "dht::DhtHandler::lookup_nodes", "Early termination for lookup nodes");
@@ -280,7 +316,7 @@ pub trait DhtHandler {
for _ in 0..a {
match nodes_to_visit.pop() {
Some(node) => {
visited_nodes.insert(node.id);
visited_nodes.insert(node.id());
futures.push(self.fetch_nodes_sp(semaphore.clone(), node, key));
}
None => {
@@ -301,12 +337,12 @@ pub trait DhtHandler {
/// Get a channel (existing or create a new one) to `node` about `topic`.
/// Don't forget to call `cleanup_channel()` once you are done with it.
async fn get_channel(&self, node: &DhtNode, topic: Option<blake3::Hash>) -> Result<ChannelPtr> {
async fn get_channel(&self, node: &N, topic: Option<blake3::Hash>) -> Result<ChannelPtr> {
let channel_cache_lock = self.dht().channel_cache.clone();
let mut channel_cache = channel_cache_lock.write().await;
// Get existing channels for this node, regardless of topic
let channels: HashMap<u32, ChannelCacheItem> = channel_cache
let channels: HashMap<u32, ChannelCacheItem<N>> = channel_cache
.iter()
.filter(|&(_, item)| item.node == *node)
.map(|(&key, item)| (key, item.clone()))
@@ -358,7 +394,7 @@ pub trait DhtHandler {
drop(channel_cache);
// Create a channel
for addr in node.addresses.clone() {
for addr in node.addresses().clone() {
let session_out = self.dht().p2p.session_outbound();
let session_weak = Arc::downgrade(&self.dht().p2p.session_outbound());
@@ -422,12 +458,14 @@ pub trait DhtHandler {
/// Add nodes as a provider for a key
async fn add_to_router(
&self,
router: DhtRouterPtr,
router: DhtRouterPtr<N>,
key: &blake3::Hash,
router_items: Vec<DhtRouterItem>,
) {
router_items: Vec<DhtRouterItem<N>>,
) where
N: 'async_trait,
{
let mut router_items = router_items.clone();
router_items.retain(|item| !item.node.addresses.is_empty());
router_items.retain(|item| !item.node.addresses().is_empty());
debug!(target: "dht::DhtHandler::add_to_router()", "Inserting {} nodes to key {}", router_items.len(), bs58::encode(key.as_bytes()).into_string());
@@ -449,13 +487,13 @@ pub trait DhtHandler {
// Add to router_cache
for router_item in router_items {
let keys = router_cache.get_mut(&router_item.node.id);
let keys = router_cache.get_mut(&router_item.node.id());
if let Some(k) = keys {
k.insert(*key);
} else {
let mut keys = HashSet::new();
keys.insert(*key);
router_cache.insert(router_item.node.id, keys);
router_cache.insert(router_item.node.id(), keys);
}
}
}

View File

@@ -16,14 +16,18 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
use std::{
cmp::Eq,
collections::{HashMap, HashSet},
fmt::Debug,
hash::{Hash, Hasher},
marker::{Send, Sync},
sync::Arc,
};
use async_trait::async_trait;
use num_bigint::BigUint;
use smol::lock::RwLock;
use std::{
collections::{HashMap, HashSet},
hash::{Hash, Hasher},
sync::Arc,
};
use url::Url;
use darkfi_serial::{SerialDecodable, SerialEncodable};
@@ -36,59 +40,65 @@ pub use settings::{DhtSettings, DhtSettingsOpt};
pub mod handler;
pub use handler::DhtHandler;
#[derive(Debug, Clone, SerialEncodable, SerialDecodable, Eq)]
pub struct DhtNode {
pub id: blake3::Hash,
pub addresses: Vec<Url>,
pub trait DhtNode: Debug + Clone + Send + Sync + PartialEq + Eq + Hash {
fn id(&self) -> blake3::Hash;
fn addresses(&self) -> Vec<Url>;
}
impl Hash for DhtNode {
fn hash<H: Hasher>(&self, state: &mut H) {
self.id.hash(state);
/// Implements default Hash, PartialEq, and Eq for a struct implementing [`DhtNode`]
#[macro_export]
macro_rules! impl_dht_node_defaults {
($t:ty) => {
impl std::hash::Hash for $t {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.id().hash(state);
}
}
impl PartialEq for DhtNode {
}
impl std::cmp::PartialEq for $t {
fn eq(&self, other: &Self) -> bool {
self.id == other.id
self.id() == other.id()
}
}
impl std::cmp::Eq for $t {}
};
}
pub use impl_dht_node_defaults;
pub struct DhtBucket {
pub nodes: Vec<DhtNode>,
pub struct DhtBucket<N: DhtNode> {
pub nodes: Vec<N>,
}
/// "Router" means: Key -> Set of nodes (+ additional data for each node)
pub type DhtRouterPtr = Arc<RwLock<HashMap<blake3::Hash, HashSet<DhtRouterItem>>>>;
pub type DhtRouterPtr<N> = Arc<RwLock<HashMap<blake3::Hash, HashSet<DhtRouterItem<N>>>>>;
#[derive(Debug, Clone, SerialEncodable, SerialDecodable, Eq)]
pub struct DhtRouterItem {
pub node: DhtNode,
pub struct DhtRouterItem<N: DhtNode> {
pub node: N,
pub timestamp: u64,
}
impl Hash for DhtRouterItem {
impl<N: DhtNode> Hash for DhtRouterItem<N> {
fn hash<H: Hasher>(&self, state: &mut H) {
self.node.id.hash(state);
self.node.id().hash(state);
}
}
impl PartialEq for DhtRouterItem {
impl<N: DhtNode> PartialEq for DhtRouterItem<N> {
fn eq(&self, other: &Self) -> bool {
self.node.id == other.node.id
self.node.id() == other.node.id()
}
}
impl From<DhtNode> for DhtRouterItem {
fn from(node: DhtNode) -> Self {
impl<N: DhtNode> From<N> for DhtRouterItem<N> {
fn from(node: N) -> Self {
DhtRouterItem { node, timestamp: Timestamp::current_time().inner() }
}
}
#[derive(Clone)]
pub struct ChannelCacheItem {
pub struct ChannelCacheItem<N: DhtNode> {
/// The DHT node the channel is connected to.
node: DhtNode,
pub node: N,
/// Topic is a hash that you set to remember what the channel is about,
/// it's not shared with the peer. If you ask for a channel (with
@@ -103,17 +113,15 @@ pub struct ChannelCacheItem {
usage_count: u32,
}
pub struct Dht {
/// Our own node id
pub node_id: blake3::Hash,
pub struct Dht<N: DhtNode> {
/// Are we bootstrapped?
pub bootstrapped: Arc<RwLock<bool>>,
/// Vec of buckets
pub buckets: Arc<RwLock<Vec<DhtBucket>>>,
pub buckets: Arc<RwLock<Vec<DhtBucket<N>>>>,
/// Number of buckets
pub n_buckets: usize,
/// Channel ID -> ChannelCacheItem
pub channel_cache: Arc<RwLock<HashMap<u32, ChannelCacheItem>>>,
pub channel_cache: Arc<RwLock<HashMap<u32, ChannelCacheItem<N>>>>,
/// Node ID -> Set of keys
pub router_cache: Arc<RwLock<HashMap<blake3::Hash, HashSet<blake3::Hash>>>>,
@@ -123,13 +131,8 @@ pub struct Dht {
pub executor: ExecutorPtr,
}
impl Dht {
pub async fn new(
node_id: &blake3::Hash,
settings: &DhtSettings,
p2p: P2pPtr,
ex: ExecutorPtr,
) -> Self {
impl<N: DhtNode> Dht<N> {
pub async fn new(settings: &DhtSettings, p2p: P2pPtr, ex: ExecutorPtr) -> Self {
// Create empty buckets
let mut buckets = vec![];
for _ in 0..256 {
@@ -137,7 +140,6 @@ impl Dht {
}
Self {
node_id: *node_id,
buckets: Arc::new(RwLock::new(buckets)),
n_buckets: 256,
bootstrapped: Arc::new(RwLock::new(false)),
@@ -156,26 +158,9 @@ impl Dht {
*bootstrapped
}
pub async fn set_bootstrapped(&self) {
pub async fn set_bootstrapped(&self, value: bool) {
let mut bootstrapped = self.bootstrapped.write().await;
*bootstrapped = true;
}
/// Get own node
pub async fn node(&self) -> DhtNode {
DhtNode {
id: self.node_id,
addresses: self
.p2p
.clone()
.hosts()
.external_addrs()
.await
.iter()
.filter(|addr| !addr.to_string().contains("[::]"))
.cloned()
.collect(),
}
*bootstrapped = value;
}
/// Get the distance between `key_1` and `key_2`
@@ -193,20 +178,20 @@ impl Dht {
}
/// Sort `nodes` by distance from `key`
pub fn sort_by_distance(&self, nodes: &mut [DhtNode], key: &blake3::Hash) {
pub fn sort_by_distance(&self, nodes: &mut [N], key: &blake3::Hash) {
nodes.sort_by(|a, b| {
let distance_a = BigUint::from_bytes_be(&self.distance(key, &a.id));
let distance_b = BigUint::from_bytes_be(&self.distance(key, &b.id));
let distance_a = BigUint::from_bytes_be(&self.distance(key, &a.id()));
let distance_b = BigUint::from_bytes_be(&self.distance(key, &b.id()));
distance_a.cmp(&distance_b)
});
}
/// `key` -> bucket index
pub async fn get_bucket_index(&self, key: &blake3::Hash) -> usize {
if key == &self.node_id {
pub async fn get_bucket_index(&self, self_node_id: &blake3::Hash, key: &blake3::Hash) -> usize {
if key == self_node_id {
return 0
}
let distance = self.distance(&self.node_id, key);
let distance = self.distance(self_node_id, key);
let mut leading_zeros = 0;
for &byte in &distance {
@@ -224,7 +209,7 @@ impl Dht {
/// Get `n` closest known nodes to a key
/// TODO: Can be optimized
pub async fn find_neighbors(&self, key: &blake3::Hash, n: usize) -> Vec<DhtNode> {
pub async fn find_neighbors(&self, key: &blake3::Hash, n: usize) -> Vec<N> {
let buckets_lock = self.buckets.clone();
let buckets = buckets_lock.read().await;
@@ -244,7 +229,7 @@ impl Dht {
}
/// Channel ID -> DhtNode
pub async fn get_node_from_channel(&self, channel_id: u32) -> Option<DhtNode> {
pub async fn get_node_from_channel(&self, channel_id: u32) -> Option<N> {
let channel_cache_lock = self.channel_cache.clone();
let channel_cache = channel_cache_lock.read().await;
if let Some(cached) = channel_cache.get(&channel_id).cloned() {
@@ -255,7 +240,7 @@ impl Dht {
}
/// Remove nodes in router that are older than expiry_secs
pub async fn prune_router(&self, router: DhtRouterPtr, expiry_secs: u32) {
pub async fn prune_router(&self, router: DhtRouterPtr<N>, expiry_secs: u32) {
let expiry_timestamp = Timestamp::current_time().inner() - (expiry_secs as u64);
let mut router_write = router.write().await;
@@ -269,4 +254,17 @@ impl Dht {
}
}
}
/// Reset the DHT state
pub async fn reset(&self) {
let mut bootstrapped = self.bootstrapped.write().await;
*bootstrapped = false;
let mut buckets = vec![];
for _ in 0..256 {
buckets.push(DhtBucket { nodes: vec![] })
}
*self.buckets.write().await = buckets;
}
}