dht, fud: handle multiple concurrent downloads

This commit is contained in:
epiphany
2025-06-26 17:24:42 +02:00
parent c05a5ce085
commit 4d2f042f35
6 changed files with 351 additions and 181 deletions

View File

@@ -39,7 +39,7 @@ use darkfi::{
dht::{Dht, DhtHandler, DhtNode, DhtRouterItem, DhtRouterPtr},
geode::{hash_to_string, ChunkedStorage, FileSequence, Geode, MAX_CHUNK_SIZE},
net::{ChannelPtr, P2pPtr},
system::PublisherPtr,
system::{PublisherPtr, StoppableTask},
util::path::expand_path,
Error, Result,
};
@@ -54,7 +54,9 @@ use proto::{
/// FudEvent
pub mod event;
use event::{ChunkDownloadCompleted, ChunkNotFound, FudEvent, ResourceUpdated};
use event::{
ChunkDownloadCompleted, ChunkNotFound, FudEvent, MetadataDownloadCompleted, ResourceUpdated,
};
/// Resource definition
pub mod resource;
@@ -130,11 +132,11 @@ pub struct Fud {
get_tx: channel::Sender<(blake3::Hash, PathBuf)>,
get_rx: channel::Receiver<(blake3::Hash, PathBuf)>,
metadata_fetch_tx: channel::Sender<(Vec<DhtNode>, blake3::Hash, PathBuf)>,
metadata_fetch_rx: channel::Receiver<(Vec<DhtNode>, blake3::Hash, PathBuf)>,
metadata_fetch_end_tx: channel::Sender<Result<()>>,
metadata_fetch_end_rx: channel::Receiver<Result<()>>,
/// Currently active downloading tasks (running the `fud.fetch_resource()` method)
fetch_tasks: Arc<RwLock<HashMap<blake3::Hash, Arc<StoppableTask>>>>,
/// Used to send events to fud clients
event_publisher: PublisherPtr<FudEvent>,
}
@@ -175,7 +177,7 @@ impl DhtHandler for Fud {
// Send keys that are closer to this node than we are
let self_id = self.dht().node_id;
let channel = self.get_channel(node).await?;
let channel = self.get_channel(node, None).await?;
for (key, seeders) in self.seeders_router.read().await.iter() {
let node_distance = BigUint::from_bytes_be(&self.dht().distance(key, &node.id));
let self_distance = BigUint::from_bytes_be(&self.dht().distance(key, &self_id));
@@ -188,14 +190,15 @@ impl DhtHandler for Fud {
.await;
}
}
self.cleanup_channel(channel).await;
Ok(())
}
async fn fetch_nodes(&self, node: &DhtNode, key: &blake3::Hash) -> Result<Vec<DhtNode>> {
debug!(target: "fud::DhtHandler::fetch_value()", "Fetching nodes close to {} from node {}", hash_to_string(key), hash_to_string(&node.id));
debug!(target: "fud::DhtHandler::fetch_nodes()", "Fetching nodes close to {} from node {}", hash_to_string(key), hash_to_string(&node.id));
let channel = self.get_channel(node).await?;
let channel = self.get_channel(node, None).await?;
let msg_subsystem = channel.message_subsystem();
msg_subsystem.add_dispatch::<FudFindNodesReply>().await;
let msg_subscriber_nodes = channel.subscribe_msg::<FudFindNodesReply>().await.unwrap();
@@ -206,6 +209,7 @@ impl DhtHandler for Fud {
let reply = msg_subscriber_nodes.receive_with_timeout(self.dht().settings.timeout).await?;
msg_subscriber_nodes.unsubscribe().await;
self.cleanup_channel(channel).await;
Ok(reply.nodes.clone())
}
@@ -222,8 +226,6 @@ impl Fud {
event_publisher: PublisherPtr<FudEvent>,
) -> Result<Self> {
let (get_tx, get_rx) = smol::channel::unbounded();
let (metadata_fetch_tx, metadata_fetch_rx) = smol::channel::unbounded();
let (metadata_fetch_end_tx, metadata_fetch_end_rx) = smol::channel::unbounded();
// Hashmap used for routing
let seeders_router = Arc::new(RwLock::new(HashMap::new()));
@@ -244,10 +246,7 @@ impl Fud {
resources: Arc::new(RwLock::new(HashMap::new())),
get_tx,
get_rx,
metadata_fetch_tx,
metadata_fetch_rx,
metadata_fetch_end_tx,
metadata_fetch_end_rx,
fetch_tasks: Arc::new(RwLock::new(HashMap::new())),
event_publisher,
};
@@ -430,7 +429,7 @@ impl Fud {
let mut seeders: HashSet<DhtRouterItem> = HashSet::new();
for node in nodes {
let channel = match self.get_channel(node).await {
let channel = match self.get_channel(node, None).await {
Ok(channel) => channel,
Err(e) => {
warn!(target: "fud::fetch_seeders()", "Could not get a channel for node {}: {}", hash_to_string(&node.id), e);
@@ -444,6 +443,7 @@ impl Fud {
Ok(msg_subscriber) => msg_subscriber,
Err(e) => {
warn!(target: "fud::fetch_seeders()", "Error subscribing to msg: {}", e);
self.cleanup_channel(channel).await;
continue;
}
};
@@ -452,6 +452,7 @@ impl Fud {
if let Err(e) = send_res {
warn!(target: "fud::fetch_seeders()", "Error while sending FudFindSeedersRequest: {}", e);
msg_subscriber.unsubscribe().await;
self.cleanup_channel(channel).await;
continue;
}
@@ -461,11 +462,13 @@ impl Fud {
Err(e) => {
warn!(target: "fud::fetch_seeders()", "Error waiting for reply: {}", e);
msg_subscriber.unsubscribe().await;
self.cleanup_channel(channel).await;
continue;
}
};
msg_subscriber.unsubscribe().await;
self.cleanup_channel(channel).await;
seeders.extend(reply.seeders.clone());
}
@@ -499,7 +502,7 @@ impl Fud {
};
while let Some(seeder) = shuffled_seeders.pop() {
let channel = match self.get_channel(&seeder.node).await {
let channel = match self.get_channel(&seeder.node, Some(*hash)).await {
Ok(channel) => channel,
Err(e) => {
warn!(target: "fud::fetch_missing_chunks()", "Could not get a channel for node {}: {}", hash_to_string(&seeder.node.id), e);
@@ -608,6 +611,8 @@ impl Fud {
msg_subscriber_notfound.unsubscribe().await;
}
self.cleanup_channel(channel).await;
// Stop when there are no missing chunks
if remaining_chunks.is_empty() {
break;
@@ -618,21 +623,23 @@ impl Fud {
}
/// Fetch a single resource metadata from `nodes`.
/// If the file is smaller than a single chunk then seeder can send the
/// chunk directly, we will create the file from it on path `path`.
/// If the resource is a file smaller than a single chunk then seeder can send the
/// chunk directly, and we will create the file from it on path `path`.
/// 1. Request seeders from those nodes
/// 2. Request the metadata from the seeders
/// 3. Insert metadata to geode using the reply
pub async fn fetch_metadata(
&self,
nodes: &Vec<DhtNode>,
hash: &blake3::Hash,
) -> Option<FetchReply> {
nodes: &Vec<DhtNode>,
path: &Path,
) -> Result<()> {
let mut queried_seeders: HashSet<blake3::Hash> = HashSet::new();
let mut result: Option<FetchReply> = None;
for node in nodes {
// 1. Request list of seeders
let channel = match self.get_channel(node).await {
let channel = match self.get_channel(node, Some(*hash)).await {
Ok(channel) => channel,
Err(e) => {
warn!(target: "fud::fetch_metadata()", "Could not get a channel for node {}: {}", hash_to_string(&node.id), e);
@@ -654,6 +661,7 @@ impl Fud {
if let Err(e) = send_res {
warn!(target: "fud::fetch_metadata()", "Error while sending FudFindSeedersRequest: {}", e);
msg_subscriber.unsubscribe().await;
self.cleanup_channel(channel).await;
continue;
}
@@ -663,6 +671,7 @@ impl Fud {
Err(e) => {
warn!(target: "fud::fetch_metadata()", "Error waiting for reply: {}", e);
msg_subscriber.unsubscribe().await;
self.cleanup_channel(channel).await;
continue;
}
};
@@ -671,6 +680,7 @@ impl Fud {
info!(target: "fud::fetch_metadata()", "Found {} seeders for {} (from {})", seeders.len(), hash_to_string(hash), hash_to_string(&node.id));
msg_subscriber.unsubscribe().await;
self.cleanup_channel(channel).await;
// 2. Request the file/chunk from the seeders
while let Some(seeder) = seeders.pop() {
@@ -680,7 +690,7 @@ impl Fud {
}
queried_seeders.insert(seeder.node.id);
if let Ok(channel) = self.get_channel(&seeder.node).await {
if let Ok(channel) = self.get_channel(&seeder.node, Some(*hash)).await {
let msg_subsystem = channel.message_subsystem();
msg_subsystem.add_dispatch::<FudChunkReply>().await;
msg_subsystem.add_dispatch::<FudFileReply>().await;
@@ -702,6 +712,7 @@ impl Fud {
msg_subscriber_file.unsubscribe().await;
msg_subscriber_dir.unsubscribe().await;
msg_subscriber_notfound.unsubscribe().await;
self.cleanup_channel(channel).await;
continue;
}
@@ -721,6 +732,7 @@ impl Fud {
msg_subscriber_file.unsubscribe().await;
msg_subscriber_dir.unsubscribe().await;
msg_subscriber_notfound.unsubscribe().await;
self.cleanup_channel(channel).await;
};
// Wait for a FudChunkReply, FudFileReply, FudDirectoryReply, or FudNotFound
@@ -797,12 +809,73 @@ impl Fud {
}
}
result
// We did not find the resource
if result.is_none() {
return Err(Error::GeodeFileRouteNotFound)
}
// 3. Insert metadata to geode using the reply
// At this point the reply content is already verified
match result.unwrap() {
FetchReply::Directory(FudDirectoryReply { files, chunk_hashes }) => {
// Convert all file paths from String to PathBuf
let mut files: Vec<_> = files
.into_iter()
.map(|(path_str, size)| (PathBuf::from(path_str), size))
.collect();
self.geode.sort_files(&mut files);
if let Err(e) = self.geode.insert_metadata(hash, &chunk_hashes, &files).await {
error!(target: "fud::fetch_metadata()", "Failed inserting directory {} to Geode: {}", hash_to_string(hash), e);
return Err(e)
}
}
FetchReply::File(FudFileReply { chunk_hashes }) => {
if let Err(e) = self.geode.insert_metadata(hash, &chunk_hashes, &[]).await {
error!(target: "fud::fetch_metadata()", "Failed inserting file {} to Geode: {}", hash_to_string(hash), e);
return Err(e)
}
}
// Looked for a file but got a chunk: the entire file fits in a single chunk
FetchReply::Chunk(FudChunkReply { chunk }) => {
info!(target: "fud::fetch_metadata()", "File fits in a single chunk");
let chunk_hash = blake3::hash(&chunk);
let _ = self.geode.insert_metadata(hash, &[chunk_hash], &[]).await;
let mut chunked_file = ChunkedStorage::new(
&[chunk_hash],
&[(path.to_path_buf(), chunk.len() as u64)],
false,
);
if let Err(e) = self.geode.write_chunk(&mut chunked_file, &chunk).await {
error!(target: "fud::fetch_metadata()", "Failed inserting chunk {} to Geode: {}", hash_to_string(&chunk_hash), e);
return Err(e)
};
}
};
Ok(())
}
/// Start downloading a file or directory from the network to `path`.
/// This creates a new task in `fetch_tasks` calling `fetch_resource()`.
pub async fn get(&self, hash: &blake3::Hash, path: &Path) -> Result<()> {
let fetch_tasks = self.fetch_tasks.read().await;
if fetch_tasks.contains_key(hash) {
return Err(Error::Custom(format!(
"Resource {} is already being downloaded",
hash_to_string(hash)
)))
}
drop(fetch_tasks);
self.get_tx.send((*hash, path.to_path_buf())).await?;
Ok(())
}
/// Download a file or directory from the network to `path`.
/// This creates a new task in `fetch_tasks` calling `fetch_resource()`.
pub async fn get(&self, hash: &blake3::Hash, path: &Path) -> Result<()> {
/// Called when `get()` creates a new fetch task.
pub async fn fetch_resource(&self, hash: &blake3::Hash, path: &Path) -> Result<()> {
let self_node = self.dht().node().await;
let mut closest_nodes = vec![];
@@ -859,12 +932,7 @@ impl Fud {
closest_nodes = self.lookup_nodes(hash).await.unwrap_or_default();
// Fetch file or directory metadata
self.metadata_fetch_tx
.send((closest_nodes.clone(), *hash, path.to_path_buf()))
.await
.unwrap();
info!(target: "self::get()", "Waiting for background file fetch task...");
match self.metadata_fetch_end_rx.recv().await.unwrap() {
match self.fetch_metadata(hash, &closest_nodes, path).await {
// The file metadata was found and inserted into geode
Ok(()) => self.geode.get(hash, path).await.unwrap(),
// We could not find the metadata, or any other error occured
@@ -903,36 +971,44 @@ impl Fud {
}
}
// Set resource status to `Verifying` and send FudEvent::MetadataDownloadCompleted
let mut resources_write = self.resources.write().await;
if let Some(resource) = resources_write.get_mut(hash) {
resource.status = ResourceStatus::Verifying;
resource.chunks_total = chunked.len() as u64;
resource.rtype = match chunked.is_dir() {
false => ResourceType::File,
true => ResourceType::Directory,
};
self.event_publisher
.notify(FudEvent::MetadataDownloadCompleted(MetadataDownloadCompleted {
hash: *hash,
resource: resource.clone(),
}))
.await;
}
drop(resources_write);
// Mark locally available chunks as such
if let Err(e) = self.geode.verify_chunks(&mut chunked).await {
error!(target: "self::get()", "Error while verifying chunks: {}", e);
return Err(e);
}
// Set resource status to `Downloading`
// Set resource.chunks_downloaded and send FudEvent::ResourceUpdated
let mut resources_write = self.resources.write().await;
let resource = match resources_write.get_mut(hash) {
Some(resource) => {
resource.status = ResourceStatus::Downloading;
resource.chunks_downloaded = chunked.local_chunks() as u64;
resource.chunks_total = chunked.len() as u64;
resource.rtype = match chunked.is_dir() {
false => ResourceType::File,
true => ResourceType::Directory,
};
resource.clone()
}
None => return Ok(()), // Resource was removed, abort
};
drop(resources_write);
if let Some(resource) = resources_write.get_mut(hash) {
resource.chunks_downloaded = chunked.local_chunks() as u64;
// Send a MetadataDownloadCompleted event
self.event_publisher
.notify(FudEvent::MetadataDownloadCompleted(event::MetadataDownloadCompleted {
hash: *hash,
resource: resource.clone(),
}))
.await;
self.event_publisher
.notify(FudEvent::ResourceUpdated(ResourceUpdated {
hash: *hash,
resource: resource.clone(),
}))
.await;
}
drop(resources_write);
// If `chunked` is a file that is bigger than the all its chunks,
// truncate the file to the chunks.
@@ -947,7 +1023,7 @@ impl Fud {
}
}
// If the file is already complete, we don't need to download any chunk
// If the resource is already complete, we don't need to download any chunk
if chunked.is_complete() {
// Announce the file
let self_announce = FudAnnounce { key: *hash, seeders: vec![self_node.clone().into()] };
@@ -976,6 +1052,25 @@ impl Fud {
return Ok(());
}
// Set resource status to `Downloading`
let mut resources_write = self.resources.write().await;
let resource = match resources_write.get_mut(hash) {
Some(resource) => {
resource.status = ResourceStatus::Downloading;
resource.clone()
}
None => return Ok(()), // Resource was removed, abort
};
drop(resources_write);
// Send a MetadataDownloadCompleted event
self.event_publisher
.notify(FudEvent::MetadataDownloadCompleted(event::MetadataDownloadCompleted {
hash: *hash,
resource: resource.clone(),
}))
.await;
// Find nodes close to the file hash if we didn't previously fetched them
if closest_nodes.is_empty() {
closest_nodes = self.lookup_nodes(hash).await.unwrap_or_default();
@@ -1001,6 +1096,22 @@ impl Fud {
return Err(e);
}
};
// Set resource status to `Verifying` and send FudEvent::ResourceUpdated
let mut resources_write = self.resources.write().await;
if let Some(resource) = resources_write.get_mut(hash) {
resource.status = ResourceStatus::Verifying;
self.event_publisher
.notify(FudEvent::ResourceUpdated(ResourceUpdated {
hash: *hash,
resource: resource.clone(),
}))
.await;
}
drop(resources_write);
// Verify all chunks
self.geode.verify_chunks(&mut chunked).await?;
// We fetched all chunks, but the file is not complete
@@ -1149,4 +1260,18 @@ impl Fud {
.notify(FudEvent::ResourceRemoved(event::ResourceRemoved { hash: *hash }))
.await;
}
/// Stop all tasks in `fetch_tasks`.
pub async fn stop(&self) {
// Create a clone of fetch_tasks because `task.stop()` needs a write lock
let fetch_tasks = self.fetch_tasks.read().await;
let cloned_fetch_tasks: HashMap<blake3::Hash, Arc<StoppableTask>> =
fetch_tasks.iter().map(|(key, value)| (*key, value.clone())).collect();
drop(fetch_tasks);
// Stop all tasks
for task in cloned_fetch_tasks.values() {
task.stop().await;
}
}
}

View File

@@ -41,7 +41,7 @@ use fud::{
get_node_id,
proto::{FudFindNodesReply, ProtocolFud},
rpc::JsonRpcInterface,
tasks::{announce_seed_task, fetch_metadata_task, get_task},
tasks::{announce_seed_task, get_task},
Fud,
};
@@ -179,24 +179,10 @@ async fn realmain(args: Args, ex: Arc<Executor<'static>>) -> Result<()> {
ex.clone(),
);
info!(target: "fud", "Starting fetch metadata task");
let file_task = StoppableTask::new();
file_task.clone().start(
fetch_metadata_task(fud.clone()),
|res| async {
match res {
Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ }
Err(e) => error!(target: "fud", "Failed starting fetch metadata task: {}", e),
}
},
Error::DetachedTaskStopped,
ex.clone(),
);
info!(target: "fud", "Starting get task");
let get_task_ = StoppableTask::new();
get_task_.clone().start(
get_task(fud.clone()),
get_task(fud.clone(), ex.clone()),
|res| async {
match res {
Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ }
@@ -288,8 +274,8 @@ async fn realmain(args: Args, ex: Arc<Executor<'static>>) -> Result<()> {
signals_handler.wait_termination(signals_task).await?;
info!(target: "fud", "Caught termination signal, cleaning up and exiting...");
info!(target: "fud", "Stopping fetch file task...");
file_task.stop().await;
info!(target: "fud", "Stopping fetch tasks...");
fud.stop().await;
info!(target: "fud", "Stopping get task...");
get_task_.stop().await;

View File

@@ -116,8 +116,8 @@ impl JsonRpcInterface {
}
// RPCAPI:
// Fetch a file from the network. Takes a file hash and file path (absolute or relative) as parameters.
// Returns the path where the file will be located once downloaded.
// Fetch a resource from the network. Takes a hash and path (absolute or relative) as parameters.
// Returns the path where the resource will be located once downloaded.
//
// --> {"jsonrpc": "2.0", "method": "get", "params": ["1211...abfd", "~/myfile.jpg"], "id": 42}
// <-- {"jsonrpc": "2.0", "result": "/home/user/myfile.jpg", "id": 42}
@@ -157,7 +157,10 @@ impl JsonRpcInterface {
None => self.fud.downloads_path.join(&hash_str),
};
let _ = self.fud.get_tx.send((hash, path.clone())).await;
// Start downloading the resource
if let Err(e) = self.fud.get(&hash, &path).await {
return JsonError::new(ErrorCode::InternalError, Some(e.to_string()), id).into()
}
JsonResponse::new(JsonValue::String(path.to_string_lossy().to_string()), id).into()
}

View File

@@ -17,12 +17,11 @@
*/
use log::{error, info};
use std::{path::PathBuf, sync::Arc};
use std::sync::Arc;
use darkfi::{
dht::DhtHandler,
geode::{hash_to_string, ChunkedStorage},
system::sleep,
system::{sleep, ExecutorPtr, StoppableTask},
Error, Result,
};
@@ -37,71 +36,40 @@ pub enum FetchReply {
Chunk(FudChunkReply),
}
/// Triggered when calling the `get` RPC method
pub async fn get_task(fud: Arc<Fud>) -> Result<()> {
/// Triggered when calling the `fud.get()` method.
/// It creates a new StoppableTask (running `fud.fetch_resource()`) and inserts
/// it into the `fud.fetch_tasks` hashmap. When the task is stopped it's
/// removed from the hashmap.
pub async fn get_task(fud: Arc<Fud>, executor: ExecutorPtr) -> Result<()> {
loop {
let (file_hash, file_path) = fud.get_rx.recv().await.unwrap();
let (hash, path) = fud.get_rx.recv().await.unwrap();
let _ = fud.get(&file_hash, &file_path).await;
}
}
// Create the new task
let mut fetch_tasks = fud.fetch_tasks.write().await;
let task = StoppableTask::new();
fetch_tasks.insert(hash, task.clone());
drop(fetch_tasks);
/// Background task that receives file fetch requests and tries to
/// fetch objects from the network using the routing table.
/// TODO: This can be optimised a lot for connection reuse, etc.
pub async fn fetch_metadata_task(fud: Arc<Fud>) -> Result<()> {
info!(target: "fud::fetch_metadata_task()", "Started background metadata fetch task");
loop {
let (nodes, hash, path) = fud.metadata_fetch_rx.recv().await.unwrap();
info!(target: "fud::fetch_metadata_task()", "Fetching metadata for {}", hash_to_string(&hash));
let reply = fud.fetch_metadata(&nodes, &hash).await;
if reply.is_none() {
fud.metadata_fetch_end_tx.send(Err(Error::GeodeFileRouteNotFound)).await.unwrap();
continue
}
let reply = reply.unwrap();
// At this point the reply content was already verified in `fud.fetch_metadata`
match reply {
FetchReply::Directory(FudDirectoryReply { files, chunk_hashes }) => {
// Convert all file paths from String to PathBuf
let mut files: Vec<_> = files
.into_iter()
.map(|(path_str, size)| (PathBuf::from(path_str), size))
.collect();
fud.geode.sort_files(&mut files);
if let Err(e) = fud.geode.insert_metadata(&hash, &chunk_hashes, &files).await {
error!(target: "fud::fetch_metadata_task()", "Failed inserting directory {} to Geode: {}", hash_to_string(&hash), e);
fud.metadata_fetch_end_tx.send(Err(e)).await.unwrap();
continue
// Start the new task
let fud_1 = fud.clone();
let fud_2 = fud.clone();
task.start(
async move { fud_1.fetch_resource(&hash, &path).await },
move |res| async move {
// Remove the task from the `fud.fetch_tasks` hashmap once it is
// stopped (error, manually, or just done).
let mut fetch_tasks = fud_2.fetch_tasks.write().await;
fetch_tasks.remove(&hash);
match res {
Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ }
Err(e) => {
error!(target: "fud::get_task()", "Error while fetching resource: {}", e)
}
}
fud.metadata_fetch_end_tx.send(Ok(())).await.unwrap();
}
FetchReply::File(FudFileReply { chunk_hashes }) => {
if let Err(e) = fud.geode.insert_metadata(&hash, &chunk_hashes, &[]).await {
error!(target: "fud::fetch_metadata_task()", "Failed inserting file {} to Geode: {}", hash_to_string(&hash), e);
fud.metadata_fetch_end_tx.send(Err(e)).await.unwrap();
continue
}
fud.metadata_fetch_end_tx.send(Ok(())).await.unwrap();
}
// Looked for a file but got a chunk: the entire file fits in a single chunk
FetchReply::Chunk(FudChunkReply { chunk }) => {
info!(target: "fud::fetch_metadata_task()", "File fits in a single chunk");
let chunk_hash = blake3::hash(&chunk);
let _ = fud.geode.insert_metadata(&hash, &[chunk_hash], &[]).await;
let mut chunked_file =
ChunkedStorage::new(&[chunk_hash], &[(path, chunk.len() as u64)], false);
if let Err(e) = fud.geode.write_chunk(&mut chunked_file, &chunk).await {
error!(target: "fud::fetch_metadata_task()", "Failed inserting chunk {} to Geode: {}", hash_to_string(&chunk_hash), e);
fud.metadata_fetch_end_tx.send(Err(e)).await.unwrap();
continue
};
fud.metadata_fetch_end_tx.send(Ok(())).await.unwrap();
}
}
},
Error::DetachedTaskStopped,
executor.clone(),
);
}
}
@@ -115,16 +83,16 @@ pub async fn announce_seed_task(fud: Arc<Fud>) -> Result<()> {
let seeders = vec![fud.dht().node().await.into()];
info!(target: "fud::announce_task()", "Verifying seeds...");
info!(target: "fud::announce_seed_task()", "Verifying seeds...");
let seeding_resources = match fud.verify_resources(None).await {
Ok(resources) => resources,
Err(e) => {
error!(target: "fud::announce_task()", "Error while verifying seeding resources: {}", e);
error!(target: "fud::announce_seed_task()", "Error while verifying seeding resources: {}", e);
continue;
}
};
info!(target: "fud::announce_task()", "Announcing files...");
info!(target: "fud::announce_seed_task()", "Announcing files...");
for resource in seeding_resources {
let _ = fud
.announce(
@@ -135,7 +103,7 @@ pub async fn announce_seed_task(fud: Arc<Fud>) -> Result<()> {
.await;
}
info!(target: "fud::announce_task()", "Pruning seeders...");
info!(target: "fud::announce_seed_task()", "Pruning seeders...");
fud.dht().prune_router(fud.seeders_router.clone(), interval.try_into().unwrap()).await;
}
}

View File

@@ -21,9 +21,13 @@ use futures::stream::FuturesUnordered;
use log::{debug, info, warn};
use num_bigint::BigUint;
use smol::{lock::Semaphore, stream::StreamExt};
use std::{collections::HashSet, sync::Arc, time::Duration};
use std::{
collections::{HashMap, HashSet},
sync::Arc,
time::Duration,
};
use super::{Dht, DhtNode, DhtRouterItem, DhtRouterPtr};
use super::{ChannelCacheItem, Dht, DhtNode, DhtRouterItem, DhtRouterPtr};
use crate::{
net::{
connector::Connector,
@@ -63,9 +67,10 @@ pub trait DhtHandler {
let nodes = self.lookup_nodes(key).await?;
for node in nodes {
let channel = self.get_channel(&node).await;
if let Ok(ch) = channel {
let _ = ch.send(message).await;
let channel_res = self.get_channel(&node, None).await;
if let Ok(channel) = channel_res {
let _ = channel.send(message).await;
self.cleanup_channel(channel).await;
}
}
@@ -87,7 +92,7 @@ pub trait DhtHandler {
let mut channel_cache = channel_cache_lock.write().await;
// Skip this channel if it's stopped or not new.
if channel.is_stopped() || channel_cache.values().any(|&v| v == channel.info.id) {
if channel.is_stopped() || channel_cache.keys().any(|&k| k == channel.info.id) {
continue;
}
// Skip this channel if it's a seed or refine session.
@@ -95,21 +100,27 @@ pub trait DhtHandler {
continue;
}
let node = self.ping(channel.clone()).await;
sleep(1).await;
let ping_res = self.ping(channel.clone()).await;
if let Ok(n) = node {
channel_cache.insert(n.id, channel.info.id);
drop(channel_cache);
if let Err(e) = ping_res {
warn!(target: "dht::DhtHandler::channel_task()", "Error while pinging (requesting node id) {}: {}", channel.address(), e);
channel.stop().await;
continue;
}
let node_cache_lock = self.dht().node_cache.clone();
let mut node_cache = node_cache_lock.write().await;
node_cache.insert(channel.info.id, n.clone());
drop(node_cache);
let node = ping_res.unwrap();
if !n.addresses.is_empty() {
self.add_node(n.clone()).await;
let _ = self.on_new_node(&n.clone()).await;
}
channel_cache.entry(channel.info.id).or_insert_with(|| ChannelCacheItem {
node: node.clone(),
topic: None,
usage_count: 0,
});
drop(channel_cache);
if !node.addresses.is_empty() {
self.add_node(node.clone()).await;
let _ = self.on_new_node(&node.clone()).await;
}
}
}
@@ -121,10 +132,10 @@ pub trait DhtHandler {
let channel_cache_lock = self.dht().channel_cache.clone();
let mut channel_cache = channel_cache_lock.write().await;
for (node_id, channel_id) in channel_cache.clone() {
for (channel_id, _) in channel_cache.clone() {
let channel = self.dht().p2p.get_channel(channel_id);
if channel.is_none() || (channel.is_some() && channel.unwrap().is_stopped()) {
channel_cache.remove(&node_id);
if channel.is_none() {
channel_cache.remove(&channel_id);
}
}
}
@@ -155,9 +166,9 @@ pub trait DhtHandler {
// Bucket is full
if bucket.nodes.len() >= self.dht().settings.k {
// Ping the least recently seen node
let channel = self.get_channel(&bucket.nodes[0]).await;
if channel.is_ok() {
let ping_res = self.ping(channel.unwrap()).await;
if let Ok(channel) = self.get_channel(&bucket.nodes[0], None).await {
let ping_res = self.ping(channel.clone()).await;
self.cleanup_channel(channel).await;
if ping_res.is_ok() {
// Ping was successful, move the least recently seen node to the tail
let n = bucket.nodes.remove(0);
@@ -303,13 +314,43 @@ pub trait DhtHandler {
return Ok(result.to_vec())
}
/// Get an existing channel, or create a new one
async fn get_channel(&self, node: &DhtNode) -> Result<ChannelPtr> {
/// Get a channel (existing or create a new one) to `node` about `topic`.
/// Don't forget to call `cleanup_channel()` once you are done with it.
async fn get_channel(&self, node: &DhtNode, topic: Option<blake3::Hash>) -> Result<ChannelPtr> {
let channel_cache_lock = self.dht().channel_cache.clone();
let channel_cache = channel_cache_lock.read().await;
let mut channel_cache = channel_cache_lock.write().await;
if let Some(channel_id) = channel_cache.get(&node.id) {
if let Some(channel) = self.dht().p2p.get_channel(*channel_id) {
// Get existing channels for this node, regardless of topic
let channels: HashMap<u32, ChannelCacheItem> = channel_cache
.iter()
.filter(|&(_, item)| item.node == *node)
.map(|(&key, item)| (key, item.clone()))
.collect();
let (channel_id, topic, usage_count) =
// If we already have a channel for this node and topic, use it
if let Some((cid, cached)) = channels.iter().find(|&(_, c)| c.topic == topic) {
(Some(*cid), cached.topic, cached.usage_count)
}
// If we have a topicless channel for this node, use it
else if let Some((cid, cached)) = channels.iter().find(|&(_, c)| c.topic.is_none()) {
(Some(*cid), topic, cached.usage_count)
}
// If we don't need any specific topic, use the first channel we have
else if topic.is_none() {
match channels.iter().next() {
Some((cid, cached)) => (Some(*cid), cached.topic, cached.usage_count),
_ => (None, topic, 0),
}
}
// There is no existing channel we can use, we will create one
else {
(None, topic, 0)
};
// If we found an existing channel we can use, try to use it
if let Some(channel_id) = channel_id {
if let Some(channel) = self.dht().p2p.get_channel(channel_id) {
if channel.session_type_id() & (SESSION_SEED | SESSION_REFINE) != 0 {
return Err(Error::Custom(
"Could not get a channel (for DHT) as this is a seed or refine session"
@@ -320,6 +361,11 @@ pub trait DhtHandler {
if channel.is_stopped() {
channel.clone().start(self.dht().executor.clone());
}
channel_cache.insert(
channel_id,
ChannelCacheItem { node: node.clone(), topic, usage_count: usage_count + 1 },
);
return Ok(channel);
}
}
@@ -356,12 +402,35 @@ pub trait DhtHandler {
continue;
}
channel_cache.insert(
channel.info.id,
ChannelCacheItem { node: node.clone(), topic, usage_count: 1 },
);
return Ok(channel)
}
Err(Error::Custom("Could not create channel".to_string()))
}
/// Decrement the channel usage count, if it becomes 0 then set the topic
/// to None, so that this channel is available for another task
async fn cleanup_channel(&self, channel: ChannelPtr) {
let channel_cache_lock = self.dht().channel_cache.clone();
let mut channel_cache = channel_cache_lock.write().await;
if let Some(cached) = channel_cache.get_mut(&channel.info.id) {
if cached.usage_count > 0 {
cached.usage_count -= 1;
}
// If the channel is not used by anything, remove the topic
if cached.usage_count == 0 {
cached.topic = None;
}
}
}
/// Add nodes as a provider for a key
async fn add_to_router(
&self,

View File

@@ -85,6 +85,24 @@ impl From<DhtNode> for DhtRouterItem {
}
}
#[derive(Clone)]
pub struct ChannelCacheItem {
/// The DHT node the channel is connected to.
node: DhtNode,
/// Topic is a hash that you set to remember what the channel is about,
/// it's not shared with the peer. If you ask for a channel (with
/// `handler.get_channel()`) for a specific topic, it will give you a
/// channel that has no topic, has the same topic, or a new
/// channel.
topic: Option<blake3::Hash>,
/// Usage count increments when you call `handler.get_channel()` and
/// decrements when you call `handler.cleanup_channel()`. A channel's
/// topic is cleared on cleanup if its usage count is zero.
usage_count: u32,
}
pub struct Dht {
/// Our own node id
pub node_id: blake3::Hash,
@@ -94,10 +112,8 @@ pub struct Dht {
pub buckets: Arc<RwLock<Vec<DhtBucket>>>,
/// Number of buckets
pub n_buckets: usize,
/// Channel ID -> Node ID
pub node_cache: Arc<RwLock<HashMap<u32, DhtNode>>>,
/// Node ID -> Channel ID
pub channel_cache: Arc<RwLock<HashMap<blake3::Hash, u32>>>,
/// Channel ID -> ChannelCacheItem
pub channel_cache: Arc<RwLock<HashMap<u32, ChannelCacheItem>>>,
/// Node ID -> Set of keys
pub router_cache: Arc<RwLock<HashMap<blake3::Hash, HashSet<blake3::Hash>>>>,
@@ -125,7 +141,6 @@ impl Dht {
buckets: Arc::new(RwLock::new(buckets)),
n_buckets: 256,
bootstrapped: Arc::new(RwLock::new(false)),
node_cache: Arc::new(RwLock::new(HashMap::new())),
channel_cache: Arc::new(RwLock::new(HashMap::new())),
router_cache: Arc::new(RwLock::new(HashMap::new())),
@@ -230,9 +245,13 @@ impl Dht {
/// Channel ID -> DhtNode
pub async fn get_node_from_channel(&self, channel_id: u32) -> Option<DhtNode> {
let node_cache_lock = self.node_cache.clone();
let node_cache = node_cache_lock.read().await;
node_cache.get(&channel_id).cloned()
let channel_cache_lock = self.channel_cache.clone();
let channel_cache = channel_cache_lock.read().await;
if let Some(cached) = channel_cache.get(&channel_id).cloned() {
return Some(cached.node)
}
None
}
/// Remove nodes in router that are older than expiry_secs