diff --git a/Cargo.lock b/Cargo.lock index 8eb94c880..0775166b6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3376,12 +3376,45 @@ dependencies = [ "winapi", ] +[[package]] +name = "fu" +version = "0.4.1" +dependencies = [ + "clap 4.5.31", + "darkfi", + "log", + "simplelog", + "smol", + "url", +] + [[package]] name = "fuchsia-cprng" version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a06f77d526c1a601b7c4cdd98f54b5eaabffc14d5f2f0296febdc7f357c6d3ba" +[[package]] +name = "fud" +version = "0.4.1" +dependencies = [ + "async-trait", + "blake3 1.6.0", + "darkfi", + "darkfi-serial", + "easy-parallel", + "log", + "serde", + "signal-hook", + "signal-hook-async-std", + "simplelog", + "smol", + "structopt", + "structopt-toml", + "tinyjson", + "url", +] + [[package]] name = "funty" version = "2.0.0" diff --git a/Cargo.toml b/Cargo.toml index eaebd2447..c7b7daa16 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,8 +26,8 @@ members = [ "bin/explorer/explorerd", "bin/darkfi-mmproxy", "bin/drk", - #"bin/fud/fu", - #"bin/fud/fud", + "bin/fud/fu", + "bin/fud/fud", "bin/genev/genevd", "bin/genev/genev-cli", "bin/darkirc", diff --git a/Makefile b/Makefile index 3c1dca6c1..57803e49e 100644 --- a/Makefile +++ b/Makefile @@ -31,7 +31,9 @@ BINS = \ lilith \ taud \ vanityaddr \ - explorerd + explorerd \ + fud \ + fu all: $(BINS) @@ -130,6 +132,20 @@ explorerd: explorerd_bundle_contracts_src: contracts $(MAKE) -C bin/explorer/explorerd bundle_contracts_src +fud: + $(MAKE) -C bin/fud/$@ \ + PREFIX="$(PREFIX)" \ + CARGO="$(CARGO)" \ + RUST_TARGET="$(RUST_TARGET)" \ + RUSTFLAGS="$(RUSTFLAGS)" + +fu: + $(MAKE) -C bin/fud/$@ \ + PREFIX="$(PREFIX)" \ + CARGO="$(CARGO)" \ + RUST_TARGET="$(RUST_TARGET)" \ + RUSTFLAGS="$(RUSTFLAGS)" + # -- END OF BINS -- fmt: diff --git a/bin/fud/README.md b/bin/fud/README.md index 9c2c30453..d44ba468f 100644 --- a/bin/fud/README.md +++ b/bin/fud/README.md @@ -31,9 +31,9 @@ OPTIONS: --transports ... Prefered transports for outbound connections (repeatable flag) ``` -On first execution, daemon will create default config file ~/.config/darkfi/fud_config.toml. +On first execution, daemon will create default config file ~/.config/darkfi/fud/fud_config.toml. Configuration must be verified and application should be configured accordingly. -Additionaly, default content folder will be created at ~/.config/darkfi/fud. +Additionaly, default content folder will be created at ~/.local/share/darkfi/fud. Run fud as follows: @@ -42,7 +42,7 @@ Run fud as follows: 13:23:04 [INFO] Starting JSON-RPC server 13:23:04 [INFO] Starting sync P2P network 13:23:04 [WARN] Skipping seed sync process since no seeds are configured. -13:23:04 [INFO] Initializing fud dht state for folder: "/home/x/.config/darkfi/fud" +13:23:04 [INFO] Initializing fud dht state for folder: "/home/x/.local/share/darkfi/fud" 13:23:04 [INFO] Not configured for accepting incoming connections. 13:23:04 [INFO] JSON-RPC listener bound to tcp://127.0.0.1:13337 13:23:04 [INFO] Entry: seedd_config.toml diff --git a/bin/fud/fu/Cargo.toml b/bin/fud/fu/Cargo.toml index 0cac0b525..da9a0d8ca 100644 --- a/bin/fud/fu/Cargo.toml +++ b/bin/fud/fu/Cargo.toml @@ -12,12 +12,11 @@ repository = "https://codeberg.org/darkrenaissance/darkfi" darkfi = {path = "../../../", features = ["util", "rpc"]} # Async -async-std = {version = "1.13.0", features = ["attributes"]} +smol = "2.0.2" # Misc clap = {version = "4.4.11", features = ["derive"]} log = "0.4.26" -serde_json = "1.0.139" simplelog = "0.12.2" url = "2.5.4" diff --git a/bin/fud/fu/Makefile b/bin/fud/fu/Makefile new file mode 100644 index 000000000..ca48fb817 --- /dev/null +++ b/bin/fud/fu/Makefile @@ -0,0 +1,44 @@ +.POSIX: + +# Install prefix +PREFIX = $(HOME)/.cargo + +# Cargo binary +CARGO = cargo +nightly + +# Compile target +RUST_TARGET = $(shell rustc -Vv | grep '^host: ' | cut -d' ' -f2) +# Uncomment when doing musl static builds +#RUSTFLAGS = -C target-feature=+crt-static -C link-self-contained=yes + +SRC = \ + Cargo.toml \ + ../../../Cargo.toml \ + $(shell find src -type f -name '*.rs') \ + $(shell find ../../../src -type f -name '*.rs') \ + +BIN = $(shell grep '^name = ' Cargo.toml | cut -d' ' -f3 | tr -d '"') + +all: $(BIN) + +$(BIN): $(SRC) + RUSTFLAGS="$(RUSTFLAGS)" $(CARGO) build --target=$(RUST_TARGET) --release --package $@ + cp -f ../../../target/$(RUST_TARGET)/release/$@ $@ + cp -f ../../../target/$(RUST_TARGET)/release/$@ ../../../$@ + +clippy: all + RUSTFLAGS="$(RUSTFLAGS)" $(CARGO) clippy --target=$(RUST_TARGET) --release --package $(BIN) --tests + +clean: + rm -f $(BIN) ../../../$(BIN) + +install: all + mkdir -p $(DESTDIR)$(PREFIX)/bin + cp -f $(BIN) $(DESTDIR)$(PREFIX)/bin + chmod 755 $(DESTDIR)$(PREFIX)/bin/$(BIN) + +uninstall: + rm -f $(DESTDIR)$(PREFIX)/bin/$(BIN) + +.PHONY: all clean install uninstall + diff --git a/bin/fud/fu/src/main.rs b/bin/fud/fu/src/main.rs index d7fc56658..fd920b2b3 100644 --- a/bin/fud/fu/src/main.rs +++ b/bin/fud/fu/src/main.rs @@ -18,15 +18,15 @@ use clap::{Parser, Subcommand}; use log::info; -use serde_json::json; use simplelog::{ColorChoice, TermLogger, TerminalMode}; +use std::sync::Arc; use url::Url; use darkfi::{ cli_desc, - rpc::{client::RpcClient, jsonrpc::JsonRequest}, + rpc::{client::RpcClient, jsonrpc::JsonRequest, util::JsonValue}, util::cli::{get_log_config, get_log_level}, - Result, + Error, Result, }; #[derive(Parser)] @@ -47,15 +47,14 @@ struct Args { #[derive(Subcommand)] enum Subcmd { - /// List fud folder contents - List, - - /// Sync fud folder contents and signal network for record changes - Sync, - /// Retrieve provided file name from the fud network Get { - #[clap(short, long)] + /// File name + file: String, + }, + + /// Put a file onto the fud network + Put { /// File name file: String, }, @@ -66,85 +65,105 @@ struct Fu { } impl Fu { - async fn close_connection(&self) -> Result<()> { - self.rpc_client.close().await + async fn close_connection(&self) { + self.rpc_client.stop().await; } - async fn list(&self) -> Result<()> { - let req = JsonRequest::new("list", json!([])); - let rep = self.rpc_client.request(req).await?; - - // Extract response - let content = rep[0].as_array().unwrap(); - let new = rep[1].as_array().unwrap(); - let deleted = rep[2].as_array().unwrap(); - - // Print info - info!("----------Content-------------"); - if content.is_empty() { - info!("No file records exists in DHT."); - } else { - for name in content { - info!("\t{}", name.as_str().unwrap()); - } - } - info!("------------------------------"); - - info!("----------New files-----------"); - if new.is_empty() { - info!("No new files to import."); - } else { - for name in new { - info!("\t{}", name.as_str().unwrap()); - } - } - info!("------------------------------"); - - info!("----------Removed keys--------"); - if deleted.is_empty() { - info!("No keys were removed."); - } else { - for key in deleted { - info!("\t{}", key.as_str().unwrap()); - } - } - info!("------------------------------"); - - Ok(()) - } - - async fn sync(&self) -> Result<()> { - let req = JsonRequest::new("sync", json!([])); - self.rpc_client.request(req).await?; - info!("Daemon synced successfully!"); - Ok(()) - } + // async fn list(&self) -> Result<()> { + // let req = JsonRequest::new("list", JsonValue::Array(vec![])); + // let rep = self.rpc_client.request(req).await?; + // + // // Extract response + // let content: Vec = rep[0].clone().try_into().unwrap(); + // let new: Vec = rep[1].clone().try_into().unwrap(); + // let deleted: Vec = rep[2].clone().try_into().unwrap(); + // + // // Print info + // info!("----------Content-------------"); + // if content.is_empty() { + // info!("No file records exists in DHT."); + // } else { + // for name in content { + // info!("\t{}", String::try_from(name).unwrap()); + // } + // } + // info!("------------------------------"); + // + // info!("----------New files-----------"); + // if new.is_empty() { + // info!("No new files to import."); + // } else { + // for name in new { + // info!("\t{}", String::try_from(name).unwrap()); + // } + // } + // info!("------------------------------"); + // + // info!("----------Removed keys--------"); + // if deleted.is_empty() { + // info!("No keys were removed."); + // } else { + // for key in deleted { + // info!("\t{}", String::try_from(key).unwrap()); + // } + // } + // info!("------------------------------"); + // + // Ok(()) + // } + // + // async fn sync(&self) -> Result<()> { + // let req = JsonRequest::new("sync", JsonValue::Array(vec![])); + // self.rpc_client.request(req).await?; + // info!("Daemon synced successfully!"); + // Ok(()) + // } async fn get(&self, file: String) -> Result<()> { - let req = JsonRequest::new("get", json!([file])); + let req = JsonRequest::new("get", JsonValue::Array(vec![JsonValue::String(file)])); let rep = self.rpc_client.request(req).await?; - let path = rep.as_str().unwrap(); + let path = rep.stringify().unwrap(); info!("File waits you at: {}", path); Ok(()) } + + async fn put(&self, file: String) -> Result<()> { + let req = JsonRequest::new("put", JsonValue::Array(vec![JsonValue::String(file)])); + let rep = self.rpc_client.request(req).await?; + match rep { + JsonValue::String(file_id) => { + println!("{}", file_id); + Ok(()) + } + _ => Err(Error::ParseFailed("File ID is not a string")), + } + } } -#[async_std::main] -async fn main() -> Result<()> { +fn main() -> Result<()> { let args = Args::parse(); let log_level = get_log_level(args.verbose); let log_config = get_log_config(args.verbose); TermLogger::init(log_level, log_config, TerminalMode::Mixed, ColorChoice::Auto)?; - let rpc_client = RpcClient::new(args.endpoint, None).await?; - let fu = Fu { rpc_client }; + let ex = Arc::new(smol::Executor::new()); + smol::block_on(async { + ex.run(async { + let rpc_client = RpcClient::new(args.endpoint, ex.clone()).await?; + let fu = Fu { rpc_client }; - match args.command { - Subcmd::List => fu.list().await, - Subcmd::Sync => fu.sync().await, - Subcmd::Get { file } => fu.get(file).await, - }?; + match args.command { + // Subcmd::List => fu.list().await, + // Subcmd::Sync => fu.sync().await, + Subcmd::Get { file } => fu.get(file).await, + Subcmd::Put { file } => fu.put(file).await, + }?; - fu.close_connection().await + fu.close_connection().await; + + Ok(()) + }) + .await + }) } diff --git a/bin/fud/fud/Makefile b/bin/fud/fud/Makefile new file mode 100644 index 000000000..ca48fb817 --- /dev/null +++ b/bin/fud/fud/Makefile @@ -0,0 +1,44 @@ +.POSIX: + +# Install prefix +PREFIX = $(HOME)/.cargo + +# Cargo binary +CARGO = cargo +nightly + +# Compile target +RUST_TARGET = $(shell rustc -Vv | grep '^host: ' | cut -d' ' -f2) +# Uncomment when doing musl static builds +#RUSTFLAGS = -C target-feature=+crt-static -C link-self-contained=yes + +SRC = \ + Cargo.toml \ + ../../../Cargo.toml \ + $(shell find src -type f -name '*.rs') \ + $(shell find ../../../src -type f -name '*.rs') \ + +BIN = $(shell grep '^name = ' Cargo.toml | cut -d' ' -f3 | tr -d '"') + +all: $(BIN) + +$(BIN): $(SRC) + RUSTFLAGS="$(RUSTFLAGS)" $(CARGO) build --target=$(RUST_TARGET) --release --package $@ + cp -f ../../../target/$(RUST_TARGET)/release/$@ $@ + cp -f ../../../target/$(RUST_TARGET)/release/$@ ../../../$@ + +clippy: all + RUSTFLAGS="$(RUSTFLAGS)" $(CARGO) clippy --target=$(RUST_TARGET) --release --package $(BIN) --tests + +clean: + rm -f $(BIN) ../../../$(BIN) + +install: all + mkdir -p $(DESTDIR)$(PREFIX)/bin + cp -f $(BIN) $(DESTDIR)$(PREFIX)/bin + chmod 755 $(DESTDIR)$(PREFIX)/bin/$(BIN) + +uninstall: + rm -f $(DESTDIR)$(PREFIX)/bin/$(BIN) + +.PHONY: all clean install uninstall + diff --git a/bin/fud/fud/fud_config.toml b/bin/fud/fud/fud_config.toml index 9294615d6..e90dd789e 100644 --- a/bin/fud/fud/fud_config.toml +++ b/bin/fud/fud/fud_config.toml @@ -7,31 +7,7 @@ ## uncommenting, or by using the command-line. # Path to the contents directory -#folder = "~/.config/darkfi/fud" - -# P2P accept addresses -#p2p_accept = ["tls://127.0.0.1:13337"] - -# P2P external addresses -#p2p_external = ["tls://127.0.0.1:13337"] - -# Connection slots -#slots = 8 - -# Seed nodes to connect to -seeds = ["tls://lilith0.dark.fi:13337", "tls://lilith1.dark.fi:13337"] - -# Peers to connect to -#peers = [] - -# Prefered transports for outbound connections -#transports = ["tls", "tcp"] - -# Enable localnet hosts -#localnet = false - -# Enable channel log -#channel_log = false +base_dir = "~/.local/share/darkfi/fud" # JSON-RPC settings [rpc] @@ -39,4 +15,58 @@ seeds = ["tls://lilith0.dark.fi:13337", "tls://lilith1.dark.fi:13337"] rpc_listen = "tcp://127.0.0.1:13336" # Disabled RPC methods -#rpc_disabled_methods = [] +rpc_disabled_methods = ["p2p.get_info"] + +# P2P network settings +[net] +# Path to the P2P datastore +p2p_datastore = "~/.local/share/darkfi/fud" + +# Path to a configured hostlist for saving known peers +hostlist = "~/.local/share/darkfi/fud/p2p_hostlist.tsv" + +## P2P accept addresses +# inbound = ["tcp://0.0.0.0:13337"] + +## Outbound connection slots +# outbound_connections = 8 + +## Inbound connection slots +#inbound_connections = 8 + +## White connection percent +# gold_connect_count = 2 + +## White connection percent +# white_connect_percent = 70 + +## Addresses we want to advertise to peers (optional) +## These should be reachable externally +#external_addrs = ["tcp+tls://my.resolveable.address:26661"] + +## Seed nodes to connect to +seeds = [ + #"tcp+tls://lilith0.dark.fi:5262", + #"tcp+tls://lilith1.dark.fi:5262", + #"tor://czzulj66rr5kq3uhidzn7fh4qvt3vaxaoldukuxnl5vipayuj7obo7id.onion:5263", + #"tor://vgbfkcu5hcnlnwd2lz26nfoa6g6quciyxwbftm6ivvrx74yvv5jnaoid.onion:5273", +] + +## Manual peers to connect to +#peers = [] + +# Whitelisted transports for outbound connections +allowed_transports = ["tcp", "tcp+tls"] +#allowed_transports = ["tor"] +#allowed_transports = ["tor", "tor+tls"] + +# Enable transport mixing +# Allows mixing transports, e.g. tor+tls:// connecting to tcp+tls:// +# By default this is not allowed. +transport_mixing = false + +# Nodes to avoid interacting with for the duration of the program, in the +# format ["host", ["scheme", "scheme"], [port, port]]. +# If scheme is left empty it will default to "tcp+tls". +# If ports are left empty all ports from this peer will be blocked. +#blacklist = [["example.com", ["tcp"], [8551, 23331]]] diff --git a/bin/fud/fud/src/main.rs b/bin/fud/fud/src/main.rs index f51549697..af6d7087d 100644 --- a/bin/fud/fud/src/main.rs +++ b/bin/fud/fud/src/main.rs @@ -38,11 +38,15 @@ use darkfi::{ async_daemonize, cli_desc, geode::Geode, net::{ - self, connector::Connector, protocol::ProtocolVersion, session::Session, - settings::SettingsOpt, P2p, P2pPtr, + connector::Connector, + protocol::ProtocolVersion, + session::{Session, SESSION_DEFAULT}, + settings::SettingsOpt, + P2p, P2pPtr, }, rpc::{ - jsonrpc::{ErrorCode, JsonError, JsonRequest, JsonResponse, JsonResult}, + jsonrpc::{ErrorCode, JsonError, JsonRequest, JsonResponse, JsonResult, JsonSubscriber}, + p2p_method::HandlerP2p, server::{listen_and_serve, RequestHandler}, settings::{RpcSettings, RpcSettingsOpt}, }, @@ -102,10 +106,17 @@ pub struct Fud { file_fetch_tx: channel::Sender<(blake3::Hash, Result<()>)>, file_fetch_rx: channel::Receiver<(blake3::Hash, Result<()>)>, + file_fetch_end_tx: channel::Sender<(blake3::Hash, Result<()>)>, + file_fetch_end_rx: channel::Receiver<(blake3::Hash, Result<()>)>, chunk_fetch_tx: channel::Sender<(blake3::Hash, Result<()>)>, chunk_fetch_rx: channel::Receiver<(blake3::Hash, Result<()>)>, + chunk_fetch_end_tx: channel::Sender<(blake3::Hash, Result<()>)>, + chunk_fetch_end_rx: channel::Receiver<(blake3::Hash, Result<()>)>, rpc_connections: Mutex>, + + /// dnet JSON-RPC subscriber + dnet_sub: JsonSubscriber, } #[async_trait] @@ -117,7 +128,9 @@ impl RequestHandler<()> for Fud { "put" => self.put(req.id, req.params).await, "get" => self.get(req.id, req.params).await, - "dnet_switch" => self.dnet_switch(req.id, req.params).await, + "dnet.switch" => self.dnet_switch(req.id, req.params).await, + "dnet.subscribe_events" => self.dnet_subscribe_events(req.id, req.params).await, + "p2p.get_info" => self.p2p_get_info(req.id, req.params).await, _ => JsonError::new(ErrorCode::MethodNotFound, None, req.id).into(), } } @@ -194,7 +207,7 @@ impl Fud { info!("Requested file {} not found in Geode, triggering fetch", file_hash); self.file_fetch_tx.send((file_hash, Ok(()))).await.unwrap(); info!("Waiting for background file fetch task..."); - let (i_file_hash, status) = self.file_fetch_rx.recv().await.unwrap(); + let (i_file_hash, status) = self.file_fetch_end_rx.recv().await.unwrap(); match status { Ok(()) => { let ch_file = self.geode.get(&file_hash).await.unwrap(); @@ -244,18 +257,17 @@ impl Fud { for chunk in missing_chunks { self.chunk_fetch_tx.send((chunk, Ok(()))).await.unwrap(); - let (i_chunk_hash, status) = self.chunk_fetch_rx.recv().await.unwrap(); + let (i_chunk_hash, status) = self.chunk_fetch_end_rx.recv().await.unwrap(); match status { Ok(()) => { let m = FudChunkPut { chunk_hash: i_chunk_hash }; self.p2p.broadcast(&m).await; - break } Err(Error::GeodeChunkRouteNotFound) => continue, Err(e) => panic!("{}", e), - } + }; } let chunked_file = match self.geode.get(&file_hash).await { @@ -296,13 +308,35 @@ impl Fud { let switch = params[0].get::().unwrap(); if *switch { - self.p2p.dnet_enable().await; + self.p2p.dnet_enable(); } else { - self.p2p.dnet_disable().await; + self.p2p.dnet_disable(); } JsonResponse::new(JsonValue::Boolean(true), id).into() } + + // RPCAPI: + // Initializes a subscription to p2p dnet events. + // Once a subscription is established, `fud` will send JSON-RPC notifications of + // new network events to the subscriber. + // + // --> {"jsonrpc": "2.0", "method": "dnet.subscribe_events", "params": [], "id": 1} + // <-- {"jsonrpc": "2.0", "method": "dnet.subscribe_events", "params": [`event`]} + pub async fn dnet_subscribe_events(&self, id: u16, params: JsonValue) -> JsonResult { + let params = params.get::>().unwrap(); + if !params.is_empty() { + return JsonError::new(ErrorCode::InvalidParams, None, id).into() + } + + self.dnet_sub.clone().into() + } +} + +impl HandlerP2p for Fud { + fn p2p(&self) -> P2pPtr { + self.p2p.clone() + } } /// Background task that receives file fetch requests and tries to @@ -319,7 +353,10 @@ async fn fetch_file_task(fud: Arc, executor: Arc>) -> Result<( if peers.is_none() { warn!("File {} not in routing table, cannot fetch", file_hash); - fud.file_fetch_tx.send((file_hash, Err(Error::GeodeFileRouteNotFound))).await.unwrap(); + fud.file_fetch_end_tx + .send((file_hash, Err(Error::GeodeFileRouteNotFound))) + .await + .unwrap(); continue } @@ -335,12 +372,8 @@ async fn fetch_file_task(fud: Arc, executor: Arc>) -> Result<( let connector = Connector::new(fud.p2p.settings(), session_weak); match connector.connect(peer).await { Ok((url, channel)) => { - let proto_ver = ProtocolVersion::new( - channel.clone(), - fud.p2p.settings().clone(), - fud.p2p.hosts().clone(), - ) - .await; + let proto_ver = + ProtocolVersion::new(channel.clone(), fud.p2p.settings().clone()).await; let handshake_task = session_out.perform_handshake_protocols( proto_ver, @@ -357,6 +390,8 @@ async fn fetch_file_task(fud: Arc, executor: Arc>) -> Result<( continue } + let msg_subsystem = channel.message_subsystem(); + msg_subsystem.add_dispatch::().await; let msg_subscriber = channel.subscribe_msg::().await.unwrap(); let request = FudFileRequest { file_hash }; @@ -400,12 +435,15 @@ async fn fetch_file_task(fud: Arc, executor: Arc>) -> Result<( if !found { warn!("Did not manage to fetch {} file metadata", file_hash); - fud.file_fetch_tx.send((file_hash, Err(Error::GeodeFileRouteNotFound))).await.unwrap(); + fud.file_fetch_end_tx + .send((file_hash, Err(Error::GeodeFileRouteNotFound))) + .await + .unwrap(); continue } info!("Successfully fetched {} file metadata", file_hash); - fud.file_fetch_tx.send((file_hash, Ok(()))).await.unwrap(); + fud.file_fetch_end_tx.send((file_hash, Ok(()))).await.unwrap(); } } @@ -423,7 +461,7 @@ async fn fetch_chunk_task(fud: Arc, executor: Arc>) -> Result< if peers.is_none() { warn!("Chunk {} not in routing table, cannot fetch", chunk_hash); - fud.chunk_fetch_tx + fud.chunk_fetch_end_tx .send((chunk_hash, Err(Error::GeodeChunkRouteNotFound))) .await .unwrap(); @@ -442,12 +480,8 @@ async fn fetch_chunk_task(fud: Arc, executor: Arc>) -> Result< let connector = Connector::new(fud.p2p.settings(), session_weak); match connector.connect(peer).await { Ok((url, channel)) => { - let proto_ver = ProtocolVersion::new( - channel.clone(), - fud.p2p.settings().clone(), - fud.p2p.hosts().clone(), - ) - .await; + let proto_ver = + ProtocolVersion::new(channel.clone(), fud.p2p.settings().clone()).await; let handshake_task = session_out.perform_handshake_protocols( proto_ver, @@ -464,6 +498,8 @@ async fn fetch_chunk_task(fud: Arc, executor: Arc>) -> Result< continue } + let msg_subsystem = channel.message_subsystem(); + msg_subsystem.add_dispatch::().await; let msg_subscriber = channel.subscribe_msg::().await.unwrap(); let request = FudChunkRequest { chunk_hash }; @@ -516,7 +552,7 @@ async fn fetch_chunk_task(fud: Arc, executor: Arc>) -> Result< if !found { warn!("Did not manage to fetch {} chunk", chunk_hash); - fud.chunk_fetch_tx + fud.chunk_fetch_end_tx .send((chunk_hash, Err(Error::GeodeChunkRouteNotFound))) .await .unwrap(); @@ -524,7 +560,7 @@ async fn fetch_chunk_task(fud: Arc, executor: Arc>) -> Result< } info!("Successfully fetched {} chunk", chunk_hash); - fud.chunk_fetch_tx.send((chunk_hash, Ok(()))).await.unwrap(); + fud.chunk_fetch_end_tx.send((chunk_hash, Ok(()))).await.unwrap(); } } @@ -541,11 +577,37 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> { let geode = Geode::new(&basedir).await?; info!("Instantiating P2P network"); - let p2p = P2p::new(args.net.into(), ex.clone()).await; + let p2p = P2p::new(args.net.into(), ex.clone()).await?; + + info!("Starting dnet subs task"); + let dnet_sub = JsonSubscriber::new("dnet.subscribe_events"); + let dnet_sub_ = dnet_sub.clone(); + let p2p_ = p2p.clone(); + let dnet_task = StoppableTask::new(); + dnet_task.clone().start( + async move { + let dnet_sub = p2p_.dnet_subscribe().await; + loop { + let event = dnet_sub.receive().await; + debug!("Got dnet event: {:?}", event); + dnet_sub_.notify(vec![event.into()].into()).await; + } + }, + |res| async { + match res { + Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ } + Err(e) => panic!("{}", e), + } + }, + Error::DetachedTaskStopped, + ex.clone(), + ); // Daemon instantiation let (file_fetch_tx, file_fetch_rx) = smol::channel::unbounded(); + let (file_fetch_end_tx, file_fetch_end_rx) = smol::channel::unbounded(); let (chunk_fetch_tx, chunk_fetch_rx) = smol::channel::unbounded(); + let (chunk_fetch_end_tx, chunk_fetch_end_rx) = smol::channel::unbounded(); let fud = Arc::new(Fud { metadata_router, chunks_router, @@ -553,9 +615,14 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> { geode, file_fetch_tx, file_fetch_rx, + file_fetch_end_tx, + file_fetch_end_rx, chunk_fetch_tx, chunk_fetch_rx, + chunk_fetch_end_tx, + chunk_fetch_end_rx, rpc_connections: Mutex::new(HashSet::new()), + dnet_sub, }); info!(target: "fud", "Starting fetch file task"); @@ -606,7 +673,7 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> { let registry = p2p.protocol_registry(); let fud_ = fud.clone(); registry - .register(net::SESSION_NET, move |channel, p2p| { + .register(SESSION_DEFAULT, move |channel, p2p| { let fud_ = fud_.clone(); async move { ProtocolFud::init(fud_, channel, p2p).await.unwrap() } }) diff --git a/bin/fud/fud/src/proto.rs b/bin/fud/fud/src/proto.rs index ab5d6beb3..22a848e38 100644 --- a/bin/fud/fud/src/proto.rs +++ b/bin/fud/fud/src/proto.rs @@ -20,7 +20,7 @@ use std::{collections::HashSet, sync::Arc}; use async_trait::async_trait; use darkfi::{ - geode::MAX_CHUNK_SIZE, + geode::{read_until_filled, MAX_CHUNK_SIZE}, impl_p2p_message, net::{ metering::{DEFAULT_METERING_CONFIGURATION, MeteringConfiguration}, @@ -31,7 +31,7 @@ use darkfi::{ }; use darkfi_serial::{SerialDecodable, SerialEncodable}; use log::{debug, error}; -use smol::{fs::File, io::AsyncReadExt, Executor}; +use smol::{fs::File, Executor}; use url::Url; use super::Fud; @@ -282,6 +282,13 @@ impl ProtocolFud { metadata_lock.insert(fud_file.file_hash, peers); } } + + let excluded_peers: Vec = metadata_lock + .get(&fud_file.file_hash) + .unwrap_or(&HashSet::new()) + .iter() + .cloned() + .collect(); drop(metadata_lock); let mut chunks_lock = self.fud.chunks_router.write().await; @@ -310,7 +317,8 @@ impl ProtocolFud { self.p2p .broadcast_with_exclude( &route, - &[self.channel.address().clone(), fud_file.peer.clone()], + &[vec![self.channel.address().clone(), fud_file.peer.clone()], excluded_peers] + .concat(), ) .await; } @@ -344,6 +352,12 @@ impl ProtocolFud { chunks_lock.insert(fud_chunk.chunk_hash, peers); } } + let excluded_peers: Vec = chunks_lock + .get(&fud_chunk.chunk_hash) + .unwrap_or(&HashSet::new()) + .iter() + .cloned() + .collect(); drop(chunks_lock); // Relay this knowledge of the new route @@ -353,7 +367,8 @@ impl ProtocolFud { self.p2p .broadcast_with_exclude( &route, - &[self.channel.address().clone(), fud_chunk.peer.clone()], + &[vec![self.channel.address().clone(), fud_chunk.peer.clone()], excluded_peers] + .concat(), ) .await; } @@ -434,9 +449,9 @@ impl ProtocolFud { // The consistency should already be checked in Geode, so we're // fine not checking and unwrapping here. - let mut buf = [0u8; MAX_CHUNK_SIZE]; + let mut buf = vec![0u8; MAX_CHUNK_SIZE]; let mut chunk_fd = File::open(&chunk_path).await.unwrap(); - let bytes_read = chunk_fd.read(&mut buf).await.unwrap(); + let bytes_read = read_until_filled(&mut chunk_fd, &mut buf).await.unwrap(); let chunk_slice = &buf[..bytes_read]; let reply = FudChunkReply { chunk: chunk_slice.to_vec() }; diff --git a/src/geode/mod.rs b/src/geode/mod.rs index be9f4d36e..6c6f9e11e 100644 --- a/src/geode/mod.rs +++ b/src/geode/mod.rs @@ -61,9 +61,11 @@ use std::{collections::HashSet, path::PathBuf}; use futures::AsyncRead; use log::{debug, info, warn}; use smol::{ - fs, - fs::{File, OpenOptions}, - io::{AsyncBufReadExt, AsyncReadExt, AsyncSeekExt, AsyncWriteExt, BufReader, Cursor, SeekFrom}, + fs::{self, File, OpenOptions}, + io::{ + self, AsyncBufReadExt, AsyncReadExt, AsyncSeekExt, AsyncWriteExt, BufReader, Cursor, + SeekFrom, + }, stream::StreamExt, }; @@ -111,6 +113,23 @@ pub struct Geode { chunks_path: PathBuf, } +pub async fn read_until_filled( + mut stream: impl AsyncRead + Unpin, + buffer: &mut [u8], +) -> io::Result { + let mut total_bytes_read = 0; + + while total_bytes_read < buffer.len() { + let bytes_read = stream.read(&mut buffer[total_bytes_read..]).await?; + if bytes_read == 0 { + break; // EOF reached + } + total_bytes_read += bytes_read; + } + + Ok(total_bytes_read) +} + impl Geode { /// Instantiate a new [`Geode`] object. /// `base_path` defines the root directory where Geode will store its @@ -183,7 +202,7 @@ impl Geode { }; // Perform consistency check - let Ok(bytes_read) = chunk_fd.read(&mut buf).await else { + let Ok(bytes_read) = read_until_filled(&mut chunk_fd, &mut buf).await else { deleted_chunk_paths.insert(chunk_path); deleted_chunks.insert(chunk_hash); buf = [0u8; MAX_CHUNK_SIZE]; @@ -269,7 +288,8 @@ impl Geode { let mut chunk_hashes = vec![]; let mut buf = [0u8; MAX_CHUNK_SIZE]; - while let Ok(bytes_read) = stream.read(&mut buf).await { + loop { + let bytes_read = read_until_filled(&mut stream, &mut buf).await?; if bytes_read == 0 { break } @@ -285,11 +305,11 @@ impl Geode { // reading from disk. let mut chunk_path = self.chunks_path.clone(); chunk_path.push(chunk_hash.to_hex().as_str()); - let mut chunk_fd = + let chunk_fd = OpenOptions::new().read(true).write(true).create(true).open(&chunk_path).await?; let mut fs_buf = [0u8; MAX_CHUNK_SIZE]; - let fs_bytes_read = chunk_fd.read(&mut fs_buf).await?; + let fs_bytes_read = read_until_filled(chunk_fd, &mut fs_buf).await?; let fs_chunk_slice = &fs_buf[..fs_bytes_read]; let fs_chunk_hash = blake3::hash(fs_chunk_slice); @@ -300,9 +320,16 @@ impl Geode { chunk_path, ); // Here the chunk is broken, so we'll truncate and write the new one. + let mut chunk_fd = OpenOptions::new() + .read(true) + .write(true) + .create(true) + .open(&chunk_path) + .await?; chunk_fd.set_len(0).await?; chunk_fd.seek(SeekFrom::Start(0)).await?; chunk_fd.write_all(chunk_slice).await?; + chunk_fd.flush().await?; } else { debug!( target: "geode::insert()", @@ -325,6 +352,8 @@ impl Geode { file_fd.write(format!("{}\n", ch.to_hex().as_str()).as_bytes()).await?; } + file_fd.flush().await?; + Ok((file_hash, chunk_hashes)) } @@ -344,6 +373,7 @@ impl Geode { for ch in chunk_hashes { file_fd.write(format!("{}\n", ch.to_hex().as_str()).as_bytes()).await?; } + file_fd.flush().await?; Ok(()) } @@ -356,7 +386,7 @@ impl Geode { let mut cursor = Cursor::new(&stream); let mut chunk = [0u8; MAX_CHUNK_SIZE]; - let bytes_read = cursor.read(&mut chunk).await?; + let bytes_read = read_until_filled(&mut cursor, &mut chunk).await?; let chunk_slice = &chunk[..bytes_read]; let chunk_hash = blake3::hash(chunk_slice); @@ -364,6 +394,7 @@ impl Geode { chunk_path.push(chunk_hash.to_hex().as_str()); let mut chunk_fd = File::create(&chunk_path).await?; chunk_fd.write_all(chunk_slice).await?; + chunk_fd.flush().await?; Ok(chunk_hash) } @@ -393,7 +424,7 @@ impl Geode { let mut chunked_file = ChunkedFile::new(&chunk_hashes); // Iterate over chunks and find which chunks we have available locally. - let mut buf = [0u8; MAX_CHUNK_SIZE]; + let mut buf = vec![]; for (chunk_hash, chunk_path) in chunked_file.0.iter_mut() { let mut c_path = self.chunks_path.clone(); c_path.push(chunk_hash.to_hex().as_str()); @@ -405,17 +436,17 @@ impl Geode { // Perform chunk consistency check let mut chunk_fd = File::open(&c_path).await?; - let bytes_read = chunk_fd.read(&mut buf).await?; + let bytes_read = chunk_fd.read_to_end(&mut buf).await?; let chunk_slice = &buf[..bytes_read]; let hashed_chunk = blake3::hash(chunk_slice); if &hashed_chunk != chunk_hash { // The chunk is corrupted/inconsistent. Garbage collection should run. - buf = [0u8; MAX_CHUNK_SIZE]; + buf = vec![]; continue } *chunk_path = Some(c_path); - buf = [0u8; MAX_CHUNK_SIZE]; + buf = vec![]; } Ok(chunked_file) @@ -434,9 +465,9 @@ impl Geode { } // Perform chunk consistency check - let mut buf = [0u8; MAX_CHUNK_SIZE]; + let mut buf = vec![]; let mut chunk_fd = File::open(&chunk_path).await?; - let bytes_read = chunk_fd.read(&mut buf).await?; + let bytes_read = chunk_fd.read_to_end(&mut buf).await?; let chunk_slice = &buf[..bytes_read]; let hashed_chunk = blake3::hash(chunk_slice); if &hashed_chunk != chunk_hash {