diff --git a/Cargo.toml b/Cargo.toml index 86a59d8f9..62d4073f5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -307,7 +307,6 @@ dht = [ "smol", "darkfi-serial", - "geode", "net", ] diff --git a/bin/fud/fud/src/dht.rs b/bin/fud/fud/src/dht.rs new file mode 100644 index 000000000..ae7eaed5c --- /dev/null +++ b/bin/fud/fud/src/dht.rs @@ -0,0 +1,235 @@ +/* This file is part of DarkFi (https://dark.fi) + * + * Copyright (C) 2020-2025 Dyne.org foundation + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +use std::sync::Arc; + +use async_trait::async_trait; +use log::debug; +use num_bigint::BigUint; +use rand::{rngs::OsRng, Rng}; +use url::Url; + +use darkfi::{ + dht::{impl_dht_node_defaults, Dht, DhtHandler, DhtLookupReply, DhtNode}, + geode::hash_to_string, + net::ChannelPtr, + util::time::Timestamp, + Error, Result, +}; +use darkfi_sdk::crypto::schnorr::SchnorrPublic; +use darkfi_serial::{SerialDecodable, SerialEncodable}; + +use crate::{ + pow::VerifiableNodeData, + proto::{ + FudAnnounce, FudFindNodesReply, FudFindNodesRequest, FudFindSeedersReply, + FudFindSeedersRequest, FudPingReply, FudPingRequest, + }, + Fud, +}; + +#[derive(Debug, Clone, SerialEncodable, SerialDecodable)] +pub struct FudNode { + pub data: VerifiableNodeData, + pub addresses: Vec, +} +impl_dht_node_defaults!(FudNode); + +impl DhtNode for FudNode { + fn id(&self) -> blake3::Hash { + self.data.id() + } + fn addresses(&self) -> Vec { + self.addresses.clone() + } +} + +/// The values of the DHT are `Vec`, mapping resource hashes to lists of [`FudSeeder`]s +#[derive(Debug, Clone, SerialEncodable, SerialDecodable, Eq)] +pub struct FudSeeder { + /// Resource that this seeder provides + pub key: blake3::Hash, + /// Seeder's node data + pub node: FudNode, + /// When this [`FudSeeder`] was added to our hash table. + /// This is not sent to other nodes. + #[skip_serialize] + pub timestamp: u64, +} + +impl PartialEq for FudSeeder { + fn eq(&self, other: &Self) -> bool { + self.key == other.key && self.node.id() == other.node.id() + } +} + +/// [`DhtHandler`] implementation for fud +#[async_trait] +impl DhtHandler for Fud { + type Value = Vec; + type Node = FudNode; + + fn dht(&self) -> Arc> { + self.dht.clone() + } + + async fn node(&self) -> FudNode { + FudNode { + data: self.node_data.read().await.clone(), + addresses: self + .p2p + .clone() + .hosts() + .external_addrs() + .await + .iter() + .filter(|addr| !addr.to_string().contains("[::]")) + .cloned() + .collect(), + } + } + + async fn ping(&self, channel: ChannelPtr) -> Result { + debug!(target: "fud::DhtHandler::ping()", "Sending ping to channel {}", channel.info.id); + let msg_subsystem = channel.message_subsystem(); + msg_subsystem.add_dispatch::().await; + let msg_subscriber = channel.subscribe_msg::().await.unwrap(); + + // Send `FudPingRequest` + let mut rng = OsRng; + let request = FudPingRequest { random: rng.gen() }; + channel.send(&request).await?; + + // Wait for `FudPingReply` + let reply = msg_subscriber.receive_with_timeout(self.dht().settings.timeout).await; + msg_subscriber.unsubscribe().await; + let reply = reply?; + + // Verify the signature + if !reply.node.data.public_key.verify(&request.random.to_be_bytes(), &reply.sig) { + channel.ban().await; + return Err(Error::InvalidSignature) + } + + // Verify PoW + if let Err(e) = self.pow.write().await.verify_node(&reply.node.data).await { + channel.ban().await; + return Err(e) + } + + Ok(reply.node.clone()) + } + + // TODO: Optimize this + async fn on_new_node(&self, node: &FudNode) -> Result<()> { + debug!(target: "fud::DhtHandler::on_new_node()", "New node {}", hash_to_string(&node.id())); + + // If this is the first node we know about, then bootstrap and announce our files + if !self.dht.is_bootstrapped().await { + let _ = self.init().await; + } + + // Send keys that are closer to this node than we are + let self_id = self.node_data.read().await.id(); + let channel = self.dht.get_channel(node, None).await?; + for (key, seeders) in self.dht.hash_table.read().await.iter() { + let node_distance = BigUint::from_bytes_be(&self.dht().distance(key, &node.id())); + let self_distance = BigUint::from_bytes_be(&self.dht().distance(key, &self_id)); + if node_distance <= self_distance { + let _ = channel.send(&FudAnnounce { key: *key, seeders: seeders.clone() }).await; + } + } + self.dht.cleanup_channel(channel).await; + + Ok(()) + } + + async fn find_nodes(&self, node: &FudNode, key: &blake3::Hash) -> Result> { + debug!(target: "fud::DhtHandler::find_nodes()", "Fetching nodes close to {} from node {}", hash_to_string(key), hash_to_string(&node.id())); + + let channel = self.dht.get_channel(node, None).await?; + let msg_subsystem = channel.message_subsystem(); + msg_subsystem.add_dispatch::().await; + let msg_subscriber_nodes = channel.subscribe_msg::().await.unwrap(); + + let request = FudFindNodesRequest { key: *key }; + channel.send(&request).await?; + + let reply = msg_subscriber_nodes.receive_with_timeout(self.dht().settings.timeout).await; + + msg_subscriber_nodes.unsubscribe().await; + self.dht.cleanup_channel(channel).await; + + Ok(reply?.nodes.clone()) + } + + async fn find_value( + &self, + node: &FudNode, + key: &blake3::Hash, + ) -> Result>> { + debug!(target: "fud::DhtHandler::find_value()", "Fetching value {} from node {}", hash_to_string(key), hash_to_string(&node.id())); + + let channel = self.dht.get_channel(node, None).await?; + let msg_subsystem = channel.message_subsystem(); + msg_subsystem.add_dispatch::().await; + let msg_subscriber = channel.subscribe_msg::().await.unwrap(); + + let request = FudFindSeedersRequest { key: *key }; + channel.send(&request).await?; + + let recv = msg_subscriber.receive_with_timeout(self.dht().settings.timeout).await; + + msg_subscriber.unsubscribe().await; + self.dht.cleanup_channel(channel).await; + + let rep = recv?; + Ok(DhtLookupReply::NodesAndValue(rep.nodes.clone(), rep.seeders.clone())) + } + + async fn add_value(&self, key: &blake3::Hash, value: &Vec) { + let mut seeders = value.clone(); + + // Remove seeders with no external addresses + seeders.retain(|item| !item.node.addresses().is_empty()); + + // Set all seeders' timestamp. They are not sent to others nodes so they default to 0. + let timestamp = Timestamp::current_time().inner(); + for seeder in &mut seeders { + seeder.timestamp = timestamp; + } + + debug!(target: "fud::DhtHandler::add_value()", "Inserting {} seeders for resource {}", seeders.len(), hash_to_string(key)); + + let mut seeders_write = self.dht.hash_table.write().await; + let existing_seeders = seeders_write.get_mut(key); + + if let Some(existing_seeders) = existing_seeders { + existing_seeders.retain(|it| !seeders.contains(it)); + existing_seeders.extend(seeders.clone()); + } else { + let mut vec = Vec::new(); + vec.extend(seeders.clone()); + seeders_write.insert(*key, vec); + } + } + + fn key_to_string(key: &blake3::Hash) -> String { + hash_to_string(key) + } +} diff --git a/bin/fud/fud/src/download.rs b/bin/fud/fud/src/download.rs new file mode 100644 index 000000000..bef9edd89 --- /dev/null +++ b/bin/fud/fud/src/download.rs @@ -0,0 +1,481 @@ +/* This file is part of DarkFi (https://dark.fi) + * + * Copyright (C) 2020-2025 Dyne.org foundation + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +use std::{ + collections::HashSet, + path::{Path, PathBuf}, + time::Instant, +}; + +use futures::{future::FutureExt, pin_mut, select}; +use log::{error, info, warn}; +use rand::{ + prelude::{IteratorRandom, SliceRandom}, + rngs::OsRng, +}; + +use darkfi::{ + dht::DhtNode, + geode::{hash_to_string, ChunkedStorage}, + net::ChannelPtr, + system::Subscription, + Error, Result, +}; +use darkfi_serial::serialize_async; + +use crate::{ + event::{self, notify_event, FudEvent}, + proto::{FudChunkReply, FudDirectoryReply, FudFileReply, FudFindRequest, FudNotFound}, + util::create_all_files, + Fud, FudSeeder, ResourceStatus, ResourceType, Scrap, +}; + +/// Receive seeders from a subscription, and execute an async expression for +/// each deduplicated seeder once (seeder order is random). +/// It will keep going until the expression returns `Ok(())`, or there are +/// no more seeders. +/// It has an optional `favored_seeder` argument that will be tried first if +/// specified. +macro_rules! seeders_loop { + ($seeders_sub:expr, $favored_seeder:expr, $code:expr) => { + let mut queried_seeders: HashSet = HashSet::new(); + let mut is_done = false; + + // Try favored seeder + let favored_seeder: Option = $favored_seeder; + if let Some(seeder) = favored_seeder { + queried_seeders.insert(seeder.node.id()); + if $code(seeder).await.is_ok() { + is_done = true; + } + } + + // Try other seeders using the subscription + while !is_done { + let rep = $seeders_sub.receive().await; + if rep.is_none() { + break; // None means the lookup is done + } + let seeders = rep.unwrap().clone(); + let mut shuffled_seeders = { + let mut vec: Vec<_> = seeders.iter().cloned().collect(); + vec.shuffle(&mut OsRng); + vec + }; + // Loop over seeders + while let Some(seeder) = shuffled_seeders.pop() { + // Only use a seeder once + if queried_seeders.iter().any(|s| *s == seeder.node.id()) { + continue; + } + queried_seeders.insert(seeder.node.id()); + + if $code(seeder).await.is_err() { + continue; + } + + is_done = true; + break; + } + } + }; + ($seeders_sub:expr, $code:expr) => { + seeders_loop!($seeders_sub, None, $code) + }; +} + +enum ChunkFetchControl { + NextChunk, + NextSeeder, + Abort, +} + +struct ChunkFetchContext<'a> { + fud: &'a Fud, + hash: &'a blake3::Hash, + chunked: &'a mut ChunkedStorage, + chunks: &'a mut HashSet, +} + +/// Fetch `chunks` for `chunked` (file or directory) from seeders in `seeders_sub`. +pub async fn fetch_chunks( + fud: &Fud, + hash: &blake3::Hash, + chunked: &mut ChunkedStorage, + seeders_sub: &Subscription>>, + favored_seeder: Option, + chunks: &mut HashSet, +) -> Result<()> { + let mut ctx = ChunkFetchContext { fud, hash, chunked, chunks }; + + seeders_loop!(seeders_sub, favored_seeder, async |seeder: FudSeeder| -> Result<()> { + let channel = match fud.dht.get_channel(&seeder.node, Some(*hash)).await { + Ok(channel) => channel, + Err(e) => { + warn!(target: "fud::download::fetch_chunks()", "Could not get a channel for node {}: {e}", hash_to_string(&seeder.node.id())); + return Err(e) + } + }; + let mut chunks_to_query = ctx.chunks.clone(); + info!(target: "fud::download::fetch_chunks()", "Requesting chunks from seeder {}", hash_to_string(&seeder.node.id())); + + loop { + // Loop over chunks + match fetch_chunk(&mut ctx, &channel, &seeder, &mut chunks_to_query).await { + ChunkFetchControl::NextChunk => continue, + ChunkFetchControl::NextSeeder => break, + ChunkFetchControl::Abort => { + fud.dht.cleanup_channel(channel).await; + return Ok(()) + } + }; + } + + fud.dht.cleanup_channel(channel).await; + + // Stop when there are no missing chunks + if ctx.chunks.is_empty() { + return Ok(()) + } + + Err(().into()) + }); + + Ok(()) +} + +/// Fetch a single chunk and return what should be done next +async fn fetch_chunk( + ctx: &mut ChunkFetchContext<'_>, + channel: &ChannelPtr, + seeder: &FudSeeder, + chunks_to_query: &mut HashSet, +) -> ChunkFetchControl { + // Select a chunk to request + let mut chunk = None; + if let Some(random_chunk) = chunks_to_query.iter().choose(&mut OsRng) { + chunk = Some(*random_chunk); + } + + if chunk.is_none() { + // No more chunks to request from this seeder + return ChunkFetchControl::NextSeeder; + } + + let chunk_hash = chunk.unwrap(); + chunks_to_query.remove(&chunk_hash); + + let start_time = Instant::now(); + let msg_subsystem = channel.message_subsystem(); + msg_subsystem.add_dispatch::().await; + msg_subsystem.add_dispatch::().await; + let msg_subscriber_chunk = channel.subscribe_msg::().await.unwrap(); + let msg_subscriber_notfound = channel.subscribe_msg::().await.unwrap(); + + let send_res = channel.send(&FudFindRequest { info: Some(*ctx.hash), key: chunk_hash }).await; + if let Err(e) = send_res { + warn!(target: "fud::download::fetch_chunk()", "Error while sending FudFindRequest: {e}"); + return ChunkFetchControl::NextSeeder; + } + + let chunk_recv = msg_subscriber_chunk.receive_with_timeout(ctx.fud.chunk_timeout).fuse(); + let notfound_recv = msg_subscriber_notfound.receive_with_timeout(ctx.fud.chunk_timeout).fuse(); + + pin_mut!(chunk_recv, notfound_recv); + + // Wait for a FudChunkReply or FudNotFound + select! { + chunk_reply = chunk_recv => { + msg_subscriber_chunk.unsubscribe().await; + msg_subscriber_notfound.unsubscribe().await; + if let Err(e) = chunk_reply { + warn!(target: "fud::download::fetch_chunk()", "Error waiting for chunk reply: {e}"); + return ChunkFetchControl::NextSeeder; + } + let reply = chunk_reply.unwrap(); + handle_chunk_reply(ctx, &chunk_hash, &reply, seeder, &start_time).await + } + notfound_reply = notfound_recv => { + msg_subscriber_chunk.unsubscribe().await; + msg_subscriber_notfound.unsubscribe().await; + if let Err(e) = notfound_reply { + warn!(target: "fud::download::fetch_chunk()", "Error waiting for NOTFOUND reply: {e}"); + return ChunkFetchControl::NextSeeder; + } + info!(target: "fud::download::fetch_chunk()", "Received NOTFOUND {} from seeder {}", hash_to_string(&chunk_hash), hash_to_string(&seeder.node.id())); + notify_event!(ctx.fud, ChunkNotFound, { hash: *ctx.hash, chunk_hash }); + ChunkFetchControl::NextChunk + } + } +} + +/// Processes an incoming chunk +async fn handle_chunk_reply( + ctx: &mut ChunkFetchContext<'_>, + chunk_hash: &blake3::Hash, + reply: &FudChunkReply, + seeder: &FudSeeder, + start_time: &Instant, +) -> ChunkFetchControl { + let write_res = ctx.fud.geode.write_chunk(ctx.chunked, &reply.chunk).await; + if let Err(e) = write_res { + error!(target: "fud::download::handle_chunk_reply()", "Failed inserting chunk {} to Geode: {e}", hash_to_string(chunk_hash)); + return ChunkFetchControl::NextChunk; + } + let (inserted_hash, bytes_written) = write_res.unwrap(); + if inserted_hash != *chunk_hash { + warn!(target: "fud::download::handle_chunk_reply()", "Received chunk does not match requested chunk"); + return ChunkFetchControl::NextChunk; + } + + info!(target: "fud::download::handle_chunk_reply()", "Received chunk {} from seeder {}", hash_to_string(chunk_hash), hash_to_string(&seeder.node.id())); + + // If we did not write the whole chunk to the filesystem, + // save the chunk in the scraps. + if bytes_written < reply.chunk.len() { + info!(target: "fud::download::handle_chunk_reply()", "Saving chunk {} as a scrap", hash_to_string(chunk_hash)); + let chunk_written = ctx.fud.geode.get_chunk(ctx.chunked, chunk_hash).await; + if let Err(e) = chunk_written { + error!(target: "fud::download::handle_chunk_reply()", "Error getting chunk: {e}"); + return ChunkFetchControl::NextChunk; + } + let scrap = Scrap { + chunk: reply.chunk.clone(), + hash_written: blake3::hash(&chunk_written.unwrap()), + }; + if let Err(e) = + ctx.fud.scrap_tree.insert(chunk_hash.as_bytes(), serialize_async(&scrap).await) + { + error!(target: "fud::download::handle_chunk_reply()", "Failed to save chunk {} as a scrap: {e}", hash_to_string(chunk_hash)); + return ChunkFetchControl::NextChunk; + } + } + + // Update the resource + let mut resources_write = ctx.fud.resources.write().await; + let resource = resources_write.get_mut(ctx.hash); + if resource.is_none() { + return ChunkFetchControl::Abort // Resource was removed + } + let resource = resource.unwrap(); + resource.status = ResourceStatus::Downloading; + resource.total_chunks_downloaded += 1; + resource.target_chunks_downloaded += 1; + + resource.total_bytes_downloaded += reply.chunk.len() as u64; + resource.target_bytes_downloaded += + resource.get_selected_bytes(ctx.chunked, &reply.chunk) as u64; + resource.speeds.push(reply.chunk.len() as f64 / start_time.elapsed().as_secs_f64()); + if resource.speeds.len() > 12 { + resource.speeds = resource.speeds.split_off(resource.speeds.len() - 12); // Only keep the last few speeds + } + + // If we just fetched the last chunk of a file, compute + // `total_bytes_size` (and `target_bytes_size`) again, + // as `geode.write_chunk()` updated the FileSequence + // to the exact file size. + if let Some((last_chunk_hash, _)) = ctx.chunked.iter().last() { + if matches!(resource.rtype, ResourceType::File) && *last_chunk_hash == *chunk_hash { + resource.total_bytes_size = ctx.chunked.get_fileseq().len(); + resource.target_bytes_size = resource.total_bytes_size; + } + } + let resource = resource.clone(); + drop(resources_write); + + notify_event!(ctx.fud, ChunkDownloadCompleted, { hash: *ctx.hash, chunk_hash: *chunk_hash, resource }); + ctx.chunks.remove(chunk_hash); + ChunkFetchControl::NextChunk +} + +enum MetadataFetchReply { + Directory(FudDirectoryReply), + File(FudFileReply), + Chunk(FudChunkReply), +} + +/// Fetch a single resource metadata from seeders received from `seeders_sub`. +/// If the resource is a file smaller than a single chunk then seeder can send the +/// chunk directly, and we will create the file from it on path `path`. +/// 1. Wait for seeders from the subscription +/// 2. Request the metadata from the seeders +/// 3. Insert metadata to geode using the reply +pub async fn fetch_metadata( + fud: &Fud, + hash: &blake3::Hash, + seeders_sub: &Subscription>>, + path: &Path, +) -> Result { + let mut result: Option<(FudSeeder, MetadataFetchReply)> = None; + + seeders_loop!(seeders_sub, async |seeder: FudSeeder| -> Result<()> { + let channel = fud.dht.get_channel(&seeder.node, Some(*hash)).await?; + let msg_subsystem = channel.message_subsystem(); + msg_subsystem.add_dispatch::().await; + msg_subsystem.add_dispatch::().await; + msg_subsystem.add_dispatch::().await; + msg_subsystem.add_dispatch::().await; + let msg_subscriber_chunk = channel.subscribe_msg::().await.unwrap(); + let msg_subscriber_file = channel.subscribe_msg::().await.unwrap(); + let msg_subscriber_dir = channel.subscribe_msg::().await.unwrap(); + let msg_subscriber_notfound = channel.subscribe_msg::().await.unwrap(); + + let send_res = channel.send(&FudFindRequest { info: None, key: *hash }).await; + if let Err(e) = send_res { + warn!(target: "fud::download::fetch_metadata()", "Error while sending FudFindRequest: {e}"); + msg_subscriber_chunk.unsubscribe().await; + msg_subscriber_file.unsubscribe().await; + msg_subscriber_dir.unsubscribe().await; + msg_subscriber_notfound.unsubscribe().await; + fud.dht.cleanup_channel(channel).await; + return Err(e) + } + + let chunk_recv = msg_subscriber_chunk.receive_with_timeout(fud.chunk_timeout).fuse(); + let file_recv = msg_subscriber_file.receive_with_timeout(fud.chunk_timeout).fuse(); + let dir_recv = msg_subscriber_dir.receive_with_timeout(fud.chunk_timeout).fuse(); + let notfound_recv = msg_subscriber_notfound.receive_with_timeout(fud.chunk_timeout).fuse(); + + pin_mut!(chunk_recv, file_recv, dir_recv, notfound_recv); + + let cleanup = async || { + msg_subscriber_chunk.unsubscribe().await; + msg_subscriber_file.unsubscribe().await; + msg_subscriber_dir.unsubscribe().await; + msg_subscriber_notfound.unsubscribe().await; + fud.dht.cleanup_channel(channel).await; + }; + + // Wait for a FudChunkReply, FudFileReply, FudDirectoryReply, or FudNotFound + select! { + // Received a chunk while requesting metadata, this is allowed to + // optimize fetching files smaller than a single chunk + chunk_reply = chunk_recv => { + cleanup().await; + if let Err(e) = chunk_reply { + warn!(target: "fud::download::fetch_metadata()", "Error waiting for chunk reply: {e}"); + return Err(e) + } + let reply = chunk_reply.unwrap(); + let chunk_hash = blake3::hash(&reply.chunk); + // Check that this is the only chunk in the file + if !fud.geode.verify_metadata(hash, &[chunk_hash], &[]) { + warn!(target: "fud::download::fetch_metadata()", "Received a chunk while fetching metadata, but the chunk did not match the file hash"); + return Err(().into()) + } + info!(target: "fud::download::fetch_metadata()", "Received chunk {} (for file {}) from seeder {}", hash_to_string(&chunk_hash), hash_to_string(hash), hash_to_string(&seeder.node.id())); + result = Some((seeder, MetadataFetchReply::Chunk((*reply).clone()))); + Ok(()) + } + file_reply = file_recv => { + cleanup().await; + if let Err(e) = file_reply { + warn!(target: "fud::download::fetch_metadata()", "Error waiting for file reply: {e}"); + return Err(e) + } + let reply = file_reply.unwrap(); + if !fud.geode.verify_metadata(hash, &reply.chunk_hashes, &[]) { + warn!(target: "fud::download::fetch_metadata()", "Received invalid file metadata"); + return Err(().into()) + } + info!(target: "fud::download::fetch_metadata()", "Received file {} from seeder {}", hash_to_string(hash), hash_to_string(&seeder.node.id())); + result = Some((seeder, MetadataFetchReply::File((*reply).clone()))); + Ok(()) + } + dir_reply = dir_recv => { + cleanup().await; + if let Err(e) = dir_reply { + warn!(target: "fud::download::fetch_metadata()", "Error waiting for directory reply: {e}"); + return Err(e) + } + let reply = dir_reply.unwrap(); + + // Convert all file paths from String to PathBuf + let files: Vec<_> = reply.files.clone().into_iter() + .map(|(path_str, size)| (PathBuf::from(path_str), size)) + .collect(); + + if !fud.geode.verify_metadata(hash, &reply.chunk_hashes, &files) { + warn!(target: "fud::download::fetch_metadata()", "Received invalid directory metadata"); + return Err(().into()) + } + info!(target: "fud::download::fetch_metadata()", "Received directory {} from seeder {}", hash_to_string(hash), hash_to_string(&seeder.node.id())); + result = Some((seeder, MetadataFetchReply::Directory((*reply).clone()))); + Ok(()) + } + notfound_reply = notfound_recv => { + cleanup().await; + if let Err(e) = notfound_reply { + warn!(target: "fud::download::fetch_metadata()", "Error waiting for NOTFOUND reply: {e}"); + return Err(e) + } + info!(target: "fud::download::fetch_metadata()", "Received NOTFOUND {} from seeder {}", hash_to_string(hash), hash_to_string(&seeder.node.id())); + Err(().into()) + } + } + }); + + // We did not find the resource + if result.is_none() { + return Err(Error::GeodeFileRouteNotFound) + } + + // Insert metadata to geode using the reply + // At this point the reply content is already verified + let (seeder, reply) = result.unwrap(); + match reply { + MetadataFetchReply::Directory(FudDirectoryReply { files, chunk_hashes }) => { + // Convert all file paths from String to PathBuf + let mut files: Vec<_> = + files.into_iter().map(|(path_str, size)| (PathBuf::from(path_str), size)).collect(); + + fud.geode.sort_files(&mut files); + if let Err(e) = fud.geode.insert_metadata(hash, &chunk_hashes, &files).await { + error!(target: "fud::download::fetch_metadata()", "Failed inserting directory {} to Geode: {e}", hash_to_string(hash)); + return Err(e) + } + } + MetadataFetchReply::File(FudFileReply { chunk_hashes }) => { + if let Err(e) = fud.geode.insert_metadata(hash, &chunk_hashes, &[]).await { + error!(target: "fud::download::fetch_metadata()", "Failed inserting file {} to Geode: {e}", hash_to_string(hash)); + return Err(e) + } + } + // Looked for a file but got a chunk: the entire file fits in a single chunk + MetadataFetchReply::Chunk(FudChunkReply { chunk }) => { + info!(target: "fud::download::fetch_metadata()", "File fits in a single chunk"); + let chunk_hash = blake3::hash(&chunk); + if let Err(e) = fud.geode.insert_metadata(hash, &[chunk_hash], &[]).await { + error!(target: "fud::download::fetch_metadata()", "Failed inserting file {} to Geode (from single chunk): {e}", hash_to_string(hash)); + return Err(e) + } + create_all_files(&[path.to_path_buf()]).await?; + let mut chunked_file = ChunkedStorage::new( + &[chunk_hash], + &[(path.to_path_buf(), chunk.len() as u64)], + false, + ); + if let Err(e) = fud.geode.write_chunk(&mut chunked_file, &chunk).await { + error!(target: "fud::download::fetch_metadata()", "Failed inserting chunk {} to Geode: {e}", hash_to_string(&chunk_hash)); + return Err(e) + }; + } + }; + + Ok(seeder) +} diff --git a/bin/fud/fud/src/event.rs b/bin/fud/fud/src/event.rs index 66fd50eca..48f3418c8 100644 --- a/bin/fud/fud/src/event.rs +++ b/bin/fud/fud/src/event.rs @@ -244,7 +244,7 @@ impl From for JsonValue { /// Macro calling `fud.event_publisher.notify()` macro_rules! notify_event { // This is for any `FudEvent` - ($fud:ident, $event:ident, { $($fields:tt)* }) => { + ($fud:expr, $event:ident, { $($fields:tt)* }) => { $fud .event_publisher .notify(FudEvent::$event(event::$event { @@ -253,7 +253,7 @@ macro_rules! notify_event { .await; }; // This is for `FudEvent`s that only have a hash and resource - ($fud:ident, $event:ident, $resource:expr) => { + ($fud:expr, $event:ident, $resource:expr) => { $fud .event_publisher .notify(FudEvent::$event(event::$event { diff --git a/bin/fud/fud/src/lib.rs b/bin/fud/fud/src/lib.rs index 19675b514..5481855ef 100644 --- a/bin/fud/fud/src/lib.rs +++ b/bin/fud/fud/src/lib.rs @@ -21,43 +21,30 @@ use std::{ io::ErrorKind, path::{Path, PathBuf}, sync::Arc, - time::Instant, }; -use async_trait::async_trait; -use futures::{future::FutureExt, pin_mut, select}; -use log::{debug, error, info, warn}; -use num_bigint::BigUint; -use rand::{prelude::IteratorRandom, rngs::OsRng, seq::SliceRandom, Rng}; +use log::{error, info, warn}; use sled_overlay::sled; use smol::{ channel, - fs::{self, File, OpenOptions}, + fs::{self, OpenOptions}, lock::RwLock, }; -use url::Url; use darkfi::{ - dht::{ - impl_dht_node_defaults, tasks as dht_tasks, Dht, DhtHandler, DhtNode, DhtRouterItem, - DhtRouterPtr, DhtSettings, - }, + dht::{tasks as dht_tasks, Dht, DhtHandler, DhtSettings}, geode::{hash_to_string, ChunkedStorage, FileSequence, Geode, MAX_CHUNK_SIZE}, - net::{ChannelPtr, P2pPtr}, - system::{ExecutorPtr, PublisherPtr, StoppableTask}, - util::path::expand_path, + net::P2pPtr, + system::{ExecutorPtr, Publisher, PublisherPtr, StoppableTask}, + util::{path::expand_path, time::Timestamp}, Error, Result, }; -use darkfi_sdk::crypto::{schnorr::SchnorrPublic, SecretKey}; -use darkfi_serial::{deserialize_async, serialize_async, SerialDecodable, SerialEncodable}; +use darkfi_sdk::crypto::SecretKey; +use darkfi_serial::{deserialize_async, serialize_async}; /// P2P protocols pub mod proto; -use proto::{ - FudAnnounce, FudChunkReply, FudDirectoryReply, FudFileReply, FudFindNodesReply, - FudFindNodesRequest, FudFindRequest, FudFindSeedersReply, FudFindSeedersRequest, FudNotFound, - FudPingReply, FudPingRequest, -}; +use proto::FudAnnounce; /// FudEvent pub mod event; @@ -76,7 +63,7 @@ pub mod rpc; /// Background tasks pub mod tasks; -use tasks::{start_task, FetchReply}; +use tasks::start_task; /// Bitcoin pub mod bitcoin; @@ -94,196 +81,78 @@ use settings::Args; /// Utils pub mod util; -use util::{get_all_files, FileSelection}; +use util::{create_all_files, get_all_files, FileSelection}; + +/// Download methods +mod download; +use download::{fetch_chunks, fetch_metadata}; + +/// [`DhtHandler`] implementation and fud-specific DHT structs +pub mod dht; +use dht::FudSeeder; const SLED_PATH_TREE: &[u8] = b"_fud_paths"; const SLED_FILE_SELECTION_TREE: &[u8] = b"_fud_file_selections"; const SLED_SCRAP_TREE: &[u8] = b"_fud_scraps"; -#[derive(Debug, Clone, SerialEncodable, SerialDecodable)] -pub struct FudNode { - data: VerifiableNodeData, - addresses: Vec, -} -impl_dht_node_defaults!(FudNode); - -impl DhtNode for FudNode { - fn id(&self) -> blake3::Hash { - self.data.id() - } - fn addresses(&self) -> Vec { - self.addresses.clone() - } -} - pub struct Fud { /// Our own [`VerifiableNodeData`] pub node_data: Arc>, - /// Our secret key (the public key is in `node_data`) pub secret_key: Arc>, - - /// Key -> Seeders - pub seeders_router: DhtRouterPtr, - - /// Pointer to the P2P network instance - p2p: P2pPtr, - /// The Geode instance geode: Geode, - /// Default download directory downloads_path: PathBuf, - /// Chunk transfer timeout in seconds chunk_timeout: u64, - /// The [`FudPow`] instance pub pow: Arc>, - /// The DHT instance - dht: Arc>, - + dht: Arc>, /// Resources (current status of all downloads/seeds) resources: Arc>>, - /// Sled tree containing "resource hash -> path on the filesystem" path_tree: sled::Tree, - /// Sled tree containing "resource hash -> file selection". If the file /// selection is all files of the resource (or if the resource is not a /// directory), the resource does not store its file selection in the tree. file_selection_tree: sled::Tree, - /// Sled tree containing scraps which are chunks containing data the user /// did not want to save to files. They also contain data the user wanted /// otherwise we would not have downloaded the chunk at all. + /// We save scraps to be able to verify integrity even if part of the chunk + /// is not saved to the filesystem in the downloaded files. /// "chunk/scrap hash -> chunk content" scrap_tree: sled::Tree, - + /// Get requests sender get_tx: channel::Sender<(blake3::Hash, PathBuf, FileSelection)>, + /// Get requests receiver get_rx: channel::Receiver<(blake3::Hash, PathBuf, FileSelection)>, - + /// Put requests sender put_tx: channel::Sender, + /// Put requests receiver put_rx: channel::Receiver, - + /// Lookup requests sender + lookup_tx: channel::Sender<(blake3::Hash, PublisherPtr>>)>, + /// Lookup requests receiver + lookup_rx: channel::Receiver<(blake3::Hash, PublisherPtr>>)>, /// Currently active downloading tasks (running the `fud.fetch_resource()` method) fetch_tasks: Arc>>>, - /// Currently active put tasks (running the `fud.insert_resource()` method) put_tasks: Arc>>>, - + /// Currently active lookup tasks (running the `fud.lookup_value()` method) + lookup_tasks: Arc>>>, /// Currently active tasks (defined in `tasks`, started with the `start_task` macro) tasks: Arc>>>, - /// Used to send events to fud clients event_publisher: PublisherPtr, - + /// Pointer to the P2P network instance + p2p: P2pPtr, /// Global multithreaded executor reference pub executor: ExecutorPtr, } -#[async_trait] -impl DhtHandler for Fud { - fn dht(&self) -> Arc> { - self.dht.clone() - } - - async fn node(&self) -> FudNode { - FudNode { - data: self.node_data.read().await.clone(), - addresses: self - .p2p - .clone() - .hosts() - .external_addrs() - .await - .iter() - .filter(|addr| !addr.to_string().contains("[::]")) - .cloned() - .collect(), - } - } - - async fn ping(&self, channel: ChannelPtr) -> Result { - debug!(target: "fud::DhtHandler::ping()", "Sending ping to channel {}", channel.info.id); - let msg_subsystem = channel.message_subsystem(); - msg_subsystem.add_dispatch::().await; - let msg_subscriber = channel.subscribe_msg::().await.unwrap(); - - // Send `FudPingRequest` - let mut rng = OsRng; - let request = FudPingRequest { random: rng.gen() }; - channel.send(&request).await?; - - // Wait for `FudPingReply` - let reply = msg_subscriber.receive_with_timeout(self.dht().settings.timeout).await?; - msg_subscriber.unsubscribe().await; - - // Verify the signature - if !reply.node.data.public_key.verify(&request.random.to_be_bytes(), &reply.sig) { - channel.ban().await; - return Err(Error::InvalidSignature) - } - - // Verify PoW - if let Err(e) = self.pow.write().await.verify_node(&reply.node.data).await { - channel.ban().await; - return Err(e) - } - - Ok(reply.node.clone()) - } - - // TODO: Optimize this - async fn on_new_node(&self, node: &FudNode) -> Result<()> { - debug!(target: "fud::DhtHandler::on_new_node()", "New node {}", hash_to_string(&node.id())); - - // If this is the first node we know about, then bootstrap and announce our files - if !self.dht().is_bootstrapped().await { - let _ = self.init().await; - } - - // Send keys that are closer to this node than we are - let self_id = self.node_data.read().await.id(); - let channel = self.get_channel(node, None).await?; - for (key, seeders) in self.seeders_router.read().await.iter() { - let node_distance = BigUint::from_bytes_be(&self.dht().distance(key, &node.id())); - let self_distance = BigUint::from_bytes_be(&self.dht().distance(key, &self_id)); - if node_distance <= self_distance { - let _ = channel - .send(&FudAnnounce { - key: *key, - seeders: seeders.clone().into_iter().collect(), - }) - .await; - } - } - self.cleanup_channel(channel).await; - - Ok(()) - } - - async fn fetch_nodes(&self, node: &FudNode, key: &blake3::Hash) -> Result> { - debug!(target: "fud::DhtHandler::fetch_nodes()", "Fetching nodes close to {} from node {}", hash_to_string(key), hash_to_string(&node.id())); - - let channel = self.get_channel(node, None).await?; - let msg_subsystem = channel.message_subsystem(); - msg_subsystem.add_dispatch::().await; - let msg_subscriber_nodes = channel.subscribe_msg::().await.unwrap(); - - let request = FudFindNodesRequest { key: *key }; - channel.send(&request).await?; - - let reply = msg_subscriber_nodes.receive_with_timeout(self.dht().settings.timeout).await; - - msg_subscriber_nodes.unsubscribe().await; - self.cleanup_channel(channel).await; - - Ok(reply?.nodes.clone()) - } -} - impl Fud { pub async fn new( settings: Args, @@ -291,7 +160,7 @@ impl Fud { sled_db: &sled::Db, event_publisher: PublisherPtr, executor: ExecutorPtr, - ) -> Result { + ) -> Result> { let basedir = expand_path(&settings.base_dir)?; let downloads_path = match settings.downloads_path { Some(downloads_path) => expand_path(&downloads_path)?, @@ -302,29 +171,28 @@ impl Fud { let mut pow = FudPow::new(settings.pow.into(), executor.clone()); pow.bitcoin_hash_cache.update().await?; // Fetch BTC block hashes let (node_data, secret_key) = pow.generate_node().await?; - info!(target: "fud", "Your node ID: {}", hash_to_string(&node_data.id())); + info!(target: "fud::new()", "Your node ID: {}", hash_to_string(&node_data.id())); // Geode - info!("Instantiating Geode instance"); + info!(target: "fud::new()", "Instantiating Geode instance"); let geode = Geode::new(&basedir).await?; // DHT let dht_settings: DhtSettings = settings.dht.into(); - let dht: Arc> = - Arc::new(Dht::::new(&dht_settings, p2p.clone(), executor.clone()).await); + let dht: Arc> = + Arc::new(Dht::::new(&dht_settings, p2p.clone(), executor.clone()).await); let (get_tx, get_rx) = smol::channel::unbounded(); let (put_tx, put_rx) = smol::channel::unbounded(); - let fud = Self { + let (lookup_tx, lookup_rx) = smol::channel::unbounded(); + let fud = Arc::new(Self { node_data: Arc::new(RwLock::new(node_data)), secret_key: Arc::new(RwLock::new(secret_key)), - seeders_router: Arc::new(RwLock::new(HashMap::new())), - p2p, geode, downloads_path, chunk_timeout: settings.chunk_timeout, pow: Arc::new(RwLock::new(pow)), - dht, + dht: dht.clone(), path_tree: sled_db.open_tree(SLED_PATH_TREE)?, file_selection_tree: sled_db.open_tree(SLED_FILE_SELECTION_TREE)?, scrap_tree: sled_db.open_tree(SLED_SCRAP_TREE)?, @@ -333,12 +201,17 @@ impl Fud { get_rx, put_tx, put_rx, + lookup_tx, + lookup_rx, fetch_tasks: Arc::new(RwLock::new(HashMap::new())), put_tasks: Arc::new(RwLock::new(HashMap::new())), + lookup_tasks: Arc::new(RwLock::new(HashMap::new())), tasks: Arc::new(RwLock::new(HashMap::new())), event_publisher, + p2p, executor, - }; + }); + *dht.handler.write().await = Arc::downgrade(&fud); Ok(fud) } @@ -347,16 +220,18 @@ impl Fud { let mut tasks = self.tasks.write().await; start_task!(self, "get", tasks::get_task, tasks); start_task!(self, "put", tasks::put_task, tasks); - start_task!(self, "DHT channel", dht_tasks::channel_task::, tasks); + start_task!(self, "DHT channel", dht_tasks::channel_task::, tasks); + start_task!(self, "lookup", tasks::lookup_task, tasks); start_task!(self, "announce", tasks::announce_seed_task, tasks); start_task!(self, "node ID", tasks::node_id_task, tasks); } /// Bootstrap the DHT, verify our resources, add ourselves to - /// `seeders_router` for the resources we already have, announce our files. + /// the seeders (`dht.hash_table`) for the resources we already have, + /// announce our files. async fn init(&self) -> Result<()> { info!(target: "fud::init()", "Bootstrapping the DHT..."); - self.bootstrap().await; + self.dht.bootstrap().await; info!(target: "fud::init()", "Finding resources..."); let mut resources_write = self.resources.write().await; @@ -426,24 +301,20 @@ impl Fud { } // Add our own node as a seeder for the resources we are seeding - let self_router_items: Vec> = vec![self_node.into()]; for resource in &resources { - self.add_to_router( - self.seeders_router.clone(), - &resource.hash, - self_router_items.clone(), - ) - .await; + let self_router_items = vec![self.new_seeder(&resource.hash).await]; + self.add_value(&resource.hash, &self_router_items).await; } info!(target: "fud::init()", "Announcing resources..."); - let seeders = vec![self.node().await.into()]; for resource in resources { + let seeders = vec![self.new_seeder(&resource.hash).await]; let _ = self + .dht .announce( &resource.hash, - &FudAnnounce { key: resource.hash, seeders: seeders.clone() }, - self.seeders_router.clone(), + &seeders.clone(), + &FudAnnounce { key: resource.hash, seeders }, ) .await; } @@ -484,6 +355,15 @@ impl Fud { Ok(None) } + /// Create a new [`dht::FudSeeder`] for own node + pub async fn new_seeder(&self, key: &blake3::Hash) -> FudSeeder { + FudSeeder { + key: *key, + node: self.node().await, + timestamp: Timestamp::current_time().inner(), + } + } + /// Verify if resources are complete and uncorrupted. /// If a resource is incomplete or corrupted, its status is changed to Incomplete. /// If a resource is complete, its status is changed to Seeding. @@ -618,461 +498,6 @@ impl Fud { Ok(seeding_resources) } - /// Query `nodes` to find the seeders for `key` - async fn fetch_seeders( - &self, - nodes: &Vec, - key: &blake3::Hash, - ) -> HashSet> { - let self_node = self.node().await; - let mut seeders: HashSet> = HashSet::new(); - - for node in nodes { - let channel = match self.get_channel(node, None).await { - Ok(channel) => channel, - Err(e) => { - warn!(target: "fud::fetch_seeders()", "Could not get a channel for node {}: {e}", hash_to_string(&node.id())); - continue; - } - }; - let msg_subsystem = channel.message_subsystem(); - msg_subsystem.add_dispatch::().await; - - let msg_subscriber = match channel.subscribe_msg::().await { - Ok(msg_subscriber) => msg_subscriber, - Err(e) => { - warn!(target: "fud::fetch_seeders()", "Error subscribing to msg: {e}"); - self.cleanup_channel(channel).await; - continue; - } - }; - - let send_res = channel.send(&FudFindSeedersRequest { key: *key }).await; - if let Err(e) = send_res { - warn!(target: "fud::fetch_seeders()", "Error while sending FudFindSeedersRequest: {e}"); - msg_subscriber.unsubscribe().await; - self.cleanup_channel(channel).await; - continue; - } - - let reply = match msg_subscriber.receive_with_timeout(self.dht().settings.timeout).await - { - Ok(reply) => reply, - Err(e) => { - warn!(target: "fud::fetch_seeders()", "Error waiting for reply: {e}"); - msg_subscriber.unsubscribe().await; - self.cleanup_channel(channel).await; - continue; - } - }; - - msg_subscriber.unsubscribe().await; - self.cleanup_channel(channel).await; - - seeders.extend(reply.seeders.clone()); - } - - seeders = - seeders.iter().filter(|seeder| seeder.node.id() != self_node.id()).cloned().collect(); - - info!(target: "fud::fetch_seeders()", "Found {} seeders for {}", seeders.len(), hash_to_string(key)); - seeders - } - - /// Fetch `chunks` for `chunked` (file or directory) from `seeders`. - async fn fetch_chunks( - &self, - hash: &blake3::Hash, - chunked: &mut ChunkedStorage, - seeders: &HashSet>, - chunks: &HashSet, - ) -> Result<()> { - let mut remaining_chunks = chunks.clone(); - let mut shuffled_seeders = { - let mut vec: Vec<_> = seeders.iter().cloned().collect(); - vec.shuffle(&mut OsRng); - vec - }; - - while let Some(seeder) = shuffled_seeders.pop() { - let channel = match self.get_channel(&seeder.node, Some(*hash)).await { - Ok(channel) => channel, - Err(e) => { - warn!(target: "fud::fetch_chunks()", "Could not get a channel for node {}: {e}", hash_to_string(&seeder.node.id())); - continue; - } - }; - let mut chunks_to_query = remaining_chunks.clone(); - info!("Requesting chunks from seeder {}", hash_to_string(&seeder.node.id())); - loop { - let start_time = Instant::now(); - let msg_subsystem = channel.message_subsystem(); - msg_subsystem.add_dispatch::().await; - msg_subsystem.add_dispatch::().await; - let msg_subscriber_chunk = channel.subscribe_msg::().await.unwrap(); - let msg_subscriber_notfound = channel.subscribe_msg::().await.unwrap(); - - // Select a chunk to request - let mut chunk = None; - if let Some(random_chunk) = chunks_to_query.iter().choose(&mut OsRng) { - chunk = Some(*random_chunk); - } - - if chunk.is_none() { - // No more chunks to request from this seeder - break; // Switch to another seeder - } - let chunk_hash = chunk.unwrap(); - chunks_to_query.remove(&chunk_hash); - - let send_res = - channel.send(&FudFindRequest { info: Some(*hash), key: chunk_hash }).await; - if let Err(e) = send_res { - warn!(target: "fud::fetch_chunks()", "Error while sending FudFindRequest: {e}"); - break; // Switch to another seeder - } - - let chunk_recv = - msg_subscriber_chunk.receive_with_timeout(self.chunk_timeout).fuse(); - let notfound_recv = - msg_subscriber_notfound.receive_with_timeout(self.chunk_timeout).fuse(); - - pin_mut!(chunk_recv, notfound_recv); - - // Wait for a FudChunkReply or FudNotFound - select! { - chunk_reply = chunk_recv => { - if let Err(e) = chunk_reply { - warn!(target: "fud::fetch_chunks()", "Error waiting for chunk reply: {e}"); - break; // Switch to another seeder - } - let reply = chunk_reply.unwrap(); - - match self.geode.write_chunk(chunked, &reply.chunk).await { - Ok((inserted_hash, bytes_written)) => { - if inserted_hash != chunk_hash { - warn!(target: "fud::fetch_chunks()", "Received chunk does not match requested chunk"); - msg_subscriber_chunk.unsubscribe().await; - msg_subscriber_notfound.unsubscribe().await; - continue; // Skip to next chunk, will retry this chunk later - } - - info!(target: "fud::fetch_chunks()", "Received chunk {} from seeder {}", hash_to_string(&chunk_hash), hash_to_string(&seeder.node.id())); - - // If we did not write the whole chunk to the filesystem, - // save the chunk in the scraps. - if bytes_written < reply.chunk.len() { - info!(target: "fud::fetch_chunks()", "Saving chunk {} as a scrap", hash_to_string(&chunk_hash)); - let chunk_written = self.geode.get_chunk(chunked, &chunk_hash).await?; - if let Err(e) = self.scrap_tree.insert(chunk_hash.as_bytes(), serialize_async(&Scrap { - chunk: reply.chunk.clone(), - hash_written: blake3::hash(&chunk_written), - }).await) { - error!(target: "fud::fetch_chunks()", "Failed to save chunk {} as a scrap: {e}", hash_to_string(&chunk_hash)) - } - } - - // Update resource `chunks_downloaded` and `bytes_downloaded` - let mut resources_write = self.resources.write().await; - let resource = match resources_write.get_mut(hash) { - Some(resource) => { - resource.status = ResourceStatus::Downloading; - resource.total_chunks_downloaded += 1; - resource.target_chunks_downloaded += 1; - - resource.total_bytes_downloaded += reply.chunk.len() as u64; - resource.target_bytes_downloaded += resource.get_selected_bytes(chunked, &reply.chunk) as u64; - resource.speeds.push(reply.chunk.len() as f64 / start_time.elapsed().as_secs_f64()); - if resource.speeds.len() > 12 { - resource.speeds = resource.speeds.split_off(resource.speeds.len() - 12); // Only keep the last 6 speeds - } - - // If we just fetched the last chunk of a file, compute - // `total_bytes_size` (and `target_bytes_size`) again, - // as `geode.write_chunk()` updated the FileSequence - // to the exact file size. - if let Some((last_chunk_hash, _)) = chunked.iter().last() { - if matches!(resource.rtype, ResourceType::File) && *last_chunk_hash == chunk_hash { - resource.total_bytes_size = chunked.get_fileseq().len(); - resource.target_bytes_size = resource.total_bytes_size; - } - } - resource.clone() - } - None => return Ok(()) // Resource was removed, abort - }; - drop(resources_write); - - notify_event!(self, ChunkDownloadCompleted, { hash: *hash, chunk_hash, resource }); - remaining_chunks.remove(&chunk_hash); - } - Err(e) => { - error!(target: "fud::fetch_chunks()", "Failed inserting chunk {} to Geode: {e}", hash_to_string(&chunk_hash)); - } - }; - } - notfound_reply = notfound_recv => { - if let Err(e) = notfound_reply { - warn!(target: "fud::fetch_chunks()", "Error waiting for NOTFOUND reply: {e}"); - msg_subscriber_chunk.unsubscribe().await; - msg_subscriber_notfound.unsubscribe().await; - break; // Switch to another seeder - } - info!(target: "fud::fetch_chunks()", "Received NOTFOUND {} from seeder {}", hash_to_string(&chunk_hash), hash_to_string(&seeder.node.id())); - notify_event!(self, ChunkNotFound, { hash: *hash, chunk_hash }); - } - }; - - msg_subscriber_chunk.unsubscribe().await; - msg_subscriber_notfound.unsubscribe().await; - } - - self.cleanup_channel(channel).await; - - // Stop when there are no missing chunks - if remaining_chunks.is_empty() { - break; - } - } - - Ok(()) - } - - /// Fetch a single resource metadata from `nodes`. - /// If the resource is a file smaller than a single chunk then seeder can send the - /// chunk directly, and we will create the file from it on path `path`. - /// 1. Request seeders from those nodes - /// 2. Request the metadata from the seeders - /// 3. Insert metadata to geode using the reply - pub async fn fetch_metadata( - &self, - hash: &blake3::Hash, - nodes: &Vec, - path: &Path, - ) -> Result<()> { - let mut queried_seeders: HashSet = HashSet::new(); - let mut result: Option = None; - - for node in nodes { - // 1. Request list of seeders - let channel = match self.get_channel(node, Some(*hash)).await { - Ok(channel) => channel, - Err(e) => { - warn!(target: "fud::fetch_metadata()", "Could not get a channel for node {}: {e}", hash_to_string(&node.id())); - continue; - } - }; - let msg_subsystem = channel.message_subsystem(); - msg_subsystem.add_dispatch::().await; - - let msg_subscriber = match channel.subscribe_msg::().await { - Ok(msg_subscriber) => msg_subscriber, - Err(e) => { - warn!(target: "fud::fetch_metadata()", "Error subscribing to msg: {e}"); - continue; - } - }; - - let send_res = channel.send(&FudFindSeedersRequest { key: *hash }).await; - if let Err(e) = send_res { - warn!(target: "fud::fetch_metadata()", "Error while sending FudFindSeedersRequest: {e}"); - msg_subscriber.unsubscribe().await; - self.cleanup_channel(channel).await; - continue; - } - - let reply = match msg_subscriber.receive_with_timeout(self.dht().settings.timeout).await - { - Ok(reply) => reply, - Err(e) => { - warn!(target: "fud::fetch_metadata()", "Error waiting for reply: {e}"); - msg_subscriber.unsubscribe().await; - self.cleanup_channel(channel).await; - continue; - } - }; - - let mut seeders = reply.seeders.clone(); - info!(target: "fud::fetch_metadata()", "Found {} seeders for {} (from {})", seeders.len(), hash_to_string(hash), hash_to_string(&node.id())); - - msg_subscriber.unsubscribe().await; - self.cleanup_channel(channel).await; - - // 2. Request the file/chunk from the seeders - while let Some(seeder) = seeders.pop() { - // Only query a seeder once - if queried_seeders.iter().any(|s| *s == seeder.node.id()) { - continue; - } - queried_seeders.insert(seeder.node.id()); - - let channel = self.get_channel(&seeder.node, Some(*hash)).await; - if let Ok(channel) = channel { - let msg_subsystem = channel.message_subsystem(); - msg_subsystem.add_dispatch::().await; - msg_subsystem.add_dispatch::().await; - msg_subsystem.add_dispatch::().await; - msg_subsystem.add_dispatch::().await; - let msg_subscriber_chunk = - channel.subscribe_msg::().await.unwrap(); - let msg_subscriber_file = - channel.subscribe_msg::().await.unwrap(); - let msg_subscriber_dir = - channel.subscribe_msg::().await.unwrap(); - let msg_subscriber_notfound = - channel.subscribe_msg::().await.unwrap(); - - let send_res = channel.send(&FudFindRequest { info: None, key: *hash }).await; - if let Err(e) = send_res { - warn!(target: "fud::fetch_metadata()", "Error while sending FudFindRequest: {e}"); - msg_subscriber_chunk.unsubscribe().await; - msg_subscriber_file.unsubscribe().await; - msg_subscriber_dir.unsubscribe().await; - msg_subscriber_notfound.unsubscribe().await; - self.cleanup_channel(channel).await; - continue; - } - - let chunk_recv = - msg_subscriber_chunk.receive_with_timeout(self.chunk_timeout).fuse(); - let file_recv = - msg_subscriber_file.receive_with_timeout(self.chunk_timeout).fuse(); - let dir_recv = - msg_subscriber_dir.receive_with_timeout(self.chunk_timeout).fuse(); - let notfound_recv = - msg_subscriber_notfound.receive_with_timeout(self.chunk_timeout).fuse(); - - pin_mut!(chunk_recv, file_recv, dir_recv, notfound_recv); - - let cleanup = async || { - msg_subscriber_chunk.unsubscribe().await; - msg_subscriber_file.unsubscribe().await; - msg_subscriber_dir.unsubscribe().await; - msg_subscriber_notfound.unsubscribe().await; - self.cleanup_channel(channel).await; - }; - - // Wait for a FudChunkReply, FudFileReply, FudDirectoryReply, or FudNotFound - select! { - // Received a chunk while requesting metadata, this is allowed to - // optimize fetching files smaller than a single chunk - chunk_reply = chunk_recv => { - cleanup().await; - if let Err(e) = chunk_reply { - warn!(target: "fud::fetch_metadata()", "Error waiting for chunk reply: {e}"); - continue; - } - let reply = chunk_reply.unwrap(); - let chunk_hash = blake3::hash(&reply.chunk); - // Check that this is the only chunk in the file - if !self.geode.verify_metadata(hash, &[chunk_hash], &[]) { - warn!(target: "fud::fetch_metadata()", "Received a chunk while fetching a file, the chunk did not match the file hash"); - continue; - } - info!(target: "fud::fetch_metadata()", "Received chunk {} (for file {}) from seeder {}", hash_to_string(&chunk_hash), hash_to_string(hash), hash_to_string(&seeder.node.id())); - result = Some(FetchReply::Chunk((*reply).clone())); - break; - } - file_reply = file_recv => { - cleanup().await; - if let Err(e) = file_reply { - warn!(target: "fud::fetch_metadata()", "Error waiting for file reply: {e}"); - continue; - } - let reply = file_reply.unwrap(); - if !self.geode.verify_metadata(hash, &reply.chunk_hashes, &[]) { - warn!(target: "fud::fetch_metadata()", "Received invalid file metadata"); - continue; - } - info!(target: "fud::fetch_metadata()", "Received file {} from seeder {}", hash_to_string(hash), hash_to_string(&seeder.node.id())); - result = Some(FetchReply::File((*reply).clone())); - break; - } - dir_reply = dir_recv => { - cleanup().await; - if let Err(e) = dir_reply { - warn!(target: "fud::fetch_metadata()", "Error waiting for directory reply: {e}"); - continue; - } - let reply = dir_reply.unwrap(); - - // Convert all file paths from String to PathBuf - let files: Vec<_> = reply.files.clone().into_iter() - .map(|(path_str, size)| (PathBuf::from(path_str), size)) - .collect(); - - if !self.geode.verify_metadata(hash, &reply.chunk_hashes, &files) { - warn!(target: "fud::fetch_metadata()", "Received invalid directory metadata"); - continue; - } - info!(target: "fud::fetch_metadata()", "Received directory {} from seeder {}", hash_to_string(hash), hash_to_string(&seeder.node.id())); - result = Some(FetchReply::Directory((*reply).clone())); - break; - } - notfound_reply = notfound_recv => { - cleanup().await; - if let Err(e) = notfound_reply { - warn!(target: "fud::fetch_metadata()", "Error waiting for NOTFOUND reply: {e}"); - continue; - } - info!(target: "fud::fetch_metadata()", "Received NOTFOUND {} from seeder {}", hash_to_string(hash), hash_to_string(&seeder.node.id())); - } - }; - } - } - - if result.is_some() { - break; - } - } - - // We did not find the resource - if result.is_none() { - return Err(Error::GeodeFileRouteNotFound) - } - - // 3. Insert metadata to geode using the reply - // At this point the reply content is already verified - match result.unwrap() { - FetchReply::Directory(FudDirectoryReply { files, chunk_hashes }) => { - // Convert all file paths from String to PathBuf - let mut files: Vec<_> = files - .into_iter() - .map(|(path_str, size)| (PathBuf::from(path_str), size)) - .collect(); - - self.geode.sort_files(&mut files); - if let Err(e) = self.geode.insert_metadata(hash, &chunk_hashes, &files).await { - error!(target: "fud::fetch_metadata()", "Failed inserting directory {} to Geode: {e}", hash_to_string(hash)); - return Err(e) - } - } - FetchReply::File(FudFileReply { chunk_hashes }) => { - if let Err(e) = self.geode.insert_metadata(hash, &chunk_hashes, &[]).await { - error!(target: "fud::fetch_metadata()", "Failed inserting file {} to Geode: {e}", hash_to_string(hash)); - return Err(e) - } - } - // Looked for a file but got a chunk: the entire file fits in a single chunk - FetchReply::Chunk(FudChunkReply { chunk }) => { - info!(target: "fud::fetch_metadata()", "File fits in a single chunk"); - let chunk_hash = blake3::hash(&chunk); - let _ = self.geode.insert_metadata(hash, &[chunk_hash], &[]).await; - let mut chunked_file = ChunkedStorage::new( - &[chunk_hash], - &[(path.to_path_buf(), chunk.len() as u64)], - false, - ); - if let Err(e) = self.geode.write_chunk(&mut chunked_file, &chunk).await { - error!(target: "fud::fetch_metadata()", "Failed inserting chunk {} to Geode: {e}", hash_to_string(&chunk_hash)); - return Err(e) - }; - } - }; - - Ok(()) - } - /// Start downloading a file or directory from the network to `path`. /// This creates a new task in `fetch_tasks` calling `fetch_resource()`. /// `files` is the list of files (relative paths) you want to download @@ -1094,35 +519,35 @@ impl Fud { /// Try to get the chunked file or directory from geode, if we don't have it /// then it is fetched from the network using `fetch_metadata()`. + /// If we need to fetch from the network, the seeders we find are sent to + /// `seeders_pub`. + /// The seeder in the returned result is only defined if we fetched from + /// the network. pub async fn get_metadata( &self, hash: &blake3::Hash, path: &Path, - ) -> Result<(ChunkedStorage, Vec)> { + seeders_pub: PublisherPtr>>, + ) -> Result<(ChunkedStorage, Option)> { match self.geode.get(hash, path).await { // We already know the metadata - Ok(v) => Ok((v, vec![])), + Ok(v) => Ok((v, None)), // The metadata in geode is invalid or corrupted Err(Error::GeodeNeedsGc) => todo!(), // If we could not find the metadata in geode, get it from the network Err(Error::GeodeFileNotFound) => { // Find nodes close to the file hash info!(target: "fud::get_metadata()", "Requested metadata {} not found in Geode, triggering fetch", hash_to_string(hash)); - let closest_nodes = self.lookup_nodes(hash).await.unwrap_or_default(); + let metadata_sub = seeders_pub.clone().subscribe().await; + self.lookup_tx.send((*hash, seeders_pub.clone())).await?; - // Fetch file or directory metadata - match self.fetch_metadata(hash, &closest_nodes, path).await { - // The file metadata was found and inserted into geode - Ok(()) => Ok((self.geode.get(hash, path).await?, closest_nodes)), - // We could not find the metadata, or any other error occured - Err(e) => Err(e), - } - } - - Err(e) => { - error!(target: "fud::get_metadata()", "{e}"); - Err(e) + // Fetch resource metadata + let fetch_res = fetch_metadata(self, hash, &metadata_sub, path).await; + metadata_sub.unsubscribe().await; + let seeder = fetch_res?; + Ok((self.geode.get(hash, path).await?, Some(seeder))) } + Err(e) => Err(e), } } @@ -1134,8 +559,6 @@ impl Fud { path: &Path, files: &FileSelection, ) -> Result<()> { - let self_node = self.node().await; - let hash_bytes = hash.as_bytes(); let path_string = path.to_string_lossy().to_string(); let path_bytes = path_string.as_bytes(); @@ -1196,16 +619,19 @@ impl Fud { // Send a DownloadStarted event notify_event!(self, DownloadStarted, resource); - // Try to get the chunked file or directory from geode or the network - let (mut chunked, mut closest_nodes) = match self.get_metadata(hash, path).await { - Ok(chunked) => chunked, - Err(e) => { - // Set resource status to `Incomplete` and send a `MetadataNotFound` event - let resource = update_resource!(hash, { status = ResourceStatus::Incomplete }); - notify_event!(self, MetadataNotFound, resource); - return Err(e); - } - }; + let seeders_pub = Publisher::new(); + let seeders_sub = seeders_pub.clone().subscribe().await; + + // Try to get the chunked file or directory from geode + let metadata_result = self.get_metadata(hash, path, seeders_pub.clone()).await; + + if let Err(e) = metadata_result { + // Set resource status to `Incomplete` and send a `MetadataNotFound` event + let resource = update_resource!(hash, { status = ResourceStatus::Incomplete }); + notify_event!(self, MetadataNotFound, resource); + return Err(e) + } + let (mut chunked, metadata_seeder) = metadata_result.unwrap(); // Get a list of all file paths the user wants to fetch let resources_read = self.resources.read().await; @@ -1217,14 +643,7 @@ impl Fud { drop(resources_read); // Create all files (and all necessary directories) - for file_path in files_vec.iter() { - if !file_path.exists() { - if let Some(dir) = file_path.parent() { - fs::create_dir_all(dir).await?; - } - File::create(&file_path).await?; - } - } + create_all_files(&files_vec).await?; // Set resource status to `Verifying` and send a `MetadataDownloadCompleted` event let resource = update_resource!(hash, { @@ -1276,7 +695,7 @@ impl Fud { chunked.iter().filter(|(hash, _)| chunk_hashes.contains(hash)).cloned().collect(); // Set of the chunks we need to download - let missing_chunks: HashSet = + let mut missing_chunks: HashSet = chunks.iter().filter(|&(_, available)| !available).map(|(chunk, _)| *chunk).collect(); // Update the resource with the chunks/bytes counts @@ -1291,27 +710,33 @@ impl Fud { target_bytes_downloaded = target_bytes_downloaded, }); - // If we don't need to download any chunk - if missing_chunks.is_empty() { + let download_completed = async |chunked: &ChunkedStorage| -> Result<()> { // Set resource status to `Seeding` or `Incomplete` let resource = update_resource!(hash, { status = match chunked.is_complete() { true => ResourceStatus::Seeding, false => ResourceStatus::Incomplete, - } + }, + target_chunks_downloaded = chunks.len() as u64, + total_chunks_downloaded = chunked.local_chunks() as u64, }); // Announce the resource if we have all chunks if chunked.is_complete() { - let self_announce = - FudAnnounce { key: *hash, seeders: vec![self_node.clone().into()] }; - let _ = self.announce(hash, &self_announce, self.seeders_router.clone()).await; + let seeders = vec![self.new_seeder(hash).await]; + let self_announce = FudAnnounce { key: *hash, seeders: seeders.clone() }; + let _ = self.dht.announce(hash, &seeders, &self_announce).await; } // Send a DownloadCompleted event notify_event!(self, DownloadCompleted, resource); - return Ok(()); + Ok(()) + }; + + // If we don't need to download any chunk + if missing_chunks.is_empty() { + return download_completed(&chunked).await; } // Set resource status to `Downloading` and send a MetadataDownloadCompleted event @@ -1320,25 +745,24 @@ impl Fud { }); notify_event!(self, MetadataDownloadCompleted, resource); - // Find nodes close to the file hash if we didn't previously fetched them - if closest_nodes.is_empty() { - closest_nodes = self.lookup_nodes(hash).await.unwrap_or_default(); + // Start looking up seeders if we did not need to do it for the metadata + if metadata_seeder.is_none() { + self.lookup_tx.send((*hash, seeders_pub)).await?; } - // Find seeders and remove ourselves from the result - let seeders = self.fetch_seeders(&closest_nodes, hash).await; - // Fetch missing chunks from seeders - self.fetch_chunks(hash, &mut chunked, &seeders, &missing_chunks).await?; + let _ = fetch_chunks( + self, + hash, + &mut chunked, + &seeders_sub, + metadata_seeder, + &mut missing_chunks, + ) + .await; // Get chunked file from geode - let mut chunked = match self.geode.get(hash, path).await { - Ok(v) => v, - Err(e) => { - error!(target: "fud::fetch_resource()", "{e}"); - return Err(e); - } - }; + let mut chunked = self.geode.get(hash, path).await?; // Set resource status to `Verifying` and send FudEvent::ResourceUpdated let resource = update_resource!(hash, { status = ResourceStatus::Verifying }); @@ -1364,26 +788,7 @@ impl Fud { return Ok(()); } - // Set resource status to `Seeding` or `Incomplete` - let resource = update_resource!(hash, { - status = match chunked.is_complete() { - true => ResourceStatus::Seeding, - false => ResourceStatus::Incomplete, - }, - target_chunks_downloaded = chunks.len() as u64, - total_chunks_downloaded = chunked.local_chunks() as u64, - }); - - // Announce the resource if we have all chunks - if chunked.is_complete() { - let self_announce = FudAnnounce { key: *hash, seeders: vec![self_node.clone().into()] }; - let _ = self.announce(hash, &self_announce, self.seeders_router.clone()).await; - } - - // Send a DownloadCompleted event - notify_event!(self, DownloadCompleted, resource); - - Ok(()) + download_completed(&chunked).await } async fn write_scraps( @@ -1646,8 +1051,9 @@ impl Fud { drop(resources_write); // Announce the new resource - let fud_announce = FudAnnounce { key: hash, seeders: vec![self_node.into()] }; - let _ = self.announce(&hash, &fud_announce, self.seeders_router.clone()).await; + let seeders = vec![self.new_seeder(&hash).await]; + let fud_announce = FudAnnounce { key: hash, seeders: seeders.clone() }; + let _ = self.dht.announce(&hash, &seeders, &fud_announce).await; // Send InsertCompleted event notify_event!(self, InsertCompleted, { @@ -1698,6 +1104,22 @@ impl Fud { notify_event!(self, ResourceRemoved, { hash: *hash }); } + /// Remove seeders that are older than `expiry_secs` + pub async fn prune_seeders(&self, expiry_secs: u32) { + let expiry_timestamp = Timestamp::current_time().inner() - (expiry_secs as u64); + let mut seeders_write = self.dht.hash_table.write().await; + + let keys: Vec<_> = seeders_write.keys().cloned().collect(); + + for key in keys { + let items = seeders_write.get_mut(&key).unwrap(); + items.retain(|item| item.timestamp > expiry_timestamp); + if items.is_empty() { + seeders_write.remove(&key); + } + } + } + /// Stop all tasks. pub async fn stop(&self) { info!("Stopping fetch tasks..."); @@ -1707,23 +1129,30 @@ impl Fud { fetch_tasks.iter().map(|(key, value)| (*key, value.clone())).collect(); drop(fetch_tasks); - // Stop all fetch tasks for task in cloned_fetch_tasks.values() { task.stop().await; } info!("Stopping put tasks..."); - // Create a clone of put_tasks because `task.stop()` needs a write lock let put_tasks = self.put_tasks.read().await; let cloned_put_tasks: HashMap> = put_tasks.iter().map(|(key, value)| (key.clone(), value.clone())).collect(); drop(put_tasks); - // Stop all put tasks for task in cloned_put_tasks.values() { task.stop().await; } + info!("Stopping lookup tasks..."); + let lookup_tasks = self.lookup_tasks.read().await; + let cloned_lookup_tasks: HashMap> = + lookup_tasks.iter().map(|(key, value)| (*key, value.clone())).collect(); + drop(lookup_tasks); + + for task in cloned_lookup_tasks.values() { + task.stop().await; + } + // Stop all other tasks let mut tasks = self.tasks.write().await; for (name, task) in tasks.clone() { diff --git a/bin/fud/fud/src/main.rs b/bin/fud/fud/src/main.rs index 00b051bec..e26ed8c26 100644 --- a/bin/fud/fud/src/main.rs +++ b/bin/fud/fud/src/main.rs @@ -85,7 +85,7 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> { let event_pub = Publisher::new(); let fud: Arc = - Arc::new(Fud::new(args_, p2p.clone(), &sled_db, event_pub.clone(), ex.clone()).await?); + Fud::new(args_, p2p.clone(), &sled_db, event_pub.clone(), ex.clone()).await?; fud.start_tasks().await; diff --git a/bin/fud/fud/src/proto.rs b/bin/fud/fud/src/proto.rs index 8c3330a7c..28281127d 100644 --- a/bin/fud/fud/src/proto.rs +++ b/bin/fud/fud/src/proto.rs @@ -22,7 +22,7 @@ use smol::Executor; use std::{path::StripPrefixError, sync::Arc}; use darkfi::{ - dht::{DhtHandler, DhtRouterItem}, + dht::DhtHandler, geode::hash_to_string, impl_p2p_message, net::{ @@ -35,7 +35,10 @@ use darkfi::{ use darkfi_sdk::crypto::schnorr::{SchnorrSecret, Signature}; use darkfi_serial::{SerialDecodable, SerialEncodable}; -use super::{Fud, FudNode}; +use crate::{ + dht::{FudNode, FudSeeder}, + Fud, +}; /// Message representing a file reply from the network #[derive(Debug, Clone, SerialEncodable, SerialDecodable)] @@ -56,7 +59,7 @@ impl_p2p_message!(FudDirectoryReply, "FudDirectoryReply", 0, 0, DEFAULT_METERING #[derive(Debug, Clone, SerialEncodable, SerialDecodable)] pub struct FudAnnounce { pub key: blake3::Hash, - pub seeders: Vec>, + pub seeders: Vec, } impl_p2p_message!(FudAnnounce, "FudAnnounce", 0, 0, DEFAULT_METERING_CONFIGURATION); @@ -127,7 +130,8 @@ impl_p2p_message!( /// Message representing a find seeders reply on the network #[derive(Debug, Clone, SerialEncodable, SerialDecodable)] pub struct FudFindSeedersReply { - pub seeders: Vec>, + pub seeders: Vec, + pub nodes: Vec, } impl_p2p_message!(FudFindSeedersReply, "FudFindSeedersReply", 0, 0, DEFAULT_METERING_CONFIGURATION); @@ -216,7 +220,7 @@ impl ProtocolFud { let node = self.fud.dht().get_node_from_channel(self.channel.info.id).await; if let Some(node) = node { - self.fud.update_node(&node).await; + self.fud.dht.update_node(&node).await; } if self.handle_fud_chunk_request(&request).await { @@ -359,7 +363,7 @@ impl ProtocolFud { let node = self.fud.dht().get_node_from_channel(self.channel.info.id).await; if let Some(node) = node { - self.fud.update_node(&node).await; + self.fud.dht.update_node(&node).await; } let reply = FudFindNodesReply { @@ -388,21 +392,38 @@ impl ProtocolFud { let node = self.fud.dht().get_node_from_channel(self.channel.info.id).await; if let Some(node) = node { - self.fud.update_node(&node).await; + self.fud.dht.update_node(&node).await; } - let router = self.fud.seeders_router.read().await; + let router = self.fud.dht.hash_table.read().await; let peers = router.get(&request.key); match peers { Some(seeders) => { let _ = self .channel - .send(&FudFindSeedersReply { seeders: seeders.iter().cloned().collect() }) + .send(&FudFindSeedersReply { + seeders: seeders.to_vec(), + nodes: self + .fud + .dht() + .find_neighbors(&request.key, self.fud.dht().settings.k) + .await, + }) .await; } None => { - let _ = self.channel.send(&FudFindSeedersReply { seeders: vec![] }).await; + let _ = self + .channel + .send(&FudFindSeedersReply { + seeders: vec![], + nodes: self + .fud + .dht() + .find_neighbors(&request.key, self.fud.dht().settings.k) + .await, + }) + .await; } }; } @@ -424,7 +445,7 @@ impl ProtocolFud { let node = self.fud.dht().get_node_from_channel(self.channel.info.id).await; if let Some(node) = node { - self.fud.update_node(&node).await; + self.fud.dht.update_node(&node).await; } let mut seeders = vec![]; @@ -437,7 +458,7 @@ impl ProtocolFud { seeders.push(seeder); } - self.fud.add_to_router(self.fud.seeders_router.clone(), &request.key, seeders).await; + self.fud.add_value(&request.key, &seeders).await; } } } diff --git a/bin/fud/fud/src/rpc.rs b/bin/fud/fud/src/rpc.rs index 22a77dd5c..224980fb3 100644 --- a/bin/fud/fud/src/rpc.rs +++ b/bin/fud/fud/src/rpc.rs @@ -89,8 +89,8 @@ impl JsonRpcInterface { } // RPCAPI: - // Put a file onto the network. Takes a local filesystem path as a parameter. - // Returns the file hash that serves as a pointer to the uploaded file. + // Put a file/directory onto the network. Takes a local filesystem path as a parameter. + // Returns the resource hash that serves as a pointer to the file/directory. // // --> {"jsonrpc": "2.0", "method": "put", "params": ["/foo.txt"], "id": 42} // <-- {"jsonrpc": "2.0", "result: "df4...3db7", "id": 42} @@ -290,8 +290,8 @@ impl JsonRpcInterface { if !params.is_empty() { return JsonError::new(ErrorCode::InvalidParams, None, id).into() } - let mut seeders_router: HashMap = HashMap::new(); - for (hash, items) in self.fud.seeders_router.read().await.iter() { + let mut seeders_table: HashMap = HashMap::new(); + for (hash, items) in self.fud.dht.hash_table.read().await.iter() { let mut nodes = vec![]; for item in items { let mut addresses = vec![]; @@ -303,10 +303,10 @@ impl JsonRpcInterface { JsonValue::Array(addresses), ])); } - seeders_router.insert(hash_to_string(hash), JsonValue::Array(nodes)); + seeders_table.insert(hash_to_string(hash), JsonValue::Array(nodes)); } let mut res: HashMap = HashMap::new(); - res.insert("seeders".to_string(), JsonValue::Object(seeders_router)); + res.insert("seeders".to_string(), JsonValue::Object(seeders_table)); JsonResponse::new(JsonValue::Object(res), id).into() } diff --git a/bin/fud/fud/src/tasks.rs b/bin/fud/fud/src/tasks.rs index 6121dcbfd..4fcedc69f 100644 --- a/bin/fud/fud/src/tasks.rs +++ b/bin/fud/fud/src/tasks.rs @@ -16,7 +16,7 @@ * along with this program. If not, see . */ -use std::{collections::HashMap, sync::Arc}; +use std::sync::Arc; use log::{error, info, warn}; @@ -27,18 +27,7 @@ use darkfi::{ Error, Result, }; -use crate::{ - event, - event::notify_event, - proto::{FudAnnounce, FudChunkReply, FudDirectoryReply, FudFileReply}, - Fud, FudEvent, -}; - -pub enum FetchReply { - Directory(FudDirectoryReply), - File(FudFileReply), - Chunk(FudChunkReply), -} +use crate::{event, event::notify_event, proto::FudAnnounce, Fud, FudEvent}; /// Triggered when calling the `fud.get()` method. /// It creates a new StoppableTask (running `fud.fetch_resource()`) and inserts @@ -64,6 +53,13 @@ pub async fn get_task(fud: Arc) -> Result<()> { // stopped (error, manually, or just done). let mut fetch_tasks = fud_2.fetch_tasks.write().await; fetch_tasks.remove(&hash); + + // If there is still a lookup task for this hash, stop it + let lookup_tasks = fud_2.lookup_tasks.read().await; + if let Some(lookup_task) = lookup_tasks.get(&hash) { + lookup_task.stop().await; + } + match res { Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ } Err(e) => { @@ -124,6 +120,41 @@ pub async fn put_task(fud: Arc) -> Result<()> { } } +/// Triggered when you need to lookup seeders for a resource. +pub async fn lookup_task(fud: Arc) -> Result<()> { + loop { + let (key, seeders_pub) = fud.lookup_rx.recv().await.unwrap(); + + let mut lookup_tasks = fud.lookup_tasks.write().await; + let task = StoppableTask::new(); + lookup_tasks.insert(key, task.clone()); + drop(lookup_tasks); + + let fud_1 = fud.clone(); + let fud_2 = fud.clone(); + task.start( + async move { + fud_1.dht.lookup_value(&key, seeders_pub).await?; + Ok(()) + }, + move |res| async move { + // Remove the task from the `fud.lookup_tasks` hashmap once it is + // stopped (error, manually, or just done). + let mut lookup_tasks = fud_2.lookup_tasks.write().await; + lookup_tasks.remove(&key); + match res { + Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ } + Err(e) => { + error!(target: "dht::lookup_task()", "Error in DHT lookup task: {e}"); + } + } + }, + Error::DetachedTaskStopped, + fud.executor.clone(), + ); + } +} + /// Background task that announces our files once every hour. /// Also removes seeders that did not announce for too long. pub async fn announce_seed_task(fud: Arc) -> Result<()> { @@ -132,8 +163,6 @@ pub async fn announce_seed_task(fud: Arc) -> Result<()> { loop { sleep(interval).await; - let seeders = vec![fud.node().await.into()]; - info!(target: "fud::announce_seed_task()", "Verifying seeds..."); let seeding_resources = match fud.verify_resources(None).await { Ok(resources) => resources, @@ -145,17 +174,19 @@ pub async fn announce_seed_task(fud: Arc) -> Result<()> { info!(target: "fud::announce_seed_task()", "Announcing files..."); for resource in seeding_resources { + let seeders = vec![fud.new_seeder(&resource.hash).await]; let _ = fud + .dht .announce( &resource.hash, - &FudAnnounce { key: resource.hash, seeders: seeders.clone() }, - fud.seeders_router.clone(), + &seeders.clone(), + &FudAnnounce { key: resource.hash, seeders }, ) .await; } info!(target: "fud::announce_seed_task()", "Pruning seeders..."); - fud.dht().prune_router(fud.seeders_router.clone(), interval.try_into().unwrap()).await; + fud.prune_seeders(interval.try_into().unwrap()).await; } } @@ -199,11 +230,11 @@ pub async fn node_id_task(fud: Arc) -> Result<()> { drop(buckets); // Removes nodes in the seeders router with unknown BTC block hashes - let mut seeders_router = fud.seeders_router.write().await; - for (key, seeders) in seeders_router.iter_mut() { - for seeder in seeders.clone().iter() { + let mut seeders_table = fud.dht.hash_table.write().await; + for (key, seeders) in seeders_table.iter_mut() { + for (i, seeder) in seeders.clone().iter().enumerate().rev() { if !btc.block_hashes.contains(&seeder.node.data.btc_block_hash) { - seeders.remove(seeder); + seeders.remove(i); info!(target: "fud::node_id_task()", "Removed node {} from the seeders of key {} (BTC block hash too old or unknown)", hash_to_string(&seeder.node.id()), hash_to_string(key)); } } @@ -232,12 +263,9 @@ pub async fn node_id_task(fud: Arc) -> Result<()> { } drop(channel_cache); - // Reset the DHT + // Reset the DHT: removes known nodes and seeders dht.reset().await; - // Reset the seeders router - *fud.seeders_router.write().await = HashMap::new(); - // Update our node data and our secret key *fud.node_data.write().await = node_data; *fud.secret_key.write().await = secret_key; diff --git a/bin/fud/fud/src/util.rs b/bin/fud/fud/src/util.rs index 2dd850379..42c44ca57 100644 --- a/bin/fud/fud/src/util.rs +++ b/bin/fud/fud/src/util.rs @@ -16,7 +16,10 @@ * along with this program. If not, see . */ -use smol::{fs, stream::StreamExt}; +use smol::{ + fs::{self, File}, + stream::StreamExt, +}; use std::{ collections::HashSet, path::{Path, PathBuf}, @@ -45,6 +48,19 @@ pub async fn get_all_files(dir: &Path) -> Result> { Ok(files) } +pub async fn create_all_files(files: &[PathBuf]) -> Result<()> { + for file_path in files.iter() { + if !file_path.exists() { + if let Some(dir) = file_path.parent() { + fs::create_dir_all(dir).await?; + } + File::create(&file_path).await?; + } + } + + Ok(()) +} + /// An enum to represent a set of files, where you can use `All` if you want /// all files without having to specify all of them. /// We could use an `Option>`, but this is more explicit. diff --git a/doc/src/SUMMARY.md b/doc/src/SUMMARY.md index 5cb2fdc18..f7680ebab 100644 --- a/doc/src/SUMMARY.md +++ b/doc/src/SUMMARY.md @@ -23,6 +23,7 @@ - [Tor Nodes](misc/nodes/tor-guide.md) - [I2p Nodes](misc/nodes/i2p-guide.md) - [Nym Nodes](misc/nodes/nym-guide.md) + - [Whonix Setup](misc/nodes/whonix-guide.md) - [Network Troubleshooting](misc/network-troubleshooting.md) - [Merge Mining](testnet/merge-mining.md) diff --git a/src/dht/handler.rs b/src/dht/handler.rs index 84f5b57fd..d5704662b 100644 --- a/src/dht/handler.rs +++ b/src/dht/handler.rs @@ -16,439 +16,48 @@ * along with this program. If not, see . */ -use async_trait::async_trait; -use futures::stream::FuturesUnordered; -use log::{debug, info, warn}; -use num_bigint::BigUint; -use smol::{lock::Semaphore, stream::StreamExt}; use std::{ - collections::{HashMap, HashSet}, - marker::Sync, + marker::{Send, Sync}, sync::Arc, - time::Duration, }; -use super::{ChannelCacheItem, Dht, DhtNode, DhtRouterItem, DhtRouterPtr}; -use crate::{ - geode::hash_to_string, - net::{ - connector::Connector, - session::{Session, SESSION_REFINE, SESSION_SEED}, - ChannelPtr, Message, - }, - system::timeout::timeout, - Error, Result, -}; +use async_trait::async_trait; +use super::{Dht, DhtLookupReply, DhtNode}; +use crate::{net::ChannelPtr, Result}; + +/// Trait for application-specific behaviors over a [`Dht`] #[async_trait] -pub trait DhtHandler: Sync { - fn dht(&self) -> Arc>; +pub trait DhtHandler: Send + Sync + Sized { + type Value: Clone; + type Node: DhtNode; + + /// The [`Dht`] instance + fn dht(&self) -> Arc>; /// Get our own node - async fn node(&self) -> N; + async fn node(&self) -> Self::Node; - /// Send a DHT ping request - async fn ping(&self, channel: ChannelPtr) -> Result; + /// Send a DHT ping request, which is used to know the node data of a peer + /// (and most importantly, its ID/key in the DHT keyspace) + async fn ping(&self, channel: ChannelPtr) -> Result; /// Triggered when we find a new node - async fn on_new_node(&self, node: &N) -> Result<()>; + async fn on_new_node(&self, node: &Self::Node) -> Result<()>; /// Send FIND NODES request to a peer to get nodes close to `key` - async fn fetch_nodes(&self, node: &N, key: &blake3::Hash) -> Result>; + async fn find_nodes(&self, node: &Self::Node, key: &blake3::Hash) -> Result>; - /// Announce message for a key, and add ourselves to router - async fn announce( + /// Send FIND VALUE request to a peer to get a value and/or nodes close to `key` + async fn find_value( &self, + node: &Self::Node, key: &blake3::Hash, - message: &M, - router: DhtRouterPtr, - ) -> Result<()> - where - N: 'async_trait, - { - let self_node = self.node().await; - if self_node.addresses().is_empty() { - return Err(().into()); // TODO - } + ) -> Result>; - self.add_to_router(router.clone(), key, vec![self_node.clone().into()]).await; - let nodes = self.lookup_nodes(key).await?; - info!(target: "dht::DhtHandler::announce()", "Announcing {} to {} nodes", hash_to_string(key), nodes.len()); + /// Add a value to our hash table + async fn add_value(&self, key: &blake3::Hash, value: &Self::Value); - for node in nodes { - let channel_res = self.get_channel(&node, None).await; - if let Ok(channel) = channel_res { - let _ = channel.send(message).await; - self.cleanup_channel(channel).await; - } - } - - Ok(()) - } - - /// Lookup our own node id to bootstrap our DHT - async fn bootstrap(&self) { - self.dht().set_bootstrapped(true).await; - - let self_node_id = self.node().await.id(); - debug!(target: "dht::DhtHandler::bootstrap()", "DHT bootstrapping {}", hash_to_string(&self_node_id)); - let nodes = self.lookup_nodes(&self_node_id).await; - - if nodes.is_err() || nodes.map_or(true, |v| v.is_empty()) { - self.dht().set_bootstrapped(false).await; - } - } - - /// Add a node in the correct bucket - async fn add_node(&self, node: N) - where - N: 'async_trait, - { - let self_node = self.node().await; - - // Do not add ourselves to the buckets - if node.id() == self_node.id() { - return; - } - - // Don't add this node if it has any external address that is the same as one of ours - let node_addresses = node.addresses(); - if self_node.addresses().iter().any(|addr| node_addresses.contains(addr)) { - return; - } - - // Do not add a node to the buckets if it does not have an address - if node.addresses().is_empty() { - return; - } - - let bucket_index = self.dht().get_bucket_index(&self.node().await.id(), &node.id()).await; - let buckets_lock = self.dht().buckets.clone(); - let mut buckets = buckets_lock.write().await; - let bucket = &mut buckets[bucket_index]; - - // Node is already in the bucket - if bucket.nodes.iter().any(|n| n.id() == node.id()) { - return; - } - - // Bucket is full - if bucket.nodes.len() >= self.dht().settings.k { - // Ping the least recently seen node - if let Ok(channel) = self.get_channel(&bucket.nodes[0], None).await { - let ping_res = self.ping(channel.clone()).await; - self.cleanup_channel(channel).await; - if ping_res.is_ok() { - // Ping was successful, move the least recently seen node to the tail - let n = bucket.nodes.remove(0); - bucket.nodes.push(n); - return; - } - } - - // Ping was not successful, remove the least recently seen node and add the new node - bucket.nodes.remove(0); - bucket.nodes.push(node); - return; - } - - // Bucket is not full - bucket.nodes.push(node); - } - - /// Move a node to the tail in its bucket, - /// to show that it is the most recently seen in the bucket. - /// If the node is not in a bucket it will be added using `add_node` - async fn update_node(&self, node: &N) { - let bucket_index = self.dht().get_bucket_index(&self.node().await.id(), &node.id()).await; - let buckets_lock = self.dht().buckets.clone(); - let mut buckets = buckets_lock.write().await; - let bucket = &mut buckets[bucket_index]; - - let node_index = bucket.nodes.iter().position(|n| n.id() == node.id()); - if node_index.is_none() { - drop(buckets); - self.add_node(node.clone()).await; - return; - } - - let n = bucket.nodes.remove(node_index.unwrap()); - bucket.nodes.push(n); - } - - /// Wait to acquire a semaphore, then run `self.fetch_nodes`. - /// This is meant to be used in `lookup_nodes`. - async fn fetch_nodes_sp( - &self, - semaphore: Arc, - node: N, - key: &blake3::Hash, - ) -> (N, Result>) - where - N: 'async_trait, - { - let _permit = semaphore.acquire().await; - (node.clone(), self.fetch_nodes(&node, key).await) - } - - /// Find `k` nodes closest to a key - async fn lookup_nodes(&self, key: &blake3::Hash) -> Result> { - info!(target: "dht::DhtHandler::lookup_nodes()", "Starting node lookup for key {}", bs58::encode(key.as_bytes()).into_string()); - - let self_node_id = self.node().await.id(); - let k = self.dht().settings.k; - let a = self.dht().settings.alpha; - let semaphore = Arc::new(Semaphore::new(self.dht().settings.concurrency)); - let mut futures = FuturesUnordered::new(); - - // Nodes we did not send a request to (yet), sorted by distance from `key` - let mut nodes_to_visit = self.dht().find_neighbors(key, k).await; - // Nodes with a pending request or a request completed - let mut visited_nodes = HashSet::::new(); - // Nodes that responded to our request, sorted by distance from `key` - let mut result = Vec::::new(); - - // Create the first `alpha` tasks - for _ in 0..a { - match nodes_to_visit.pop() { - Some(node) => { - visited_nodes.insert(node.id()); - futures.push(self.fetch_nodes_sp(semaphore.clone(), node, key)); - } - None => { - break; - } - } - } - - while let Some((queried_node, value_result)) = futures.next().await { - match value_result { - Ok(mut nodes) => { - info!(target: "dht::DhtHandler::lookup_nodes()", "Queried {}, got {} nodes", hash_to_string(&queried_node.id()), nodes.len()); - - // Remove ourselves and already known nodes from the new nodes - nodes.retain(|node| { - node.id() != self_node_id && - !visited_nodes.contains(&node.id()) && - !nodes_to_visit.iter().any(|n| n.id() == node.id()) - }); - - // Add new nodes to our buckets - for node in nodes.clone() { - self.add_node(node).await; - } - - // Add nodes to the list of nodes to visit - nodes_to_visit.extend(nodes.clone()); - self.dht().sort_by_distance(&mut nodes_to_visit, key); - - // Update nearest_nodes - result.push(queried_node.clone()); - self.dht().sort_by_distance(&mut result, key); - - // Early termination check: - // Stop if our furthest visited node is closer than the closest node we will query, - // and we already have `k` or more nodes in the result set - if result.len() >= k { - if let Some(furthest) = result.last() { - if let Some(next_node) = nodes_to_visit.first() { - let furthest_dist = BigUint::from_bytes_be( - &self.dht().distance(key, &furthest.id()), - ); - let next_dist = BigUint::from_bytes_be( - &self.dht().distance(key, &next_node.id()), - ); - if furthest_dist < next_dist { - info!(target: "dht::DhtHandler::lookup_nodes()", "Early termination for lookup nodes"); - break; - } - } - } - } - - // Create the `alpha` tasks - for _ in 0..a { - match nodes_to_visit.pop() { - Some(node) => { - visited_nodes.insert(node.id()); - futures.push(self.fetch_nodes_sp(semaphore.clone(), node, key)); - } - None => { - break; - } - } - } - } - Err(e) => { - warn!(target: "dht::DhtHandler::lookup_nodes", "Error looking for nodes: {e}"); - } - } - } - - result.truncate(k); - return Ok(result.to_vec()) - } - - /// Get a channel (existing or create a new one) to `node` about `topic`. - /// Don't forget to call `cleanup_channel()` once you are done with it. - async fn get_channel(&self, node: &N, topic: Option) -> Result { - let channel_cache_lock = self.dht().channel_cache.clone(); - let mut channel_cache = channel_cache_lock.write().await; - - // Get existing channels for this node, regardless of topic - let channels: HashMap> = channel_cache - .iter() - .filter(|&(_, item)| item.node == *node) - .map(|(&key, item)| (key, item.clone())) - .collect(); - - let (channel_id, topic, usage_count) = - // If we already have a channel for this node and topic, use it - if let Some((cid, cached)) = channels.iter().find(|&(_, c)| c.topic == topic) { - (Some(*cid), cached.topic, cached.usage_count) - } - // If we have a topicless channel for this node, use it - else if let Some((cid, cached)) = channels.iter().find(|&(_, c)| c.topic.is_none()) { - (Some(*cid), topic, cached.usage_count) - } - // If we don't need any specific topic, use the first channel we have - else if topic.is_none() { - match channels.iter().next() { - Some((cid, cached)) => (Some(*cid), cached.topic, cached.usage_count), - _ => (None, topic, 0), - } - } - // There is no existing channel we can use, we will create one - else { - (None, topic, 0) - }; - - // If we found an existing channel we can use, try to use it - if let Some(channel_id) = channel_id { - if let Some(channel) = self.dht().p2p.get_channel(channel_id) { - if channel.session_type_id() & (SESSION_SEED | SESSION_REFINE) != 0 { - return Err(Error::Custom( - "Could not get a channel (for DHT) as this is a seed or refine session" - .to_string(), - )); - } - - if channel.is_stopped() { - channel.clone().start(self.dht().executor.clone()); - } - - channel_cache.insert( - channel_id, - ChannelCacheItem { node: node.clone(), topic, usage_count: usage_count + 1 }, - ); - return Ok(channel); - } - } - - drop(channel_cache); - - // Create a channel - for addr in node.addresses().clone() { - let session_out = self.dht().p2p.session_outbound(); - let session_weak = Arc::downgrade(&self.dht().p2p.session_outbound()); - - let connector = Connector::new(self.dht().p2p.settings(), session_weak); - let dur = Duration::from_secs(self.dht().settings.timeout); - let Ok(connect_res) = timeout(dur, connector.connect(&addr)).await else { - warn!(target: "dht::DhtHandler::get_channel()", "Timeout trying to connect to {addr}"); - return Err(Error::ConnectTimeout); - }; - if connect_res.is_err() { - warn!(target: "dht::DhtHandler::get_channel()", "Error while connecting to {addr}: {}", connect_res.unwrap_err()); - continue; - } - let (_, channel) = connect_res.unwrap(); - - if channel.session_type_id() & (SESSION_SEED | SESSION_REFINE) != 0 { - return Err(Error::Custom( - "Could not create a channel (for DHT) as this is a seed or refine session" - .to_string(), - )); - } - - let register_res = - session_out.register_channel(channel.clone(), self.dht().executor.clone()).await; - if register_res.is_err() { - channel.clone().stop().await; - warn!(target: "dht::DhtHandler::get_channel()", "Error while registering channel {}: {}", channel.info.id, register_res.unwrap_err()); - continue; - } - - let mut channel_cache = channel_cache_lock.write().await; - channel_cache.insert( - channel.info.id, - ChannelCacheItem { node: node.clone(), topic, usage_count: 1 }, - ); - - return Ok(channel) - } - - Err(Error::Custom("Could not create channel".to_string())) - } - - /// Decrement the channel usage count, if it becomes 0 then set the topic - /// to None, so that this channel is available for another task - async fn cleanup_channel(&self, channel: ChannelPtr) { - let channel_cache_lock = self.dht().channel_cache.clone(); - let mut channel_cache = channel_cache_lock.write().await; - - if let Some(cached) = channel_cache.get_mut(&channel.info.id) { - if cached.usage_count > 0 { - cached.usage_count -= 1; - } - - // If the channel is not used by anything, remove the topic - if cached.usage_count == 0 { - cached.topic = None; - } - } - } - - /// Add nodes as a provider for a key - async fn add_to_router( - &self, - router: DhtRouterPtr, - key: &blake3::Hash, - router_items: Vec>, - ) where - N: 'async_trait, - { - let mut router_items = router_items.clone(); - router_items.retain(|item| !item.node.addresses().is_empty()); - - debug!(target: "dht::DhtHandler::add_to_router()", "Inserting {} nodes to key {}", router_items.len(), bs58::encode(key.as_bytes()).into_string()); - - let mut router_write = router.write().await; - let key_r = router_write.get_mut(key); - - let router_cache_lock = self.dht().router_cache.clone(); - let mut router_cache = router_cache_lock.write().await; - - // Add to router - if let Some(k) = key_r { - k.retain(|it| !router_items.contains(it)); - k.extend(router_items.clone()); - } else { - let mut hs = HashSet::new(); - hs.extend(router_items.clone()); - router_write.insert(*key, hs); - } - - // Add to router_cache - for router_item in router_items { - let keys = router_cache.get_mut(&router_item.node.id()); - if let Some(k) = keys { - k.insert(*key); - } else { - let mut keys = HashSet::new(); - keys.insert(*key); - router_cache.insert(router_item.node.id(), keys); - } - } - } + /// Defines how keys are printed/logged + fn key_to_string(key: &blake3::Hash) -> String; } diff --git a/src/dht/mod.rs b/src/dht/mod.rs index 09bdfb981..0ce818a72 100644 --- a/src/dht/mod.rs +++ b/src/dht/mod.rs @@ -20,19 +20,30 @@ use std::{ cmp::Eq, collections::{HashMap, HashSet}, fmt::Debug, - hash::{Hash, Hasher}, + hash::Hash, marker::{Send, Sync}, - sync::Arc, + sync::{Arc, Weak}, + time::Duration, }; -use async_trait::async_trait; +use futures::stream::FuturesUnordered; +use log::{debug, info, warn}; use num_bigint::BigUint; -use smol::lock::RwLock; +use smol::{ + lock::{RwLock, Semaphore}, + stream::StreamExt, +}; use url::Url; -use darkfi_serial::{SerialDecodable, SerialEncodable}; - -use crate::{net::P2pPtr, system::ExecutorPtr, util::time::Timestamp}; +use crate::{ + net::{ + connector::Connector, + session::{Session, SESSION_REFINE, SESSION_SEED}, + ChannelPtr, Message, P2pPtr, + }, + system::{timeout::timeout, ExecutorPtr, PublisherPtr}, + Error, Result, +}; pub mod settings; pub use settings::{DhtSettings, DhtSettingsOpt}; @@ -66,36 +77,23 @@ macro_rules! impl_dht_node_defaults { } pub use impl_dht_node_defaults; +enum DhtLookupType { + Nodes(blake3::Hash), + Value(blake3::Hash, PublisherPtr>), +} + +pub enum DhtLookupReply { + Nodes(Vec), + Value(V), + NodesAndValue(Vec, V), +} + pub struct DhtBucket { pub nodes: Vec, } -/// "Router" means: Key -> Set of nodes (+ additional data for each node) -pub type DhtRouterPtr = Arc>>>>; - -#[derive(Debug, Clone, SerialEncodable, SerialDecodable, Eq)] -pub struct DhtRouterItem { - pub node: N, - pub timestamp: u64, -} - -impl Hash for DhtRouterItem { - fn hash(&self, state: &mut H) { - self.node.id().hash(state); - } -} - -impl PartialEq for DhtRouterItem { - fn eq(&self, other: &Self) -> bool { - self.node.id() == other.node.id() - } -} - -impl From for DhtRouterItem { - fn from(node: N) -> Self { - DhtRouterItem { node, timestamp: Timestamp::current_time().inner() } - } -} +/// Our local hash table, storing DHT keys and values +pub type DhtHashTable = Arc>>; #[derive(Clone)] pub struct ChannelCacheItem { @@ -104,7 +102,7 @@ pub struct ChannelCacheItem { /// Topic is a hash that you set to remember what the channel is about, /// it's not shared with the peer. If you ask for a channel (with - /// `handler.get_channel()`) for a specific topic, it will give you a + /// `dht.get_channel()`) for a specific topic, it will give you a /// channel that has no topic, has the same topic, or a new /// channel. topic: Option, @@ -115,25 +113,28 @@ pub struct ChannelCacheItem { usage_count: u32, } -pub struct Dht { +pub struct Dht { + /// [`DhtHandler`] that implements application-specific behaviors over a [`Dht`] + pub handler: RwLock>, /// Are we bootstrapped? pub bootstrapped: Arc>, /// Vec of buckets - pub buckets: Arc>>>, + pub buckets: Arc>>>, + /// Our local hash table, storing a part of the full DHT keys/values + pub hash_table: DhtHashTable, /// Number of buckets pub n_buckets: usize, /// Channel ID -> ChannelCacheItem - pub channel_cache: Arc>>>, - /// Node ID -> Set of keys - pub router_cache: Arc>>>, - + pub channel_cache: Arc>>>, + /// DHT settings pub settings: DhtSettings, - + /// P2P network pointer pub p2p: P2pPtr, + /// Global multithreaded executor reference pub executor: ExecutorPtr, } -impl Dht { +impl Dht { pub async fn new(settings: &DhtSettings, p2p: P2pPtr, ex: ExecutorPtr) -> Self { // Create empty buckets let mut buckets = vec![]; @@ -142,11 +143,12 @@ impl Dht { } Self { + handler: RwLock::new(Weak::new()), buckets: Arc::new(RwLock::new(buckets)), + hash_table: Arc::new(RwLock::new(HashMap::new())), n_buckets: 256, bootstrapped: Arc::new(RwLock::new(false)), channel_cache: Arc::new(RwLock::new(HashMap::new())), - router_cache: Arc::new(RwLock::new(HashMap::new())), settings: settings.clone(), @@ -155,6 +157,10 @@ impl Dht { } } + pub async fn handler(&self) -> Arc { + self.handler.read().await.upgrade().unwrap() + } + pub async fn is_bootstrapped(&self) -> bool { let bootstrapped = self.bootstrapped.read().await; *bootstrapped @@ -180,7 +186,7 @@ impl Dht { } /// Sort `nodes` by distance from `key` - pub fn sort_by_distance(&self, nodes: &mut [N], key: &blake3::Hash) { + pub fn sort_by_distance(&self, nodes: &mut [H::Node], key: &blake3::Hash) { nodes.sort_by(|a, b| { let distance_a = BigUint::from_bytes_be(&self.distance(key, &a.id())); let distance_b = BigUint::from_bytes_be(&self.distance(key, &b.id())); @@ -211,7 +217,7 @@ impl Dht { /// Get `n` closest known nodes to a key /// TODO: Can be optimized - pub async fn find_neighbors(&self, key: &blake3::Hash, n: usize) -> Vec { + pub async fn find_neighbors(&self, key: &blake3::Hash, n: usize) -> Vec { let buckets_lock = self.buckets.clone(); let buckets = buckets_lock.read().await; @@ -230,34 +236,18 @@ impl Dht { neighbors } - /// Channel ID -> DhtNode - pub async fn get_node_from_channel(&self, channel_id: u32) -> Option { + /// Channel ID -> [`DhtNode`] + pub async fn get_node_from_channel(&self, channel_id: u32) -> Option { let channel_cache_lock = self.channel_cache.clone(); let channel_cache = channel_cache_lock.read().await; - if let Some(cached) = channel_cache.get(&channel_id).cloned() { - return Some(cached.node) + if let Some(cached) = channel_cache.get(&channel_id) { + return Some(cached.node.clone()) } None } - /// Remove nodes in router that are older than expiry_secs - pub async fn prune_router(&self, router: DhtRouterPtr, expiry_secs: u32) { - let expiry_timestamp = Timestamp::current_time().inner() - (expiry_secs as u64); - let mut router_write = router.write().await; - - let keys: Vec<_> = router_write.keys().cloned().collect(); - - for key in keys { - let items = router_write.get_mut(&key).unwrap(); - items.retain(|item| item.timestamp > expiry_timestamp); - if items.is_empty() { - router_write.remove(&key); - } - } - } - - /// Reset the DHT state + /// Reset the DHT state (nodes and hash table) pub async fn reset(&self) { let mut bootstrapped = self.bootstrapped.write().await; *bootstrapped = false; @@ -268,5 +258,361 @@ impl Dht { } *self.buckets.write().await = buckets; + *self.hash_table.write().await = HashMap::new(); + } + + /// Add `value` to our hash table and send `message` for a `key` to the closest nodes found + pub async fn announce( + &self, + key: &blake3::Hash, + value: &H::Value, + message: &M, + ) -> Result<()> { + let self_node = self.handler().await.node().await; + if self_node.addresses().is_empty() { + return Err(().into()); // TODO + } + + self.handler().await.add_value(key, value).await; + let nodes = self.lookup_nodes(key).await?; + info!(target: "dht::announce()", "Announcing {} to {} nodes", H::key_to_string(key), nodes.len()); + + for node in nodes { + let channel_res = self.get_channel(&node, None).await; + if let Ok(channel) = channel_res { + let _ = channel.send(message).await; + self.cleanup_channel(channel).await; + } + } + + Ok(()) + } + + /// Lookup our own node id to bootstrap our DHT + pub async fn bootstrap(&self) { + self.set_bootstrapped(true).await; + + let self_node_id = self.handler().await.node().await.id(); + debug!(target: "dht::bootstrap()", "DHT bootstrapping {}", H::key_to_string(&self_node_id)); + let nodes = self.lookup_nodes(&self_node_id).await; + + if nodes.is_err() || nodes.map_or(true, |v| v.is_empty()) { + self.set_bootstrapped(false).await; + } + } + + /// Add a node in the correct bucket + pub async fn add_node(&self, node: H::Node) { + let self_node = self.handler().await.node().await; + + // Do not add ourselves to the buckets + if node.id() == self_node.id() { + return; + } + + // Don't add this node if it has any external address that is the same as one of ours + let node_addresses = node.addresses(); + if self_node.addresses().iter().any(|addr| node_addresses.contains(addr)) { + return; + } + + // Do not add a node to the buckets if it does not have an address + if node.addresses().is_empty() { + return; + } + + let bucket_index = + self.get_bucket_index(&self.handler().await.node().await.id(), &node.id()).await; + let buckets_lock = self.buckets.clone(); + let mut buckets = buckets_lock.write().await; + let bucket = &mut buckets[bucket_index]; + + // Node is already in the bucket + if bucket.nodes.iter().any(|n| n.id() == node.id()) { + return; + } + + // Bucket is full + if bucket.nodes.len() >= self.settings.k { + // Ping the least recently seen node + if let Ok(channel) = self.get_channel(&bucket.nodes[0], None).await { + let ping_res = self.handler().await.ping(channel.clone()).await; + self.cleanup_channel(channel).await; + if ping_res.is_ok() { + // Ping was successful, move the least recently seen node to the tail + let n = bucket.nodes.remove(0); + bucket.nodes.push(n); + return; + } + } + + // Ping was not successful, remove the least recently seen node and add the new node + bucket.nodes.remove(0); + bucket.nodes.push(node); + return; + } + + // Bucket is not full + bucket.nodes.push(node); + } + + /// Move a node to the tail in its bucket, + /// to show that it is the most recently seen in the bucket. + /// If the node is not in a bucket it will be added using `add_node` + pub async fn update_node(&self, node: &H::Node) { + let bucket_index = + self.get_bucket_index(&self.handler().await.node().await.id(), &node.id()).await; + let buckets_lock = self.buckets.clone(); + let mut buckets = buckets_lock.write().await; + let bucket = &mut buckets[bucket_index]; + + let node_index = bucket.nodes.iter().position(|n| n.id() == node.id()); + if node_index.is_none() { + drop(buckets); + self.add_node(node.clone()).await; + return; + } + + let n = bucket.nodes.remove(node_index.unwrap()); + bucket.nodes.push(n); + } + + /// Lookup algorithm for both nodes lookup and value lookup + async fn lookup(&self, lookup_type: DhtLookupType) -> Result> { + let (key, value_pub) = match lookup_type { + DhtLookupType::Nodes(key) => (key, None), + DhtLookupType::Value(key, ref pub_ptr) => (key, Some(pub_ptr)), + }; + + let (k, a) = (self.settings.k, self.settings.alpha); + let semaphore = Arc::new(Semaphore::new(self.settings.concurrency)); + + let mut unique_nodes = HashSet::new(); + let mut nodes_to_visit = self.find_neighbors(&key, k).await; + let mut result = Vec::new(); + let mut futures = FuturesUnordered::new(); + + let distance_check = |(furthest, next): (&H::Node, &H::Node)| { + BigUint::from_bytes_be(&self.distance(&key, &furthest.id())) < + BigUint::from_bytes_be(&self.distance(&key, &next.id())) + }; + + let lookup = async |node: H::Node, key| { + let _permit = semaphore.acquire().await; + let n = node.clone(); + let handler = self.handler().await; + match &lookup_type { + DhtLookupType::Nodes(_) => { + (n, handler.find_nodes(&node, key).await.map(DhtLookupReply::Nodes)) + } + DhtLookupType::Value(_, _) => (n, handler.find_value(&node, key).await), + } + }; + + let spawn_futures = async |nodes_to_visit: &mut Vec, + unique_nodes: &mut HashSet<_>, + futures: &mut FuturesUnordered<_>| { + for _ in 0..a { + if let Some(node) = nodes_to_visit.pop() { + unique_nodes.insert(node.id()); + futures.push(Box::pin(lookup(node, &key))); + } + } + }; + + spawn_futures(&mut nodes_to_visit, &mut unique_nodes, &mut futures).await; // Initial alpha tasks + + while let Some((queried_node, res)) = futures.next().await { + if let Err(e) = res { + warn!(target: "dht::lookup()", "Error in DHT lookup: {e}"); + + // Spawn next `alpha` futures if there are no more futures but + // we still have nodes to visit + if futures.is_empty() { + spawn_futures(&mut nodes_to_visit, &mut unique_nodes, &mut futures).await; + } + + continue; + } + + let (nodes, value) = match res.unwrap() { + DhtLookupReply::Nodes(nodes) => (Some(nodes), None), + DhtLookupReply::Value(value) => (None, Some(value)), + DhtLookupReply::NodesAndValue(nodes, value) => (Some(nodes), Some(value)), + }; + + if let Some(value) = value { + if let Some(publisher) = value_pub { + publisher.notify(Some(value)).await; + } + } + + if let Some(mut nodes) = nodes { + let self_id = self.handler().await.node().await.id(); + nodes.retain(|node| node.id() != self_id && unique_nodes.insert(node.id())); + + nodes_to_visit.extend(nodes.clone()); + self.sort_by_distance(&mut nodes_to_visit, &key); + } + + result.push(queried_node); + self.sort_by_distance(&mut result, &key); + + // Early termination logic + if result.len() >= k && + result.last().zip(nodes_to_visit.first()).is_some_and(distance_check) + { + break; + } + + // Spawn next `alpha` futures + spawn_futures(&mut nodes_to_visit, &mut unique_nodes, &mut futures).await; + } + + if let Some(publisher) = value_pub { + publisher.notify(None).await; + } + + Ok(result.into_iter().take(k).collect()) + } + + /// Find `k` nodes closest to a key + pub async fn lookup_nodes(&self, key: &blake3::Hash) -> Result> { + info!(target: "dht::lookup_nodes()", "Starting node lookup for key {}", H::key_to_string(key)); + self.lookup(DhtLookupType::Nodes(*key)).await + } + + /// Find value for `key` + pub async fn lookup_value( + &self, + key: &blake3::Hash, + value_pub: PublisherPtr>, + ) -> Result> { + info!(target: "dht::lookup_value()", "Starting value lookup for key {}", H::key_to_string(key)); + self.lookup(DhtLookupType::Value(*key, value_pub)).await + } + + /// Get a channel (existing or create a new one) to `node` about `topic`. + /// Don't forget to call `cleanup_channel()` once you are done with it. + pub async fn get_channel( + &self, + node: &H::Node, + topic: Option, + ) -> Result { + let channel_cache_lock = self.channel_cache.clone(); + let mut channel_cache = channel_cache_lock.write().await; + + // Get existing channels for this node, regardless of topic + let channels: HashMap> = channel_cache + .iter() + .filter(|&(_, item)| item.node == *node) + .map(|(&key, item)| (key, item.clone())) + .collect(); + + let (channel_id, topic, usage_count) = + // If we already have a channel for this node and topic, use it + if let Some((cid, cached)) = channels.iter().find(|&(_, c)| c.topic == topic) { + (Some(*cid), cached.topic, cached.usage_count) + } + // If we have a topicless channel for this node, use it + else if let Some((cid, cached)) = channels.iter().find(|&(_, c)| c.topic.is_none()) { + (Some(*cid), topic, cached.usage_count) + } + // If we don't need any specific topic, use the first channel we have + else if topic.is_none() { + match channels.iter().next() { + Some((cid, cached)) => (Some(*cid), cached.topic, cached.usage_count), + _ => (None, topic, 0), + } + } + // There is no existing channel we can use, we will create one + else { + (None, topic, 0) + }; + + // If we found an existing channel we can use, try to use it + if let Some(channel_id) = channel_id { + if let Some(channel) = self.p2p.get_channel(channel_id) { + if channel.session_type_id() & (SESSION_SEED | SESSION_REFINE) != 0 { + return Err(Error::Custom( + "Could not get a channel (for DHT) as this is a seed or refine session" + .to_string(), + )); + } + + if channel.is_stopped() { + channel.clone().start(self.executor.clone()); + } + + channel_cache.insert( + channel_id, + ChannelCacheItem { node: node.clone(), topic, usage_count: usage_count + 1 }, + ); + return Ok(channel); + } + } + + drop(channel_cache); + + // Create a channel + for addr in node.addresses().clone() { + let session_out = self.p2p.session_outbound(); + let session_weak = Arc::downgrade(&self.p2p.session_outbound()); + + let connector = Connector::new(self.p2p.settings(), session_weak); + let dur = Duration::from_secs(self.settings.timeout); + let Ok(connect_res) = timeout(dur, connector.connect(&addr)).await else { + warn!(target: "dht::get_channel()", "Timeout trying to connect to {addr}"); + return Err(Error::ConnectTimeout); + }; + if connect_res.is_err() { + warn!(target: "dht::get_channel()", "Error while connecting to {addr}: {}", connect_res.unwrap_err()); + continue; + } + let (_, channel) = connect_res.unwrap(); + + if channel.session_type_id() & (SESSION_SEED | SESSION_REFINE) != 0 { + return Err(Error::Custom( + "Could not create a channel (for DHT) as this is a seed or refine session" + .to_string(), + )); + } + + let register_res = + session_out.register_channel(channel.clone(), self.executor.clone()).await; + if register_res.is_err() { + channel.clone().stop().await; + warn!(target: "dht::get_channel()", "Error while registering channel {}: {}", channel.info.id, register_res.unwrap_err()); + continue; + } + + let mut channel_cache = channel_cache_lock.write().await; + channel_cache.insert( + channel.info.id, + ChannelCacheItem { node: node.clone(), topic, usage_count: 1 }, + ); + + return Ok(channel) + } + + Err(Error::Custom("Could not create channel".to_string())) + } + + /// Decrement the channel usage count, if it becomes 0 then set the topic + /// to None, so that this channel is available for another task + pub async fn cleanup_channel(&self, channel: ChannelPtr) { + let channel_cache_lock = self.channel_cache.clone(); + let mut channel_cache = channel_cache_lock.write().await; + + if let Some(cached) = channel_cache.get_mut(&channel.info.id) { + if cached.usage_count > 0 { + cached.usage_count -= 1; + } + + // If the channel is not used by anything, remove the topic + if cached.usage_count == 0 { + cached.topic = None; + } + } } } diff --git a/src/dht/tasks.rs b/src/dht/tasks.rs index 88c0994e4..53a8ad166 100644 --- a/src/dht/tasks.rs +++ b/src/dht/tasks.rs @@ -27,7 +27,7 @@ use crate::{ /// Send a DHT ping request when there is a new channel, to know the node id of the new peer, /// Then fill the channel cache and the buckets -pub async fn channel_task, N: DhtNode>(handler: Arc) -> Result<()> { +pub async fn channel_task(handler: Arc) -> Result<()> { loop { let channel_sub = handler.dht().p2p.hosts().subscribe_channel().await; let res = channel_sub.receive().await; @@ -51,7 +51,7 @@ pub async fn channel_task, N: DhtNode>(handler: Arc) -> Resu let ping_res = handler.ping(channel.clone()).await; if let Err(e) = ping_res { - warn!(target: "dht::DhtHandler::channel_task()", "Error while pinging (requesting node id) {}: {e}", channel.address()); + warn!(target: "dht::channel_task()", "Error while pinging (requesting node id) {}: {e}", channel.address()); // channel.stop().await; continue; } @@ -66,7 +66,7 @@ pub async fn channel_task, N: DhtNode>(handler: Arc) -> Resu drop(channel_cache); if !node.addresses().is_empty() { - handler.add_node(node.clone()).await; + handler.dht().add_node(node.clone()).await; let _ = handler.on_new_node(&node.clone()).await; } }