From ff112dd6c285dd9d62ddaa6e0979db1c226ee25b Mon Sep 17 00:00:00 2001 From: darkfi Date: Sat, 5 Apr 2025 14:29:01 +0200 Subject: [PATCH] fud: add resource.rs, add `fu watch`, add `fu rm`, merge announcing task and pruning task --- Cargo.lock | 1 + bin/fud/fu/Cargo.toml | 1 + bin/fud/fu/src/main.rs | 269 +++++++++++++++++++++++++++++++++--- bin/fud/fud/src/main.rs | 158 ++++++++++++++++----- bin/fud/fud/src/resource.rs | 58 ++++++++ bin/fud/fud/src/rpc.rs | 223 +++++++++++++++++++++++++----- bin/fud/fud/src/tasks.rs | 49 ++++--- src/geode/mod.rs | 10 ++ 8 files changed, 654 insertions(+), 115 deletions(-) create mode 100644 bin/fud/fud/src/resource.rs diff --git a/Cargo.lock b/Cargo.lock index ee88e0d68..b42bbace1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3385,6 +3385,7 @@ dependencies = [ "log", "simplelog", "smol", + "termcolor", "url", ] diff --git a/bin/fud/fu/Cargo.toml b/bin/fud/fu/Cargo.toml index da9a0d8ca..8aa44c8df 100644 --- a/bin/fud/fu/Cargo.toml +++ b/bin/fud/fu/Cargo.toml @@ -19,6 +19,7 @@ clap = {version = "4.4.11", features = ["derive"]} log = "0.4.26" simplelog = "0.12.2" url = "2.5.4" +termcolor = "1.4.1" [lints] workspace = true diff --git a/bin/fud/fu/src/main.rs b/bin/fud/fu/src/main.rs index 191b28b7a..411f8e8f6 100644 --- a/bin/fud/fu/src/main.rs +++ b/bin/fud/fu/src/main.rs @@ -19,11 +19,13 @@ use clap::{Parser, Subcommand}; use log::error; use simplelog::{ColorChoice, TermLogger, TerminalMode}; +use smol::lock::RwLock; use std::{ collections::HashMap, io::{stdout, Write}, sync::Arc, }; +use termcolor::{Color, ColorSpec, StandardStream, WriteColor}; use url::Url; use darkfi::{ @@ -70,6 +72,15 @@ enum Subcmd { file: String, }, + /// Watch + Watch {}, + + /// Remove a resource from fud + Rm { + /// Resource hash + hash: String, + }, + /// Get the current node buckets ListBuckets {}, @@ -152,22 +163,37 @@ impl Fu { } match params.get("event").unwrap().get::().unwrap().as_str() { "file_download_completed" => { - chunks_total = - *info.get("chunk_count").unwrap().get::().unwrap() as usize; - print_progress_bar(chunks_downloaded, chunks_total); - } - "chunk_download_completed" => { - chunks_downloaded += 1; - print_progress_bar(chunks_downloaded, chunks_total); - } - "download_completed" => { - let info = params - .get("info") + let resource = info + .get("resource") .unwrap() .get::>() .unwrap(); + chunks_total = + *resource.get("chunks_total").unwrap().get::().unwrap() + as usize; + print_progress_bar(chunks_downloaded, chunks_total); + } + "chunk_download_completed" => { + let resource = info + .get("resource") + .unwrap() + .get::>() + .unwrap(); + chunks_downloaded = + *resource.get("chunks_downloaded").unwrap().get::().unwrap() + as usize; + print_progress_bar(chunks_downloaded, chunks_total); + } + "download_completed" => { let file_path = info.get("file_path").unwrap().get::().unwrap(); - chunks_downloaded = chunks_total; + let resource = info + .get("resource") + .unwrap() + .get::>() + .unwrap(); + chunks_downloaded = + *resource.get("chunks_downloaded").unwrap().get::().unwrap() + as usize; print_progress_bar(chunks_downloaded, chunks_total); println!("\nDownload completed:\n{}", file_path); return Ok(()); @@ -186,11 +212,6 @@ impl Fu { } "download_error" => { // An error that caused the download to be unsuccessful - let info = params - .get("info") - .unwrap() - .get::>() - .unwrap(); println!(); return Err(Error::Custom( info.get("error").unwrap().get::().unwrap().to_string(), @@ -278,6 +299,218 @@ impl Fu { Ok(()) } + + async fn watch(&self, ex: ExecutorPtr) -> Result<()> { + let req = JsonRequest::new("list_resources", JsonValue::Array(vec![])); + let rep = self.rpc_client.request(req).await?; + + let resources_json: Vec = rep.clone().try_into().unwrap(); + let resources: Arc>>> = Arc::new(RwLock::new(vec![])); + + let publisher = Publisher::new(); + let subscription = Arc::new(publisher.clone().subscribe().await); + let subscriber_task = StoppableTask::new(); + let publisher_ = publisher.clone(); + let rpc_client_ = self.rpc_client.clone(); + subscriber_task.clone().start( + async move { + let req = JsonRequest::new("subscribe", JsonValue::Array(vec![])); + rpc_client_.subscribe(req, publisher).await + }, + move |res| async move { + match res { + Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ } + Err(e) => { + error!("{}", e); + publisher_ + .notify(JsonResult::Error(JsonError::new( + ErrorCode::InternalError, + None, + 0, + ))) + .await; + } + } + }, + Error::DetachedTaskStopped, + ex, + ); + + let mut tstdout = StandardStream::stdout(ColorChoice::Auto); + + let mut update_resource = async |resource: &HashMap| { + let hash = resource.get("hash").unwrap().get::().unwrap(); + let mut resources_write = resources.write().await; + let i = match resources_write + .iter() + .position(|r| r.get("hash").unwrap().get::().unwrap() == hash) + { + Some(i) => { + resources_write.remove(i); + resources_write.insert(i, resource.clone()); + i + } + None => { + resources_write.push(resource.clone()); + resources_write.len() - 1 + } + }; + + // Move the cursor to the i-th line and clear it + print!("\x1b[{};1H\x1B[2K", i + 2); + + let hash = resource.get("hash").unwrap().get::().unwrap(); + print!("\r{:>44} ", hash,); + + let status = resource.get("status").unwrap().get::().unwrap(); + tstdout + .set_color( + ColorSpec::new() + .set_fg(match status.as_str() { + "downloading" => Some(Color::Blue), + "seeding" => Some(Color::Green), + "discovering" => Some(Color::Magenta), + "incomplete" => Some(Color::Red), + _ => None, + }) + .set_bold(true), + ) + .unwrap(); + print!("{:>11} ", status,); + tstdout.reset().unwrap(); + + let chunks_downloaded = + *resource.get("chunks_downloaded").unwrap().get::().unwrap() as usize; + let chunks_total = + *resource.get("chunks_total").unwrap().get::().unwrap() as usize; + match chunks_total { + 0 => { + print!("{:>5.1} {:>9}", 0.0, format!("{}/?", chunks_downloaded)); + } + _ => { + let percent = chunks_downloaded as f64 / chunks_total as f64 * 100.0; + print!( + "{:>5.1} {:>9}", + percent, + format!("{}/{}", chunks_downloaded, chunks_total) + ); + } + }; + println!(); + + // Move the cursor to end + print!("\x1b[{};1H", resources_write.len() + 2); + stdout().flush().unwrap(); + }; + + let print_begin = async || { + // Clear + print!("\x1B[2J\x1B[1;1H"); + + // Print column headers + println!("\x1b[4m{:>44} {:>11} {:>5} {:>9}\x1b[0m", "Hash", "Status", "%", "Chunks"); + }; + + print_begin().await; + if resources_json.is_empty() { + println!("No known resources"); + } else { + for resource in resources_json.iter() { + let resource = resource.get::>().unwrap(); + update_resource(resource).await; + } + } + + loop { + match subscription.receive().await { + JsonResult::Notification(n) => { + let params = n.params.get::>().unwrap(); + let info = + params.get("info").unwrap().get::>().unwrap(); + match params.get("event").unwrap().get::().unwrap().as_str() { + "download_started" => { + let resource = info + .get("resource") + .unwrap() + .get::>() + .unwrap(); + update_resource(resource).await; + } + "file_download_completed" => { + let resource = info + .get("resource") + .unwrap() + .get::>() + .unwrap(); + update_resource(resource).await; + } + "chunk_download_completed" => { + let resource = info + .get("resource") + .unwrap() + .get::>() + .unwrap(); + update_resource(resource).await; + } + "download_completed" => { + let resource = info + .get("resource") + .unwrap() + .get::>() + .unwrap(); + update_resource(resource).await; + } + "resource_removed" => { + { + let hash = info.get("file_hash").unwrap().get::().unwrap(); + let mut resources_write = resources.write().await; + let i = resources_write.iter().position(|r| { + r.get("hash").unwrap().get::().unwrap() == hash + }); + if let Some(i) = i { + resources_write.remove(i); + } + } + + let r = resources.read().await.clone(); + print_begin().await; + for resource in r.iter() { + update_resource(resource).await; + } + } + "missing_chunks" => { + let resource = info + .get("resource") + .unwrap() + .get::>() + .unwrap(); + update_resource(resource).await; + } + "download_error" => { + // An error that caused the download to be unsuccessful + } + _ => {} + } + } + + JsonResult::Error(e) => { + return Err(Error::UnexpectedJsonRpc(format!("Got error from JSON-RPC: {e:?}"))) + } + + x => { + return Err(Error::UnexpectedJsonRpc(format!( + "Got unexpected data from JSON-RPC: {x:?}" + ))) + } + } + } + } + + async fn remove(&self, hash: String) -> Result<()> { + let req = JsonRequest::new("remove", JsonValue::Array(vec![JsonValue::String(hash)])); + self.rpc_client.request(req).await?; + Ok(()) + } } fn main() -> Result<()> { @@ -296,6 +529,8 @@ fn main() -> Result<()> { match args.command { Subcmd::Get { file, name } => fu.get(file, name, ex.clone()).await, Subcmd::Put { file } => fu.put(file).await, + Subcmd::Watch {} => fu.watch(ex.clone()).await, + Subcmd::Rm { hash } => fu.remove(hash).await, Subcmd::ListBuckets {} => fu.list_buckets().await, Subcmd::ListSeeders {} => fu.list_seeders().await, }?; diff --git a/bin/fud/fud/src/main.rs b/bin/fud/fud/src/main.rs index 86ff20f67..bed005da8 100644 --- a/bin/fud/fud/src/main.rs +++ b/bin/fud/fud/src/main.rs @@ -24,6 +24,7 @@ use std::{ }; use num_bigint::BigUint; +use resource::{Resource, ResourceStatus}; use rpc::{ChunkDownloadCompleted, ChunkNotFound}; use tasks::FetchReply; @@ -45,7 +46,7 @@ use structopt_toml::{structopt::StructOpt, StructOptToml}; use darkfi::{ async_daemonize, cli_desc, - geode::{hash_to_string, Geode}, + geode::{hash_to_string, ChunkedFile, Geode}, net::{session::SESSION_DEFAULT, settings::SettingsOpt, ChannelPtr, P2p, P2pPtr}, rpc::{ jsonrpc::JsonSubscriber, @@ -67,6 +68,7 @@ use proto::{ }; mod dht; +mod resource; mod rpc; mod tasks; @@ -124,6 +126,9 @@ pub struct Fud { /// The DHT instance dht: Arc, + /// Resources (current status of all downloads/seeds) + resources: Arc>>, + get_tx: channel::Sender<(u16, blake3::Hash, PathBuf, Result<()>)>, get_rx: channel::Receiver<(u16, blake3::Hash, PathBuf, Result<()>)>, file_fetch_tx: channel::Sender<(blake3::Hash, Result<()>)>, @@ -137,9 +142,9 @@ pub struct Fud { dnet_sub: JsonSubscriber, /// Download JSON-RPC subscriber - download_sub: JsonSubscriber, + event_sub: JsonSubscriber, - download_publisher: PublisherPtr, + event_publisher: PublisherPtr, } impl HandlerP2p for Fud { @@ -226,19 +231,95 @@ impl Fud { /// Add ourselves to `seeders_router` for the files we already have. /// Skipped if we have no external address. async fn init(&self) -> Result<()> { + info!(target: "fud::init()", "Finding resources..."); + let hashes = self.geode.list_files().await?; + let mut resources_write = self.resources.write().await; + for hash in hashes { + resources_write.insert( + hash, + Resource { + hash, + status: ResourceStatus::Incomplete, + chunks_total: 0, + chunks_downloaded: 0, + }, + ); + } + drop(resources_write); + + info!(target: "fud::init()", "Verifying resources..."); + let resources = self.get_seeding_resources().await?; + if self.dht().node.clone().addresses.is_empty() { return Ok(()); } - let self_router_items: Vec = vec![self.dht().node.clone().into()]; - let hashes = self.geode.list_files().await?; - for hash in hashes { - self.add_to_router(self.seeders_router.clone(), &hash, self_router_items.clone()).await; + info!(target: "fud::init()", "Start seeding..."); + let self_router_items: Vec = vec![self.dht().node.clone().into()]; + + for resource in resources { + self.add_to_router( + self.seeders_router.clone(), + &resource.hash, + self_router_items.clone(), + ) + .await; } Ok(()) } + /// Verify if resources are complete and uncorrupted. + /// If a resource is incomplete or corrupted, its status is changed to Incomplete. + /// If a resource is complete, its status is changed to Seeding. + /// Returns the list of verified and uncorrupted/complete seeding resources. + async fn get_seeding_resources(&self) -> Result> { + let mut resources_write = self.resources.write().await; + + let update_resource = + async |resource: &mut Resource, + status: ResourceStatus, + chunked_file: Option<&ChunkedFile>| { + resource.status = status; + resource.chunks_total = match chunked_file { + Some(chunked_file) => chunked_file.len() as u64, + None => 0, + }; + resource.chunks_downloaded = match chunked_file { + Some(chunked_file) => chunked_file.local_chunks() as u64, + None => 0, + }; + }; + + let mut seeding_resources: Vec = vec![]; + for (_, mut resource) in resources_write.iter_mut() { + match resource.status { + ResourceStatus::Seeding => {} + ResourceStatus::Incomplete => {} + _ => continue, + }; + + // Make sure the resource is not corrupted or incomplete + let chunked_file = match self.geode.get(&resource.hash).await { + Ok(v) => v, + Err(_) => { + update_resource(&mut resource, ResourceStatus::Incomplete, None).await; + continue; + } + }; + if !chunked_file.is_complete() { + update_resource(&mut resource, ResourceStatus::Incomplete, Some(&chunked_file)) + .await; + continue; + } + + update_resource(&mut resource, ResourceStatus::Seeding, Some(&chunked_file)).await; + seeding_resources.push(resource.clone()); + } + + Ok(seeding_resources) + } + /// Query nodes close to `key` to find the seeders async fn fetch_seeders(&self, key: &blake3::Hash) -> HashSet { let closest_nodes = self.lookup_nodes(key).await; // Find the `k` closest nodes @@ -298,7 +379,7 @@ impl Fud { file_hash: &blake3::Hash, chunk_hashes: &HashSet, seeders: &HashSet, - ) { + ) -> Result<()> { let mut remaining_chunks = chunk_hashes.clone(); let mut shuffled_seeders = { let mut vec: Vec<_> = seeders.iter().cloned().collect(); @@ -368,11 +449,24 @@ impl Fud { continue; // Skip to next chunk, will retry this chunk later } + // Upade resource `chunks_downloaded` + let mut resources_write = self.resources.write().await; + let resource = match resources_write.get_mut(file_hash) { + Some(resource) => { + resource.status = ResourceStatus::Downloading; + resource.chunks_downloaded += 1; + resource.clone() + } + None => return Ok(()) // Resource was removed, abort + }; + drop(resources_write); + info!(target: "fud::fetch_chunks()", "Received chunk {} from seeder {}", hash_to_string(&chunk_hash), hash_to_string(&seeder.node.id)); - self.download_publisher + self.event_publisher .notify(FudEvent::ChunkDownloadCompleted(ChunkDownloadCompleted { file_hash: *file_hash, chunk_hash, + resource, })) .await; remaining_chunks.remove(&chunk_hash); @@ -390,7 +484,7 @@ impl Fud { break; // Switch to another seeder } info!(target: "fud::fetch_chunks()", "Received NOTFOUND {} from seeder {}", hash_to_string(&chunk_hash), hash_to_string(&seeder.node.id)); - self.download_publisher + self.event_publisher .notify(FudEvent::ChunkNotFound(ChunkNotFound { file_hash: *file_hash, chunk_hash, @@ -409,6 +503,8 @@ impl Fud { break; } } + + Ok(()) } /// Fetch a single file metadata from the network. @@ -633,9 +729,8 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> { Ok(mut file) => { let mut buffer = Vec::new(); file.read_to_end(&mut buffer).await?; - let buf: [u8; 44] = buffer.try_into().expect("Node ID must have 44 characters"); let mut out_buf = [0u8; 32]; - bs58::decode(buf).onto(&mut out_buf)?; + bs58::decode(buffer).onto(&mut out_buf)?; let node_id = blake3::Hash::from_bytes(out_buf); Ok(node_id) } @@ -656,7 +751,7 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> { info!(target: "fud", "Your node ID: {}", hash_to_string(&node_id_)); // Daemon instantiation - let download_sub = JsonSubscriber::new("get"); + let event_sub = JsonSubscriber::new("event"); let (get_tx, get_rx) = smol::channel::unbounded(); let (file_fetch_tx, file_fetch_rx) = smol::channel::unbounded(); let (file_fetch_end_tx, file_fetch_end_rx) = smol::channel::unbounded(); @@ -668,6 +763,7 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> { geode, downloads_path, dht: dht.clone(), + resources: Arc::new(RwLock::new(HashMap::new())), get_tx, get_rx, file_fetch_tx, @@ -676,22 +772,22 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> { file_fetch_end_rx, rpc_connections: Mutex::new(HashSet::new()), dnet_sub, - download_sub: download_sub.clone(), - download_publisher: Publisher::new(), + event_sub: event_sub.clone(), + event_publisher: Publisher::new(), }); fud.init().await?; - info!("Starting download subs task"); - let download_sub_ = download_sub.clone(); + info!(target: "fud", "Starting download subs task"); + let event_sub_ = event_sub.clone(); let fud_ = fud.clone(); - let download_task = StoppableTask::new(); - download_task.clone().start( + let event_task = StoppableTask::new(); + event_task.clone().start( async move { - let download_sub = fud_.download_publisher.clone().subscribe().await; + let event_sub = fud_.event_publisher.clone().subscribe().await; loop { - let event = download_sub.receive().await; - debug!("Got download event: {:?}", event); - download_sub_.notify(event.into()).await; + let event = event_sub.receive().await; + debug!(target: "fud", "Got event: {:?}", event); + event_sub_.notify(event.into()).await; } }, |res| async { @@ -786,19 +882,6 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> { Error::DetachedTaskStopped, ex.clone(), ); - let prune_task = StoppableTask::new(); - let fud_ = fud.clone(); - prune_task.clone().start( - async move { tasks::prune_seeders_task(fud_.clone()).await }, - |res| async { - match res { - Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ } - Err(e) => error!(target: "fud", "Failed starting prune seeders task: {}", e), - } - }, - Error::DetachedTaskStopped, - ex.clone(), - ); let announce_task = StoppableTask::new(); let fud_ = fud.clone(); announce_task.clone().start( @@ -816,7 +899,7 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> { // Signal handling for graceful termination. let (signals_handler, signals_task) = SignalHandler::new(ex)?; signals_handler.wait_termination(signals_task).await?; - info!("Caught termination signal, cleaning up and exiting..."); + info!(target: "fud", "Caught termination signal, cleaning up and exiting..."); info!(target: "fud", "Stopping fetch file task..."); file_task.stop().await; @@ -833,7 +916,6 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> { info!(target: "fud", "Stopping DHT tasks"); dht_channel_task.stop().await; dht_disconnect_task.stop().await; - prune_task.stop().await; announce_task.stop().await; info!("Bye!"); diff --git a/bin/fud/fud/src/resource.rs b/bin/fud/fud/src/resource.rs new file mode 100644 index 000000000..cc7b091fa --- /dev/null +++ b/bin/fud/fud/src/resource.rs @@ -0,0 +1,58 @@ +/* This file is part of DarkFi (https://dark.fi) + * + * Copyright (C) 2020-2025 Dyne.org foundation + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +use darkfi::{geode::hash_to_string, rpc::util::json_map}; +use tinyjson::JsonValue; + +#[derive(Clone, Debug)] +pub enum ResourceStatus { + Downloading, + Seeding, + Discovering, + Incomplete, +} + +#[derive(Clone, Debug)] +pub struct Resource { + pub hash: blake3::Hash, + pub status: ResourceStatus, + pub chunks_total: u64, + pub chunks_downloaded: u64, +} + +impl From for JsonValue { + fn from(rs: Resource) -> JsonValue { + json_map([ + ("hash", JsonValue::String(hash_to_string(&rs.hash))), + ( + "status", + JsonValue::String( + match rs.status { + ResourceStatus::Downloading => "downloading", + ResourceStatus::Seeding => "seeding", + ResourceStatus::Discovering => "discovering", + ResourceStatus::Incomplete => "incomplete", + } + .to_string(), + ), + ), + ("chunks_total", JsonValue::Number(rs.chunks_total as f64)), + ("chunks_downloaded", JsonValue::Number(rs.chunks_downloaded as f64)), + ]) + } +} diff --git a/bin/fud/fud/src/rpc.rs b/bin/fud/fud/src/rpc.rs index c42eabdf6..349f7ce28 100644 --- a/bin/fud/fud/src/rpc.rs +++ b/bin/fud/fud/src/rpc.rs @@ -21,7 +21,12 @@ use std::{ path::PathBuf, }; -use crate::{dht::DhtHandler, proto::FudAnnounce, Fud}; +use crate::{ + dht::DhtHandler, + proto::FudAnnounce, + resource::{Resource, ResourceStatus}, + Fud, +}; use async_trait::async_trait; use darkfi::{ geode::hash_to_string, @@ -33,7 +38,7 @@ use darkfi::{ }, system::StoppableTaskPtr, util::path::expand_path, - Error, + Error, Result, }; use log::{error, info}; use smol::{ @@ -51,12 +56,14 @@ impl RequestHandler<()> for Fud { "put" => self.put(req.id, req.params).await, "get" => self.get(req.id, req.params).await, "subscribe" => self.subscribe(req.id, req.params).await, + "remove" => self.remove_resource(req.id, req.params).await, + "list_resources" => self.list_resources(req.id, req.params).await, + "list_buckets" => self.list_buckets(req.id, req.params).await, + "list_seeders" => self.list_seeders(req.id, req.params).await, "dnet.switch" => self.dnet_switch(req.id, req.params).await, "dnet.subscribe_events" => self.dnet_subscribe_events(req.id, req.params).await, "p2p.get_info" => self.p2p_get_info(req.id, req.params).await, - "list_buckets" => self.list_buckets(req.id, req.params).await, - "list_seeders" => self.list_seeders(req.id, req.params).await, _ => JsonError::new(ErrorCode::MethodNotFound, None, req.id).into(), } } @@ -172,7 +179,7 @@ impl Fud { // --> {"jsonrpc": "2.0", "method": "get", "params": [], "id": 42} // <-- {"jsonrpc": "2.0", "method": "get", "params": `event`} async fn subscribe(&self, _id: u16, _params: JsonValue) -> JsonResult { - self.download_sub.clone().into() + self.event_sub.clone().into() } // RPCAPI: @@ -216,7 +223,27 @@ impl Fud { } // RPCAPI: - // Returns the current buckets + // Returns resources from the database. + // + // --> {"jsonrpc": "2.0", "method": "list_buckets", "params": [], "id": 1} + // <-- {"jsonrpc": "2.0", "result": [[["abcdef", ["tcp://127.0.0.1:13337"]]]], "id": 1} + pub async fn list_resources(&self, id: u16, params: JsonValue) -> JsonResult { + let params = params.get::>().unwrap(); + if !params.is_empty() { + return JsonError::new(ErrorCode::InvalidParams, None, id).into() + } + + let resources_read = self.resources.read().await; + let mut resources: Vec = vec![]; + for (_, resource) in resources_read.iter() { + resources.push(resource.clone().into()); + } + + JsonResponse::new(JsonValue::Array(resources), id).into() + } + + // RPCAPI: + // Returns the current buckets. // // --> {"jsonrpc": "2.0", "method": "list_buckets", "params": [], "id": 1} // <-- {"jsonrpc": "2.0", "result": [[["abcdef", ["tcp://127.0.0.1:13337"]]]], "id": 1} @@ -245,9 +272,9 @@ impl Fud { } // RPCAPI: - // Returns the content of the seeders router + // Returns the content of the seeders router. // - // --> {"jsonrpc": "2.0", "method": "list_routes", "params": [], "id": 1} + // --> {"jsonrpc": "2.0", "method": "list_seeders", "params": [], "id": 1} // <-- {"jsonrpc": "2.0", "result": {"seeders": {"abcdef": ["ghijkl"]}}, "id": 1} pub async fn list_seeders(&self, id: u16, params: JsonValue) -> JsonResult { let params = params.get::>().unwrap(); @@ -267,27 +294,62 @@ impl Fud { JsonResponse::new(JsonValue::Object(res), id).into() } + + // RPCAPI: + // Removes a resource. + // + // --> {"jsonrpc": "2.0", "method": "remove", "params": ["1211...abfd"], "id": 1} + // <-- {"jsonrpc": "2.0", "result": [], "id": 1} + pub async fn remove_resource(&self, id: u16, params: JsonValue) -> JsonResult { + let params = params.get::>().unwrap(); + if params.len() != 1 || !params[0].is_string() { + return JsonError::new(ErrorCode::InvalidParams, None, id).into() + } + let mut hash_buf = [0u8; 32]; + match bs58::decode(params[0].get::().unwrap().as_str()).onto(&mut hash_buf) { + Ok(_) => {} + Err(_) => return JsonError::new(ErrorCode::InvalidParams, None, id).into(), + } + + let hash = blake3::Hash::from_bytes(hash_buf); + let mut resources_write = self.resources.write().await; + resources_write.remove(&hash); + drop(resources_write); + + self.event_publisher + .notify(FudEvent::ResourceRemoved(ResourceRemoved { file_hash: hash })) + .await; + + JsonResponse::new(JsonValue::Array(vec![]), id).into() + } } #[derive(Clone, Debug)] pub struct DownloadStarted { pub file_hash: blake3::Hash, pub file_path: PathBuf, + pub resource: Resource, } #[derive(Clone, Debug)] pub struct ChunkDownloadCompleted { pub file_hash: blake3::Hash, pub chunk_hash: blake3::Hash, + pub resource: Resource, } #[derive(Clone, Debug)] pub struct FileDownloadCompleted { pub file_hash: blake3::Hash, - pub chunk_count: usize, + pub resource: Resource, } #[derive(Clone, Debug)] pub struct DownloadCompleted { pub file_hash: blake3::Hash, pub file_path: PathBuf, + pub resource: Resource, +} +#[derive(Clone, Debug)] +pub struct ResourceRemoved { + pub file_hash: blake3::Hash, } #[derive(Clone, Debug)] pub struct ChunkNotFound { @@ -301,6 +363,7 @@ pub struct FileNotFound { #[derive(Clone, Debug)] pub struct MissingChunks { pub file_hash: blake3::Hash, + pub resource: Resource, } #[derive(Clone, Debug)] pub struct DownloadError { @@ -314,6 +377,7 @@ pub enum FudEvent { ChunkDownloadCompleted(ChunkDownloadCompleted), FileDownloadCompleted(FileDownloadCompleted), DownloadCompleted(DownloadCompleted), + ResourceRemoved(ResourceRemoved), ChunkNotFound(ChunkNotFound), FileNotFound(FileNotFound), MissingChunks(MissingChunks), @@ -325,6 +389,7 @@ impl From for JsonValue { json_map([ ("file_hash", JsonValue::String(hash_to_string(&info.file_hash))), ("file_path", JsonValue::String(info.file_path.to_string_lossy().to_string())), + ("resource", info.resource.into()), ]) } } @@ -333,6 +398,7 @@ impl From for JsonValue { json_map([ ("file_hash", JsonValue::String(hash_to_string(&info.file_hash))), ("chunk_hash", JsonValue::String(hash_to_string(&info.chunk_hash))), + ("resource", info.resource.into()), ]) } } @@ -340,7 +406,7 @@ impl From for JsonValue { fn from(info: FileDownloadCompleted) -> JsonValue { json_map([ ("file_hash", JsonValue::String(hash_to_string(&info.file_hash))), - ("chunk_count", JsonValue::Number(info.chunk_count as f64)), + ("resource", info.resource.into()), ]) } } @@ -349,9 +415,15 @@ impl From for JsonValue { json_map([ ("file_hash", JsonValue::String(hash_to_string(&info.file_hash))), ("file_path", JsonValue::String(info.file_path.to_string_lossy().to_string())), + ("resource", info.resource.into()), ]) } } +impl From for JsonValue { + fn from(info: ResourceRemoved) -> JsonValue { + json_map([("file_hash", JsonValue::String(hash_to_string(&info.file_hash)))]) + } +} impl From for JsonValue { fn from(info: ChunkNotFound) -> JsonValue { json_map([ @@ -369,6 +441,7 @@ impl From for JsonValue { fn from(info: MissingChunks) -> JsonValue { json_map([ ("file_hash", JsonValue::String(hash_to_string(&info.file_hash))), + ("resource", info.resource.into()), ]) } } @@ -395,13 +468,18 @@ impl From for JsonValue { FudEvent::DownloadCompleted(info) => { json_map([("event", json_str("download_completed")), ("info", info.into())]) } + FudEvent::ResourceRemoved(info) => { + json_map([("event", json_str("resource_removed")), ("info", info.into())]) + } FudEvent::ChunkNotFound(info) => { json_map([("event", json_str("chunk_not_found")), ("info", info.into())]) } FudEvent::FileNotFound(info) => { json_map([("event", json_str("file_not_found")), ("info", info.into())]) } - FudEvent::MissingChunks(info) => json_map([("event", json_str("missing_chunks")), ("info", info.into())]), + FudEvent::MissingChunks(info) => { + json_map([("event", json_str("missing_chunks")), ("info", info.into())]) + } FudEvent::DownloadError(info) => { json_map([("event", json_str("download_error")), ("info", info.into())]) } @@ -411,13 +489,25 @@ impl From for JsonValue { impl Fud { /// Handle `get` RPC request - pub async fn handle_get(&self, file_hash: &blake3::Hash, file_path: &PathBuf) { + pub async fn handle_get(&self, file_hash: &blake3::Hash, file_path: &PathBuf) -> Result<()> { let self_node = self.dht().node.clone(); - self.download_publisher + // Add resource to `self.resources` + let resource = Resource { + hash: *file_hash, + status: ResourceStatus::Discovering, + chunks_total: 0, + chunks_downloaded: 0, + }; + let mut resources_write = self.resources.write().await; + resources_write.insert(*file_hash, resource.clone()); + drop(resources_write); + + self.event_publisher .notify(FudEvent::DownloadStarted(DownloadStarted { file_hash: *file_hash, file_path: file_path.clone(), + resource, })) .await; @@ -433,23 +523,42 @@ impl Fud { Ok(()) => self.geode.get(&i_file_hash).await.unwrap(), Err(Error::GeodeFileRouteNotFound) => { - self.download_publisher + self.event_publisher .notify(FudEvent::FileNotFound(FileNotFound { file_hash: *file_hash })) .await; - return; + return Err(Error::GeodeFileRouteNotFound); } - Err(e) => panic!("{}", e), + Err(e) => { + error!(target: "fud::handle_get()", "{}", e); + return Err(e); + } } } - Err(e) => panic!("{}", e), + Err(e) => { + error!(target: "fud::handle_get()", "{}", e); + return Err(e); + } }; - self.download_publisher + // Set resource status to `Downloading` + let mut resources_write = self.resources.write().await; + let resource = match resources_write.get_mut(file_hash) { + Some(resource) => { + resource.status = ResourceStatus::Downloading; + resource.chunks_downloaded = chunked_file.local_chunks() as u64; + resource.chunks_total = chunked_file.len() as u64; + resource.clone() + } + None => return Ok(()), // Resource was removed, abort + }; + drop(resources_write); + + self.event_publisher .notify(FudEvent::FileDownloadCompleted(FileDownloadCompleted { file_hash: *file_hash, - chunk_count: chunked_file.len(), + resource: resource.clone(), })) .await; @@ -460,21 +569,38 @@ impl Fud { return match self.geode.assemble_file(file_hash, &chunked_file, file_path).await { Ok(_) => { - self.download_publisher + // Set resource status to `Seeding` + let mut resources_write = self.resources.write().await; + let resource = match resources_write.get_mut(file_hash) { + Some(resource) => { + resource.status = ResourceStatus::Seeding; + resource.chunks_downloaded = chunked_file.len() as u64; + resource.clone() + } + None => return Ok(()), // Resource was removed, abort + }; + drop(resources_write); + + self.event_publisher .notify(FudEvent::DownloadCompleted(DownloadCompleted { file_hash: *file_hash, file_path: file_path.clone(), + resource, })) .await; + + Ok(()) } Err(e) => { error!(target: "fud::handle_get()", "{}", e); - self.download_publisher + self.event_publisher .notify(FudEvent::DownloadError(DownloadError { file_hash: *file_hash, error: e.to_string(), })) .await; + + Err(e) } }; } @@ -486,44 +612,69 @@ impl Fud { for (chunk, path) in chunked_file.iter() { if path.is_none() { missing_chunks.insert(*chunk); - } else { - self.download_publisher - .notify(FudEvent::ChunkDownloadCompleted(ChunkDownloadCompleted { - file_hash: *file_hash, - chunk_hash: *chunk, - })) - .await; } } // Fetch missing chunks from seeders - self.fetch_chunks(file_hash, &missing_chunks, &seeders).await; + self.fetch_chunks(file_hash, &missing_chunks, &seeders).await?; let chunked_file = match self.geode.get(file_hash).await { Ok(v) => v, - Err(e) => panic!("{}", e), + Err(e) => { + error!(target: "fud::handle_get()", "{}", e); + return Err(e); + } }; // We fetched all chunks, but the file is not complete // (some chunks were missing from all seeders) if !chunked_file.is_complete() { - self.download_publisher.notify(FudEvent::MissingChunks(MissingChunks { - file_hash: *file_hash, - })).await; + // Set resource status to `Incomplete` + let mut resources_write = self.resources.write().await; + let resource = match resources_write.get_mut(file_hash) { + Some(resource) => { + resource.status = ResourceStatus::Incomplete; + resource.clone() + } + None => return Ok(()), // Resource was removed, abort + }; + drop(resources_write); + + self.event_publisher + .notify(FudEvent::MissingChunks(MissingChunks { file_hash: *file_hash, resource })) + .await; + return Ok(()); } + let self_announce = + FudAnnounce { key: *file_hash, seeders: vec![self_node.clone().into()] }; + let _ = self.announce(file_hash, &self_announce, self.seeders_router.clone()).await; + + // Set resource status to `Seeding` + let mut resources_write = self.resources.write().await; + let resource = match resources_write.get_mut(file_hash) { + Some(resource) => { + resource.status = ResourceStatus::Seeding; + resource.chunks_downloaded = chunked_file.len() as u64; + resource.clone() + } + None => return Ok(()), // Resource was removed, abort + }; + drop(resources_write); + match self.geode.assemble_file(file_hash, &chunked_file, file_path).await { Ok(_) => { - self.download_publisher + self.event_publisher .notify(FudEvent::DownloadCompleted(DownloadCompleted { file_hash: *file_hash, file_path: file_path.clone(), + resource, })) .await; } Err(e) => { error!(target: "fud::handle_get()", "{}", e); - self.download_publisher + self.event_publisher .notify(FudEvent::DownloadError(DownloadError { file_hash: *file_hash, error: e.to_string(), @@ -531,5 +682,7 @@ impl Fud { .await; } }; + + Ok(()) } } diff --git a/bin/fud/fud/src/tasks.rs b/bin/fud/fud/src/tasks.rs index 96923cb25..5b528100d 100644 --- a/bin/fud/fud/src/tasks.rs +++ b/bin/fud/fud/src/tasks.rs @@ -94,38 +94,37 @@ pub async fn fetch_file_task(fud: Arc) -> Result<()> { } } -/// Background task that removes seeders that did not announce a file/chunk -/// for more than an hour. -pub async fn prune_seeders_task(fud: Arc) -> Result<()> { - sleep(120).await; - - loop { - sleep(3600).await; // TODO: Make a setting - - info!(target: "fud::prune_seeders_task()", "Pruning seeders..."); - fud.dht().prune_router(fud.seeders_router.clone(), 3600).await; - } -} - /// Background task that announces our files and chunks once every hour. +/// Also removes seeders that did not announce for too long. pub async fn announce_seed_task(fud: Arc) -> Result<()> { + let interval = 3600; // TODO: Make a setting + loop { - sleep(3600).await; // TODO: Make a setting + sleep(interval).await; let seeders = vec![fud.dht().node.clone().into()]; - info!(target: "fud::announce_task()", "Announcing files..."); - let file_hashes = fud.geode.list_files().await; - if let Ok(files) = file_hashes { - for file in files { - let _ = fud - .announce( - &file, - &FudAnnounce { key: file, seeders: seeders.clone() }, - fud.seeders_router.clone(), - ) - .await; + info!(target: "fud::announce_task()", "Verifying seeds..."); + let seeding_resources = match fud.get_seeding_resources().await { + Ok(resources) => resources, + Err(e) => { + error!(target: "fud::announce_task()", "Error while verifying seeding resources: {}", e); + continue; } + }; + + info!(target: "fud::announce_task()", "Announcing files..."); + for resource in seeding_resources { + let _ = fud + .announce( + &resource.hash, + &FudAnnounce { key: resource.hash, seeders: seeders.clone() }, + fud.seeders_router.clone(), + ) + .await; } + + info!(target: "fud::announce_task()", "Pruning seeders..."); + fud.dht().prune_router(fud.seeders_router.clone(), interval.try_into().unwrap()).await; } } diff --git a/src/geode/mod.rs b/src/geode/mod.rs index c66bebb41..160f176ec 100644 --- a/src/geode/mod.rs +++ b/src/geode/mod.rs @@ -117,6 +117,11 @@ impl ChunkedFile { pub fn is_empty(&self) -> bool { self.0.is_empty() } + + /// Return the number of chunks available locally. + pub fn local_chunks(&self) -> usize { + self.0.iter().filter(|(_, p)| p.is_some()).count() + } } /// Chunk-based file storage interface. @@ -449,6 +454,11 @@ impl Geode { } }; + // Make sure the chunk hashes match with the file hash + if !self.verify_file(file_hash, &chunk_hashes) { + return Err(Error::GeodeNeedsGc); + } + let mut chunked_file = ChunkedFile::new(&chunk_hashes); // Iterate over chunks and find which chunks we have available locally.