diff --git a/Cargo.lock b/Cargo.lock index 799179b68..b301288ed 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2169,24 +2169,6 @@ dependencies = [ "syn 1.0.109", ] -[[package]] -name = "dhtd" -version = "0.4.1" -dependencies = [ - "async-std", - "async-trait", - "blake3", - "darkfi", - "darkfi-serial", - "easy-parallel", - "libsqlite3-sys", - "log", - "rand 0.8.5", - "simplelog", - "smol", - "url", -] - [[package]] name = "digest" version = "0.9.0" @@ -2771,20 +2753,6 @@ dependencies = [ "winapi", ] -[[package]] -name = "fu" -version = "0.4.1" -dependencies = [ - "async-std", - "clap 4.3.19", - "darkfi", - "libsqlite3-sys", - "log", - "serde_json", - "simplelog", - "url", -] - [[package]] name = "fud" version = "0.4.1" diff --git a/Cargo.toml b/Cargo.toml index 11d8bc42a..5ef0bc7ae 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,7 +25,7 @@ members = [ "bin/drk", "bin/faucetd", #"bin/fud/fu", - #"bin/fud/fud", + "bin/fud/fud", "bin/genev/genevd", "bin/genev/genev-cli", "bin/darkirc", @@ -171,7 +171,6 @@ blockchain = [ geode = [ "async-std", "blake3", - "log", ] event-graph = [ diff --git a/bin/fud/fud/.gitignore b/bin/fud/fud/.gitignore deleted file mode 100644 index 96ef6c0b9..000000000 --- a/bin/fud/fud/.gitignore +++ /dev/null @@ -1,2 +0,0 @@ -/target -Cargo.lock diff --git a/bin/fud/fud/Cargo.toml b/bin/fud/fud/Cargo.toml index 79716e7ba..9fc30f908 100644 --- a/bin/fud/fud/Cargo.toml +++ b/bin/fud/fud/Cargo.toml @@ -9,8 +9,8 @@ homepage = "https://dark.fi" repository = "https://github.com/darkrenaissance/darkfi" [dependencies] -darkfi = {path = "../../../", features = ["dht", "rpc"]} -darkfi-serial = {path = "../../../src/serial"} +darkfi = {path = "../../../", features = ["geode", "rpc"]} +darkfi-serial = {path = "../../../src/serial", features = ["hash"]} # Misc async-trait = "0.1.72" diff --git a/bin/fud/fud/src/error.rs b/bin/fud/fud/src/error.rs deleted file mode 100644 index 043a6011f..000000000 --- a/bin/fud/fud/src/error.rs +++ /dev/null @@ -1,48 +0,0 @@ -/* This file is part of DarkFi (https://dark.fi) - * - * Copyright (C) 2020-2023 Dyne.org foundation - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation, either version 3 of the - * License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -use serde_json::Value; - -use darkfi::rpc::jsonrpc::{ErrorCode::ServerError, JsonError, JsonResult}; - -pub enum RpcError { - UnknownKey = -35107, - QueryFailed = -35108, - KeyInsertFail = -35110, - KeyRemoveFail = -35111, - WaitingNetworkError = -35112, - FileGenerationFail = -35113, -} - -fn to_tuple(e: RpcError) -> (i64, String) { - let msg = match e { - RpcError::UnknownKey => "Did not find key", - RpcError::QueryFailed => "Failed to query key", - RpcError::KeyInsertFail => "Failed to insert key", - RpcError::KeyRemoveFail => "Failed to remove key", - RpcError::WaitingNetworkError => "Error while waiting network response.", - RpcError::FileGenerationFail => "Failed to generate file for key", - }; - - (e as i64, msg.to_string()) -} - -pub fn server_error(e: RpcError, id: Value) -> JsonResult { - let (code, msg) = to_tuple(e); - JsonError::new(ServerError(code), Some(msg), id).into() -} diff --git a/bin/fud/fud/src/main.rs b/bin/fud/fud/src/main.rs index 06ac14361..1a3b55131 100644 --- a/bin/fud/fud/src/main.rs +++ b/bin/fud/fud/src/main.rs @@ -16,343 +16,156 @@ * along with this program. If not, see . */ -use std::{collections::HashSet, fs, path::PathBuf}; +use std::collections::{HashMap, HashSet}; -use async_std::{stream::StreamExt, sync::Arc}; +use async_std::{ + fs::File, + stream::StreamExt, + sync::{Arc, RwLock}, +}; use async_trait::async_trait; -use darkfi_serial::serialize; -use log::{debug, error, info, warn}; +use log::{error, info}; use serde_json::{json, Value}; -use structopt_toml::{serde::Deserialize, structopt::StructOpt, StructOptToml}; +use smol::Executor; +use structopt_toml::{structopt::StructOpt, StructOptToml}; use url::Url; use darkfi::{ async_daemonize, cli_desc, - dht::{waiting_for_response, Dht, DhtPtr}, - net, + geode::Geode, + net::{self, settings::SettingsOpt, P2p, P2pPtr}, rpc::{ - jsonrpc::{ - ErrorCode, - ErrorCode::{InvalidParams, MethodNotFound}, - JsonError, JsonRequest, JsonResponse, JsonResult, - }, + jsonrpc::{ErrorCode, JsonError, JsonRequest, JsonResponse, JsonResult}, server::{listen_and_serve, RequestHandler}, }, util::path::expand_path, Result, }; -mod error; -use error::{server_error, RpcError}; +/// P2P protocols +mod proto; +use proto::{FudFilePut, ProtocolFud}; const CONFIG_FILE: &str = "fud_config.toml"; const CONFIG_FILE_CONTENTS: &str = include_str!("../fud_config.toml"); -#[derive(Clone, Debug, Deserialize, StructOpt, StructOptToml)] +#[derive(Clone, Debug, serde::Deserialize, StructOpt, StructOptToml)] #[serde(default)] #[structopt(name = "fud", about = cli_desc!())] struct Args { - #[structopt(short, long)] - /// Configuration file to use - config: Option, - - #[structopt(long, default_value = "~/.config/darkfi/fud")] - /// Path to the contents directory - folder: String, + #[structopt(short, parse(from_occurrences))] + /// Increase verbosity (-vvv supported) + verbose: u8, #[structopt(long, default_value = "tcp://127.0.0.1:13336")] /// JSON-RPC listen URL rpc_listen: Url, - #[structopt(long)] - /// P2P accept addresses (repeatable flag) - p2p_accept: Vec, - - #[structopt(long)] - /// P2P external addresses (repeatable flag) - p2p_external: Vec, - - #[structopt(long, default_value = "8")] - /// Connection slots - slots: usize, - - #[structopt(long)] - /// Connect to seed (repeatable flag) - seeds: Vec, - - #[structopt(long)] - /// Connect to peer (repeatable flag) - peers: Vec, - - #[structopt(long)] - /// Prefered transports for outbound connections (repeatable flag) - transports: Vec, - - #[structopt(long)] - /// Enable localnet hosts - localnet: bool, - - #[structopt(long)] - /// Enable channel log - channel_log: bool, - #[structopt(short, long)] - /// Set log file to ouput into + /// Configuration file to use + config: Option, + + #[structopt(long)] + /// Set log file path to output daemon logs into log: Option, - #[structopt(short, parse(from_occurrences))] - /// Increase verbosity (-vvv supported) - verbose: u8, + #[structopt(long, default_value = "~/.local/share/fud")] + /// Base directory for filesystem storage + base_dir: String, + + #[structopt(flatten)] + /// Network settings + net: SettingsOpt, } -/// Struct representing the daemon. pub struct Fud { - /// Daemon dht state - dht: DhtPtr, + /// Routing table for file metadata + metadata_router: Arc>>>, + /// Routing table for file chunks + chunks_router: Arc>>>, + /// Pointer to the P2P network instance + p2p: P2pPtr, + /// The Geode instance + geode: Geode, +} - /// Path to the contents directory - folder: PathBuf, +#[async_trait] +impl RequestHandler for Fud { + async fn handle_request(&self, req: JsonRequest) -> JsonResult { + if !req.params.is_array() { + return JsonError::new(ErrorCode::InvalidParams, None, req.id).into() + } + + let params = req.params.as_array().unwrap(); + + match req.method.as_str() { + Some("put") => return self.put(req.id, params).await, + Some("get") => return self.get(req.id, params).await, + + Some("ping") => return self.pong(req.id, params).await, + Some("dnet_switch") => return self.dnet_switch(req.id, params).await, + Some("dnet_info") => return self.dnet_info(req.id, params).await, + Some(_) | None => return JsonError::new(ErrorCode::MethodNotFound, None, req.id).into(), + } + } } impl Fud { - pub async fn new(dht: DhtPtr, folder: PathBuf) -> Result { - Ok(Self { dht, folder }) - } - - /// Initialize fud dht state by reading the contents folder and generating - /// the corresponding dht records. - async fn init(&self) -> Result<()> { - info!("Initializing fud dht state for folder: {:?}", self.folder); - - if !self.folder.exists() { - fs::create_dir_all(&self.folder)?; - } - - let entries = fs::read_dir(&self.folder).unwrap(); - { - let mut lock = self.dht.write().await; - - // Sync lookup map with network - if let Err(e) = lock.sync_lookup_map().await { - error!("Failed to sync lookup map: {}", e); - } - - for entry in entries { - let e = entry.unwrap(); - let name = String::from(e.file_name().to_str().unwrap()); - info!("Entry: {}", name); - let key_hash = blake3::hash(&serialize(&name)); - let value: Vec = std::fs::read(e.path()).unwrap(); - if let Err(e) = lock.insert(key_hash, value).await { - error!("Failed to insert key: {}", e); - } - } - } - - Ok(()) - } - - /// Signaling fud network that node goes offline. - async fn disconnect(&self) -> Result<()> { - debug!("Peer disconnecting, signaling network"); - - { - let mut lock = self.dht.write().await; - let records = lock.map.clone(); - for key in records.keys() { - let result = lock.remove(*key).await; - match result { - Ok(option) => match option { - Some(k) => { - debug!("Hash key removed: {}", k); - } - None => { - warn!("Did not find key: {}", key); - } - }, - Err(e) => { - error!("Failed to remove key: {}", e); - } - } - } - } - - Ok(()) - } - // RPCAPI: - // Returns all folder contents, with file changes. - // --> {"jsonrpc": "2.0", "method": "list", "params": [], "id": 1} - // <-- {"jsonrpc": "2.0", "result": "[[files],[new],[deleted]", "id": 1} - pub async fn list(&self, id: Value, _params: &[Value]) -> JsonResult { - let mut content = HashSet::new(); - let mut new = HashSet::new(); - let mut deleted = HashSet::new(); - - let entries = fs::read_dir(&self.folder).unwrap(); - let records = self.dht.read().await.map.clone(); - let mut entries_hashes = HashSet::new(); - - // We iterate files for new records - for entry in entries { - let e = entry.unwrap(); - let name = String::from(e.file_name().to_str().unwrap()); - let key_hash = blake3::hash(&serialize(&name)); - entries_hashes.insert(key_hash); - - if records.contains_key(&key_hash) { - content.insert(name.clone()); - } else { - new.insert(name); - } - } - - // We check records for removed files - for key in records.keys() { - if entries_hashes.contains(key) { - continue - } - deleted.insert(key.to_string()); - } - - JsonResponse::new(json!((content, new, deleted)), id).into() - } - - // RPCAPI: - // Iterate contents folder and dht for potential changes. - // --> {"jsonrpc": "2.0", "method": "sync", "params": [], "id": 1} - // <-- {"jsonrpc": "2.0", "result": "true", "id": 1} - pub async fn sync(&self, id: Value, _params: &[Value]) -> JsonResult { - info!("Sync process started"); - - let entries = fs::read_dir(&self.folder).unwrap(); - { - let mut lock = self.dht.write().await; - let records = lock.map.clone(); - let mut entries_hashes = HashSet::new(); - - // We iterate files for new records - for entry in entries { - let e = entry.unwrap(); - let name = String::from(e.file_name().to_str().unwrap()); - info!("Entry: {}", name); - let key_hash = blake3::hash(&serialize(&name)); - entries_hashes.insert(key_hash); - - if records.contains_key(&key_hash) { - continue - } - - let value: Vec = std::fs::read(e.path()).unwrap(); - if let Err(e) = lock.insert(key_hash, value).await { - error!("Failed to insert key: {}", e); - return server_error(RpcError::KeyInsertFail, id) - } - } - - // We check records for removed files - let records = lock.map.clone(); - for key in records.keys() { - if entries_hashes.contains(key) { - continue - } - - let result = lock.remove(*key).await; - match result { - Ok(option) => match option { - Some(k) => { - debug!("Hash key removed: {}", k); - } - None => { - warn!("Did not find key: {}", key); - } - }, - Err(e) => { - error!("Failed to remove key: {}", e); - return server_error(RpcError::KeyRemoveFail, id) - } - } - } - } - - JsonResponse::new(json!(true), id).into() - } - - // RPCAPI: - // Checks if provided key exists and retrieve it from the local map or queries the network. - // Returns key or not found message. - // --> {"jsonrpc": "2.0", "method": "get", "params": ["name"], "id": 1} - // <-- {"jsonrpc": "2.0", "result": "path", "id": 1} - async fn get(&self, id: Value, params: &[Value]) -> JsonResult { + // Put a file onto the network. Takes a local filesystem path as a parameter. + // Returns the fil hashe that serves as a pointer to the uploaded file. + // + // --> {"jsonrpc": "2.0", "method": "put", "params": ["/foo.txt"], "id": 42} + // <-- {"jsonrpc": "2.0", "result: "df4...3db7", "id": 42} + async fn put(&self, id: Value, params: &[Value]) -> JsonResult { if params.len() != 1 || !params[0].is_string() { - return JsonError::new(InvalidParams, None, id).into() + return JsonError::new(ErrorCode::InvalidParams, None, id).into() } - let key = params[0].as_str().unwrap().to_string(); - let key_hash = blake3::hash(&serialize(&key)); + let path = match expand_path(params[0].as_str().unwrap()) { + Ok(v) => v, + Err(_) => return JsonError::new(ErrorCode::InvalidParams, None, id).into(), + }; - // We execute this sequence to prevent lock races between threads - // Verify key exists - let exists = self.dht.read().await.contains_key(key_hash); - if exists.is_none() { - info!("Did not find key: {}", key); - return server_error(RpcError::UnknownKey, id) - } - - // Check if key is local or should query network - let path = self.folder.join(key.clone()); - let local = exists.unwrap(); - if local { - match self.dht.read().await.get(key_hash) { - Some(_) => return JsonResponse::new(json!(path), id).into(), - None => { - info!("Did not find key: {}", key); - return server_error(RpcError::UnknownKey, id) - } - } - } - - info!("Key doesn't exist locally, querring network..."); - if let Err(e) = self.dht.read().await.request_key(key_hash).await { - error!("Failed to query key: {}", e); - return server_error(RpcError::QueryFailed, id) - } - - info!("Waiting response..."); - match waiting_for_response(self.dht.clone()).await { - Ok(response) => { - match response { - Some(resp) => { - info!("Key found!"); - // Optionally, we insert the key to our local map - if let Err(e) = - self.dht.write().await.insert(resp.key, resp.value.clone()).await - { - error!("Failed to insert key: {}", e); - return server_error(RpcError::KeyInsertFail, id) - } - - if let Err(e) = std::fs::write(path.clone(), resp.value) { - error!("Failed to generate file for key: {}", e); - return server_error(RpcError::FileGenerationFail, id) - } - JsonResponse::new(json!(path), id).into() - } - None => { - info!("Did not find key: {}", key); - server_error(RpcError::UnknownKey, id) - } - } - } + // A valid path was passed. Let's see if we can read it, and if so, + // add it to Geode. + let fd = match File::open(&path).await { + Ok(v) => v, Err(e) => { - error!("Error while waiting network response: {}", e); - server_error(RpcError::WaitingNetworkError, id) + error!("Failed to open {:?}: {}", path, e); + return JsonError::new(ErrorCode::InvalidParams, None, id).into() } - } + }; + + let (file_hash, chunk_hashes) = match self.geode.insert(fd).await { + Ok(v) => v, + Err(e) => { + error!("Failed inserting file {:?} to geode: {}", path, e); + // FIXME: Custom error here + return JsonError::new(ErrorCode::InvalidParams, None, id).into() + } + }; + + // TODO: Broadcast file_hash and chunk_hashes as FudFilePut {}; + let fud_file = FudFilePut { file_hash, chunk_hashes }; + self.p2p.broadcast(&fud_file).await; + + JsonResponse::new(json!(file_hash.to_hex().as_str()), id).into() + } + + // RPCAPI: + // Fetch a file from the network. Takes a file hash as parameter. + // Returns the path to the local file containing the metadata. + // + // --> {"jsonrpc": "2.0", "method": "get", "params": ["1211...abfd"], "id": 42} + // <-- {"jsonrpc": "2.0", "result: "~/.local/share/fud/files/1211...abfd", "id": 42} + async fn get(&self, id: Value, _params: &[Value]) -> JsonResult { + JsonResponse::new(json!([]), id).into() } // RPCAPI: // Replies to a ping method. + // // --> {"jsonrpc": "2.0", "method": "ping", "params": [], "id": 42} // <-- {"jsonrpc": "2.0", "result": "pong", "id": 42} async fn pong(&self, id: Value, _params: &[Value]) -> JsonResult { @@ -372,9 +185,9 @@ impl Fud { } if params[0].as_bool().unwrap() { - self.dht.read().await.p2p.dnet_enable().await; + self.p2p.dnet_enable().await; } else { - self.dht.read().await.p2p.dnet_disable().await; + self.p2p.dnet_disable().await; } JsonResponse::new(json!(true), id).into() @@ -385,82 +198,52 @@ impl Fud { // --> {"jsonrpc": "2.0", "method": "dnet_info", "params": [], "id": 42} // <-- {"jsonrpc": "2.0", result": {"nodeID": [], "nodeinfo": [], "id": 42} async fn dnet_info(&self, id: Value, _params: &[Value]) -> JsonResult { - let dnet_info = self.dht.read().await.p2p.dnet_info().await; - JsonResponse::new(net::P2p::map_dnet_info(dnet_info), id).into() - } -} - -#[async_trait] -impl RequestHandler for Fud { - async fn handle_request(&self, req: JsonRequest) -> JsonResult { - if !req.params.is_array() { - return JsonError::new(InvalidParams, None, req.id).into() - } - - let params = req.params.as_array().unwrap(); - - match req.method.as_str() { - Some("list") => return self.list(req.id, params).await, - Some("sync") => return self.sync(req.id, params).await, - Some("get") => return self.get(req.id, params).await, - Some("ping") => return self.pong(req.id, params).await, - Some("dnet_switch") => return self.dnet_switch(req.id, params).await, - Some("dnet_info") => return self.dnet_info(req.id, params).await, - Some(_) | None => return JsonError::new(MethodNotFound, None, req.id).into(), - } + let dnet_info = self.p2p.dnet_info().await; + JsonResponse::new(P2p::map_dnet_info(dnet_info), id).into() } } async_daemonize!(realmain); -async fn realmain(args: Args, ex: Arc>) -> Result<()> { +async fn realmain(args: Args, ex: Arc>) -> Result<()> { + // The working directory for this daemon and geode. + let basedir = expand_path(&args.base_dir)?; + + // Hashmaps used for routing + let metadata_router = Arc::new(RwLock::new(HashMap::new())); + let chunks_router = Arc::new(RwLock::new(HashMap::new())); + + info!("Instantiating Geode instance"); + let geode = Geode::new(&basedir.into()).await?; + + info!("Instantiating P2P network"); + let p2p = P2p::new(args.net.into()).await; + + // Daemon instantiation + let fud = Arc::new(Fud { metadata_router, chunks_router, p2p: p2p.clone(), geode }); + let _fud = fud.clone(); + + info!("Starting JSON-RPC server on {}", args.rpc_listen); + let _ex = ex.clone(); + ex.spawn(listen_and_serve(args.rpc_listen, fud, _ex)).detach(); + + info!("Starting P2P protocols"); + let registry = p2p.protocol_registry(); + registry + .register(net::SESSION_ALL, move |channel, p2p| { + let _fud = _fud.clone(); + async move { ProtocolFud::init(_fud, channel, p2p).await.unwrap() } + }) + .await; + p2p.clone().start(ex.clone()).await?; + // Signal handling for graceful termination. let (signals_handler, signals_task) = SignalHandler::new()?; - - // P2P network - let network_settings = net::Settings { - inbound_addrs: args.p2p_accept, - outbound_connections: args.slots, - external_addrs: args.p2p_external, - peers: args.peers.clone(), - seeds: args.seeds.clone(), - allowed_transports: args.transports, - localnet: args.localnet, - ..Default::default() - }; - - let p2p = net::P2p::new(network_settings).await; - - // Initialize daemon dht - let dht = Dht::new(None, p2p.clone(), signals_handler.term_rx.clone(), ex.clone()).await?; - - // Initialize daemon - let folder = expand_path(&args.folder)?; - let fud = Fud::new(dht.clone(), folder).await?; - let fud = Arc::new(fud); - - // JSON-RPC server - info!("Starting JSON-RPC server"); - let _ex = ex.clone(); - ex.spawn(listen_and_serve(args.rpc_listen, fud.clone(), _ex)).detach(); - - info!("Starting sync P2P network"); - p2p.clone().start(ex.clone()).await?; - let _ex = ex.clone(); - let _p2p = p2p.clone(); - ex.spawn(async move { - if let Err(e) = _p2p.run(_ex).await { - error!("Failed starting P2P network: {}", e); - } - }) - .detach(); - - fud.init().await?; - - // Wait for termination signal signals_handler.wait_termination(signals_task).await?; info!("Caught termination signal, cleaning up and exiting..."); - fud.disconnect().await?; + info!("Stopping P2P network"); + p2p.stop().await; + info!("Bye!"); Ok(()) } diff --git a/bin/fud/fud/src/proto.rs b/bin/fud/fud/src/proto.rs new file mode 100644 index 000000000..50d0aaf34 --- /dev/null +++ b/bin/fud/fud/src/proto.rs @@ -0,0 +1,331 @@ +/* This file is part of DarkFi (https://dark.fi) + * + * Copyright (C) 2020-2023 Dyne.org foundation + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +use std::collections::HashSet; + +use async_std::sync::Arc; +use async_trait::async_trait; +use darkfi::{ + impl_p2p_message, + net::{ + ChannelPtr, Message, MessageSubscription, P2pPtr, ProtocolBase, ProtocolBasePtr, + ProtocolJobsManager, ProtocolJobsManagerPtr, + }, + Result, +}; +use darkfi_serial::{SerialDecodable, SerialEncodable}; +use log::{debug, error}; +use smol::Executor; +use url::Url; + +use super::Fud; + +/// Message representing a new file on the network +#[derive(Debug, Clone, SerialEncodable, SerialDecodable)] +pub struct FudFilePut { + pub file_hash: blake3::Hash, + pub chunk_hashes: Vec, +} +impl_p2p_message!(FudFilePut, "FudFilePut"); + +/// Message representing a new chunk on the network +#[derive(Debug, Clone, SerialEncodable, SerialDecodable)] +pub struct FudChunkPut { + pub chunk_hash: blake3::Hash, +} +impl_p2p_message!(FudChunkPut, "FudChunkPut"); + +/// Message representing a new route for a file on the network +#[derive(Debug, Clone, SerialEncodable, SerialDecodable)] +pub struct FudFileRoute { + pub file_hash: blake3::Hash, + pub chunk_hashes: Vec, + pub peer: Url, +} +impl_p2p_message!(FudFileRoute, "FudFileRoute"); + +/// Message representing a new route for a chunk on the network +#[derive(Debug, Clone, SerialEncodable, SerialDecodable)] +pub struct FudChunkRoute { + pub chunk_hash: blake3::Hash, + pub peer: Url, +} +impl_p2p_message!(FudChunkRoute, "FudChunkRoute"); + +/// P2P protocol implementation for fud. +pub struct ProtocolFud { + channel_address: Url, + file_put_sub: MessageSubscription, + chunk_put_sub: MessageSubscription, + file_route_sub: MessageSubscription, + chunk_route_sub: MessageSubscription, + fud: Arc, + p2p: P2pPtr, + jobsman: ProtocolJobsManagerPtr, +} + +impl ProtocolFud { + pub async fn init(fud: Arc, channel: ChannelPtr, p2p: P2pPtr) -> Result { + debug!( + target: "fud::proto::ProtocolFud::init()", + "Adding ProtocolFud to the protocol registry" + ); + + let msg_subsystem = channel.message_subsystem(); + msg_subsystem.add_dispatch::().await; + msg_subsystem.add_dispatch::().await; + msg_subsystem.add_dispatch::().await; + msg_subsystem.add_dispatch::().await; + + let file_put_sub = channel.subscribe_msg::().await?; + let chunk_put_sub = channel.subscribe_msg::().await?; + let file_route_sub = channel.subscribe_msg::().await?; + let chunk_route_sub = channel.subscribe_msg::().await?; + + Ok(Arc::new(Self { + channel_address: channel.address().clone(), + file_put_sub, + chunk_put_sub, + file_route_sub, + chunk_route_sub, + fud, + p2p, + jobsman: ProtocolJobsManager::new("ProtocolFud", channel.clone()), + })) + } + + async fn handle_fud_file_put(self: Arc) -> Result<()> { + debug!(target: "fud::ProtocolFud::handle_fud_file_put()", "START"); + + loop { + let fud_file = match self.file_put_sub.receive().await { + Ok(v) => v, + Err(e) => { + error!( + target: "fud::ProtocolFud::handle_fud_file_put()", + "recv fail: {}", e, + ); + continue + } + }; + + // TODO: This approach is naive and optimistic. Needs to be fixed. + let mut metadata_lock = self.fud.metadata_router.write().await; + let file_route = metadata_lock.get_mut(&fud_file.file_hash); + match file_route { + Some(peers) => { + peers.insert(self.channel_address.clone()); + } + None => { + let mut peers = HashSet::new(); + peers.insert(self.channel_address.clone()); + metadata_lock.insert(fud_file.file_hash, peers); + } + } + drop(metadata_lock); + + let mut chunks_lock = self.fud.chunks_router.write().await; + for chunk in &fud_file.chunk_hashes { + let chunk_route = chunks_lock.get_mut(chunk); + match chunk_route { + Some(peers) => { + peers.insert(self.channel_address.clone()); + } + None => { + let mut peers = HashSet::new(); + peers.insert(self.channel_address.clone()); + chunks_lock.insert(*chunk, peers); + } + } + } + drop(chunks_lock); + + // Relay this knowledge of the new route + let route = FudFileRoute { + file_hash: fud_file.file_hash, + chunk_hashes: fud_file.chunk_hashes.clone(), + peer: self.channel_address.clone(), + }; + + self.p2p.broadcast_with_exclude(&route, &[self.channel_address.clone()]).await; + } + } + + async fn handle_fud_chunk_put(self: Arc) -> Result<()> { + debug!(target: "fud::ProtocolFud::handle_fud_chunk_put()", "START"); + + loop { + let fud_chunk = match self.chunk_put_sub.receive().await { + Ok(v) => v, + Err(e) => { + error!( + target: "fud::ProtocolFud::handle_fud_chunk_put()", + "recv fail: {}", e, + ); + continue + } + }; + + // TODO: This approach is naive and optimistic. Needs to be fixed. + let mut chunks_lock = self.fud.chunks_router.write().await; + let chunk_route = chunks_lock.get_mut(&fud_chunk.chunk_hash); + match chunk_route { + Some(peers) => { + peers.insert(self.channel_address.clone()); + } + None => { + let mut peers = HashSet::new(); + peers.insert(self.channel_address.clone()); + chunks_lock.insert(fud_chunk.chunk_hash, peers); + } + } + drop(chunks_lock); + + // Relay this knowledge of the new route + let route = FudChunkRoute { + chunk_hash: fud_chunk.chunk_hash, + peer: self.channel_address.clone(), + }; + + self.p2p.broadcast_with_exclude(&route, &[self.channel_address.clone()]).await; + } + } + + async fn handle_fud_file_route(self: Arc) -> Result<()> { + debug!(target: "fud::ProtocolFud::handle_fud_file_route()", "START"); + + loop { + let fud_file = match self.file_route_sub.receive().await { + Ok(v) => v, + Err(e) => { + error!( + target: "fud::ProtocolFud::handle_fud_file_route()", + "recv fail: {}", e, + ); + continue + } + }; + + // TODO: This approach is naive and optimistic. Needs to be fixed. + let mut metadata_lock = self.fud.metadata_router.write().await; + let file_route = metadata_lock.get_mut(&fud_file.file_hash); + match file_route { + Some(peers) => { + peers.insert(fud_file.peer.clone()); + } + None => { + let mut peers = HashSet::new(); + peers.insert(fud_file.peer.clone()); + metadata_lock.insert(fud_file.file_hash, peers); + } + } + drop(metadata_lock); + + let mut chunks_lock = self.fud.chunks_router.write().await; + for chunk in &fud_file.chunk_hashes { + let chunk_route = chunks_lock.get_mut(chunk); + match chunk_route { + Some(peers) => { + peers.insert(fud_file.peer.clone()); + } + None => { + let mut peers = HashSet::new(); + peers.insert(fud_file.peer.clone()); + chunks_lock.insert(*chunk, peers); + } + } + } + drop(chunks_lock); + + // Relay this knowledge of the new route + let route = FudFileRoute { + file_hash: fud_file.file_hash, + chunk_hashes: fud_file.chunk_hashes.clone(), + peer: fud_file.peer.clone(), + }; + + self.p2p + .broadcast_with_exclude( + &route, + &[self.channel_address.clone(), fud_file.peer.clone()], + ) + .await; + } + } + + async fn handle_fud_chunk_route(self: Arc) -> Result<()> { + debug!(target: "fud::ProtocolFud::handle_fud_chunk_route()", "START"); + + loop { + let fud_chunk = match self.chunk_route_sub.receive().await { + Ok(v) => v, + Err(e) => { + error!( + target: "fud::ProtocolFud::handle_fud_chunk_put()", + "recv fail: {}", e, + ); + continue + } + }; + + // TODO: This approach is naive and optimistic. Needs to be fixed. + let mut chunks_lock = self.fud.chunks_router.write().await; + let chunk_route = chunks_lock.get_mut(&fud_chunk.chunk_hash); + match chunk_route { + Some(peers) => { + peers.insert(fud_chunk.peer.clone()); + } + None => { + let mut peers = HashSet::new(); + peers.insert(fud_chunk.peer.clone()); + chunks_lock.insert(fud_chunk.chunk_hash, peers); + } + } + drop(chunks_lock); + + // Relay this knowledge of the new route + let route = + FudChunkRoute { chunk_hash: fud_chunk.chunk_hash, peer: fud_chunk.peer.clone() }; + + self.p2p + .broadcast_with_exclude( + &route, + &[self.channel_address.clone(), fud_chunk.peer.clone()], + ) + .await; + } + } +} + +#[async_trait] +impl ProtocolBase for ProtocolFud { + async fn start(self: Arc, executor: Arc>) -> Result<()> { + debug!(target: "fud::ProtocolFud::start()", "START"); + self.jobsman.clone().start(executor.clone()); + self.jobsman.clone().spawn(self.clone().handle_fud_file_put(), executor.clone()).await; + self.jobsman.clone().spawn(self.clone().handle_fud_chunk_put(), executor.clone()).await; + self.jobsman.clone().spawn(self.clone().handle_fud_file_route(), executor.clone()).await; + self.jobsman.clone().spawn(self.clone().handle_fud_chunk_route(), executor.clone()).await; + debug!(target: "fud::ProtocolFud::start()", "END"); + Ok(()) + } + + fn name(&self) -> &'static str { + "ProtocolFud" + } +} diff --git a/src/geode/mod.rs b/src/geode/mod.rs index 83fbdf4a2..8323aa776 100644 --- a/src/geode/mod.rs +++ b/src/geode/mod.rs @@ -64,6 +64,7 @@ use async_std::{ path::PathBuf, stream::StreamExt, }; +use futures::AsyncRead; use log::{debug, info, warn}; use crate::{Error, Result}; @@ -122,10 +123,7 @@ impl Geode { fs::create_dir_all(&files_path).await?; fs::create_dir_all(&chunks_path).await?; - // Instantiate Self and perform initial garbage collection. - let self_ = Self { files_path, chunks_path }; - self_.garbage_collect().await?; - Ok(self_) + Ok(Self { files_path, chunks_path }) } /// Attempt to read chunk hashes from a given file path and return @@ -262,16 +260,14 @@ impl Geode { /// file name, and the file's chunks, respectively. pub async fn insert( &self, - stream: impl AsRef<[u8]>, + mut stream: impl AsyncRead + Unpin, ) -> Result<(blake3::Hash, Vec)> { info!(target: "geode::insert()", "[Geode] Inserting file..."); let mut file_hasher = blake3::Hasher::new(); let mut chunk_hashes = vec![]; - - let mut cursor = Cursor::new(&stream); let mut buf = [0u8; MAX_CHUNK_SIZE]; - while let Ok(bytes_read) = cursor.read(&mut buf).await { + while let Ok(bytes_read) = stream.read(&mut buf).await { if bytes_read == 0 { break }