From 4d2f042f35bdd26deab19b3e8a4c79994fd55761 Mon Sep 17 00:00:00 2001 From: epiphany Date: Thu, 26 Jun 2025 17:24:42 +0200 Subject: [PATCH] dht, fud: handle multiple concurrent downloads --- bin/fud/fud/src/lib.rs | 235 ++++++++++++++++++++++++++++++--------- bin/fud/fud/src/main.rs | 22 +--- bin/fud/fud/src/rpc.rs | 9 +- bin/fud/fud/src/tasks.rs | 104 ++++++----------- src/dht/handler.rs | 127 ++++++++++++++++----- src/dht/mod.rs | 35 ++++-- 6 files changed, 351 insertions(+), 181 deletions(-) diff --git a/bin/fud/fud/src/lib.rs b/bin/fud/fud/src/lib.rs index ada3abe0a..4c54c8244 100644 --- a/bin/fud/fud/src/lib.rs +++ b/bin/fud/fud/src/lib.rs @@ -39,7 +39,7 @@ use darkfi::{ dht::{Dht, DhtHandler, DhtNode, DhtRouterItem, DhtRouterPtr}, geode::{hash_to_string, ChunkedStorage, FileSequence, Geode, MAX_CHUNK_SIZE}, net::{ChannelPtr, P2pPtr}, - system::PublisherPtr, + system::{PublisherPtr, StoppableTask}, util::path::expand_path, Error, Result, }; @@ -54,7 +54,9 @@ use proto::{ /// FudEvent pub mod event; -use event::{ChunkDownloadCompleted, ChunkNotFound, FudEvent, ResourceUpdated}; +use event::{ + ChunkDownloadCompleted, ChunkNotFound, FudEvent, MetadataDownloadCompleted, ResourceUpdated, +}; /// Resource definition pub mod resource; @@ -130,11 +132,11 @@ pub struct Fud { get_tx: channel::Sender<(blake3::Hash, PathBuf)>, get_rx: channel::Receiver<(blake3::Hash, PathBuf)>, - metadata_fetch_tx: channel::Sender<(Vec, blake3::Hash, PathBuf)>, - metadata_fetch_rx: channel::Receiver<(Vec, blake3::Hash, PathBuf)>, - metadata_fetch_end_tx: channel::Sender>, - metadata_fetch_end_rx: channel::Receiver>, + /// Currently active downloading tasks (running the `fud.fetch_resource()` method) + fetch_tasks: Arc>>>, + + /// Used to send events to fud clients event_publisher: PublisherPtr, } @@ -175,7 +177,7 @@ impl DhtHandler for Fud { // Send keys that are closer to this node than we are let self_id = self.dht().node_id; - let channel = self.get_channel(node).await?; + let channel = self.get_channel(node, None).await?; for (key, seeders) in self.seeders_router.read().await.iter() { let node_distance = BigUint::from_bytes_be(&self.dht().distance(key, &node.id)); let self_distance = BigUint::from_bytes_be(&self.dht().distance(key, &self_id)); @@ -188,14 +190,15 @@ impl DhtHandler for Fud { .await; } } + self.cleanup_channel(channel).await; Ok(()) } async fn fetch_nodes(&self, node: &DhtNode, key: &blake3::Hash) -> Result> { - debug!(target: "fud::DhtHandler::fetch_value()", "Fetching nodes close to {} from node {}", hash_to_string(key), hash_to_string(&node.id)); + debug!(target: "fud::DhtHandler::fetch_nodes()", "Fetching nodes close to {} from node {}", hash_to_string(key), hash_to_string(&node.id)); - let channel = self.get_channel(node).await?; + let channel = self.get_channel(node, None).await?; let msg_subsystem = channel.message_subsystem(); msg_subsystem.add_dispatch::().await; let msg_subscriber_nodes = channel.subscribe_msg::().await.unwrap(); @@ -206,6 +209,7 @@ impl DhtHandler for Fud { let reply = msg_subscriber_nodes.receive_with_timeout(self.dht().settings.timeout).await?; msg_subscriber_nodes.unsubscribe().await; + self.cleanup_channel(channel).await; Ok(reply.nodes.clone()) } @@ -222,8 +226,6 @@ impl Fud { event_publisher: PublisherPtr, ) -> Result { let (get_tx, get_rx) = smol::channel::unbounded(); - let (metadata_fetch_tx, metadata_fetch_rx) = smol::channel::unbounded(); - let (metadata_fetch_end_tx, metadata_fetch_end_rx) = smol::channel::unbounded(); // Hashmap used for routing let seeders_router = Arc::new(RwLock::new(HashMap::new())); @@ -244,10 +246,7 @@ impl Fud { resources: Arc::new(RwLock::new(HashMap::new())), get_tx, get_rx, - metadata_fetch_tx, - metadata_fetch_rx, - metadata_fetch_end_tx, - metadata_fetch_end_rx, + fetch_tasks: Arc::new(RwLock::new(HashMap::new())), event_publisher, }; @@ -430,7 +429,7 @@ impl Fud { let mut seeders: HashSet = HashSet::new(); for node in nodes { - let channel = match self.get_channel(node).await { + let channel = match self.get_channel(node, None).await { Ok(channel) => channel, Err(e) => { warn!(target: "fud::fetch_seeders()", "Could not get a channel for node {}: {}", hash_to_string(&node.id), e); @@ -444,6 +443,7 @@ impl Fud { Ok(msg_subscriber) => msg_subscriber, Err(e) => { warn!(target: "fud::fetch_seeders()", "Error subscribing to msg: {}", e); + self.cleanup_channel(channel).await; continue; } }; @@ -452,6 +452,7 @@ impl Fud { if let Err(e) = send_res { warn!(target: "fud::fetch_seeders()", "Error while sending FudFindSeedersRequest: {}", e); msg_subscriber.unsubscribe().await; + self.cleanup_channel(channel).await; continue; } @@ -461,11 +462,13 @@ impl Fud { Err(e) => { warn!(target: "fud::fetch_seeders()", "Error waiting for reply: {}", e); msg_subscriber.unsubscribe().await; + self.cleanup_channel(channel).await; continue; } }; msg_subscriber.unsubscribe().await; + self.cleanup_channel(channel).await; seeders.extend(reply.seeders.clone()); } @@ -499,7 +502,7 @@ impl Fud { }; while let Some(seeder) = shuffled_seeders.pop() { - let channel = match self.get_channel(&seeder.node).await { + let channel = match self.get_channel(&seeder.node, Some(*hash)).await { Ok(channel) => channel, Err(e) => { warn!(target: "fud::fetch_missing_chunks()", "Could not get a channel for node {}: {}", hash_to_string(&seeder.node.id), e); @@ -608,6 +611,8 @@ impl Fud { msg_subscriber_notfound.unsubscribe().await; } + self.cleanup_channel(channel).await; + // Stop when there are no missing chunks if remaining_chunks.is_empty() { break; @@ -618,21 +623,23 @@ impl Fud { } /// Fetch a single resource metadata from `nodes`. - /// If the file is smaller than a single chunk then seeder can send the - /// chunk directly, we will create the file from it on path `path`. + /// If the resource is a file smaller than a single chunk then seeder can send the + /// chunk directly, and we will create the file from it on path `path`. /// 1. Request seeders from those nodes /// 2. Request the metadata from the seeders + /// 3. Insert metadata to geode using the reply pub async fn fetch_metadata( &self, - nodes: &Vec, hash: &blake3::Hash, - ) -> Option { + nodes: &Vec, + path: &Path, + ) -> Result<()> { let mut queried_seeders: HashSet = HashSet::new(); let mut result: Option = None; for node in nodes { // 1. Request list of seeders - let channel = match self.get_channel(node).await { + let channel = match self.get_channel(node, Some(*hash)).await { Ok(channel) => channel, Err(e) => { warn!(target: "fud::fetch_metadata()", "Could not get a channel for node {}: {}", hash_to_string(&node.id), e); @@ -654,6 +661,7 @@ impl Fud { if let Err(e) = send_res { warn!(target: "fud::fetch_metadata()", "Error while sending FudFindSeedersRequest: {}", e); msg_subscriber.unsubscribe().await; + self.cleanup_channel(channel).await; continue; } @@ -663,6 +671,7 @@ impl Fud { Err(e) => { warn!(target: "fud::fetch_metadata()", "Error waiting for reply: {}", e); msg_subscriber.unsubscribe().await; + self.cleanup_channel(channel).await; continue; } }; @@ -671,6 +680,7 @@ impl Fud { info!(target: "fud::fetch_metadata()", "Found {} seeders for {} (from {})", seeders.len(), hash_to_string(hash), hash_to_string(&node.id)); msg_subscriber.unsubscribe().await; + self.cleanup_channel(channel).await; // 2. Request the file/chunk from the seeders while let Some(seeder) = seeders.pop() { @@ -680,7 +690,7 @@ impl Fud { } queried_seeders.insert(seeder.node.id); - if let Ok(channel) = self.get_channel(&seeder.node).await { + if let Ok(channel) = self.get_channel(&seeder.node, Some(*hash)).await { let msg_subsystem = channel.message_subsystem(); msg_subsystem.add_dispatch::().await; msg_subsystem.add_dispatch::().await; @@ -702,6 +712,7 @@ impl Fud { msg_subscriber_file.unsubscribe().await; msg_subscriber_dir.unsubscribe().await; msg_subscriber_notfound.unsubscribe().await; + self.cleanup_channel(channel).await; continue; } @@ -721,6 +732,7 @@ impl Fud { msg_subscriber_file.unsubscribe().await; msg_subscriber_dir.unsubscribe().await; msg_subscriber_notfound.unsubscribe().await; + self.cleanup_channel(channel).await; }; // Wait for a FudChunkReply, FudFileReply, FudDirectoryReply, or FudNotFound @@ -797,12 +809,73 @@ impl Fud { } } - result + // We did not find the resource + if result.is_none() { + return Err(Error::GeodeFileRouteNotFound) + } + + // 3. Insert metadata to geode using the reply + // At this point the reply content is already verified + match result.unwrap() { + FetchReply::Directory(FudDirectoryReply { files, chunk_hashes }) => { + // Convert all file paths from String to PathBuf + let mut files: Vec<_> = files + .into_iter() + .map(|(path_str, size)| (PathBuf::from(path_str), size)) + .collect(); + + self.geode.sort_files(&mut files); + if let Err(e) = self.geode.insert_metadata(hash, &chunk_hashes, &files).await { + error!(target: "fud::fetch_metadata()", "Failed inserting directory {} to Geode: {}", hash_to_string(hash), e); + return Err(e) + } + } + FetchReply::File(FudFileReply { chunk_hashes }) => { + if let Err(e) = self.geode.insert_metadata(hash, &chunk_hashes, &[]).await { + error!(target: "fud::fetch_metadata()", "Failed inserting file {} to Geode: {}", hash_to_string(hash), e); + return Err(e) + } + } + // Looked for a file but got a chunk: the entire file fits in a single chunk + FetchReply::Chunk(FudChunkReply { chunk }) => { + info!(target: "fud::fetch_metadata()", "File fits in a single chunk"); + let chunk_hash = blake3::hash(&chunk); + let _ = self.geode.insert_metadata(hash, &[chunk_hash], &[]).await; + let mut chunked_file = ChunkedStorage::new( + &[chunk_hash], + &[(path.to_path_buf(), chunk.len() as u64)], + false, + ); + if let Err(e) = self.geode.write_chunk(&mut chunked_file, &chunk).await { + error!(target: "fud::fetch_metadata()", "Failed inserting chunk {} to Geode: {}", hash_to_string(&chunk_hash), e); + return Err(e) + }; + } + }; + + Ok(()) + } + + /// Start downloading a file or directory from the network to `path`. + /// This creates a new task in `fetch_tasks` calling `fetch_resource()`. + pub async fn get(&self, hash: &blake3::Hash, path: &Path) -> Result<()> { + let fetch_tasks = self.fetch_tasks.read().await; + if fetch_tasks.contains_key(hash) { + return Err(Error::Custom(format!( + "Resource {} is already being downloaded", + hash_to_string(hash) + ))) + } + drop(fetch_tasks); + + self.get_tx.send((*hash, path.to_path_buf())).await?; + + Ok(()) } /// Download a file or directory from the network to `path`. - /// This creates a new task in `fetch_tasks` calling `fetch_resource()`. - pub async fn get(&self, hash: &blake3::Hash, path: &Path) -> Result<()> { + /// Called when `get()` creates a new fetch task. + pub async fn fetch_resource(&self, hash: &blake3::Hash, path: &Path) -> Result<()> { let self_node = self.dht().node().await; let mut closest_nodes = vec![]; @@ -859,12 +932,7 @@ impl Fud { closest_nodes = self.lookup_nodes(hash).await.unwrap_or_default(); // Fetch file or directory metadata - self.metadata_fetch_tx - .send((closest_nodes.clone(), *hash, path.to_path_buf())) - .await - .unwrap(); - info!(target: "self::get()", "Waiting for background file fetch task..."); - match self.metadata_fetch_end_rx.recv().await.unwrap() { + match self.fetch_metadata(hash, &closest_nodes, path).await { // The file metadata was found and inserted into geode Ok(()) => self.geode.get(hash, path).await.unwrap(), // We could not find the metadata, or any other error occured @@ -903,36 +971,44 @@ impl Fud { } } + // Set resource status to `Verifying` and send FudEvent::MetadataDownloadCompleted + let mut resources_write = self.resources.write().await; + if let Some(resource) = resources_write.get_mut(hash) { + resource.status = ResourceStatus::Verifying; + resource.chunks_total = chunked.len() as u64; + resource.rtype = match chunked.is_dir() { + false => ResourceType::File, + true => ResourceType::Directory, + }; + + self.event_publisher + .notify(FudEvent::MetadataDownloadCompleted(MetadataDownloadCompleted { + hash: *hash, + resource: resource.clone(), + })) + .await; + } + drop(resources_write); + // Mark locally available chunks as such if let Err(e) = self.geode.verify_chunks(&mut chunked).await { error!(target: "self::get()", "Error while verifying chunks: {}", e); return Err(e); } - // Set resource status to `Downloading` + // Set resource.chunks_downloaded and send FudEvent::ResourceUpdated let mut resources_write = self.resources.write().await; - let resource = match resources_write.get_mut(hash) { - Some(resource) => { - resource.status = ResourceStatus::Downloading; - resource.chunks_downloaded = chunked.local_chunks() as u64; - resource.chunks_total = chunked.len() as u64; - resource.rtype = match chunked.is_dir() { - false => ResourceType::File, - true => ResourceType::Directory, - }; - resource.clone() - } - None => return Ok(()), // Resource was removed, abort - }; - drop(resources_write); + if let Some(resource) = resources_write.get_mut(hash) { + resource.chunks_downloaded = chunked.local_chunks() as u64; - // Send a MetadataDownloadCompleted event - self.event_publisher - .notify(FudEvent::MetadataDownloadCompleted(event::MetadataDownloadCompleted { - hash: *hash, - resource: resource.clone(), - })) - .await; + self.event_publisher + .notify(FudEvent::ResourceUpdated(ResourceUpdated { + hash: *hash, + resource: resource.clone(), + })) + .await; + } + drop(resources_write); // If `chunked` is a file that is bigger than the all its chunks, // truncate the file to the chunks. @@ -947,7 +1023,7 @@ impl Fud { } } - // If the file is already complete, we don't need to download any chunk + // If the resource is already complete, we don't need to download any chunk if chunked.is_complete() { // Announce the file let self_announce = FudAnnounce { key: *hash, seeders: vec![self_node.clone().into()] }; @@ -976,6 +1052,25 @@ impl Fud { return Ok(()); } + // Set resource status to `Downloading` + let mut resources_write = self.resources.write().await; + let resource = match resources_write.get_mut(hash) { + Some(resource) => { + resource.status = ResourceStatus::Downloading; + resource.clone() + } + None => return Ok(()), // Resource was removed, abort + }; + drop(resources_write); + + // Send a MetadataDownloadCompleted event + self.event_publisher + .notify(FudEvent::MetadataDownloadCompleted(event::MetadataDownloadCompleted { + hash: *hash, + resource: resource.clone(), + })) + .await; + // Find nodes close to the file hash if we didn't previously fetched them if closest_nodes.is_empty() { closest_nodes = self.lookup_nodes(hash).await.unwrap_or_default(); @@ -1001,6 +1096,22 @@ impl Fud { return Err(e); } }; + + // Set resource status to `Verifying` and send FudEvent::ResourceUpdated + let mut resources_write = self.resources.write().await; + if let Some(resource) = resources_write.get_mut(hash) { + resource.status = ResourceStatus::Verifying; + + self.event_publisher + .notify(FudEvent::ResourceUpdated(ResourceUpdated { + hash: *hash, + resource: resource.clone(), + })) + .await; + } + drop(resources_write); + + // Verify all chunks self.geode.verify_chunks(&mut chunked).await?; // We fetched all chunks, but the file is not complete @@ -1149,4 +1260,18 @@ impl Fud { .notify(FudEvent::ResourceRemoved(event::ResourceRemoved { hash: *hash })) .await; } + + /// Stop all tasks in `fetch_tasks`. + pub async fn stop(&self) { + // Create a clone of fetch_tasks because `task.stop()` needs a write lock + let fetch_tasks = self.fetch_tasks.read().await; + let cloned_fetch_tasks: HashMap> = + fetch_tasks.iter().map(|(key, value)| (*key, value.clone())).collect(); + drop(fetch_tasks); + + // Stop all tasks + for task in cloned_fetch_tasks.values() { + task.stop().await; + } + } } diff --git a/bin/fud/fud/src/main.rs b/bin/fud/fud/src/main.rs index 7be39e852..0fa3bbc76 100644 --- a/bin/fud/fud/src/main.rs +++ b/bin/fud/fud/src/main.rs @@ -41,7 +41,7 @@ use fud::{ get_node_id, proto::{FudFindNodesReply, ProtocolFud}, rpc::JsonRpcInterface, - tasks::{announce_seed_task, fetch_metadata_task, get_task}, + tasks::{announce_seed_task, get_task}, Fud, }; @@ -179,24 +179,10 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> { ex.clone(), ); - info!(target: "fud", "Starting fetch metadata task"); - let file_task = StoppableTask::new(); - file_task.clone().start( - fetch_metadata_task(fud.clone()), - |res| async { - match res { - Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ } - Err(e) => error!(target: "fud", "Failed starting fetch metadata task: {}", e), - } - }, - Error::DetachedTaskStopped, - ex.clone(), - ); - info!(target: "fud", "Starting get task"); let get_task_ = StoppableTask::new(); get_task_.clone().start( - get_task(fud.clone()), + get_task(fud.clone(), ex.clone()), |res| async { match res { Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ } @@ -288,8 +274,8 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> { signals_handler.wait_termination(signals_task).await?; info!(target: "fud", "Caught termination signal, cleaning up and exiting..."); - info!(target: "fud", "Stopping fetch file task..."); - file_task.stop().await; + info!(target: "fud", "Stopping fetch tasks..."); + fud.stop().await; info!(target: "fud", "Stopping get task..."); get_task_.stop().await; diff --git a/bin/fud/fud/src/rpc.rs b/bin/fud/fud/src/rpc.rs index 435b13df8..ad0d5d14b 100644 --- a/bin/fud/fud/src/rpc.rs +++ b/bin/fud/fud/src/rpc.rs @@ -116,8 +116,8 @@ impl JsonRpcInterface { } // RPCAPI: - // Fetch a file from the network. Takes a file hash and file path (absolute or relative) as parameters. - // Returns the path where the file will be located once downloaded. + // Fetch a resource from the network. Takes a hash and path (absolute or relative) as parameters. + // Returns the path where the resource will be located once downloaded. // // --> {"jsonrpc": "2.0", "method": "get", "params": ["1211...abfd", "~/myfile.jpg"], "id": 42} // <-- {"jsonrpc": "2.0", "result": "/home/user/myfile.jpg", "id": 42} @@ -157,7 +157,10 @@ impl JsonRpcInterface { None => self.fud.downloads_path.join(&hash_str), }; - let _ = self.fud.get_tx.send((hash, path.clone())).await; + // Start downloading the resource + if let Err(e) = self.fud.get(&hash, &path).await { + return JsonError::new(ErrorCode::InternalError, Some(e.to_string()), id).into() + } JsonResponse::new(JsonValue::String(path.to_string_lossy().to_string()), id).into() } diff --git a/bin/fud/fud/src/tasks.rs b/bin/fud/fud/src/tasks.rs index e1b677a4d..82c58382c 100644 --- a/bin/fud/fud/src/tasks.rs +++ b/bin/fud/fud/src/tasks.rs @@ -17,12 +17,11 @@ */ use log::{error, info}; -use std::{path::PathBuf, sync::Arc}; +use std::sync::Arc; use darkfi::{ dht::DhtHandler, - geode::{hash_to_string, ChunkedStorage}, - system::sleep, + system::{sleep, ExecutorPtr, StoppableTask}, Error, Result, }; @@ -37,71 +36,40 @@ pub enum FetchReply { Chunk(FudChunkReply), } -/// Triggered when calling the `get` RPC method -pub async fn get_task(fud: Arc) -> Result<()> { +/// Triggered when calling the `fud.get()` method. +/// It creates a new StoppableTask (running `fud.fetch_resource()`) and inserts +/// it into the `fud.fetch_tasks` hashmap. When the task is stopped it's +/// removed from the hashmap. +pub async fn get_task(fud: Arc, executor: ExecutorPtr) -> Result<()> { loop { - let (file_hash, file_path) = fud.get_rx.recv().await.unwrap(); + let (hash, path) = fud.get_rx.recv().await.unwrap(); - let _ = fud.get(&file_hash, &file_path).await; - } -} + // Create the new task + let mut fetch_tasks = fud.fetch_tasks.write().await; + let task = StoppableTask::new(); + fetch_tasks.insert(hash, task.clone()); + drop(fetch_tasks); -/// Background task that receives file fetch requests and tries to -/// fetch objects from the network using the routing table. -/// TODO: This can be optimised a lot for connection reuse, etc. -pub async fn fetch_metadata_task(fud: Arc) -> Result<()> { - info!(target: "fud::fetch_metadata_task()", "Started background metadata fetch task"); - loop { - let (nodes, hash, path) = fud.metadata_fetch_rx.recv().await.unwrap(); - info!(target: "fud::fetch_metadata_task()", "Fetching metadata for {}", hash_to_string(&hash)); - - let reply = fud.fetch_metadata(&nodes, &hash).await; - if reply.is_none() { - fud.metadata_fetch_end_tx.send(Err(Error::GeodeFileRouteNotFound)).await.unwrap(); - continue - } - let reply = reply.unwrap(); - - // At this point the reply content was already verified in `fud.fetch_metadata` - match reply { - FetchReply::Directory(FudDirectoryReply { files, chunk_hashes }) => { - // Convert all file paths from String to PathBuf - let mut files: Vec<_> = files - .into_iter() - .map(|(path_str, size)| (PathBuf::from(path_str), size)) - .collect(); - - fud.geode.sort_files(&mut files); - if let Err(e) = fud.geode.insert_metadata(&hash, &chunk_hashes, &files).await { - error!(target: "fud::fetch_metadata_task()", "Failed inserting directory {} to Geode: {}", hash_to_string(&hash), e); - fud.metadata_fetch_end_tx.send(Err(e)).await.unwrap(); - continue + // Start the new task + let fud_1 = fud.clone(); + let fud_2 = fud.clone(); + task.start( + async move { fud_1.fetch_resource(&hash, &path).await }, + move |res| async move { + // Remove the task from the `fud.fetch_tasks` hashmap once it is + // stopped (error, manually, or just done). + let mut fetch_tasks = fud_2.fetch_tasks.write().await; + fetch_tasks.remove(&hash); + match res { + Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ } + Err(e) => { + error!(target: "fud::get_task()", "Error while fetching resource: {}", e) + } } - fud.metadata_fetch_end_tx.send(Ok(())).await.unwrap(); - } - FetchReply::File(FudFileReply { chunk_hashes }) => { - if let Err(e) = fud.geode.insert_metadata(&hash, &chunk_hashes, &[]).await { - error!(target: "fud::fetch_metadata_task()", "Failed inserting file {} to Geode: {}", hash_to_string(&hash), e); - fud.metadata_fetch_end_tx.send(Err(e)).await.unwrap(); - continue - } - fud.metadata_fetch_end_tx.send(Ok(())).await.unwrap(); - } - // Looked for a file but got a chunk: the entire file fits in a single chunk - FetchReply::Chunk(FudChunkReply { chunk }) => { - info!(target: "fud::fetch_metadata_task()", "File fits in a single chunk"); - let chunk_hash = blake3::hash(&chunk); - let _ = fud.geode.insert_metadata(&hash, &[chunk_hash], &[]).await; - let mut chunked_file = - ChunkedStorage::new(&[chunk_hash], &[(path, chunk.len() as u64)], false); - if let Err(e) = fud.geode.write_chunk(&mut chunked_file, &chunk).await { - error!(target: "fud::fetch_metadata_task()", "Failed inserting chunk {} to Geode: {}", hash_to_string(&chunk_hash), e); - fud.metadata_fetch_end_tx.send(Err(e)).await.unwrap(); - continue - }; - fud.metadata_fetch_end_tx.send(Ok(())).await.unwrap(); - } - } + }, + Error::DetachedTaskStopped, + executor.clone(), + ); } } @@ -115,16 +83,16 @@ pub async fn announce_seed_task(fud: Arc) -> Result<()> { let seeders = vec![fud.dht().node().await.into()]; - info!(target: "fud::announce_task()", "Verifying seeds..."); + info!(target: "fud::announce_seed_task()", "Verifying seeds..."); let seeding_resources = match fud.verify_resources(None).await { Ok(resources) => resources, Err(e) => { - error!(target: "fud::announce_task()", "Error while verifying seeding resources: {}", e); + error!(target: "fud::announce_seed_task()", "Error while verifying seeding resources: {}", e); continue; } }; - info!(target: "fud::announce_task()", "Announcing files..."); + info!(target: "fud::announce_seed_task()", "Announcing files..."); for resource in seeding_resources { let _ = fud .announce( @@ -135,7 +103,7 @@ pub async fn announce_seed_task(fud: Arc) -> Result<()> { .await; } - info!(target: "fud::announce_task()", "Pruning seeders..."); + info!(target: "fud::announce_seed_task()", "Pruning seeders..."); fud.dht().prune_router(fud.seeders_router.clone(), interval.try_into().unwrap()).await; } } diff --git a/src/dht/handler.rs b/src/dht/handler.rs index 149e96973..f361c3c02 100644 --- a/src/dht/handler.rs +++ b/src/dht/handler.rs @@ -21,9 +21,13 @@ use futures::stream::FuturesUnordered; use log::{debug, info, warn}; use num_bigint::BigUint; use smol::{lock::Semaphore, stream::StreamExt}; -use std::{collections::HashSet, sync::Arc, time::Duration}; +use std::{ + collections::{HashMap, HashSet}, + sync::Arc, + time::Duration, +}; -use super::{Dht, DhtNode, DhtRouterItem, DhtRouterPtr}; +use super::{ChannelCacheItem, Dht, DhtNode, DhtRouterItem, DhtRouterPtr}; use crate::{ net::{ connector::Connector, @@ -63,9 +67,10 @@ pub trait DhtHandler { let nodes = self.lookup_nodes(key).await?; for node in nodes { - let channel = self.get_channel(&node).await; - if let Ok(ch) = channel { - let _ = ch.send(message).await; + let channel_res = self.get_channel(&node, None).await; + if let Ok(channel) = channel_res { + let _ = channel.send(message).await; + self.cleanup_channel(channel).await; } } @@ -87,7 +92,7 @@ pub trait DhtHandler { let mut channel_cache = channel_cache_lock.write().await; // Skip this channel if it's stopped or not new. - if channel.is_stopped() || channel_cache.values().any(|&v| v == channel.info.id) { + if channel.is_stopped() || channel_cache.keys().any(|&k| k == channel.info.id) { continue; } // Skip this channel if it's a seed or refine session. @@ -95,21 +100,27 @@ pub trait DhtHandler { continue; } - let node = self.ping(channel.clone()).await; + sleep(1).await; + let ping_res = self.ping(channel.clone()).await; - if let Ok(n) = node { - channel_cache.insert(n.id, channel.info.id); - drop(channel_cache); + if let Err(e) = ping_res { + warn!(target: "dht::DhtHandler::channel_task()", "Error while pinging (requesting node id) {}: {}", channel.address(), e); + channel.stop().await; + continue; + } - let node_cache_lock = self.dht().node_cache.clone(); - let mut node_cache = node_cache_lock.write().await; - node_cache.insert(channel.info.id, n.clone()); - drop(node_cache); + let node = ping_res.unwrap(); - if !n.addresses.is_empty() { - self.add_node(n.clone()).await; - let _ = self.on_new_node(&n.clone()).await; - } + channel_cache.entry(channel.info.id).or_insert_with(|| ChannelCacheItem { + node: node.clone(), + topic: None, + usage_count: 0, + }); + drop(channel_cache); + + if !node.addresses.is_empty() { + self.add_node(node.clone()).await; + let _ = self.on_new_node(&node.clone()).await; } } } @@ -121,10 +132,10 @@ pub trait DhtHandler { let channel_cache_lock = self.dht().channel_cache.clone(); let mut channel_cache = channel_cache_lock.write().await; - for (node_id, channel_id) in channel_cache.clone() { + for (channel_id, _) in channel_cache.clone() { let channel = self.dht().p2p.get_channel(channel_id); - if channel.is_none() || (channel.is_some() && channel.unwrap().is_stopped()) { - channel_cache.remove(&node_id); + if channel.is_none() { + channel_cache.remove(&channel_id); } } } @@ -155,9 +166,9 @@ pub trait DhtHandler { // Bucket is full if bucket.nodes.len() >= self.dht().settings.k { // Ping the least recently seen node - let channel = self.get_channel(&bucket.nodes[0]).await; - if channel.is_ok() { - let ping_res = self.ping(channel.unwrap()).await; + if let Ok(channel) = self.get_channel(&bucket.nodes[0], None).await { + let ping_res = self.ping(channel.clone()).await; + self.cleanup_channel(channel).await; if ping_res.is_ok() { // Ping was successful, move the least recently seen node to the tail let n = bucket.nodes.remove(0); @@ -303,13 +314,43 @@ pub trait DhtHandler { return Ok(result.to_vec()) } - /// Get an existing channel, or create a new one - async fn get_channel(&self, node: &DhtNode) -> Result { + /// Get a channel (existing or create a new one) to `node` about `topic`. + /// Don't forget to call `cleanup_channel()` once you are done with it. + async fn get_channel(&self, node: &DhtNode, topic: Option) -> Result { let channel_cache_lock = self.dht().channel_cache.clone(); - let channel_cache = channel_cache_lock.read().await; + let mut channel_cache = channel_cache_lock.write().await; - if let Some(channel_id) = channel_cache.get(&node.id) { - if let Some(channel) = self.dht().p2p.get_channel(*channel_id) { + // Get existing channels for this node, regardless of topic + let channels: HashMap = channel_cache + .iter() + .filter(|&(_, item)| item.node == *node) + .map(|(&key, item)| (key, item.clone())) + .collect(); + + let (channel_id, topic, usage_count) = + // If we already have a channel for this node and topic, use it + if let Some((cid, cached)) = channels.iter().find(|&(_, c)| c.topic == topic) { + (Some(*cid), cached.topic, cached.usage_count) + } + // If we have a topicless channel for this node, use it + else if let Some((cid, cached)) = channels.iter().find(|&(_, c)| c.topic.is_none()) { + (Some(*cid), topic, cached.usage_count) + } + // If we don't need any specific topic, use the first channel we have + else if topic.is_none() { + match channels.iter().next() { + Some((cid, cached)) => (Some(*cid), cached.topic, cached.usage_count), + _ => (None, topic, 0), + } + } + // There is no existing channel we can use, we will create one + else { + (None, topic, 0) + }; + + // If we found an existing channel we can use, try to use it + if let Some(channel_id) = channel_id { + if let Some(channel) = self.dht().p2p.get_channel(channel_id) { if channel.session_type_id() & (SESSION_SEED | SESSION_REFINE) != 0 { return Err(Error::Custom( "Could not get a channel (for DHT) as this is a seed or refine session" @@ -320,6 +361,11 @@ pub trait DhtHandler { if channel.is_stopped() { channel.clone().start(self.dht().executor.clone()); } + + channel_cache.insert( + channel_id, + ChannelCacheItem { node: node.clone(), topic, usage_count: usage_count + 1 }, + ); return Ok(channel); } } @@ -356,12 +402,35 @@ pub trait DhtHandler { continue; } + channel_cache.insert( + channel.info.id, + ChannelCacheItem { node: node.clone(), topic, usage_count: 1 }, + ); + return Ok(channel) } Err(Error::Custom("Could not create channel".to_string())) } + /// Decrement the channel usage count, if it becomes 0 then set the topic + /// to None, so that this channel is available for another task + async fn cleanup_channel(&self, channel: ChannelPtr) { + let channel_cache_lock = self.dht().channel_cache.clone(); + let mut channel_cache = channel_cache_lock.write().await; + + if let Some(cached) = channel_cache.get_mut(&channel.info.id) { + if cached.usage_count > 0 { + cached.usage_count -= 1; + } + + // If the channel is not used by anything, remove the topic + if cached.usage_count == 0 { + cached.topic = None; + } + } + } + /// Add nodes as a provider for a key async fn add_to_router( &self, diff --git a/src/dht/mod.rs b/src/dht/mod.rs index 165919049..68a577319 100644 --- a/src/dht/mod.rs +++ b/src/dht/mod.rs @@ -85,6 +85,24 @@ impl From for DhtRouterItem { } } +#[derive(Clone)] +pub struct ChannelCacheItem { + /// The DHT node the channel is connected to. + node: DhtNode, + + /// Topic is a hash that you set to remember what the channel is about, + /// it's not shared with the peer. If you ask for a channel (with + /// `handler.get_channel()`) for a specific topic, it will give you a + /// channel that has no topic, has the same topic, or a new + /// channel. + topic: Option, + + /// Usage count increments when you call `handler.get_channel()` and + /// decrements when you call `handler.cleanup_channel()`. A channel's + /// topic is cleared on cleanup if its usage count is zero. + usage_count: u32, +} + pub struct Dht { /// Our own node id pub node_id: blake3::Hash, @@ -94,10 +112,8 @@ pub struct Dht { pub buckets: Arc>>, /// Number of buckets pub n_buckets: usize, - /// Channel ID -> Node ID - pub node_cache: Arc>>, - /// Node ID -> Channel ID - pub channel_cache: Arc>>, + /// Channel ID -> ChannelCacheItem + pub channel_cache: Arc>>, /// Node ID -> Set of keys pub router_cache: Arc>>>, @@ -125,7 +141,6 @@ impl Dht { buckets: Arc::new(RwLock::new(buckets)), n_buckets: 256, bootstrapped: Arc::new(RwLock::new(false)), - node_cache: Arc::new(RwLock::new(HashMap::new())), channel_cache: Arc::new(RwLock::new(HashMap::new())), router_cache: Arc::new(RwLock::new(HashMap::new())), @@ -230,9 +245,13 @@ impl Dht { /// Channel ID -> DhtNode pub async fn get_node_from_channel(&self, channel_id: u32) -> Option { - let node_cache_lock = self.node_cache.clone(); - let node_cache = node_cache_lock.read().await; - node_cache.get(&channel_id).cloned() + let channel_cache_lock = self.channel_cache.clone(); + let channel_cache = channel_cache_lock.read().await; + if let Some(cached) = channel_cache.get(&channel_id).cloned() { + return Some(cached.node) + } + + None } /// Remove nodes in router that are older than expiry_secs