fu: handle downloaded bytes, download speed, eta

This commit is contained in:
epiphany
2025-07-30 15:54:35 +02:00
parent f2850953d9
commit c8ca15fc54
4 changed files with 280 additions and 106 deletions

1
Cargo.lock generated
View File

@@ -3048,6 +3048,7 @@ version = "0.5.0"
dependencies = [ dependencies = [
"clap 4.5.38", "clap 4.5.38",
"darkfi", "darkfi",
"fud",
"log", "log",
"simplelog", "simplelog",
"smol", "smol",

View File

@@ -10,6 +10,7 @@ repository = "https://codeberg.org/darkrenaissance/darkfi"
[dependencies] [dependencies]
darkfi = {path = "../../../", features = ["util", "rpc"]} darkfi = {path = "../../../", features = ["util", "rpc"]}
fud = {path = "../fud/"}
# Async # Async
smol = "2.0.2" smol = "2.0.2"

View File

@@ -40,8 +40,15 @@ use darkfi::{
Error, Result, Error, Result,
}; };
use fud::{
resource::{Resource, ResourceStatus},
util::hash_to_string,
};
mod util; mod util;
use crate::util::{status_to_colorspec, type_to_colorspec}; use crate::util::{
format_bytes, format_duration, format_progress_bytes, status_to_colorspec, type_to_colorspec,
};
#[derive(Parser)] #[derive(Parser)]
#[clap(name = "fu", about = cli_desc!(), version)] #[clap(name = "fu", about = cli_desc!(), version)]
@@ -111,15 +118,15 @@ struct Fu {
impl Fu { impl Fu {
async fn get( async fn get(
&self, &self,
file_hash: String, hash: String,
file_path: Option<String>, path: Option<String>,
files: Option<Vec<String>>, files: Option<Vec<String>>,
ex: ExecutorPtr, ex: ExecutorPtr,
) -> Result<()> { ) -> Result<()> {
let publisher = Publisher::new(); let publisher = Publisher::new();
let subscription = Arc::new(publisher.clone().subscribe().await); let subscription = Arc::new(publisher.clone().subscribe().await);
let subscriber_task = StoppableTask::new(); let subscriber_task = StoppableTask::new();
let file_hash_ = file_hash.clone(); let hash_ = hash.clone();
let publisher_ = publisher.clone(); let publisher_ = publisher.clone();
let rpc_client_ = self.rpc_client.clone(); let rpc_client_ = self.rpc_client.clone();
subscriber_task.clone().start( subscriber_task.clone().start(
@@ -150,36 +157,71 @@ impl Fu {
let mut started = false; let mut started = false;
let mut tstdout = StandardStream::stdout(ColorChoice::Auto); let mut tstdout = StandardStream::stdout(ColorChoice::Auto);
let mut print_progress_bar = |info: &HashMap<String, JsonValue>| { let mut print_progress = |info: &HashMap<String, JsonValue>| {
started = true; started = true;
let resource = let rs: Resource = info.get("resource").unwrap().clone().into();
info.get("resource").unwrap().get::<HashMap<String, JsonValue>>().unwrap();
let chunks_downloaded = print!("\x1B[2K\r"); // Clear current line
*resource.get("chunks_downloaded").unwrap().get::<f64>().unwrap() as usize;
let chunks_total = // Progress bar
*resource.get("chunks_target").unwrap().get::<f64>().unwrap() as usize; let percent = if rs.target_bytes_downloaded > rs.target_bytes_size {
let mut status = resource.get("status").unwrap().get::<String>().unwrap().clone(); 1.0
let percent = match chunks_total { } else if rs.target_bytes_size > 0 {
0 => 0f64, rs.target_bytes_downloaded as f64 / rs.target_bytes_size as f64
_ => chunks_downloaded as f64 / chunks_total as f64, } else {
0.0
}; };
let completed = (percent * progress_bar_width as f64) as usize; let completed = (percent * progress_bar_width as f64) as usize;
let remaining = progress_bar_width - completed; let remaining = match progress_bar_width > completed {
true => progress_bar_width - completed,
false => 0,
};
let bar = "=".repeat(completed) + &" ".repeat(remaining); let bar = "=".repeat(completed) + &" ".repeat(remaining);
print!( print!("[{bar}] {:.1}% | ", percent * 100.0);
"\x1B[2K\r[{bar}] {:.1}% | {chunks_downloaded}/{chunks_total} chunks | ",
percent * 100.0 // Downloaded / Total (in bytes)
); if rs.target_bytes_size > 0 {
if remaining == 0 { if rs.target_bytes_downloaded == rs.target_bytes_size {
status = "seeding".to_string(); print!("{} | ", format_bytes(rs.target_bytes_size));
} else {
print!(
"{} | ",
format_progress_bytes(rs.target_bytes_downloaded, rs.target_bytes_size)
);
}
} }
// Download speed (in bytes/sec)
if !rs.speeds.is_empty() && rs.target_chunks_downloaded < rs.target_chunks_count {
print!("{}/s | ", format_bytes(*rs.speeds.last().unwrap() as u64));
}
// Downloaded / Total (in chunks)
if rs.target_chunks_count > 0 {
let s = if rs.target_chunks_count > 1 { "s" } else { "" };
if rs.target_chunks_downloaded == rs.target_chunks_count {
print!("{} chunk{s} | ", rs.target_chunks_count);
} else {
print!(
"{}/{} chunk{s} | ",
rs.target_chunks_downloaded, rs.target_chunks_count
);
}
}
// ETA
if !rs.speeds.is_empty() && rs.target_chunks_downloaded < rs.target_chunks_count {
print!("ETA: {} | ", format_duration(rs.get_eta()));
}
// Status
let is_done = rs.target_chunks_downloaded == rs.target_chunks_count &&
rs.status.as_str() == "incomplete";
let status = if is_done { ResourceStatus::Seeding } else { rs.status };
tstdout.set_color(&status_to_colorspec(&status)).unwrap(); tstdout.set_color(&status_to_colorspec(&status)).unwrap();
print!( print!(
"{}", "{}",
match status.as_str() { if let ResourceStatus::Seeding = status { "done" } else { status.as_str() }
"seeding" => "done",
s => s,
}
); );
tstdout.reset().unwrap(); tstdout.reset().unwrap();
stdout().flush().unwrap(); stdout().flush().unwrap();
@@ -188,8 +230,8 @@ impl Fu {
let req = JsonRequest::new( let req = JsonRequest::new(
"get", "get",
JsonValue::Array(vec![ JsonValue::Array(vec![
JsonValue::String(file_hash_.clone()), JsonValue::String(hash_.clone()),
JsonValue::String(file_path.unwrap_or_default()), JsonValue::String(path.unwrap_or_default()),
match files { match files {
Some(files) => { Some(files) => {
JsonValue::Array(files.into_iter().map(JsonValue::String).collect()) JsonValue::Array(files.into_iter().map(JsonValue::String).collect())
@@ -209,7 +251,7 @@ impl Fu {
let info = let info =
params.get("info").unwrap().get::<HashMap<String, JsonValue>>().unwrap(); params.get("info").unwrap().get::<HashMap<String, JsonValue>>().unwrap();
let hash = info.get("hash").unwrap().get::<String>().unwrap(); let hash = info.get("hash").unwrap().get::<String>().unwrap();
if *hash != file_hash_ { if *hash != hash_ {
continue; continue;
} }
match params.get("event").unwrap().get::<String>().unwrap().as_str() { match params.get("event").unwrap().get::<String>().unwrap().as_str() {
@@ -217,22 +259,22 @@ impl Fu {
"metadata_download_completed" | "metadata_download_completed" |
"chunk_download_completed" | "chunk_download_completed" |
"resource_updated" => { "resource_updated" => {
print_progress_bar(info); print_progress(info);
} }
"download_completed" => { "download_completed" => {
let resource = info let resource_json = info
.get("resource") .get("resource")
.unwrap() .unwrap()
.get::<HashMap<String, JsonValue>>() .get::<HashMap<String, JsonValue>>()
.unwrap(); .unwrap();
let file_path = resource.get("path").unwrap().get::<String>().unwrap(); let path = resource_json.get("path").unwrap().get::<String>().unwrap();
print_progress_bar(info); print_progress(info);
println!("\nDownload completed:\n{file_path}"); println!("\nDownload completed:\n{path}");
return Ok(()); return Ok(());
} }
"metadata_not_found" => { "metadata_not_found" => {
println!(); println!();
return Err(Error::Custom(format!("Could not find {file_hash}"))); return Err(Error::Custom(format!("Could not find {hash}")));
} }
"chunk_not_found" => { "chunk_not_found" => {
// A seeder does not have a chunk we are looking for, // A seeder does not have a chunk we are looking for,
@@ -285,37 +327,52 @@ impl Fu {
let req = JsonRequest::new("list_resources", JsonValue::Array(vec![])); let req = JsonRequest::new("list_resources", JsonValue::Array(vec![]));
let rep = self.rpc_client.request(req).await?; let rep = self.rpc_client.request(req).await?;
let resources: Vec<JsonValue> = rep.clone().try_into().unwrap(); let resources_json: Vec<JsonValue> = rep.clone().try_into().unwrap();
let resources: Vec<Resource> = resources_json.into_iter().map(|v| v.into()).collect();
let mut tstdout = StandardStream::stdout(ColorChoice::Auto); let mut tstdout = StandardStream::stdout(ColorChoice::Auto);
for rs in resources.iter() { for resource in resources.iter() {
let resource = rs.get::<HashMap<String, JsonValue>>().unwrap();
let path = resource.get("path").unwrap().get::<String>().unwrap();
let hash = resource.get("hash").unwrap().get::<String>().unwrap().as_str();
let rtype = resource.get("type").unwrap().get::<String>().unwrap();
let chunks_downloaded =
*resource.get("chunks_downloaded").unwrap().get::<f64>().unwrap() as usize;
let chunks_total =
*resource.get("chunks_total").unwrap().get::<f64>().unwrap() as usize;
let status = resource.get("status").unwrap().get::<String>().unwrap();
tstdout.set_color(ColorSpec::new().set_bold(true)).unwrap(); tstdout.set_color(ColorSpec::new().set_bold(true)).unwrap();
println!("{path}"); println!("{}", resource.path.to_string_lossy());
tstdout.reset().unwrap(); tstdout.reset().unwrap();
println!(" ID: {hash}"); println!(" ID: {}", hash_to_string(&resource.hash));
print!(" Type: "); print!(" Type: ");
tstdout.set_color(&type_to_colorspec(rtype)).unwrap(); tstdout.set_color(&type_to_colorspec(&resource.rtype)).unwrap();
println!("{rtype}"); println!("{}", resource.rtype.as_str());
tstdout.reset().unwrap(); tstdout.reset().unwrap();
print!(" Status: "); print!(" Status: ");
tstdout.set_color(&status_to_colorspec(status)).unwrap(); tstdout.set_color(&status_to_colorspec(&resource.status)).unwrap();
println!("{status}"); println!("{}", resource.status.as_str());
tstdout.reset().unwrap(); tstdout.reset().unwrap();
println!( println!(
" Chunks: {chunks_downloaded}/{}", " Chunks: {}/{} ({}/{})",
match chunks_total { resource.total_chunks_downloaded,
match resource.total_chunks_count {
0 => "?".to_string(), 0 => "?".to_string(),
_ => chunks_total.to_string(), _ => resource.total_chunks_count.to_string(),
},
resource.target_chunks_downloaded,
match resource.target_chunks_count {
0 => "?".to_string(),
_ => resource.target_chunks_count.to_string(),
}
);
println!(
" Bytes: {} ({})",
match resource.total_bytes_size {
0 => "?".to_string(),
_ => format_progress_bytes(
resource.total_bytes_downloaded,
resource.total_bytes_size
),
},
match resource.target_bytes_size {
0 => "?".to_string(),
_ => format_progress_bytes(
resource.target_bytes_downloaded,
resource.target_bytes_size
),
} }
); );
} }
@@ -359,13 +416,13 @@ impl Fu {
let req = JsonRequest::new("list_seeders", JsonValue::Array(vec![])); let req = JsonRequest::new("list_seeders", JsonValue::Array(vec![]));
let rep = self.rpc_client.request(req).await?; let rep = self.rpc_client.request(req).await?;
let files: HashMap<String, JsonValue> = rep["seeders"].clone().try_into().unwrap(); let resources: HashMap<String, JsonValue> = rep["seeders"].clone().try_into().unwrap();
if files.is_empty() { if resources.is_empty() {
println!("No known seeders"); println!("No known seeders");
} else { } else {
for (file_hash, node_ids) in files { for (hash, node_ids) in resources {
println!("{file_hash}"); println!("{hash}");
let node_ids: Vec<JsonValue> = node_ids.try_into().unwrap(); let node_ids: Vec<JsonValue> = node_ids.try_into().unwrap();
for node_id in node_ids { for node_id in node_ids {
let node_id: String = node_id.try_into().unwrap(); let node_id: String = node_id.try_into().unwrap();
@@ -382,7 +439,7 @@ impl Fu {
let rep = self.rpc_client.request(req).await?; let rep = self.rpc_client.request(req).await?;
let resources_json: Vec<JsonValue> = rep.clone().try_into().unwrap(); let resources_json: Vec<JsonValue> = rep.clone().try_into().unwrap();
let resources: Arc<RwLock<Vec<HashMap<String, JsonValue>>>> = Arc::new(RwLock::new(vec![])); let resources: Arc<RwLock<Vec<Resource>>> = Arc::new(RwLock::new(vec![]));
let publisher = Publisher::new(); let publisher = Publisher::new();
let subscription = Arc::new(publisher.clone().subscribe().await); let subscription = Arc::new(publisher.clone().subscribe().await);
@@ -415,13 +472,9 @@ impl Fu {
let mut tstdout = StandardStream::stdout(ColorChoice::Auto); let mut tstdout = StandardStream::stdout(ColorChoice::Auto);
let mut update_resource = async |resource: &HashMap<String, JsonValue>| { let mut update_resource = async |resource: &Resource| {
let hash = resource.get("hash").unwrap().get::<String>().unwrap();
let mut resources_write = resources.write().await; let mut resources_write = resources.write().await;
let i = match resources_write let i = match resources_write.iter().position(|r| r.hash == resource.hash) {
.iter()
.position(|r| r.get("hash").unwrap().get::<String>().unwrap() == hash)
{
Some(i) => { Some(i) => {
resources_write.remove(i); resources_write.remove(i);
resources_write.insert(i, resource.clone()); resources_write.insert(i, resource.clone());
@@ -436,32 +489,92 @@ impl Fu {
// Move the cursor to the i-th line and clear it // Move the cursor to the i-th line and clear it
print!("\x1b[{};1H\x1B[2K", i + 2); print!("\x1b[{};1H\x1B[2K", i + 2);
let hash = resource.get("hash").unwrap().get::<String>().unwrap(); // Hash
print!("\r{hash:>44} "); print!("\r{:>44} ", hash_to_string(&resource.hash));
let rtype = resource.get("type").unwrap().get::<String>().unwrap(); // Type
tstdout.set_color(&type_to_colorspec(rtype)).unwrap(); tstdout.set_color(&type_to_colorspec(&resource.rtype)).unwrap();
print!("{rtype:>9} "); print!(
"{:>4} ",
match resource.rtype.as_str() {
"unknown" => "?",
"directory" => "dir",
_ => resource.rtype.as_str(),
}
);
tstdout.reset().unwrap(); tstdout.reset().unwrap();
let status = resource.get("status").unwrap().get::<String>().unwrap(); // Status
tstdout.set_color(&status_to_colorspec(status)).unwrap(); tstdout.set_color(&status_to_colorspec(&resource.status)).unwrap();
print!("{status:>11} "); print!("{:>11} ", resource.status.as_str());
tstdout.reset().unwrap(); tstdout.reset().unwrap();
let chunks_downloaded = // Downloaded / Total (in bytes)
*resource.get("chunks_downloaded").unwrap().get::<f64>().unwrap() as usize; match resource.total_bytes_size {
let chunks_total =
*resource.get("chunks_total").unwrap().get::<f64>().unwrap() as usize;
match chunks_total {
0 => { 0 => {
print!("{:>5.1} {:>9}", 0.0, format!("{chunks_downloaded}/?")); print!("{:>5.1} {:>16} ", 0.0, "?");
} }
_ => { _ => {
let percent = chunks_downloaded as f64 / chunks_total as f64 * 100.0; let percent = resource.total_bytes_downloaded as f64 /
print!("{:>5.1} {:>9}", percent, format!("{chunks_downloaded}/{chunks_total}")); resource.total_bytes_size as f64 *
100.0;
if resource.total_bytes_downloaded == resource.total_bytes_size {
print!("{:>5.1} {:>16} ", percent, format_bytes(resource.total_bytes_size));
} else {
print!(
"{:>5.1} {:>16} ",
percent,
format_progress_bytes(
resource.total_bytes_downloaded,
resource.total_bytes_size
)
);
}
} }
}; };
// Downloaded / Total (in chunks)
match resource.total_chunks_count {
0 => {
print!("{:>9} ", format!("{}/?", resource.total_chunks_downloaded));
}
_ => {
if resource.total_chunks_downloaded == resource.total_chunks_count {
print!("{:>9} ", resource.total_chunks_count.to_string());
} else {
print!(
"{:>9} ",
format!(
"{}/{}",
resource.total_chunks_downloaded, resource.total_chunks_count
)
);
}
}
};
// Download speed (in bytes/sec)
let speed_available = resource.total_bytes_downloaded < resource.total_bytes_size &&
resource.status.as_str() == "downloading" &&
!resource.speeds.is_empty();
print!(
"{:>12} ",
match speed_available {
false => "-".to_string(),
true => format!("{}/s", format_bytes(*resource.speeds.last().unwrap() as u64)),
}
);
// ETA
let eta = resource.get_eta();
print!(
"{:>6}",
match eta {
0 => "-".to_string(),
_ => format_duration(eta),
}
);
println!(); println!();
// Move the cursor to end // Move the cursor to end
@@ -475,8 +588,8 @@ impl Fu {
// Print column headers // Print column headers
println!( println!(
"\x1b[4m{:>44} {:>9} {:>11} {:>5} {:>9}\x1b[0m", "\x1b[4m{:>44} {:>4} {:>11} {:>5} {:>16} {:>9} {:>12} {:>6}\x1b[0m",
"Hash", "Type", "Status", "%", "Chunks" "Hash", "Type", "Status", "%", "Bytes", "Chunks", "Speed", "ETA"
); );
}; };
@@ -485,8 +598,8 @@ impl Fu {
println!("No known resources"); println!("No known resources");
} else { } else {
for resource in resources_json.iter() { for resource in resources_json.iter() {
let resource = resource.get::<HashMap<String, JsonValue>>().unwrap(); let rs: Resource = resource.clone().into();
update_resource(resource).await; update_resource(&rs).await;
} }
} }
@@ -504,20 +617,16 @@ impl Fu {
"missing_chunks" | "missing_chunks" |
"metadata_not_found" | "metadata_not_found" |
"resource_updated" => { "resource_updated" => {
let resource = info let resource: Resource = info.get("resource").unwrap().clone().into();
.get("resource") update_resource(&resource).await;
.unwrap()
.get::<HashMap<String, JsonValue>>()
.unwrap();
update_resource(resource).await;
} }
"resource_removed" => { "resource_removed" => {
{ {
let hash = info.get("hash").unwrap().get::<String>().unwrap(); let hash = info.get("hash").unwrap().get::<String>().unwrap();
let mut resources_write = resources.write().await; let mut resources_write = resources.write().await;
let i = resources_write.iter().position(|r| { let i = resources_write
r.get("hash").unwrap().get::<String>().unwrap() == hash .iter()
}); .position(|r| hash_to_string(&r.hash) == *hash);
if let Some(i) = i { if let Some(i) = i {
resources_write.remove(i); resources_write.remove(i);
} }

View File

@@ -18,27 +18,90 @@
use termcolor::{Color, ColorSpec}; use termcolor::{Color, ColorSpec};
pub fn status_to_colorspec(status: &str) -> ColorSpec { use fud::resource::{ResourceStatus, ResourceType};
const UNITS: [&str; 7] = ["B", "KB", "MB", "GB", "TB", "PB", "EB"];
pub fn status_to_colorspec(status: &ResourceStatus) -> ColorSpec {
ColorSpec::new() ColorSpec::new()
.set_fg(match status { .set_fg(match status {
"downloading" => Some(Color::Blue), ResourceStatus::Downloading => Some(Color::Blue),
"seeding" => Some(Color::Green), ResourceStatus::Seeding => Some(Color::Green),
"discovering" => Some(Color::Magenta), ResourceStatus::Discovering => Some(Color::Magenta),
"incomplete" => Some(Color::Red), ResourceStatus::Incomplete => Some(Color::Red),
"verifying" => Some(Color::Yellow), ResourceStatus::Verifying => Some(Color::Yellow),
_ => None,
}) })
.set_bold(true) .set_bold(true)
.clone() .clone()
} }
pub fn type_to_colorspec(rtype: &str) -> ColorSpec { pub fn type_to_colorspec(rtype: &ResourceType) -> ColorSpec {
ColorSpec::new() ColorSpec::new()
.set_fg(match rtype { .set_fg(match rtype {
"file" => Some(Color::Blue), ResourceType::File => Some(Color::Blue),
"directory" => Some(Color::Magenta), ResourceType::Directory => Some(Color::Magenta),
_ => None, ResourceType::Unknown => None,
}) })
.set_bold(true) .set_bold(true)
.clone() .clone()
} }
pub fn format_bytes(bytes: u64) -> String {
let mut size = bytes as f64;
let mut unit_index = 0;
while size >= 1024.0 && unit_index < UNITS.len() - 1 {
size /= 1024.0;
unit_index += 1;
}
format!("{size:.1} {}", UNITS[unit_index])
}
pub fn format_progress_bytes(current: u64, total: u64) -> String {
let mut total = total as f64;
let mut unit_index = 0;
while total >= 1024.0 && unit_index < UNITS.len() - 1 {
total /= 1024.0;
unit_index += 1;
}
let current = (current as f64) / 1024_f64.powi(unit_index as i32);
format!("{current:.1}/{total:.1} {}", UNITS[unit_index])
}
/// Returns a formated string from the duration.
/// - 1 -> 1s
/// - 60 -> 1m
/// - 90 -> 1m30s
pub fn format_duration(seconds: u64) -> String {
if seconds == 0 {
return "0s".to_string();
}
let units = [
(86400, "d"), // days
(3600, "h"), // hours
(60, "m"), // minutes
(1, "s"), // seconds
];
for (i, (unit_seconds, unit_symbol)) in units.iter().enumerate() {
if seconds >= *unit_seconds {
let first = seconds / unit_seconds;
let remaining = seconds % unit_seconds;
if remaining > 0 && i < units.len() - 1 {
let (next_unit_seconds, next_unit_symbol) = units[i + 1];
let second = remaining / next_unit_seconds;
return format!("{first}{unit_symbol}{second}{next_unit_symbol}");
}
return format!("{first}{unit_symbol}");
}
}
"0s".to_string()
}