fud: add resource.rs, add fu watch, add fu rm, merge announcing task and pruning task

This commit is contained in:
darkfi
2025-04-05 14:29:01 +02:00
committed by epiphany1
parent 011b80e071
commit ff112dd6c2
8 changed files with 654 additions and 115 deletions

1
Cargo.lock generated
View File

@@ -3385,6 +3385,7 @@ dependencies = [
"log",
"simplelog",
"smol",
"termcolor",
"url",
]

View File

@@ -19,6 +19,7 @@ clap = {version = "4.4.11", features = ["derive"]}
log = "0.4.26"
simplelog = "0.12.2"
url = "2.5.4"
termcolor = "1.4.1"
[lints]
workspace = true

View File

@@ -19,11 +19,13 @@
use clap::{Parser, Subcommand};
use log::error;
use simplelog::{ColorChoice, TermLogger, TerminalMode};
use smol::lock::RwLock;
use std::{
collections::HashMap,
io::{stdout, Write},
sync::Arc,
};
use termcolor::{Color, ColorSpec, StandardStream, WriteColor};
use url::Url;
use darkfi::{
@@ -70,6 +72,15 @@ enum Subcmd {
file: String,
},
/// Watch
Watch {},
/// Remove a resource from fud
Rm {
/// Resource hash
hash: String,
},
/// Get the current node buckets
ListBuckets {},
@@ -152,22 +163,37 @@ impl Fu {
}
match params.get("event").unwrap().get::<String>().unwrap().as_str() {
"file_download_completed" => {
chunks_total =
*info.get("chunk_count").unwrap().get::<f64>().unwrap() as usize;
print_progress_bar(chunks_downloaded, chunks_total);
}
"chunk_download_completed" => {
chunks_downloaded += 1;
print_progress_bar(chunks_downloaded, chunks_total);
}
"download_completed" => {
let info = params
.get("info")
let resource = info
.get("resource")
.unwrap()
.get::<HashMap<String, JsonValue>>()
.unwrap();
chunks_total =
*resource.get("chunks_total").unwrap().get::<f64>().unwrap()
as usize;
print_progress_bar(chunks_downloaded, chunks_total);
}
"chunk_download_completed" => {
let resource = info
.get("resource")
.unwrap()
.get::<HashMap<String, JsonValue>>()
.unwrap();
chunks_downloaded =
*resource.get("chunks_downloaded").unwrap().get::<f64>().unwrap()
as usize;
print_progress_bar(chunks_downloaded, chunks_total);
}
"download_completed" => {
let file_path = info.get("file_path").unwrap().get::<String>().unwrap();
chunks_downloaded = chunks_total;
let resource = info
.get("resource")
.unwrap()
.get::<HashMap<String, JsonValue>>()
.unwrap();
chunks_downloaded =
*resource.get("chunks_downloaded").unwrap().get::<f64>().unwrap()
as usize;
print_progress_bar(chunks_downloaded, chunks_total);
println!("\nDownload completed:\n{}", file_path);
return Ok(());
@@ -186,11 +212,6 @@ impl Fu {
}
"download_error" => {
// An error that caused the download to be unsuccessful
let info = params
.get("info")
.unwrap()
.get::<HashMap<String, JsonValue>>()
.unwrap();
println!();
return Err(Error::Custom(
info.get("error").unwrap().get::<String>().unwrap().to_string(),
@@ -278,6 +299,218 @@ impl Fu {
Ok(())
}
async fn watch(&self, ex: ExecutorPtr) -> Result<()> {
let req = JsonRequest::new("list_resources", JsonValue::Array(vec![]));
let rep = self.rpc_client.request(req).await?;
let resources_json: Vec<JsonValue> = rep.clone().try_into().unwrap();
let resources: Arc<RwLock<Vec<HashMap<String, JsonValue>>>> = Arc::new(RwLock::new(vec![]));
let publisher = Publisher::new();
let subscription = Arc::new(publisher.clone().subscribe().await);
let subscriber_task = StoppableTask::new();
let publisher_ = publisher.clone();
let rpc_client_ = self.rpc_client.clone();
subscriber_task.clone().start(
async move {
let req = JsonRequest::new("subscribe", JsonValue::Array(vec![]));
rpc_client_.subscribe(req, publisher).await
},
move |res| async move {
match res {
Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ }
Err(e) => {
error!("{}", e);
publisher_
.notify(JsonResult::Error(JsonError::new(
ErrorCode::InternalError,
None,
0,
)))
.await;
}
}
},
Error::DetachedTaskStopped,
ex,
);
let mut tstdout = StandardStream::stdout(ColorChoice::Auto);
let mut update_resource = async |resource: &HashMap<String, JsonValue>| {
let hash = resource.get("hash").unwrap().get::<String>().unwrap();
let mut resources_write = resources.write().await;
let i = match resources_write
.iter()
.position(|r| r.get("hash").unwrap().get::<String>().unwrap() == hash)
{
Some(i) => {
resources_write.remove(i);
resources_write.insert(i, resource.clone());
i
}
None => {
resources_write.push(resource.clone());
resources_write.len() - 1
}
};
// Move the cursor to the i-th line and clear it
print!("\x1b[{};1H\x1B[2K", i + 2);
let hash = resource.get("hash").unwrap().get::<String>().unwrap();
print!("\r{:>44} ", hash,);
let status = resource.get("status").unwrap().get::<String>().unwrap();
tstdout
.set_color(
ColorSpec::new()
.set_fg(match status.as_str() {
"downloading" => Some(Color::Blue),
"seeding" => Some(Color::Green),
"discovering" => Some(Color::Magenta),
"incomplete" => Some(Color::Red),
_ => None,
})
.set_bold(true),
)
.unwrap();
print!("{:>11} ", status,);
tstdout.reset().unwrap();
let chunks_downloaded =
*resource.get("chunks_downloaded").unwrap().get::<f64>().unwrap() as usize;
let chunks_total =
*resource.get("chunks_total").unwrap().get::<f64>().unwrap() as usize;
match chunks_total {
0 => {
print!("{:>5.1} {:>9}", 0.0, format!("{}/?", chunks_downloaded));
}
_ => {
let percent = chunks_downloaded as f64 / chunks_total as f64 * 100.0;
print!(
"{:>5.1} {:>9}",
percent,
format!("{}/{}", chunks_downloaded, chunks_total)
);
}
};
println!();
// Move the cursor to end
print!("\x1b[{};1H", resources_write.len() + 2);
stdout().flush().unwrap();
};
let print_begin = async || {
// Clear
print!("\x1B[2J\x1B[1;1H");
// Print column headers
println!("\x1b[4m{:>44} {:>11} {:>5} {:>9}\x1b[0m", "Hash", "Status", "%", "Chunks");
};
print_begin().await;
if resources_json.is_empty() {
println!("No known resources");
} else {
for resource in resources_json.iter() {
let resource = resource.get::<HashMap<String, JsonValue>>().unwrap();
update_resource(resource).await;
}
}
loop {
match subscription.receive().await {
JsonResult::Notification(n) => {
let params = n.params.get::<HashMap<String, JsonValue>>().unwrap();
let info =
params.get("info").unwrap().get::<HashMap<String, JsonValue>>().unwrap();
match params.get("event").unwrap().get::<String>().unwrap().as_str() {
"download_started" => {
let resource = info
.get("resource")
.unwrap()
.get::<HashMap<String, JsonValue>>()
.unwrap();
update_resource(resource).await;
}
"file_download_completed" => {
let resource = info
.get("resource")
.unwrap()
.get::<HashMap<String, JsonValue>>()
.unwrap();
update_resource(resource).await;
}
"chunk_download_completed" => {
let resource = info
.get("resource")
.unwrap()
.get::<HashMap<String, JsonValue>>()
.unwrap();
update_resource(resource).await;
}
"download_completed" => {
let resource = info
.get("resource")
.unwrap()
.get::<HashMap<String, JsonValue>>()
.unwrap();
update_resource(resource).await;
}
"resource_removed" => {
{
let hash = info.get("file_hash").unwrap().get::<String>().unwrap();
let mut resources_write = resources.write().await;
let i = resources_write.iter().position(|r| {
r.get("hash").unwrap().get::<String>().unwrap() == hash
});
if let Some(i) = i {
resources_write.remove(i);
}
}
let r = resources.read().await.clone();
print_begin().await;
for resource in r.iter() {
update_resource(resource).await;
}
}
"missing_chunks" => {
let resource = info
.get("resource")
.unwrap()
.get::<HashMap<String, JsonValue>>()
.unwrap();
update_resource(resource).await;
}
"download_error" => {
// An error that caused the download to be unsuccessful
}
_ => {}
}
}
JsonResult::Error(e) => {
return Err(Error::UnexpectedJsonRpc(format!("Got error from JSON-RPC: {e:?}")))
}
x => {
return Err(Error::UnexpectedJsonRpc(format!(
"Got unexpected data from JSON-RPC: {x:?}"
)))
}
}
}
}
async fn remove(&self, hash: String) -> Result<()> {
let req = JsonRequest::new("remove", JsonValue::Array(vec![JsonValue::String(hash)]));
self.rpc_client.request(req).await?;
Ok(())
}
}
fn main() -> Result<()> {
@@ -296,6 +529,8 @@ fn main() -> Result<()> {
match args.command {
Subcmd::Get { file, name } => fu.get(file, name, ex.clone()).await,
Subcmd::Put { file } => fu.put(file).await,
Subcmd::Watch {} => fu.watch(ex.clone()).await,
Subcmd::Rm { hash } => fu.remove(hash).await,
Subcmd::ListBuckets {} => fu.list_buckets().await,
Subcmd::ListSeeders {} => fu.list_seeders().await,
}?;

View File

@@ -24,6 +24,7 @@ use std::{
};
use num_bigint::BigUint;
use resource::{Resource, ResourceStatus};
use rpc::{ChunkDownloadCompleted, ChunkNotFound};
use tasks::FetchReply;
@@ -45,7 +46,7 @@ use structopt_toml::{structopt::StructOpt, StructOptToml};
use darkfi::{
async_daemonize, cli_desc,
geode::{hash_to_string, Geode},
geode::{hash_to_string, ChunkedFile, Geode},
net::{session::SESSION_DEFAULT, settings::SettingsOpt, ChannelPtr, P2p, P2pPtr},
rpc::{
jsonrpc::JsonSubscriber,
@@ -67,6 +68,7 @@ use proto::{
};
mod dht;
mod resource;
mod rpc;
mod tasks;
@@ -124,6 +126,9 @@ pub struct Fud {
/// The DHT instance
dht: Arc<Dht>,
/// Resources (current status of all downloads/seeds)
resources: Arc<RwLock<HashMap<blake3::Hash, Resource>>>,
get_tx: channel::Sender<(u16, blake3::Hash, PathBuf, Result<()>)>,
get_rx: channel::Receiver<(u16, blake3::Hash, PathBuf, Result<()>)>,
file_fetch_tx: channel::Sender<(blake3::Hash, Result<()>)>,
@@ -137,9 +142,9 @@ pub struct Fud {
dnet_sub: JsonSubscriber,
/// Download JSON-RPC subscriber
download_sub: JsonSubscriber,
event_sub: JsonSubscriber,
download_publisher: PublisherPtr<FudEvent>,
event_publisher: PublisherPtr<FudEvent>,
}
impl HandlerP2p for Fud {
@@ -226,19 +231,95 @@ impl Fud {
/// Add ourselves to `seeders_router` for the files we already have.
/// Skipped if we have no external address.
async fn init(&self) -> Result<()> {
info!(target: "fud::init()", "Finding resources...");
let hashes = self.geode.list_files().await?;
let mut resources_write = self.resources.write().await;
for hash in hashes {
resources_write.insert(
hash,
Resource {
hash,
status: ResourceStatus::Incomplete,
chunks_total: 0,
chunks_downloaded: 0,
},
);
}
drop(resources_write);
info!(target: "fud::init()", "Verifying resources...");
let resources = self.get_seeding_resources().await?;
if self.dht().node.clone().addresses.is_empty() {
return Ok(());
}
let self_router_items: Vec<DhtRouterItem> = vec![self.dht().node.clone().into()];
let hashes = self.geode.list_files().await?;
for hash in hashes {
self.add_to_router(self.seeders_router.clone(), &hash, self_router_items.clone()).await;
info!(target: "fud::init()", "Start seeding...");
let self_router_items: Vec<DhtRouterItem> = vec![self.dht().node.clone().into()];
for resource in resources {
self.add_to_router(
self.seeders_router.clone(),
&resource.hash,
self_router_items.clone(),
)
.await;
}
Ok(())
}
/// Verify if resources are complete and uncorrupted.
/// If a resource is incomplete or corrupted, its status is changed to Incomplete.
/// If a resource is complete, its status is changed to Seeding.
/// Returns the list of verified and uncorrupted/complete seeding resources.
async fn get_seeding_resources(&self) -> Result<Vec<Resource>> {
let mut resources_write = self.resources.write().await;
let update_resource =
async |resource: &mut Resource,
status: ResourceStatus,
chunked_file: Option<&ChunkedFile>| {
resource.status = status;
resource.chunks_total = match chunked_file {
Some(chunked_file) => chunked_file.len() as u64,
None => 0,
};
resource.chunks_downloaded = match chunked_file {
Some(chunked_file) => chunked_file.local_chunks() as u64,
None => 0,
};
};
let mut seeding_resources: Vec<Resource> = vec![];
for (_, mut resource) in resources_write.iter_mut() {
match resource.status {
ResourceStatus::Seeding => {}
ResourceStatus::Incomplete => {}
_ => continue,
};
// Make sure the resource is not corrupted or incomplete
let chunked_file = match self.geode.get(&resource.hash).await {
Ok(v) => v,
Err(_) => {
update_resource(&mut resource, ResourceStatus::Incomplete, None).await;
continue;
}
};
if !chunked_file.is_complete() {
update_resource(&mut resource, ResourceStatus::Incomplete, Some(&chunked_file))
.await;
continue;
}
update_resource(&mut resource, ResourceStatus::Seeding, Some(&chunked_file)).await;
seeding_resources.push(resource.clone());
}
Ok(seeding_resources)
}
/// Query nodes close to `key` to find the seeders
async fn fetch_seeders(&self, key: &blake3::Hash) -> HashSet<DhtRouterItem> {
let closest_nodes = self.lookup_nodes(key).await; // Find the `k` closest nodes
@@ -298,7 +379,7 @@ impl Fud {
file_hash: &blake3::Hash,
chunk_hashes: &HashSet<blake3::Hash>,
seeders: &HashSet<DhtRouterItem>,
) {
) -> Result<()> {
let mut remaining_chunks = chunk_hashes.clone();
let mut shuffled_seeders = {
let mut vec: Vec<_> = seeders.iter().cloned().collect();
@@ -368,11 +449,24 @@ impl Fud {
continue; // Skip to next chunk, will retry this chunk later
}
// Upade resource `chunks_downloaded`
let mut resources_write = self.resources.write().await;
let resource = match resources_write.get_mut(file_hash) {
Some(resource) => {
resource.status = ResourceStatus::Downloading;
resource.chunks_downloaded += 1;
resource.clone()
}
None => return Ok(()) // Resource was removed, abort
};
drop(resources_write);
info!(target: "fud::fetch_chunks()", "Received chunk {} from seeder {}", hash_to_string(&chunk_hash), hash_to_string(&seeder.node.id));
self.download_publisher
self.event_publisher
.notify(FudEvent::ChunkDownloadCompleted(ChunkDownloadCompleted {
file_hash: *file_hash,
chunk_hash,
resource,
}))
.await;
remaining_chunks.remove(&chunk_hash);
@@ -390,7 +484,7 @@ impl Fud {
break; // Switch to another seeder
}
info!(target: "fud::fetch_chunks()", "Received NOTFOUND {} from seeder {}", hash_to_string(&chunk_hash), hash_to_string(&seeder.node.id));
self.download_publisher
self.event_publisher
.notify(FudEvent::ChunkNotFound(ChunkNotFound {
file_hash: *file_hash,
chunk_hash,
@@ -409,6 +503,8 @@ impl Fud {
break;
}
}
Ok(())
}
/// Fetch a single file metadata from the network.
@@ -633,9 +729,8 @@ async fn realmain(args: Args, ex: Arc<Executor<'static>>) -> Result<()> {
Ok(mut file) => {
let mut buffer = Vec::new();
file.read_to_end(&mut buffer).await?;
let buf: [u8; 44] = buffer.try_into().expect("Node ID must have 44 characters");
let mut out_buf = [0u8; 32];
bs58::decode(buf).onto(&mut out_buf)?;
bs58::decode(buffer).onto(&mut out_buf)?;
let node_id = blake3::Hash::from_bytes(out_buf);
Ok(node_id)
}
@@ -656,7 +751,7 @@ async fn realmain(args: Args, ex: Arc<Executor<'static>>) -> Result<()> {
info!(target: "fud", "Your node ID: {}", hash_to_string(&node_id_));
// Daemon instantiation
let download_sub = JsonSubscriber::new("get");
let event_sub = JsonSubscriber::new("event");
let (get_tx, get_rx) = smol::channel::unbounded();
let (file_fetch_tx, file_fetch_rx) = smol::channel::unbounded();
let (file_fetch_end_tx, file_fetch_end_rx) = smol::channel::unbounded();
@@ -668,6 +763,7 @@ async fn realmain(args: Args, ex: Arc<Executor<'static>>) -> Result<()> {
geode,
downloads_path,
dht: dht.clone(),
resources: Arc::new(RwLock::new(HashMap::new())),
get_tx,
get_rx,
file_fetch_tx,
@@ -676,22 +772,22 @@ async fn realmain(args: Args, ex: Arc<Executor<'static>>) -> Result<()> {
file_fetch_end_rx,
rpc_connections: Mutex::new(HashSet::new()),
dnet_sub,
download_sub: download_sub.clone(),
download_publisher: Publisher::new(),
event_sub: event_sub.clone(),
event_publisher: Publisher::new(),
});
fud.init().await?;
info!("Starting download subs task");
let download_sub_ = download_sub.clone();
info!(target: "fud", "Starting download subs task");
let event_sub_ = event_sub.clone();
let fud_ = fud.clone();
let download_task = StoppableTask::new();
download_task.clone().start(
let event_task = StoppableTask::new();
event_task.clone().start(
async move {
let download_sub = fud_.download_publisher.clone().subscribe().await;
let event_sub = fud_.event_publisher.clone().subscribe().await;
loop {
let event = download_sub.receive().await;
debug!("Got download event: {:?}", event);
download_sub_.notify(event.into()).await;
let event = event_sub.receive().await;
debug!(target: "fud", "Got event: {:?}", event);
event_sub_.notify(event.into()).await;
}
},
|res| async {
@@ -786,19 +882,6 @@ async fn realmain(args: Args, ex: Arc<Executor<'static>>) -> Result<()> {
Error::DetachedTaskStopped,
ex.clone(),
);
let prune_task = StoppableTask::new();
let fud_ = fud.clone();
prune_task.clone().start(
async move { tasks::prune_seeders_task(fud_.clone()).await },
|res| async {
match res {
Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ }
Err(e) => error!(target: "fud", "Failed starting prune seeders task: {}", e),
}
},
Error::DetachedTaskStopped,
ex.clone(),
);
let announce_task = StoppableTask::new();
let fud_ = fud.clone();
announce_task.clone().start(
@@ -816,7 +899,7 @@ async fn realmain(args: Args, ex: Arc<Executor<'static>>) -> Result<()> {
// Signal handling for graceful termination.
let (signals_handler, signals_task) = SignalHandler::new(ex)?;
signals_handler.wait_termination(signals_task).await?;
info!("Caught termination signal, cleaning up and exiting...");
info!(target: "fud", "Caught termination signal, cleaning up and exiting...");
info!(target: "fud", "Stopping fetch file task...");
file_task.stop().await;
@@ -833,7 +916,6 @@ async fn realmain(args: Args, ex: Arc<Executor<'static>>) -> Result<()> {
info!(target: "fud", "Stopping DHT tasks");
dht_channel_task.stop().await;
dht_disconnect_task.stop().await;
prune_task.stop().await;
announce_task.stop().await;
info!("Bye!");

View File

@@ -0,0 +1,58 @@
/* This file is part of DarkFi (https://dark.fi)
*
* Copyright (C) 2020-2025 Dyne.org foundation
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
use darkfi::{geode::hash_to_string, rpc::util::json_map};
use tinyjson::JsonValue;
#[derive(Clone, Debug)]
pub enum ResourceStatus {
Downloading,
Seeding,
Discovering,
Incomplete,
}
#[derive(Clone, Debug)]
pub struct Resource {
pub hash: blake3::Hash,
pub status: ResourceStatus,
pub chunks_total: u64,
pub chunks_downloaded: u64,
}
impl From<Resource> for JsonValue {
fn from(rs: Resource) -> JsonValue {
json_map([
("hash", JsonValue::String(hash_to_string(&rs.hash))),
(
"status",
JsonValue::String(
match rs.status {
ResourceStatus::Downloading => "downloading",
ResourceStatus::Seeding => "seeding",
ResourceStatus::Discovering => "discovering",
ResourceStatus::Incomplete => "incomplete",
}
.to_string(),
),
),
("chunks_total", JsonValue::Number(rs.chunks_total as f64)),
("chunks_downloaded", JsonValue::Number(rs.chunks_downloaded as f64)),
])
}
}

View File

@@ -21,7 +21,12 @@ use std::{
path::PathBuf,
};
use crate::{dht::DhtHandler, proto::FudAnnounce, Fud};
use crate::{
dht::DhtHandler,
proto::FudAnnounce,
resource::{Resource, ResourceStatus},
Fud,
};
use async_trait::async_trait;
use darkfi::{
geode::hash_to_string,
@@ -33,7 +38,7 @@ use darkfi::{
},
system::StoppableTaskPtr,
util::path::expand_path,
Error,
Error, Result,
};
use log::{error, info};
use smol::{
@@ -51,12 +56,14 @@ impl RequestHandler<()> for Fud {
"put" => self.put(req.id, req.params).await,
"get" => self.get(req.id, req.params).await,
"subscribe" => self.subscribe(req.id, req.params).await,
"remove" => self.remove_resource(req.id, req.params).await,
"list_resources" => self.list_resources(req.id, req.params).await,
"list_buckets" => self.list_buckets(req.id, req.params).await,
"list_seeders" => self.list_seeders(req.id, req.params).await,
"dnet.switch" => self.dnet_switch(req.id, req.params).await,
"dnet.subscribe_events" => self.dnet_subscribe_events(req.id, req.params).await,
"p2p.get_info" => self.p2p_get_info(req.id, req.params).await,
"list_buckets" => self.list_buckets(req.id, req.params).await,
"list_seeders" => self.list_seeders(req.id, req.params).await,
_ => JsonError::new(ErrorCode::MethodNotFound, None, req.id).into(),
}
}
@@ -172,7 +179,7 @@ impl Fud {
// --> {"jsonrpc": "2.0", "method": "get", "params": [], "id": 42}
// <-- {"jsonrpc": "2.0", "method": "get", "params": `event`}
async fn subscribe(&self, _id: u16, _params: JsonValue) -> JsonResult {
self.download_sub.clone().into()
self.event_sub.clone().into()
}
// RPCAPI:
@@ -216,7 +223,27 @@ impl Fud {
}
// RPCAPI:
// Returns the current buckets
// Returns resources from the database.
//
// --> {"jsonrpc": "2.0", "method": "list_buckets", "params": [], "id": 1}
// <-- {"jsonrpc": "2.0", "result": [[["abcdef", ["tcp://127.0.0.1:13337"]]]], "id": 1}
pub async fn list_resources(&self, id: u16, params: JsonValue) -> JsonResult {
let params = params.get::<Vec<JsonValue>>().unwrap();
if !params.is_empty() {
return JsonError::new(ErrorCode::InvalidParams, None, id).into()
}
let resources_read = self.resources.read().await;
let mut resources: Vec<JsonValue> = vec![];
for (_, resource) in resources_read.iter() {
resources.push(resource.clone().into());
}
JsonResponse::new(JsonValue::Array(resources), id).into()
}
// RPCAPI:
// Returns the current buckets.
//
// --> {"jsonrpc": "2.0", "method": "list_buckets", "params": [], "id": 1}
// <-- {"jsonrpc": "2.0", "result": [[["abcdef", ["tcp://127.0.0.1:13337"]]]], "id": 1}
@@ -245,9 +272,9 @@ impl Fud {
}
// RPCAPI:
// Returns the content of the seeders router
// Returns the content of the seeders router.
//
// --> {"jsonrpc": "2.0", "method": "list_routes", "params": [], "id": 1}
// --> {"jsonrpc": "2.0", "method": "list_seeders", "params": [], "id": 1}
// <-- {"jsonrpc": "2.0", "result": {"seeders": {"abcdef": ["ghijkl"]}}, "id": 1}
pub async fn list_seeders(&self, id: u16, params: JsonValue) -> JsonResult {
let params = params.get::<Vec<JsonValue>>().unwrap();
@@ -267,27 +294,62 @@ impl Fud {
JsonResponse::new(JsonValue::Object(res), id).into()
}
// RPCAPI:
// Removes a resource.
//
// --> {"jsonrpc": "2.0", "method": "remove", "params": ["1211...abfd"], "id": 1}
// <-- {"jsonrpc": "2.0", "result": [], "id": 1}
pub async fn remove_resource(&self, id: u16, params: JsonValue) -> JsonResult {
let params = params.get::<Vec<JsonValue>>().unwrap();
if params.len() != 1 || !params[0].is_string() {
return JsonError::new(ErrorCode::InvalidParams, None, id).into()
}
let mut hash_buf = [0u8; 32];
match bs58::decode(params[0].get::<String>().unwrap().as_str()).onto(&mut hash_buf) {
Ok(_) => {}
Err(_) => return JsonError::new(ErrorCode::InvalidParams, None, id).into(),
}
let hash = blake3::Hash::from_bytes(hash_buf);
let mut resources_write = self.resources.write().await;
resources_write.remove(&hash);
drop(resources_write);
self.event_publisher
.notify(FudEvent::ResourceRemoved(ResourceRemoved { file_hash: hash }))
.await;
JsonResponse::new(JsonValue::Array(vec![]), id).into()
}
}
#[derive(Clone, Debug)]
pub struct DownloadStarted {
pub file_hash: blake3::Hash,
pub file_path: PathBuf,
pub resource: Resource,
}
#[derive(Clone, Debug)]
pub struct ChunkDownloadCompleted {
pub file_hash: blake3::Hash,
pub chunk_hash: blake3::Hash,
pub resource: Resource,
}
#[derive(Clone, Debug)]
pub struct FileDownloadCompleted {
pub file_hash: blake3::Hash,
pub chunk_count: usize,
pub resource: Resource,
}
#[derive(Clone, Debug)]
pub struct DownloadCompleted {
pub file_hash: blake3::Hash,
pub file_path: PathBuf,
pub resource: Resource,
}
#[derive(Clone, Debug)]
pub struct ResourceRemoved {
pub file_hash: blake3::Hash,
}
#[derive(Clone, Debug)]
pub struct ChunkNotFound {
@@ -301,6 +363,7 @@ pub struct FileNotFound {
#[derive(Clone, Debug)]
pub struct MissingChunks {
pub file_hash: blake3::Hash,
pub resource: Resource,
}
#[derive(Clone, Debug)]
pub struct DownloadError {
@@ -314,6 +377,7 @@ pub enum FudEvent {
ChunkDownloadCompleted(ChunkDownloadCompleted),
FileDownloadCompleted(FileDownloadCompleted),
DownloadCompleted(DownloadCompleted),
ResourceRemoved(ResourceRemoved),
ChunkNotFound(ChunkNotFound),
FileNotFound(FileNotFound),
MissingChunks(MissingChunks),
@@ -325,6 +389,7 @@ impl From<DownloadStarted> for JsonValue {
json_map([
("file_hash", JsonValue::String(hash_to_string(&info.file_hash))),
("file_path", JsonValue::String(info.file_path.to_string_lossy().to_string())),
("resource", info.resource.into()),
])
}
}
@@ -333,6 +398,7 @@ impl From<ChunkDownloadCompleted> for JsonValue {
json_map([
("file_hash", JsonValue::String(hash_to_string(&info.file_hash))),
("chunk_hash", JsonValue::String(hash_to_string(&info.chunk_hash))),
("resource", info.resource.into()),
])
}
}
@@ -340,7 +406,7 @@ impl From<FileDownloadCompleted> for JsonValue {
fn from(info: FileDownloadCompleted) -> JsonValue {
json_map([
("file_hash", JsonValue::String(hash_to_string(&info.file_hash))),
("chunk_count", JsonValue::Number(info.chunk_count as f64)),
("resource", info.resource.into()),
])
}
}
@@ -349,9 +415,15 @@ impl From<DownloadCompleted> for JsonValue {
json_map([
("file_hash", JsonValue::String(hash_to_string(&info.file_hash))),
("file_path", JsonValue::String(info.file_path.to_string_lossy().to_string())),
("resource", info.resource.into()),
])
}
}
impl From<ResourceRemoved> for JsonValue {
fn from(info: ResourceRemoved) -> JsonValue {
json_map([("file_hash", JsonValue::String(hash_to_string(&info.file_hash)))])
}
}
impl From<ChunkNotFound> for JsonValue {
fn from(info: ChunkNotFound) -> JsonValue {
json_map([
@@ -369,6 +441,7 @@ impl From<MissingChunks> for JsonValue {
fn from(info: MissingChunks) -> JsonValue {
json_map([
("file_hash", JsonValue::String(hash_to_string(&info.file_hash))),
("resource", info.resource.into()),
])
}
}
@@ -395,13 +468,18 @@ impl From<FudEvent> for JsonValue {
FudEvent::DownloadCompleted(info) => {
json_map([("event", json_str("download_completed")), ("info", info.into())])
}
FudEvent::ResourceRemoved(info) => {
json_map([("event", json_str("resource_removed")), ("info", info.into())])
}
FudEvent::ChunkNotFound(info) => {
json_map([("event", json_str("chunk_not_found")), ("info", info.into())])
}
FudEvent::FileNotFound(info) => {
json_map([("event", json_str("file_not_found")), ("info", info.into())])
}
FudEvent::MissingChunks(info) => json_map([("event", json_str("missing_chunks")), ("info", info.into())]),
FudEvent::MissingChunks(info) => {
json_map([("event", json_str("missing_chunks")), ("info", info.into())])
}
FudEvent::DownloadError(info) => {
json_map([("event", json_str("download_error")), ("info", info.into())])
}
@@ -411,13 +489,25 @@ impl From<FudEvent> for JsonValue {
impl Fud {
/// Handle `get` RPC request
pub async fn handle_get(&self, file_hash: &blake3::Hash, file_path: &PathBuf) {
pub async fn handle_get(&self, file_hash: &blake3::Hash, file_path: &PathBuf) -> Result<()> {
let self_node = self.dht().node.clone();
self.download_publisher
// Add resource to `self.resources`
let resource = Resource {
hash: *file_hash,
status: ResourceStatus::Discovering,
chunks_total: 0,
chunks_downloaded: 0,
};
let mut resources_write = self.resources.write().await;
resources_write.insert(*file_hash, resource.clone());
drop(resources_write);
self.event_publisher
.notify(FudEvent::DownloadStarted(DownloadStarted {
file_hash: *file_hash,
file_path: file_path.clone(),
resource,
}))
.await;
@@ -433,23 +523,42 @@ impl Fud {
Ok(()) => self.geode.get(&i_file_hash).await.unwrap(),
Err(Error::GeodeFileRouteNotFound) => {
self.download_publisher
self.event_publisher
.notify(FudEvent::FileNotFound(FileNotFound { file_hash: *file_hash }))
.await;
return;
return Err(Error::GeodeFileRouteNotFound);
}
Err(e) => panic!("{}", e),
Err(e) => {
error!(target: "fud::handle_get()", "{}", e);
return Err(e);
}
}
}
Err(e) => panic!("{}", e),
Err(e) => {
error!(target: "fud::handle_get()", "{}", e);
return Err(e);
}
};
self.download_publisher
// Set resource status to `Downloading`
let mut resources_write = self.resources.write().await;
let resource = match resources_write.get_mut(file_hash) {
Some(resource) => {
resource.status = ResourceStatus::Downloading;
resource.chunks_downloaded = chunked_file.local_chunks() as u64;
resource.chunks_total = chunked_file.len() as u64;
resource.clone()
}
None => return Ok(()), // Resource was removed, abort
};
drop(resources_write);
self.event_publisher
.notify(FudEvent::FileDownloadCompleted(FileDownloadCompleted {
file_hash: *file_hash,
chunk_count: chunked_file.len(),
resource: resource.clone(),
}))
.await;
@@ -460,21 +569,38 @@ impl Fud {
return match self.geode.assemble_file(file_hash, &chunked_file, file_path).await {
Ok(_) => {
self.download_publisher
// Set resource status to `Seeding`
let mut resources_write = self.resources.write().await;
let resource = match resources_write.get_mut(file_hash) {
Some(resource) => {
resource.status = ResourceStatus::Seeding;
resource.chunks_downloaded = chunked_file.len() as u64;
resource.clone()
}
None => return Ok(()), // Resource was removed, abort
};
drop(resources_write);
self.event_publisher
.notify(FudEvent::DownloadCompleted(DownloadCompleted {
file_hash: *file_hash,
file_path: file_path.clone(),
resource,
}))
.await;
Ok(())
}
Err(e) => {
error!(target: "fud::handle_get()", "{}", e);
self.download_publisher
self.event_publisher
.notify(FudEvent::DownloadError(DownloadError {
file_hash: *file_hash,
error: e.to_string(),
}))
.await;
Err(e)
}
};
}
@@ -486,44 +612,69 @@ impl Fud {
for (chunk, path) in chunked_file.iter() {
if path.is_none() {
missing_chunks.insert(*chunk);
} else {
self.download_publisher
.notify(FudEvent::ChunkDownloadCompleted(ChunkDownloadCompleted {
file_hash: *file_hash,
chunk_hash: *chunk,
}))
.await;
}
}
// Fetch missing chunks from seeders
self.fetch_chunks(file_hash, &missing_chunks, &seeders).await;
self.fetch_chunks(file_hash, &missing_chunks, &seeders).await?;
let chunked_file = match self.geode.get(file_hash).await {
Ok(v) => v,
Err(e) => panic!("{}", e),
Err(e) => {
error!(target: "fud::handle_get()", "{}", e);
return Err(e);
}
};
// We fetched all chunks, but the file is not complete
// (some chunks were missing from all seeders)
if !chunked_file.is_complete() {
self.download_publisher.notify(FudEvent::MissingChunks(MissingChunks {
file_hash: *file_hash,
})).await;
// Set resource status to `Incomplete`
let mut resources_write = self.resources.write().await;
let resource = match resources_write.get_mut(file_hash) {
Some(resource) => {
resource.status = ResourceStatus::Incomplete;
resource.clone()
}
None => return Ok(()), // Resource was removed, abort
};
drop(resources_write);
self.event_publisher
.notify(FudEvent::MissingChunks(MissingChunks { file_hash: *file_hash, resource }))
.await;
return Ok(());
}
let self_announce =
FudAnnounce { key: *file_hash, seeders: vec![self_node.clone().into()] };
let _ = self.announce(file_hash, &self_announce, self.seeders_router.clone()).await;
// Set resource status to `Seeding`
let mut resources_write = self.resources.write().await;
let resource = match resources_write.get_mut(file_hash) {
Some(resource) => {
resource.status = ResourceStatus::Seeding;
resource.chunks_downloaded = chunked_file.len() as u64;
resource.clone()
}
None => return Ok(()), // Resource was removed, abort
};
drop(resources_write);
match self.geode.assemble_file(file_hash, &chunked_file, file_path).await {
Ok(_) => {
self.download_publisher
self.event_publisher
.notify(FudEvent::DownloadCompleted(DownloadCompleted {
file_hash: *file_hash,
file_path: file_path.clone(),
resource,
}))
.await;
}
Err(e) => {
error!(target: "fud::handle_get()", "{}", e);
self.download_publisher
self.event_publisher
.notify(FudEvent::DownloadError(DownloadError {
file_hash: *file_hash,
error: e.to_string(),
@@ -531,5 +682,7 @@ impl Fud {
.await;
}
};
Ok(())
}
}

View File

@@ -94,38 +94,37 @@ pub async fn fetch_file_task(fud: Arc<Fud>) -> Result<()> {
}
}
/// Background task that removes seeders that did not announce a file/chunk
/// for more than an hour.
pub async fn prune_seeders_task(fud: Arc<Fud>) -> Result<()> {
sleep(120).await;
loop {
sleep(3600).await; // TODO: Make a setting
info!(target: "fud::prune_seeders_task()", "Pruning seeders...");
fud.dht().prune_router(fud.seeders_router.clone(), 3600).await;
}
}
/// Background task that announces our files and chunks once every hour.
/// Also removes seeders that did not announce for too long.
pub async fn announce_seed_task(fud: Arc<Fud>) -> Result<()> {
let interval = 3600; // TODO: Make a setting
loop {
sleep(3600).await; // TODO: Make a setting
sleep(interval).await;
let seeders = vec![fud.dht().node.clone().into()];
info!(target: "fud::announce_task()", "Announcing files...");
let file_hashes = fud.geode.list_files().await;
if let Ok(files) = file_hashes {
for file in files {
let _ = fud
.announce(
&file,
&FudAnnounce { key: file, seeders: seeders.clone() },
fud.seeders_router.clone(),
)
.await;
info!(target: "fud::announce_task()", "Verifying seeds...");
let seeding_resources = match fud.get_seeding_resources().await {
Ok(resources) => resources,
Err(e) => {
error!(target: "fud::announce_task()", "Error while verifying seeding resources: {}", e);
continue;
}
};
info!(target: "fud::announce_task()", "Announcing files...");
for resource in seeding_resources {
let _ = fud
.announce(
&resource.hash,
&FudAnnounce { key: resource.hash, seeders: seeders.clone() },
fud.seeders_router.clone(),
)
.await;
}
info!(target: "fud::announce_task()", "Pruning seeders...");
fud.dht().prune_router(fud.seeders_router.clone(), interval.try_into().unwrap()).await;
}
}

View File

@@ -117,6 +117,11 @@ impl ChunkedFile {
pub fn is_empty(&self) -> bool {
self.0.is_empty()
}
/// Return the number of chunks available locally.
pub fn local_chunks(&self) -> usize {
self.0.iter().filter(|(_, p)| p.is_some()).count()
}
}
/// Chunk-based file storage interface.
@@ -449,6 +454,11 @@ impl Geode {
}
};
// Make sure the chunk hashes match with the file hash
if !self.verify_file(file_hash, &chunk_hashes) {
return Err(Error::GeodeNeedsGc);
}
let mut chunked_file = ChunkedFile::new(&chunk_hashes);
// Iterate over chunks and find which chunks we have available locally.