fud, geode: chunks are written and fetched from the full file

This commit is contained in:
epiphany
2025-05-18 19:14:38 +02:00
parent a6fb5f8852
commit 55e08ae605
9 changed files with 648 additions and 548 deletions

1
Cargo.lock generated
View File

@@ -3076,6 +3076,7 @@ dependencies = [
"signal-hook",
"signal-hook-async-std",
"simplelog",
"sled-overlay",
"smol",
"structopt",
"structopt-toml",

View File

@@ -9,7 +9,7 @@ homepage = "https://dark.fi"
repository = "https://codeberg.org/darkrenaissance/darkfi"
[dependencies]
darkfi = {path = "../../../", features = ["async-daemonize", "geode", "rpc", "dht"]}
darkfi = {path = "../../../", features = ["async-daemonize", "geode", "rpc", "dht", "sled-overlay"]}
darkfi-serial = {version = "0.4.2", features = ["hash"]}
# Misc
@@ -30,6 +30,9 @@ signal-hook = "0.3.17"
simplelog = "0.12.2"
smol = "2.0.2"
# Database
sled-overlay = "0.1.8"
# Argument parsing
serde = {version = "1.0.219", features = ["derive"]}
structopt = "0.3.26"

206
bin/fud/fud/src/event.rs Normal file
View File

@@ -0,0 +1,206 @@
/* This file is part of DarkFi (https://dark.fi)
*
* Copyright (C) 2020-2025 Dyne.org foundation
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
use tinyjson::JsonValue;
use darkfi::{
geode::hash_to_string,
rpc::util::{json_map, json_str},
};
use crate::resource::Resource;
#[derive(Clone, Debug)]
pub struct DownloadStarted {
pub hash: blake3::Hash,
pub resource: Resource,
}
#[derive(Clone, Debug)]
pub struct ChunkDownloadCompleted {
pub hash: blake3::Hash,
pub chunk_hash: blake3::Hash,
pub resource: Resource,
}
#[derive(Clone, Debug)]
pub struct FileDownloadCompleted {
pub hash: blake3::Hash,
pub resource: Resource,
}
#[derive(Clone, Debug)]
pub struct DownloadCompleted {
pub hash: blake3::Hash,
pub resource: Resource,
}
#[derive(Clone, Debug)]
pub struct ResourceUpdated {
pub hash: blake3::Hash,
pub resource: Resource,
}
#[derive(Clone, Debug)]
pub struct ResourceRemoved {
pub hash: blake3::Hash,
}
#[derive(Clone, Debug)]
pub struct ChunkNotFound {
pub hash: blake3::Hash,
pub chunk_hash: blake3::Hash,
}
#[derive(Clone, Debug)]
pub struct FileNotFound {
pub hash: blake3::Hash,
pub resource: Resource,
}
#[derive(Clone, Debug)]
pub struct MissingChunks {
pub hash: blake3::Hash,
pub resource: Resource,
}
#[derive(Clone, Debug)]
pub struct DownloadError {
pub hash: blake3::Hash,
pub error: String,
}
#[derive(Clone, Debug)]
pub enum FudEvent {
DownloadStarted(DownloadStarted),
ChunkDownloadCompleted(ChunkDownloadCompleted),
FileDownloadCompleted(FileDownloadCompleted),
DownloadCompleted(DownloadCompleted),
ResourceUpdated(ResourceUpdated),
ResourceRemoved(ResourceRemoved),
ChunkNotFound(ChunkNotFound),
FileNotFound(FileNotFound),
MissingChunks(MissingChunks),
DownloadError(DownloadError),
}
impl From<DownloadStarted> for JsonValue {
fn from(info: DownloadStarted) -> JsonValue {
json_map([
("hash", JsonValue::String(hash_to_string(&info.hash))),
("resource", info.resource.into()),
])
}
}
impl From<ChunkDownloadCompleted> for JsonValue {
fn from(info: ChunkDownloadCompleted) -> JsonValue {
json_map([
("hash", JsonValue::String(hash_to_string(&info.hash))),
("chunk_hash", JsonValue::String(hash_to_string(&info.chunk_hash))),
("resource", info.resource.into()),
])
}
}
impl From<FileDownloadCompleted> for JsonValue {
fn from(info: FileDownloadCompleted) -> JsonValue {
json_map([
("hash", JsonValue::String(hash_to_string(&info.hash))),
("resource", info.resource.into()),
])
}
}
impl From<DownloadCompleted> for JsonValue {
fn from(info: DownloadCompleted) -> JsonValue {
json_map([
("hash", JsonValue::String(hash_to_string(&info.hash))),
("resource", info.resource.into()),
])
}
}
impl From<ResourceUpdated> for JsonValue {
fn from(info: ResourceUpdated) -> JsonValue {
json_map([
("hash", JsonValue::String(hash_to_string(&info.hash))),
("resource", info.resource.into()),
])
}
}
impl From<ResourceRemoved> for JsonValue {
fn from(info: ResourceRemoved) -> JsonValue {
json_map([("hash", JsonValue::String(hash_to_string(&info.hash)))])
}
}
impl From<ChunkNotFound> for JsonValue {
fn from(info: ChunkNotFound) -> JsonValue {
json_map([
("hash", JsonValue::String(hash_to_string(&info.hash))),
("chunk_hash", JsonValue::String(hash_to_string(&info.chunk_hash))),
])
}
}
impl From<FileNotFound> for JsonValue {
fn from(info: FileNotFound) -> JsonValue {
json_map([
("hash", JsonValue::String(hash_to_string(&info.hash))),
("resource", info.resource.into()),
])
}
}
impl From<MissingChunks> for JsonValue {
fn from(info: MissingChunks) -> JsonValue {
json_map([
("hash", JsonValue::String(hash_to_string(&info.hash))),
("resource", info.resource.into()),
])
}
}
impl From<DownloadError> for JsonValue {
fn from(info: DownloadError) -> JsonValue {
json_map([
("hash", JsonValue::String(hash_to_string(&info.hash))),
("error", JsonValue::String(info.error)),
])
}
}
impl From<FudEvent> for JsonValue {
fn from(event: FudEvent) -> JsonValue {
match event {
FudEvent::DownloadStarted(info) => {
json_map([("event", json_str("download_started")), ("info", info.into())])
}
FudEvent::ChunkDownloadCompleted(info) => {
json_map([("event", json_str("chunk_download_completed")), ("info", info.into())])
}
FudEvent::FileDownloadCompleted(info) => {
json_map([("event", json_str("file_download_completed")), ("info", info.into())])
}
FudEvent::DownloadCompleted(info) => {
json_map([("event", json_str("download_completed")), ("info", info.into())])
}
FudEvent::ResourceUpdated(info) => {
json_map([("event", json_str("resource_updated")), ("info", info.into())])
}
FudEvent::ResourceRemoved(info) => {
json_map([("event", json_str("resource_removed")), ("info", info.into())])
}
FudEvent::ChunkNotFound(info) => {
json_map([("event", json_str("chunk_not_found")), ("info", info.into())])
}
FudEvent::FileNotFound(info) => {
json_map([("event", json_str("file_not_found")), ("info", info.into())])
}
FudEvent::MissingChunks(info) => {
json_map([("event", json_str("missing_chunks")), ("info", info.into())])
}
FudEvent::DownloadError(info) => {
json_map([("event", json_str("download_error")), ("info", info.into())])
}
}
}
}

View File

@@ -21,6 +21,7 @@ use futures::{future::FutureExt, pin_mut, select};
use log::{debug, error, info, warn};
use num_bigint::BigUint;
use rand::{prelude::IteratorRandom, rngs::OsRng, seq::SliceRandom, RngCore};
use sled_overlay::sled;
use smol::{
channel,
fs::{File, OpenOptions},
@@ -56,8 +57,8 @@ use darkfi::{
Error, Result,
};
use event::{ChunkDownloadCompleted, ChunkNotFound, FudEvent, ResourceUpdated};
use resource::{Resource, ResourceStatus};
use rpc::{ChunkDownloadCompleted, ChunkNotFound, FudEvent};
use tasks::FetchReply;
/// P2P protocols
@@ -68,6 +69,7 @@ use proto::{
FudPingRequest, ProtocolFud,
};
mod event;
mod resource;
mod rpc;
mod tasks;
@@ -101,7 +103,7 @@ struct Args {
/// Default path to store downloaded files (defaults to <base_dir>/downloads)
downloads_path: Option<String>,
#[structopt(short, long, default_value = "60")]
#[structopt(long, default_value = "60")]
/// Chunk transfer timeout in seconds
chunk_timeout: u64,
@@ -140,10 +142,13 @@ pub struct Fud {
/// Resources (current status of all downloads/seeds)
resources: Arc<RwLock<HashMap<blake3::Hash, Resource>>>,
/// Sled tree containing "resource hash -> path on the filesystem"
path_tree: sled::Tree,
get_tx: channel::Sender<(u16, blake3::Hash, PathBuf, Result<()>)>,
get_rx: channel::Receiver<(u16, blake3::Hash, PathBuf, Result<()>)>,
file_fetch_tx: channel::Sender<(Vec<DhtNode>, blake3::Hash, Result<()>)>,
file_fetch_rx: channel::Receiver<(Vec<DhtNode>, blake3::Hash, Result<()>)>,
file_fetch_tx: channel::Sender<(Vec<DhtNode>, blake3::Hash, PathBuf, Result<()>)>,
file_fetch_rx: channel::Receiver<(Vec<DhtNode>, blake3::Hash, PathBuf, Result<()>)>,
file_fetch_end_tx: channel::Sender<(blake3::Hash, Result<()>)>,
file_fetch_end_rx: channel::Receiver<(blake3::Hash, Result<()>)>,
@@ -242,13 +247,37 @@ impl Fud {
/// Skipped if we have no external address.
async fn init(&self) -> Result<()> {
info!(target: "fud::init()", "Finding resources...");
let hashes = self.geode.list_files().await?;
let mut resources_write = self.resources.write().await;
for hash in hashes {
for result in self.path_tree.iter() {
if result.is_err() {
continue;
}
// Parse hash
let (hash, path) = result.unwrap();
let hash_bytes: [u8; 32] = match hash.to_vec().try_into() {
Ok(v) => v,
Err(_) => continue,
};
let hash = blake3::Hash::from_bytes(hash_bytes);
// Parse path
let path_bytes = path.to_vec();
let path_str = match std::str::from_utf8(&path_bytes) {
Ok(v) => v,
Err(_) => continue,
};
let path: PathBuf = match expand_path(path_str) {
Ok(v) => v,
Err(_) => continue,
};
// Add resource
resources_write.insert(
hash,
Resource {
hash,
path,
status: ResourceStatus::Incomplete,
chunks_total: 0,
chunks_downloaded: 0,
@@ -258,7 +287,7 @@ impl Fud {
drop(resources_write);
info!(target: "fud::init()", "Verifying resources...");
let resources = self.get_seeding_resources().await?;
let resources = self.verify_resources(None).await?;
let self_node = self.dht().node().await;
@@ -281,11 +310,23 @@ impl Fud {
Ok(())
}
/// Get resource path from hash using the sled db
fn hash_to_path(&self, hash: &blake3::Hash) -> Result<Option<PathBuf>> {
if let Some(value) = self.path_tree.get(hash.as_bytes())? {
let path: PathBuf = expand_path(std::str::from_utf8(&value)?)?;
return Ok(Some(path));
}
Ok(None)
}
/// Verify if resources are complete and uncorrupted.
/// If a resource is incomplete or corrupted, its status is changed to Incomplete.
/// If a resource is complete, its status is changed to Seeding.
/// Takes an optional list of hashes.
/// If no hash is given (None), it verifies all resources.
/// Returns the list of verified and uncorrupted/complete seeding resources.
async fn get_seeding_resources(&self) -> Result<Vec<Resource>> {
async fn verify_resources(&self, hashes: Option<Vec<blake3::Hash>>) -> Result<Vec<Resource>> {
let mut resources_write = self.resources.write().await;
let update_resource =
@@ -301,10 +342,23 @@ impl Fud {
Some(chunked_file) => chunked_file.local_chunks() as u64,
None => 0,
};
self.event_publisher
.notify(FudEvent::ResourceUpdated(ResourceUpdated {
hash: resource.hash,
resource: resource.clone(),
}))
.await;
};
let mut seeding_resources: Vec<Resource> = vec![];
for (_, mut resource) in resources_write.iter_mut() {
if let Some(ref hashes_list) = hashes {
if !hashes_list.contains(&resource.hash) {
continue;
}
}
match resource.status {
ResourceStatus::Seeding => {}
ResourceStatus::Incomplete => {}
@@ -312,7 +366,14 @@ impl Fud {
};
// Make sure the resource is not corrupted or incomplete
let chunked_file = match self.geode.get(&resource.hash).await {
let resource_path = match self.hash_to_path(&resource.hash) {
Ok(Some(v)) => v,
Ok(None) | Err(_) => {
update_resource(&mut resource, ResourceStatus::Incomplete, None).await;
continue;
}
};
let chunked_file = match self.geode.get(&resource.hash, &resource_path).await {
Ok(v) => v,
Err(_) => {
update_resource(&mut resource, ResourceStatus::Incomplete, None).await;
@@ -388,6 +449,7 @@ impl Fud {
/// Fetch chunks for a file from `seeders`
async fn fetch_chunks(
&self,
file_path: &PathBuf,
file_hash: &blake3::Hash,
chunk_hashes: &HashSet<blake3::Hash>,
seeders: &HashSet<DhtRouterItem>,
@@ -407,6 +469,7 @@ impl Fud {
continue;
}
};
let mut chunks_to_query = remaining_chunks.clone();
info!("Requesting chunks from seeder {}", hash_to_string(&seeder.node.id));
loop {
let msg_subsystem = channel.message_subsystem();
@@ -415,8 +478,6 @@ impl Fud {
let msg_subscriber_chunk = channel.subscribe_msg::<FudChunkReply>().await.unwrap();
let msg_subscriber_notfound = channel.subscribe_msg::<FudNotFound>().await.unwrap();
let mut chunks_to_query = remaining_chunks.clone();
// Select a chunk to request
let mut chunk_hash: Option<blake3::Hash> = None;
if let Some(&random_chunk) = chunks_to_query.iter().choose(&mut OsRng) {
@@ -428,8 +489,10 @@ impl Fud {
break; // Switch to another seeder
}
let chunk_hash = chunk_hash.unwrap();
chunks_to_query.remove(&chunk_hash);
let send_res = channel.send(&FudFindRequest { key: chunk_hash }).await;
let send_res =
channel.send(&FudFindRequest { info: Some(*file_hash), key: chunk_hash }).await;
if let Err(e) = send_res {
warn!(target: "fud::fetch_chunks()", "Error while sending FudFindRequest: {}", e);
break; // Switch to another seeder
@@ -449,10 +512,9 @@ impl Fud {
warn!(target: "fud::fetch_chunks()", "Error waiting for chunk reply: {}", e);
break; // Switch to another seeder
}
chunks_to_query.remove(&chunk_hash);
let reply = chunk_reply.unwrap();
match self.geode.insert_chunk(&reply.chunk).await {
match self.geode.write_chunk(file_hash, file_path, &reply.chunk).await {
Ok(inserted_hash) => {
if inserted_hash != chunk_hash {
warn!("Received chunk does not match requested chunk");
@@ -461,7 +523,7 @@ impl Fud {
continue; // Skip to next chunk, will retry this chunk later
}
// Upade resource `chunks_downloaded`
// Update resource `chunks_downloaded`
let mut resources_write = self.resources.write().await;
let resource = match resources_write.get_mut(file_hash) {
Some(resource) => {
@@ -476,7 +538,7 @@ impl Fud {
info!(target: "fud::fetch_chunks()", "Received chunk {} from seeder {}", hash_to_string(&chunk_hash), hash_to_string(&seeder.node.id));
self.event_publisher
.notify(FudEvent::ChunkDownloadCompleted(ChunkDownloadCompleted {
file_hash: *file_hash,
hash: *file_hash,
chunk_hash,
resource,
}))
@@ -498,11 +560,10 @@ impl Fud {
info!(target: "fud::fetch_chunks()", "Received NOTFOUND {} from seeder {}", hash_to_string(&chunk_hash), hash_to_string(&seeder.node.id));
self.event_publisher
.notify(FudEvent::ChunkNotFound(ChunkNotFound {
file_hash: *file_hash,
hash: *file_hash,
chunk_hash,
}))
.await;
chunks_to_query.remove(&chunk_hash);
}
};
@@ -593,7 +654,8 @@ impl Fud {
let msg_subscriber_notfound =
channel.subscribe_msg::<FudNotFound>().await.unwrap();
let send_res = channel.send(&FudFindRequest { key: file_hash }).await;
let send_res =
channel.send(&FudFindRequest { info: None, key: file_hash }).await;
if let Err(e) = send_res {
warn!(target: "fud::fetch_file_metadata()", "Error while sending FudFindRequest: {}", e);
msg_subscriber_chunk.unsubscribe().await;
@@ -697,8 +759,12 @@ async fn realmain(args: Args, ex: Arc<Executor<'static>>) -> Result<()> {
// Hashmap used for routing
let seeders_router = Arc::new(RwLock::new(HashMap::new()));
// Sled database init
info!("Instantiating database");
let sled_db = sled::open(basedir.join("db"))?;
info!("Instantiating Geode instance");
let geode = Geode::new(&basedir).await?;
let geode = Geode::new(&basedir /*, sled_db, "geode"*/).await?;
info!("Instantiating P2P network");
let net_settings: NetSettings = args.net.into();
@@ -771,6 +837,7 @@ async fn realmain(args: Args, ex: Arc<Executor<'static>>) -> Result<()> {
downloads_path,
chunk_timeout: args.chunk_timeout,
dht: dht.clone(),
path_tree: sled_db.open_tree("path")?,
resources: Arc::new(RwLock::new(HashMap::new())),
get_tx,
get_rx,

View File

@@ -18,12 +18,12 @@
use async_trait::async_trait;
use log::{debug, error, info};
use smol::{fs::File, Executor};
use smol::Executor;
use std::sync::Arc;
use darkfi::{
dht::{DhtHandler, DhtNode, DhtRouterItem},
geode::{hash_to_string, read_until_filled, MAX_CHUNK_SIZE},
geode::hash_to_string,
impl_p2p_message,
net::{
metering::{MeteringConfiguration, DEFAULT_METERING_CONFIGURATION},
@@ -79,6 +79,7 @@ impl_p2p_message!(FudPingReply, "FudPingReply", 0, 0, DEFAULT_METERING_CONFIGURA
/// Message representing a find file/chunk request from the network
#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
pub struct FudFindRequest {
pub info: Option<blake3::Hash>,
pub key: blake3::Hash,
}
impl_p2p_message!(FudFindRequest, "FudFindRequest", 0, 0, DEFAULT_METERING_CONFIGURATION);
@@ -202,63 +203,87 @@ impl ProtocolFud {
self.fud.update_node(&node).await;
}
// Chunk
{
let chunk_res = self.fud.geode.get_chunk(&request.key).await;
// TODO: Run geode GC
if let Ok(chunk_path) = chunk_res {
let mut buf = vec![0u8; MAX_CHUNK_SIZE];
let mut chunk_fd = File::open(&chunk_path).await.unwrap();
let bytes_read = read_until_filled(&mut chunk_fd, &mut buf).await.unwrap();
let chunk_slice = &buf[..bytes_read];
let reply = FudChunkReply { chunk: chunk_slice.to_vec() };
info!(target: "fud::ProtocolFud::handle_fud_find_request()", "Sending chunk");
let _ = self.channel.send(&reply).await;
continue;
}
if self.handle_fud_chunk_request(&request).await {
continue;
}
// File
{
let file_res = self.fud.geode.get(&request.key).await;
// TODO: Run geode GC
if let Ok(chunked_file) = file_res {
// If the file has a single chunk, just reply with the chunk
if chunked_file.len() == 1 {
let chunk_res =
self.fud.geode.get_chunk(&chunked_file.iter().next().unwrap().0).await;
if let Ok(chunk_path) = chunk_res {
let mut buf = vec![0u8; MAX_CHUNK_SIZE];
let mut chunk_fd = File::open(&chunk_path).await.unwrap();
let bytes_read =
read_until_filled(&mut chunk_fd, &mut buf).await.unwrap();
let chunk_slice = &buf[..bytes_read];
let reply = FudChunkReply { chunk: chunk_slice.to_vec() };
info!(target: "fud::ProtocolFud::handle_fud_find_request()", "Sending chunk (file has a single chunk)");
let _ = self.channel.send(&reply).await;
continue;
}
}
// Otherwise reply with the file metadata
let reply = FudFileReply {
chunk_hashes: chunked_file.iter().map(|(chunk, _)| *chunk).collect(),
};
info!(target: "fud::ProtocolFud::handle_fud_find_request()", "Sending file");
let _ = self.channel.send(&reply).await;
continue;
}
if self.handle_fud_file_request(&request).await {
continue;
}
// Request did not match anything we have
let reply = FudNotFound {};
info!(target: "fud::ProtocolFud::handle_fud_find_request()", "We do not have {}", hash_to_string(&request.key));
let _ = self.channel.send(&reply).await;
}
}
/// If the FudFindRequest matches a chunk we have, handle it.
/// Returns true if the chunk was found.
async fn handle_fud_chunk_request(&self, request: &FudFindRequest) -> bool {
let file_hash = request.info;
if file_hash.is_none() {
return false;
}
let file_hash = file_hash.unwrap();
let file_path = self.fud.hash_to_path(&file_hash).ok().flatten();
if file_path.is_none() {
return false;
}
let file_path = file_path.unwrap();
let chunk = self.fud.geode.get_chunk(&request.key, &file_hash, &file_path).await;
if let Ok(chunk) = chunk {
// TODO: Run geode GC
let reply = FudChunkReply { chunk };
info!(target: "fud::ProtocolFud::handle_fud_find_request()", "Sending chunk");
let _ = self.channel.send(&reply).await;
return true;
}
false
}
/// If the FudFindRequest matches a file we have, handle it
/// Returns true if the file was found.
async fn handle_fud_file_request(&self, request: &FudFindRequest) -> bool {
let file_path = self.fud.hash_to_path(&request.key).ok().flatten();
if file_path.is_none() {
return false;
}
let file_path = file_path.unwrap();
let chunked_file = self.fud.geode.get(&request.key, &file_path).await.ok();
if chunked_file.is_none() {
return false;
}
let chunked_file = chunked_file.unwrap();
// If the file has a single chunk, just reply with the chunk
if chunked_file.len() == 1 {
let chunk = self
.fud
.geode
.get_chunk(&chunked_file.iter().next().unwrap().0, &request.key, &file_path)
.await;
if let Ok(chunk) = chunk {
// TODO: Run geode GC
let reply = FudChunkReply { chunk };
info!(target: "fud::ProtocolFud::handle_fud_find_request()", "Sending chunk (file has a single chunk)");
let _ = self.channel.send(&reply).await;
return true;
}
return false;
}
// Otherwise reply with the file metadata
let reply =
FudFileReply { chunk_hashes: chunked_file.iter().map(|(chunk, _)| *chunk).collect() };
info!(target: "fud::ProtocolFud::handle_fud_find_request()", "Sending file");
let _ = self.channel.send(&reply).await;
true
}
async fn handle_fud_find_nodes_request(self: Arc<Self>) -> Result<()> {
debug!(target: "fud::ProtocolFud::handle_fud_find_nodes_request()", "START");

View File

@@ -17,6 +17,7 @@
*/
use darkfi::{geode::hash_to_string, rpc::util::json_map};
use std::path::PathBuf;
use tinyjson::JsonValue;
#[derive(Clone, Debug)]
@@ -30,6 +31,7 @@ pub enum ResourceStatus {
#[derive(Clone, Debug)]
pub struct Resource {
pub hash: blake3::Hash,
pub path: PathBuf,
pub status: ResourceStatus,
pub chunks_total: u64,
pub chunks_downloaded: u64,
@@ -39,6 +41,13 @@ impl From<Resource> for JsonValue {
fn from(rs: Resource) -> JsonValue {
json_map([
("hash", JsonValue::String(hash_to_string(&rs.hash))),
(
"path",
JsonValue::String(match rs.path.into_os_string().into_string() {
Ok(path) => path,
Err(_) => "".to_string(),
}),
),
(
"status",
JsonValue::String(

View File

@@ -35,7 +35,6 @@ use darkfi::{
jsonrpc::{ErrorCode, JsonError, JsonRequest, JsonResponse, JsonResult},
p2p_method::HandlerP2p,
server::RequestHandler,
util::{json_map, json_str},
},
system::StoppableTaskPtr,
util::path::expand_path,
@@ -43,6 +42,7 @@ use darkfi::{
};
use crate::{
event::{self, FudEvent},
proto::FudAnnounce,
resource::{Resource, ResourceStatus},
Fud,
@@ -61,6 +61,7 @@ impl RequestHandler<()> for Fud {
"list_resources" => self.list_resources(req.id, req.params).await,
"list_buckets" => self.list_buckets(req.id, req.params).await,
"list_seeders" => self.list_seeders(req.id, req.params).await,
"verify" => self.verify(req.id, req.params).await,
"dnet.switch" => self.dnet_switch(req.id, req.params).await,
"dnet.subscribe_events" => self.dnet_subscribe_events(req.id, req.params).await,
@@ -116,7 +117,7 @@ impl Fud {
}
};
let (file_hash, _) = match self.geode.insert(fd).await {
let (file_hash, chunk_hashes) = match self.geode.insert(fd).await {
Ok(v) => v,
Err(e) => {
let error_str = format!("Failed inserting file {:?} to geode: {}", path, e);
@@ -125,6 +126,29 @@ impl Fud {
}
};
// Add path to the sled db
if let Err(e) = self
.path_tree
.insert(file_hash.as_bytes(), path.to_string_lossy().to_string().as_bytes())
{
error!(target: "fud::put()", "Failed inserting new file into sled: {}", e);
return JsonError::new(ErrorCode::InternalError, None, id).into()
}
// Add resource
let mut resources_write = self.resources.write().await;
resources_write.insert(
file_hash,
Resource {
hash: file_hash,
path,
status: ResourceStatus::Seeding,
chunks_total: chunk_hashes.len() as u64,
chunks_downloaded: chunk_hashes.len() as u64,
},
);
drop(resources_write);
// Announce file
let fud_announce = FudAnnounce { key: file_hash, seeders: vec![self_node.into()] };
let _ = self.announce(&file_hash, &fud_announce, self.seeders_router.clone()).await;
@@ -137,7 +161,7 @@ impl Fud {
// Returns the path where the file will be located once downloaded.
//
// --> {"jsonrpc": "2.0", "method": "get", "params": ["1211...abfd", "~/myfile.jpg"], "id": 42}
// <-- {"jsonrpc": "2.0", "method": "get", "params": "/home/user/myfile.jpg"}
// <-- {"jsonrpc": "2.0", "result": "/home/user/myfile.jpg", "id": 42}
async fn get(&self, id: u16, params: JsonValue) -> JsonResult {
let params = params.get::<Vec<JsonValue>>().unwrap();
if params.len() != 2 || !params[0].is_string() || !params[1].is_string() {
@@ -186,7 +210,7 @@ impl Fud {
// Subscribe to download events.
//
// --> {"jsonrpc": "2.0", "method": "get", "params": [], "id": 42}
// <-- {"jsonrpc": "2.0", "method": "get", "params": `event`}
// <-- {"jsonrpc": "2.0", "result": `event`, "id": 42}
async fn subscribe(&self, _id: u16, _params: JsonValue) -> JsonResult {
self.event_sub.clone().into()
}
@@ -232,7 +256,7 @@ impl Fud {
}
// RPCAPI:
// Returns resources from the database.
// Returns resources.
//
// --> {"jsonrpc": "2.0", "method": "list_buckets", "params": [], "id": 1}
// <-- {"jsonrpc": "2.0", "result": [[["abcdef", ["tcp://127.0.0.1:13337"]]]], "id": 1}
@@ -326,177 +350,49 @@ impl Fud {
drop(resources_write);
self.event_publisher
.notify(FudEvent::ResourceRemoved(ResourceRemoved { file_hash: hash }))
.notify(FudEvent::ResourceRemoved(event::ResourceRemoved { hash }))
.await;
JsonResponse::new(JsonValue::Array(vec![]), id).into()
}
}
#[derive(Clone, Debug)]
pub struct DownloadStarted {
pub file_hash: blake3::Hash,
pub file_path: PathBuf,
pub resource: Resource,
}
#[derive(Clone, Debug)]
pub struct ChunkDownloadCompleted {
pub file_hash: blake3::Hash,
pub chunk_hash: blake3::Hash,
pub resource: Resource,
}
#[derive(Clone, Debug)]
pub struct FileDownloadCompleted {
pub file_hash: blake3::Hash,
pub resource: Resource,
}
#[derive(Clone, Debug)]
pub struct DownloadCompleted {
pub file_hash: blake3::Hash,
pub file_path: PathBuf,
pub resource: Resource,
}
#[derive(Clone, Debug)]
pub struct ResourceRemoved {
pub file_hash: blake3::Hash,
}
#[derive(Clone, Debug)]
pub struct ChunkNotFound {
pub file_hash: blake3::Hash,
pub chunk_hash: blake3::Hash,
}
#[derive(Clone, Debug)]
pub struct FileNotFound {
pub file_hash: blake3::Hash,
pub resource: Resource,
}
#[derive(Clone, Debug)]
pub struct MissingChunks {
pub file_hash: blake3::Hash,
pub resource: Resource,
}
#[derive(Clone, Debug)]
pub struct DownloadError {
pub file_hash: blake3::Hash,
pub error: String,
}
#[derive(Clone, Debug)]
pub enum FudEvent {
DownloadStarted(DownloadStarted),
ChunkDownloadCompleted(ChunkDownloadCompleted),
FileDownloadCompleted(FileDownloadCompleted),
DownloadCompleted(DownloadCompleted),
ResourceRemoved(ResourceRemoved),
ChunkNotFound(ChunkNotFound),
FileNotFound(FileNotFound),
MissingChunks(MissingChunks),
DownloadError(DownloadError),
}
impl From<DownloadStarted> for JsonValue {
fn from(info: DownloadStarted) -> JsonValue {
json_map([
("file_hash", JsonValue::String(hash_to_string(&info.file_hash))),
("file_path", JsonValue::String(info.file_path.to_string_lossy().to_string())),
("resource", info.resource.into()),
])
}
}
impl From<ChunkDownloadCompleted> for JsonValue {
fn from(info: ChunkDownloadCompleted) -> JsonValue {
json_map([
("file_hash", JsonValue::String(hash_to_string(&info.file_hash))),
("chunk_hash", JsonValue::String(hash_to_string(&info.chunk_hash))),
("resource", info.resource.into()),
])
}
}
impl From<FileDownloadCompleted> for JsonValue {
fn from(info: FileDownloadCompleted) -> JsonValue {
json_map([
("file_hash", JsonValue::String(hash_to_string(&info.file_hash))),
("resource", info.resource.into()),
])
}
}
impl From<DownloadCompleted> for JsonValue {
fn from(info: DownloadCompleted) -> JsonValue {
json_map([
("file_hash", JsonValue::String(hash_to_string(&info.file_hash))),
("file_path", JsonValue::String(info.file_path.to_string_lossy().to_string())),
("resource", info.resource.into()),
])
}
}
impl From<ResourceRemoved> for JsonValue {
fn from(info: ResourceRemoved) -> JsonValue {
json_map([("file_hash", JsonValue::String(hash_to_string(&info.file_hash)))])
}
}
impl From<ChunkNotFound> for JsonValue {
fn from(info: ChunkNotFound) -> JsonValue {
json_map([
("file_hash", JsonValue::String(hash_to_string(&info.file_hash))),
("chunk_hash", JsonValue::String(hash_to_string(&info.chunk_hash))),
])
}
}
impl From<FileNotFound> for JsonValue {
fn from(info: FileNotFound) -> JsonValue {
json_map([
("file_hash", JsonValue::String(hash_to_string(&info.file_hash))),
("resource", info.resource.into()),
])
}
}
impl From<MissingChunks> for JsonValue {
fn from(info: MissingChunks) -> JsonValue {
json_map([
("file_hash", JsonValue::String(hash_to_string(&info.file_hash))),
("resource", info.resource.into()),
])
}
}
impl From<DownloadError> for JsonValue {
fn from(info: DownloadError) -> JsonValue {
json_map([
("file_hash", JsonValue::String(hash_to_string(&info.file_hash))),
("error", JsonValue::String(info.error)),
])
}
}
impl From<FudEvent> for JsonValue {
fn from(event: FudEvent) -> JsonValue {
match event {
FudEvent::DownloadStarted(info) => {
json_map([("event", json_str("download_started")), ("info", info.into())])
}
FudEvent::ChunkDownloadCompleted(info) => {
json_map([("event", json_str("chunk_download_completed")), ("info", info.into())])
}
FudEvent::FileDownloadCompleted(info) => {
json_map([("event", json_str("file_download_completed")), ("info", info.into())])
}
FudEvent::DownloadCompleted(info) => {
json_map([("event", json_str("download_completed")), ("info", info.into())])
}
FudEvent::ResourceRemoved(info) => {
json_map([("event", json_str("resource_removed")), ("info", info.into())])
}
FudEvent::ChunkNotFound(info) => {
json_map([("event", json_str("chunk_not_found")), ("info", info.into())])
}
FudEvent::FileNotFound(info) => {
json_map([("event", json_str("file_not_found")), ("info", info.into())])
}
FudEvent::MissingChunks(info) => {
json_map([("event", json_str("missing_chunks")), ("info", info.into())])
}
FudEvent::DownloadError(info) => {
json_map([("event", json_str("download_error")), ("info", info.into())])
}
// RPCAPI:
// Verifies local files. Takes a list of file hashes as parameters.
// An empty list means all known files.
// Returns the path where the file will be located once downloaded.
//
// --> {"jsonrpc": "2.0", "method": "verify", "params": ["1211...abfd"], "id": 42}
// <-- {"jsonrpc": "2.0", "result": [], "id": 1}
async fn verify(&self, id: u16, params: JsonValue) -> JsonResult {
let params = params.get::<Vec<JsonValue>>().unwrap();
if !params.iter().all(|param| param.is_string()) {
return JsonError::new(ErrorCode::InvalidParams, None, id).into()
}
let hashes = if params.is_empty() {
None
} else {
let hashes_str: Vec<String> =
params.iter().map(|param| param.get::<String>().unwrap().clone()).collect();
let hashes: Result<Vec<blake3::Hash>> = hashes_str
.into_iter()
.map(|hash_str| {
let mut buf = [0u8; 32];
bs58::decode(hash_str).onto(&mut buf)?;
Ok(blake3::Hash::from_bytes(buf))
})
.collect();
if hashes.is_err() {
return JsonError::new(ErrorCode::InvalidParams, None, id).into();
}
Some(hashes.unwrap())
};
if let Err(e) = self.verify_resources(hashes).await {
error!(target: "fud::verify()", "Could not verify resources: {}", e);
return JsonError::new(ErrorCode::InternalError, None, id).into();
}
JsonResponse::new(JsonValue::Array(vec![]), id).into()
}
}
@@ -504,10 +400,16 @@ impl Fud {
/// Handle `get` RPC request
pub async fn handle_get(&self, file_hash: &blake3::Hash, file_path: &PathBuf) -> Result<()> {
let self_node = self.dht().node().await;
let mut closest_nodes = vec![];
// Add path to the sled db
self.path_tree
.insert(file_hash.as_bytes(), file_path.to_string_lossy().to_string().as_bytes())?;
// Add resource to `self.resources`
let resource = Resource {
hash: *file_hash,
path: file_path.clone(),
status: ResourceStatus::Discovering,
chunks_total: 0,
chunks_downloaded: 0,
@@ -516,28 +418,37 @@ impl Fud {
resources_write.insert(*file_hash, resource.clone());
drop(resources_write);
// Send a DownloadStarted event
self.event_publisher
.notify(FudEvent::DownloadStarted(DownloadStarted {
file_hash: *file_hash,
file_path: file_path.clone(),
.notify(FudEvent::DownloadStarted(event::DownloadStarted {
hash: *file_hash,
resource,
}))
.await;
let mut closest_nodes = vec![];
let chunked_file = match self.geode.get(file_hash).await {
// Try to get the chunked file from geode
let chunked_file = match self.geode.get(file_hash, file_path).await {
// We already know the list of chunk hashes for this file
Ok(v) => v,
// The metadata in geode is invalid or corrupted
Err(Error::GeodeNeedsGc) => todo!(),
// If we could not find the file in geode, get the file metadata from the network
Err(Error::GeodeFileNotFound) => {
// Find nodes close to the file hash
info!(target: "self::get()", "Requested file {} not found in Geode, triggering fetch", hash_to_string(file_hash));
closest_nodes = self.lookup_nodes(file_hash).await.unwrap_or_default();
self.file_fetch_tx.send((closest_nodes.clone(), *file_hash, Ok(()))).await.unwrap();
// Fetch file metadata (list of chunk hashes)
self.file_fetch_tx
.send((closest_nodes.clone(), *file_hash, file_path.clone(), Ok(())))
.await
.unwrap();
info!(target: "self::get()", "Waiting for background file fetch task...");
let (i_file_hash, status) = self.file_fetch_end_rx.recv().await.unwrap();
match status {
Ok(()) => self.geode.get(&i_file_hash).await.unwrap(),
// The file metadata was found and inserted into geode
Ok(()) => self.geode.get(&i_file_hash, file_path).await.unwrap(),
// We could not find the file metadata
Err(Error::GeodeFileRouteNotFound) => {
// Set resource status to `Incomplete` and send FudEvent::FileNotFound
let mut resources_write = self.resources.write().await;
@@ -545,8 +456,8 @@ impl Fud {
resource.status = ResourceStatus::Incomplete;
self.event_publisher
.notify(FudEvent::FileNotFound(FileNotFound {
file_hash: *file_hash,
.notify(FudEvent::FileNotFound(event::FileNotFound {
hash: *file_hash,
resource: resource.clone(),
}))
.await;
@@ -581,60 +492,57 @@ impl Fud {
};
drop(resources_write);
// Send a FileDownloadCompleted event
self.event_publisher
.notify(FudEvent::FileDownloadCompleted(FileDownloadCompleted {
file_hash: *file_hash,
.notify(FudEvent::FileDownloadCompleted(event::FileDownloadCompleted {
hash: *file_hash,
resource: resource.clone(),
}))
.await;
// If the file is already complete, we don't need to download any chunk
if chunked_file.is_complete() {
// Announce the file
let self_announce =
FudAnnounce { key: *file_hash, seeders: vec![self_node.clone().into()] };
let _ = self.announce(file_hash, &self_announce, self.seeders_router.clone()).await;
return match self.geode.assemble_file(file_hash, &chunked_file, file_path).await {
Ok(_) => {
// Set resource status to `Seeding`
let mut resources_write = self.resources.write().await;
let resource = match resources_write.get_mut(file_hash) {
Some(resource) => {
resource.status = ResourceStatus::Seeding;
resource.chunks_downloaded = chunked_file.len() as u64;
resource.clone()
}
None => return Ok(()), // Resource was removed, abort
};
drop(resources_write);
self.event_publisher
.notify(FudEvent::DownloadCompleted(DownloadCompleted {
file_hash: *file_hash,
file_path: file_path.clone(),
resource,
}))
.await;
Ok(())
}
Err(e) => {
error!(target: "fud::handle_get()", "{}", e);
self.event_publisher
.notify(FudEvent::DownloadError(DownloadError {
file_hash: *file_hash,
error: e.to_string(),
}))
.await;
Err(e)
// Set resource status to `Seeding`
let mut resources_write = self.resources.write().await;
let resource = match resources_write.get_mut(file_hash) {
Some(resource) => {
resource.status = ResourceStatus::Seeding;
resource.chunks_downloaded = chunked_file.len() as u64;
resource.clone()
}
None => return Ok(()), // Resource was removed, abort
};
drop(resources_write);
// Send a DownloadCompleted event
self.event_publisher
.notify(FudEvent::DownloadCompleted(event::DownloadCompleted {
hash: *file_hash,
resource,
}))
.await;
return Ok(());
}
// Find nodes close to the file hash if we didn't previously fetched them
if closest_nodes.is_empty() {
closest_nodes = self.lookup_nodes(file_hash).await.unwrap_or_default();
}
let seeders = self.fetch_seeders(&closest_nodes, file_hash).await;
// Find seeders and remove ourselves from the result
let seeders = self
.fetch_seeders(&closest_nodes, file_hash)
.await
.iter()
.filter(|seeder| seeder.node.id != self_node.id)
.cloned()
.collect();
// List missing chunks
let mut missing_chunks = HashSet::new();
@@ -645,9 +553,10 @@ impl Fud {
}
// Fetch missing chunks from seeders
self.fetch_chunks(file_hash, &missing_chunks, &seeders).await?;
self.fetch_chunks(file_path, file_hash, &missing_chunks, &seeders).await?;
let chunked_file = match self.geode.get(file_hash).await {
// Get chunked file from geode
let chunked_file = match self.geode.get(file_hash, file_path).await {
Ok(v) => v,
Err(e) => {
error!(target: "fud::handle_get()", "{}", e);
@@ -669,12 +578,17 @@ impl Fud {
};
drop(resources_write);
// Send a MissingChunks event
self.event_publisher
.notify(FudEvent::MissingChunks(MissingChunks { file_hash: *file_hash, resource }))
.notify(FudEvent::MissingChunks(event::MissingChunks {
hash: *file_hash,
resource,
}))
.await;
return Ok(());
}
// Announce the file
let self_announce =
FudAnnounce { key: *file_hash, seeders: vec![self_node.clone().into()] };
let _ = self.announce(file_hash, &self_announce, self.seeders_router.clone()).await;
@@ -691,26 +605,13 @@ impl Fud {
};
drop(resources_write);
match self.geode.assemble_file(file_hash, &chunked_file, file_path).await {
Ok(_) => {
self.event_publisher
.notify(FudEvent::DownloadCompleted(DownloadCompleted {
file_hash: *file_hash,
file_path: file_path.clone(),
resource,
}))
.await;
}
Err(e) => {
error!(target: "fud::handle_get()", "{}", e);
self.event_publisher
.notify(FudEvent::DownloadError(DownloadError {
file_hash: *file_hash,
error: e.to_string(),
}))
.await;
}
};
// Send a DownloadCompleted event
self.event_publisher
.notify(FudEvent::DownloadCompleted(event::DownloadCompleted {
hash: *file_hash,
resource,
}))
.await;
Ok(())
}

View File

@@ -46,7 +46,7 @@ pub enum FetchReply {
pub async fn fetch_file_task(fud: Arc<Fud>) -> Result<()> {
info!(target: "fud::fetch_file_task()", "Started background file fetch task");
loop {
let (nodes, file_hash, _) = fud.file_fetch_rx.recv().await.unwrap();
let (nodes, file_hash, file_path, _) = fud.file_fetch_rx.recv().await.unwrap();
info!(target: "fud::fetch_file_task()", "Fetching file {}", hash_to_string(&file_hash));
let result = fud.fetch_file_metadata(nodes, file_hash).await;
@@ -69,7 +69,7 @@ pub async fn fetch_file_task(fud: Arc<Fud>) -> Result<()> {
info!(target: "fud::fetch_file_task()", "File fits in a single chunk");
let chunk_hash = blake3::hash(&chunk);
let _ = fud.geode.insert_file(&file_hash, &[chunk_hash]).await;
match fud.geode.insert_chunk(&chunk).await {
match fud.geode.write_chunk(&file_hash, &file_path, &chunk).await {
Ok(_) => {}
Err(e) => {
error!(
@@ -104,7 +104,7 @@ pub async fn announce_seed_task(fud: Arc<Fud>) -> Result<()> {
let seeders = vec![fud.dht().node().await.into()];
info!(target: "fud::announce_task()", "Verifying seeds...");
let seeding_resources = match fud.get_seeding_resources().await {
let seeding_resources = match fud.verify_resources(None).await {
Ok(resources) => resources,
Err(e) => {
error!(target: "fud::announce_task()", "Error while verifying seeding resources: {}", e);

View File

@@ -23,20 +23,20 @@
//! `remove` support. File removal should be handled externally, and then it
//! is only required to run `garbage_collect()` to clean things up.
//!
//! The filesystem hierarchy stores two directories: `files` and `chunks`.
//! `chunks` store [`MAX_CHUNK_SIZE`] files, where the filename is a BLAKE3
//! hash of the chunk's contents.
//! `files` store metadata about a full file, which can be retrieved by
//! concatenating the chunks in order. The filename of a file in `files`
//! is the BLAKE3 hash of hashed chunks in the correct order.
//! The filesystem hierarchy stores a `files` directory storing metadata
//! about a full file. The filename of a file in `files` is the BLAKE3
//! hash of hashed chunks in the correct order. Inside the file is the list
//! of the chunks making up the full file.
//!
//! To get the chunks you split the full file into `MAX_CHUNK_SIZE` sized
//! slices, where the last chunk is the only one that can be smaller than
//! that.
//!
//! It might look like the following:
//! ```
//! /files/B9fFKaEYphw2oH5PDbeL1TTAcSzL6ax84p8SjBKzuYzX
//! /files/8nA3ndjFFee3n5wMPLZampLpGaMJi3od4MSyaXPDoF91
//! /files/...
//! /chunks/2bQPxSR8Frz7S7JW3DRAzEtkrHfLXB1CN65V7az77pUp
//! /chunks/CvjvN6MfWQYK54DgKNR7MPgFSZqsCgpWKF2p8ot66CCP
//! /chunks/...
//! ```
//!
//! In the above example, contents of `B9fFKaEYphw2oH5PDbeL1TTAcSzL6ax84p8SjBKzuYzX`
@@ -46,19 +46,15 @@
//! CvjvN6MfWQYK54DgKNR7MPgFSZqsCgpWKF2p8ot66CCP
//! ```
//!
//! This means, in order to retrieve `B9fFKaEYphw2oH5PDbeL1TTAcSzL6ax84p8SjBKzuYzX`,
//! we need to concatenate the files under `/chunks` whose filenames are the
//! hashes found above. The contents of the files in `/chunks` are arbitrary
//! data, and by concatenating them we can retrieve the original file.
//! This means, the file `B9fFKaEYphw2oH5PDbeL1TTAcSzL6ax84p8SjBKzuYzX`
//! is the concatenation of the chunks with the above hashes.
//!
//! It is important to note that multiple files can use the same chunks.
//! This is some kind of naive deduplication, so we actually don't consider
//! chunks to be specific to a single file and therefore when we do garbage
//! collection, we keep chunks and files independent of each other.
//! The full file is not copied, and individual chunks are not stored by
//! geode. Additionally it does not keep track of the full files path.
use std::{collections::HashSet, path::PathBuf};
use futures::AsyncRead;
use futures::{AsyncRead, AsyncSeek};
use log::{debug, info, warn};
use smol::{
fs::{self, File, OpenOptions},
@@ -76,8 +72,6 @@ pub const MAX_CHUNK_SIZE: usize = 262_144;
/// Path prefix where file metadata is stored
const FILES_PATH: &str = "files";
/// Path prefix where file chunks are stored
const CHUNKS_PATH: &str = "chunks";
pub fn hash_to_string(hash: &blake3::Hash) -> String {
bs58::encode(hash.as_bytes()).into_string()
@@ -91,7 +85,7 @@ pub fn hash_to_string(hash: &blake3::Hash) -> String {
/// the filesystem where the chunk can be found. If `None`, it
/// is to be assumed that the chunk is not available locally.
#[derive(Clone)]
pub struct ChunkedFile(Vec<(blake3::Hash, Option<PathBuf>)>);
pub struct ChunkedFile(Vec<(blake3::Hash, Option<bool>)>);
impl ChunkedFile {
fn new(hashes: &[blake3::Hash]) -> Self {
@@ -104,7 +98,7 @@ impl ChunkedFile {
}
/// Return an iterator over the chunks and their paths.
pub fn iter(&self) -> core::slice::Iter<'_, (blake3::Hash, Option<PathBuf>)> {
pub fn iter(&self) -> core::slice::Iter<'_, (blake3::Hash, Option<bool>)> {
self.0.iter()
}
@@ -128,8 +122,6 @@ impl ChunkedFile {
pub struct Geode {
/// Path to the filesystem directory where file metadata is stored
files_path: PathBuf,
/// Path to the filesystem directory where file chunks are stored
chunks_path: PathBuf,
}
/// smol::fs::File::read does not guarantee that the buffer will be filled, even if the buffer is
@@ -158,15 +150,12 @@ impl Geode {
/// file metadata and chunks.
pub async fn new(base_path: &PathBuf) -> Result<Self> {
let mut files_path: PathBuf = base_path.into();
let mut chunks_path: PathBuf = base_path.into();
files_path.push(FILES_PATH);
chunks_path.push(CHUNKS_PATH);
// Create necessary directory structure if needed
fs::create_dir_all(&files_path).await?;
fs::create_dir_all(&chunks_path).await?;
Ok(Self { files_path, chunks_path })
Ok(Self { files_path })
}
/// Attempt to read chunk hashes from a given file path and return
@@ -188,75 +177,11 @@ impl Geode {
}
/// Perform garbage collection over the filesystem hierarchy.
/// Returns sets representing deleted files and deleted chunks, respectively.
pub async fn garbage_collect(&self) -> Result<(HashSet<blake3::Hash>, HashSet<blake3::Hash>)> {
/// Returns a set representing deleted files.
pub async fn garbage_collect(&self) -> Result<HashSet<blake3::Hash>> {
info!(target: "geode::garbage_collect()", "[Geode] Performing garbage collection");
// We track corrupt files and chunks here.
// We track corrupt files here.
let mut deleted_files = HashSet::new();
let mut deleted_chunks = HashSet::new();
let mut deleted_chunk_paths = HashSet::new();
// Scan through available chunks and check them for consistency.
let mut chunk_paths = fs::read_dir(&self.chunks_path).await?;
let mut buf = [0u8; MAX_CHUNK_SIZE];
while let Some(chunk) = chunk_paths.next().await {
let Ok(entry) = chunk else { continue };
let chunk_path = entry.path();
// Skip if we're not a plain file
if !chunk_path.is_file() {
continue
}
// Make sure that the filename is a BLAKE3 hash
let file_name = match chunk_path.file_name().and_then(|n| n.to_str()) {
Some(v) => v,
None => continue,
};
let mut hash_buf = [0u8; 32];
let chunk_hash = match bs58::decode(file_name).onto(&mut hash_buf) {
Ok(_) => blake3::Hash::from_bytes(hash_buf),
Err(_) => continue,
};
// If there is a problem with opening the file, remove it.
let Ok(mut chunk_fd) = File::open(&chunk_path).await else {
deleted_chunk_paths.insert(chunk_path);
deleted_chunks.insert(chunk_hash);
continue
};
// Perform consistency check
let Ok(bytes_read) = read_until_filled(&mut chunk_fd, &mut buf).await else {
deleted_chunk_paths.insert(chunk_path);
deleted_chunks.insert(chunk_hash);
buf = [0u8; MAX_CHUNK_SIZE];
continue
};
let chunk_slice = &buf[..bytes_read];
let hashed_chunk = blake3::hash(chunk_slice);
// If the hash doesn't match the filename, remove it.
if chunk_hash != hashed_chunk {
deleted_chunk_paths.insert(chunk_path);
deleted_chunks.insert(chunk_hash);
buf = [0u8; MAX_CHUNK_SIZE];
continue
}
// Seems legit.
buf = [0u8; MAX_CHUNK_SIZE];
}
for chunk_path in &deleted_chunk_paths {
if let Err(e) = fs::remove_file(chunk_path).await {
warn!(
target: "geode::garbage_collect()",
"[Geode] Garbage collect failed to remove corrupted chunk: {}", e,
);
}
}
// Perform health check over file metadata. For now we just ensure they
// have the correct format.
@@ -298,7 +223,7 @@ impl Geode {
}
info!(target: "geode::garbage_collect()", "[Geode] Garbage collection finished");
Ok((deleted_files, deleted_chunks))
Ok(deleted_files)
}
/// Insert a file into Geode. The function expects any kind of byte stream, which
@@ -312,9 +237,9 @@ impl Geode {
info!(target: "geode::insert()", "[Geode] Inserting file...");
let mut file_hasher = blake3::Hasher::new();
let mut chunk_hashes = vec![];
let mut buf = [0u8; MAX_CHUNK_SIZE];
loop {
let mut buf = [0u8; MAX_CHUNK_SIZE];
let bytes_read = read_until_filled(&mut stream, &mut buf).await?;
if bytes_read == 0 {
break
@@ -324,47 +249,6 @@ impl Geode {
let chunk_hash = blake3::hash(chunk_slice);
file_hasher.update(chunk_hash.as_bytes());
chunk_hashes.push(chunk_hash);
// Write the chunk to a file, if necessary. We first perform
// a consistency check and if things are fine, we don't have
// to perform a write, which is usually more expensive than
// reading from disk.
let mut chunk_path = self.chunks_path.clone();
chunk_path.push(hash_to_string(&chunk_hash).as_str());
let chunk_fd =
OpenOptions::new().read(true).write(true).create(true).open(&chunk_path).await?;
let mut fs_buf = [0u8; MAX_CHUNK_SIZE];
let fs_bytes_read = read_until_filled(chunk_fd, &mut fs_buf).await?;
let fs_chunk_slice = &fs_buf[..fs_bytes_read];
let fs_chunk_hash = blake3::hash(fs_chunk_slice);
if fs_chunk_hash != chunk_hash {
debug!(
target: "geode::insert()",
"Existing chunk inconsistent or unavailable. Writing chunk to {:?}",
chunk_path,
);
// Here the chunk is broken, so we'll truncate and write the new one.
let mut chunk_fd = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.open(&chunk_path)
.await?;
chunk_fd.set_len(0).await?;
chunk_fd.seek(SeekFrom::Start(0)).await?;
chunk_fd.write_all(chunk_slice).await?;
chunk_fd.flush().await?;
} else {
debug!(
target: "geode::insert()",
"Existing chunk consistent. Skipping write to {:?}",
chunk_path,
);
}
buf = [0u8; MAX_CHUNK_SIZE];
}
// This hash is the file's chunks hashes hashed in order.
@@ -410,10 +294,16 @@ impl Geode {
Ok(())
}
/// Create and insert a single chunk into Geode given a stream.
/// Write a single chunk into `file_path` given a stream.
/// The file must be inserted into Geode before calling this method.
/// Always overwrites any existing chunk. Returns the chunk hash once inserted.
pub async fn insert_chunk(&self, stream: impl AsRef<[u8]>) -> Result<blake3::Hash> {
info!(target: "geode::insert_chunk()", "[Geode] Inserting single chunk");
pub async fn write_chunk(
&self,
file_hash: &blake3::Hash,
file_path: &PathBuf,
stream: impl AsRef<[u8]>,
) -> Result<blake3::Hash> {
info!(target: "geode::write_chunk()", "[Geode] Writing single chunk");
let mut cursor = Cursor::new(&stream);
let mut chunk = [0u8; MAX_CHUNK_SIZE];
@@ -422,27 +312,43 @@ impl Geode {
let chunk_slice = &chunk[..bytes_read];
let chunk_hash = blake3::hash(chunk_slice);
let mut chunk_path = self.chunks_path.clone();
chunk_path.push(hash_to_string(&chunk_hash).as_str());
let mut chunk_fd = File::create(&chunk_path).await?;
chunk_fd.write_all(chunk_slice).await?;
chunk_fd.flush().await?;
let chunked_file = self.get(file_hash, file_path).await?;
// Get the chunk index in the file from the chunk hash
let chunk_index = match chunked_file.iter().position(|c| c.0 == chunk_hash) {
Some(index) => index,
None => {
return Err(Error::GeodeNeedsGc);
}
};
let position = (chunk_index as u64) * (MAX_CHUNK_SIZE as u64);
// Create the file if it does not exist
if !file_path.exists() {
File::create(&file_path).await?;
}
let mut file_fd = OpenOptions::new().write(true).open(&file_path).await?;
file_fd.seek(SeekFrom::Start(position)).await?;
file_fd.write_all(chunk_slice).await?;
file_fd.flush().await?;
Ok(chunk_hash)
}
/// Fetch file metadata from Geode. Returns [`ChunkedFile`] which gives a list
/// of chunks and optionally file paths to the said chunks. Returns an error if
/// of chunks and booleans to know if the chunks we have are valid. Returns an error if
/// the read failed in any way (could also be the file does not exist).
pub async fn get(&self, file_hash: &blake3::Hash) -> Result<ChunkedFile> {
pub async fn get(&self, file_hash: &blake3::Hash, file_path: &PathBuf) -> Result<ChunkedFile> {
let file_hash_str = hash_to_string(file_hash);
info!(target: "geode::get()", "[Geode] Getting file chunks for {}...", file_hash_str);
let mut file_path = self.files_path.clone();
file_path.push(file_hash_str);
let mut file_metadata_path = self.files_path.clone();
file_metadata_path.push(file_hash_str);
// Try to read the file metadata. If it's corrupt, return an error signalling
// that garbage collection needs to run.
let chunk_hashes = match Self::read_metadata(&file_path).await {
let chunk_hashes = match Self::read_metadata(&file_metadata_path).await {
Ok(v) => v,
Err(e) => {
return match e {
@@ -461,63 +367,95 @@ impl Geode {
let mut chunked_file = ChunkedFile::new(&chunk_hashes);
// Iterate over chunks and find which chunks we have available locally.
let mut buf = vec![];
for (chunk_hash, chunk_path) in chunked_file.0.iter_mut() {
let mut c_path = self.chunks_path.clone();
c_path.push(hash_to_string(chunk_hash).as_str());
if !c_path.exists() || !c_path.is_file() {
// TODO: We should be aggressive here and remove the non-file.
continue
// Open the file, if we can't we return the chunked file with no locally available chunk.
let mut file = match File::open(&file_path).await {
Ok(v) => v,
Err(_) => {
return Ok(chunked_file);
}
};
// Iterate over chunks and find which chunks we have available locally.
for (chunk_index, (chunk_hash, chunk_valid)) in chunked_file.0.iter_mut().enumerate() {
let chunk = self.read_chunk(&mut file, &chunk_index).await?;
// Perform chunk consistency check
let mut chunk_fd = File::open(&c_path).await?;
let bytes_read = chunk_fd.read_to_end(&mut buf).await?;
let chunk_slice = &buf[..bytes_read];
let hashed_chunk = blake3::hash(chunk_slice);
if &hashed_chunk != chunk_hash {
// The chunk is corrupted/inconsistent. Garbage collection should run.
buf = vec![];
if !self.verify_chunk(chunk_hash, &chunk) {
continue
}
*chunk_path = Some(c_path);
buf = vec![];
*chunk_valid = Some(true);
}
Ok(chunked_file)
}
/// Fetch a single chunk from Geode. Returns a `PathBuf` pointing to the chunk
/// Fetch a single chunk from Geode. Returns a Vec containing the chunk content
/// if it is found.
pub async fn get_chunk(&self, chunk_hash: &blake3::Hash) -> Result<PathBuf> {
let chunk_hash_str = hash_to_string(chunk_hash);
info!(target: "geode::get_chunk()", "[Geode] Getting chunk {}", chunk_hash_str);
let mut chunk_path = self.chunks_path.clone();
chunk_path.push(chunk_hash_str);
pub async fn get_chunk(
&self,
chunk_hash: &blake3::Hash,
file_hash: &blake3::Hash,
file_path: &PathBuf,
) -> Result<Vec<u8>> {
info!(target: "geode::get_chunk()", "[Geode] Getting chunk {}", hash_to_string(chunk_hash));
if !chunk_path.exists() || !chunk_path.is_file() {
// TODO: We should be aggressive here and remove the non-file.
if !file_path.exists() || !file_path.is_file() {
return Err(Error::GeodeChunkNotFound)
}
let mut file_metadata_path = self.files_path.clone();
file_metadata_path.push(hash_to_string(file_hash));
// Try to read the file metadata. If it's corrupt, return an error signalling
// that garbage collection needs to run.
let chunk_hashes = match Self::read_metadata(&file_metadata_path).await {
Ok(v) => v,
Err(e) => {
return match e {
// If the file is not found, return according error.
Error::Io(std::io::ErrorKind::NotFound) => Err(Error::GeodeFileNotFound),
// Anything else should tell the client to do garbage collection
_ => Err(Error::GeodeNeedsGc),
}
}
};
// Get the chunk index in the file from the chunk hash
let chunk_index = match chunk_hashes.iter().position(|&h| h == *chunk_hash) {
Some(index) => index,
None => return Err(Error::GeodeChunkNotFound),
};
// Read the file to get the chunk content
let mut file = File::open(&file_path).await?;
let chunk = self.read_chunk(&mut file, &chunk_index).await?;
// Perform chunk consistency check
let mut buf = vec![];
let mut chunk_fd = File::open(&chunk_path).await?;
let bytes_read = chunk_fd.read_to_end(&mut buf).await?;
if !self.verify_chunk(chunk_hash, &buf[..bytes_read]) {
// The chunk is corrupted
if !self.verify_chunk(chunk_hash, &chunk) {
return Err(Error::GeodeNeedsGc)
}
Ok(chunk_path)
Ok(chunk)
}
/// Read the file at `file_path` to get its chunk with index `chunk_index`.
/// Returns the chunk content in a Vec.
pub async fn read_chunk(
&self,
mut stream: impl AsyncRead + Unpin + AsyncSeek,
chunk_index: &usize,
) -> Result<Vec<u8>> {
let position = (*chunk_index as u64) * (MAX_CHUNK_SIZE as u64);
let mut buf = [0u8; MAX_CHUNK_SIZE];
stream.seek(SeekFrom::Start(position)).await?;
let bytes_read = read_until_filled(stream, &mut buf).await?;
Ok(buf[..bytes_read].to_vec())
}
/// Verifies that the file hash matches the chunk hashes.
pub fn verify_file(&self, file_hash: &blake3::Hash, chunk_hashes: &[blake3::Hash]) -> bool {
info!(target: "geode::verify_file()", "[Geode] Verifying file metadata");
info!(target: "geode::verify_file()", "[Geode] Verifying file metadata for {}", hash_to_string(file_hash));
let mut file_hasher = blake3::Hasher::new();
for chunk_hash in chunk_hashes {
@@ -531,54 +469,4 @@ impl Geode {
pub fn verify_chunk(&self, chunk_hash: &blake3::Hash, chunk_slice: &[u8]) -> bool {
blake3::hash(chunk_slice) == *chunk_hash
}
/// Assemble chunks to create a file.
/// This method does NOT perform a consistency check.
pub async fn assemble_file(
&self,
file_hash: &blake3::Hash,
chunked_file: &ChunkedFile,
file_path: &PathBuf,
) -> Result<()> {
let file_hash_str = hash_to_string(file_hash);
info!(target: "geode::assemble_file()", "[Geode] Assembling file {}", file_hash_str);
if file_path.exists() && file_path.is_dir() {
return Err(Error::Custom("File path is an existing directory".to_string())) // TODO
}
let mut file_fd = File::create(&file_path).await?;
for (_, chunk_path) in chunked_file.iter() {
let mut buf = vec![];
let mut chunk_fd = File::open(chunk_path.clone().unwrap()).await?;
let bytes_read = chunk_fd.read_to_end(&mut buf).await?;
let chunk_slice = &buf[..bytes_read];
file_fd.write(chunk_slice).await?;
file_fd.flush().await?;
}
Ok(())
}
/// List file hashes.
pub async fn list_files(&self) -> Result<Vec<blake3::Hash>> {
info!(target: "geode::list_files()", "[Geode] Listing files");
let mut dir = fs::read_dir(&self.files_path).await?;
let mut file_hashes = vec![];
while let Some(file) = dir.try_next().await? {
let os_file_name = file.file_name();
let file_name = os_file_name.to_string_lossy().to_string();
let mut hash_buf = [0u8; 32];
let file_hash = match bs58::decode(file_name).onto(&mut hash_buf) {
Ok(_) => blake3::Hash::from_bytes(hash_buf),
Err(_) => continue,
};
file_hashes.push(file_hash);
}
Ok(file_hashes)
}
}