fud, fu, geode: fixes & improvements

This commit is contained in:
darkfi
2025-03-02 17:10:51 +01:00
committed by epiphany1
parent 10edfbdf1a
commit e219954da2
12 changed files with 455 additions and 157 deletions

33
Cargo.lock generated
View File

@@ -3376,12 +3376,45 @@ dependencies = [
"winapi",
]
[[package]]
name = "fu"
version = "0.4.1"
dependencies = [
"clap 4.5.31",
"darkfi",
"log",
"simplelog",
"smol",
"url",
]
[[package]]
name = "fuchsia-cprng"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a06f77d526c1a601b7c4cdd98f54b5eaabffc14d5f2f0296febdc7f357c6d3ba"
[[package]]
name = "fud"
version = "0.4.1"
dependencies = [
"async-trait",
"blake3 1.6.0",
"darkfi",
"darkfi-serial",
"easy-parallel",
"log",
"serde",
"signal-hook",
"signal-hook-async-std",
"simplelog",
"smol",
"structopt",
"structopt-toml",
"tinyjson",
"url",
]
[[package]]
name = "funty"
version = "2.0.0"

View File

@@ -26,8 +26,8 @@ members = [
"bin/explorer/explorerd",
"bin/darkfi-mmproxy",
"bin/drk",
#"bin/fud/fu",
#"bin/fud/fud",
"bin/fud/fu",
"bin/fud/fud",
"bin/genev/genevd",
"bin/genev/genev-cli",
"bin/darkirc",

View File

@@ -31,7 +31,9 @@ BINS = \
lilith \
taud \
vanityaddr \
explorerd
explorerd \
fud \
fu
all: $(BINS)
@@ -130,6 +132,20 @@ explorerd:
explorerd_bundle_contracts_src: contracts
$(MAKE) -C bin/explorer/explorerd bundle_contracts_src
fud:
$(MAKE) -C bin/fud/$@ \
PREFIX="$(PREFIX)" \
CARGO="$(CARGO)" \
RUST_TARGET="$(RUST_TARGET)" \
RUSTFLAGS="$(RUSTFLAGS)"
fu:
$(MAKE) -C bin/fud/$@ \
PREFIX="$(PREFIX)" \
CARGO="$(CARGO)" \
RUST_TARGET="$(RUST_TARGET)" \
RUSTFLAGS="$(RUSTFLAGS)"
# -- END OF BINS --
fmt:

View File

@@ -31,9 +31,9 @@ OPTIONS:
--transports <transports>... Prefered transports for outbound connections (repeatable flag)
```
On first execution, daemon will create default config file ~/.config/darkfi/fud_config.toml.
On first execution, daemon will create default config file ~/.config/darkfi/fud/fud_config.toml.
Configuration must be verified and application should be configured accordingly.
Additionaly, default content folder will be created at ~/.config/darkfi/fud.
Additionaly, default content folder will be created at ~/.local/share/darkfi/fud.
Run fud as follows:
@@ -42,7 +42,7 @@ Run fud as follows:
13:23:04 [INFO] Starting JSON-RPC server
13:23:04 [INFO] Starting sync P2P network
13:23:04 [WARN] Skipping seed sync process since no seeds are configured.
13:23:04 [INFO] Initializing fud dht state for folder: "/home/x/.config/darkfi/fud"
13:23:04 [INFO] Initializing fud dht state for folder: "/home/x/.local/share/darkfi/fud"
13:23:04 [INFO] Not configured for accepting incoming connections.
13:23:04 [INFO] JSON-RPC listener bound to tcp://127.0.0.1:13337
13:23:04 [INFO] Entry: seedd_config.toml

View File

@@ -12,12 +12,11 @@ repository = "https://codeberg.org/darkrenaissance/darkfi"
darkfi = {path = "../../../", features = ["util", "rpc"]}
# Async
async-std = {version = "1.13.0", features = ["attributes"]}
smol = "2.0.2"
# Misc
clap = {version = "4.4.11", features = ["derive"]}
log = "0.4.26"
serde_json = "1.0.139"
simplelog = "0.12.2"
url = "2.5.4"

44
bin/fud/fu/Makefile Normal file
View File

@@ -0,0 +1,44 @@
.POSIX:
# Install prefix
PREFIX = $(HOME)/.cargo
# Cargo binary
CARGO = cargo +nightly
# Compile target
RUST_TARGET = $(shell rustc -Vv | grep '^host: ' | cut -d' ' -f2)
# Uncomment when doing musl static builds
#RUSTFLAGS = -C target-feature=+crt-static -C link-self-contained=yes
SRC = \
Cargo.toml \
../../../Cargo.toml \
$(shell find src -type f -name '*.rs') \
$(shell find ../../../src -type f -name '*.rs') \
BIN = $(shell grep '^name = ' Cargo.toml | cut -d' ' -f3 | tr -d '"')
all: $(BIN)
$(BIN): $(SRC)
RUSTFLAGS="$(RUSTFLAGS)" $(CARGO) build --target=$(RUST_TARGET) --release --package $@
cp -f ../../../target/$(RUST_TARGET)/release/$@ $@
cp -f ../../../target/$(RUST_TARGET)/release/$@ ../../../$@
clippy: all
RUSTFLAGS="$(RUSTFLAGS)" $(CARGO) clippy --target=$(RUST_TARGET) --release --package $(BIN) --tests
clean:
rm -f $(BIN) ../../../$(BIN)
install: all
mkdir -p $(DESTDIR)$(PREFIX)/bin
cp -f $(BIN) $(DESTDIR)$(PREFIX)/bin
chmod 755 $(DESTDIR)$(PREFIX)/bin/$(BIN)
uninstall:
rm -f $(DESTDIR)$(PREFIX)/bin/$(BIN)
.PHONY: all clean install uninstall

View File

@@ -18,15 +18,15 @@
use clap::{Parser, Subcommand};
use log::info;
use serde_json::json;
use simplelog::{ColorChoice, TermLogger, TerminalMode};
use std::sync::Arc;
use url::Url;
use darkfi::{
cli_desc,
rpc::{client::RpcClient, jsonrpc::JsonRequest},
rpc::{client::RpcClient, jsonrpc::JsonRequest, util::JsonValue},
util::cli::{get_log_config, get_log_level},
Result,
Error, Result,
};
#[derive(Parser)]
@@ -47,15 +47,14 @@ struct Args {
#[derive(Subcommand)]
enum Subcmd {
/// List fud folder contents
List,
/// Sync fud folder contents and signal network for record changes
Sync,
/// Retrieve provided file name from the fud network
Get {
#[clap(short, long)]
/// File name
file: String,
},
/// Put a file onto the fud network
Put {
/// File name
file: String,
},
@@ -66,85 +65,105 @@ struct Fu {
}
impl Fu {
async fn close_connection(&self) -> Result<()> {
self.rpc_client.close().await
async fn close_connection(&self) {
self.rpc_client.stop().await;
}
async fn list(&self) -> Result<()> {
let req = JsonRequest::new("list", json!([]));
let rep = self.rpc_client.request(req).await?;
// Extract response
let content = rep[0].as_array().unwrap();
let new = rep[1].as_array().unwrap();
let deleted = rep[2].as_array().unwrap();
// Print info
info!("----------Content-------------");
if content.is_empty() {
info!("No file records exists in DHT.");
} else {
for name in content {
info!("\t{}", name.as_str().unwrap());
}
}
info!("------------------------------");
info!("----------New files-----------");
if new.is_empty() {
info!("No new files to import.");
} else {
for name in new {
info!("\t{}", name.as_str().unwrap());
}
}
info!("------------------------------");
info!("----------Removed keys--------");
if deleted.is_empty() {
info!("No keys were removed.");
} else {
for key in deleted {
info!("\t{}", key.as_str().unwrap());
}
}
info!("------------------------------");
Ok(())
}
async fn sync(&self) -> Result<()> {
let req = JsonRequest::new("sync", json!([]));
self.rpc_client.request(req).await?;
info!("Daemon synced successfully!");
Ok(())
}
// async fn list(&self) -> Result<()> {
// let req = JsonRequest::new("list", JsonValue::Array(vec![]));
// let rep = self.rpc_client.request(req).await?;
//
// // Extract response
// let content: Vec<JsonValue> = rep[0].clone().try_into().unwrap();
// let new: Vec<JsonValue> = rep[1].clone().try_into().unwrap();
// let deleted: Vec<JsonValue> = rep[2].clone().try_into().unwrap();
//
// // Print info
// info!("----------Content-------------");
// if content.is_empty() {
// info!("No file records exists in DHT.");
// } else {
// for name in content {
// info!("\t{}", String::try_from(name).unwrap());
// }
// }
// info!("------------------------------");
//
// info!("----------New files-----------");
// if new.is_empty() {
// info!("No new files to import.");
// } else {
// for name in new {
// info!("\t{}", String::try_from(name).unwrap());
// }
// }
// info!("------------------------------");
//
// info!("----------Removed keys--------");
// if deleted.is_empty() {
// info!("No keys were removed.");
// } else {
// for key in deleted {
// info!("\t{}", String::try_from(key).unwrap());
// }
// }
// info!("------------------------------");
//
// Ok(())
// }
//
// async fn sync(&self) -> Result<()> {
// let req = JsonRequest::new("sync", JsonValue::Array(vec![]));
// self.rpc_client.request(req).await?;
// info!("Daemon synced successfully!");
// Ok(())
// }
async fn get(&self, file: String) -> Result<()> {
let req = JsonRequest::new("get", json!([file]));
let req = JsonRequest::new("get", JsonValue::Array(vec![JsonValue::String(file)]));
let rep = self.rpc_client.request(req).await?;
let path = rep.as_str().unwrap();
let path = rep.stringify().unwrap();
info!("File waits you at: {}", path);
Ok(())
}
async fn put(&self, file: String) -> Result<()> {
let req = JsonRequest::new("put", JsonValue::Array(vec![JsonValue::String(file)]));
let rep = self.rpc_client.request(req).await?;
match rep {
JsonValue::String(file_id) => {
println!("{}", file_id);
Ok(())
}
_ => Err(Error::ParseFailed("File ID is not a string")),
}
}
}
#[async_std::main]
async fn main() -> Result<()> {
fn main() -> Result<()> {
let args = Args::parse();
let log_level = get_log_level(args.verbose);
let log_config = get_log_config(args.verbose);
TermLogger::init(log_level, log_config, TerminalMode::Mixed, ColorChoice::Auto)?;
let rpc_client = RpcClient::new(args.endpoint, None).await?;
let fu = Fu { rpc_client };
let ex = Arc::new(smol::Executor::new());
smol::block_on(async {
ex.run(async {
let rpc_client = RpcClient::new(args.endpoint, ex.clone()).await?;
let fu = Fu { rpc_client };
match args.command {
Subcmd::List => fu.list().await,
Subcmd::Sync => fu.sync().await,
Subcmd::Get { file } => fu.get(file).await,
}?;
match args.command {
// Subcmd::List => fu.list().await,
// Subcmd::Sync => fu.sync().await,
Subcmd::Get { file } => fu.get(file).await,
Subcmd::Put { file } => fu.put(file).await,
}?;
fu.close_connection().await
fu.close_connection().await;
Ok(())
})
.await
})
}

44
bin/fud/fud/Makefile Normal file
View File

@@ -0,0 +1,44 @@
.POSIX:
# Install prefix
PREFIX = $(HOME)/.cargo
# Cargo binary
CARGO = cargo +nightly
# Compile target
RUST_TARGET = $(shell rustc -Vv | grep '^host: ' | cut -d' ' -f2)
# Uncomment when doing musl static builds
#RUSTFLAGS = -C target-feature=+crt-static -C link-self-contained=yes
SRC = \
Cargo.toml \
../../../Cargo.toml \
$(shell find src -type f -name '*.rs') \
$(shell find ../../../src -type f -name '*.rs') \
BIN = $(shell grep '^name = ' Cargo.toml | cut -d' ' -f3 | tr -d '"')
all: $(BIN)
$(BIN): $(SRC)
RUSTFLAGS="$(RUSTFLAGS)" $(CARGO) build --target=$(RUST_TARGET) --release --package $@
cp -f ../../../target/$(RUST_TARGET)/release/$@ $@
cp -f ../../../target/$(RUST_TARGET)/release/$@ ../../../$@
clippy: all
RUSTFLAGS="$(RUSTFLAGS)" $(CARGO) clippy --target=$(RUST_TARGET) --release --package $(BIN) --tests
clean:
rm -f $(BIN) ../../../$(BIN)
install: all
mkdir -p $(DESTDIR)$(PREFIX)/bin
cp -f $(BIN) $(DESTDIR)$(PREFIX)/bin
chmod 755 $(DESTDIR)$(PREFIX)/bin/$(BIN)
uninstall:
rm -f $(DESTDIR)$(PREFIX)/bin/$(BIN)
.PHONY: all clean install uninstall

View File

@@ -7,31 +7,7 @@
## uncommenting, or by using the command-line.
# Path to the contents directory
#folder = "~/.config/darkfi/fud"
# P2P accept addresses
#p2p_accept = ["tls://127.0.0.1:13337"]
# P2P external addresses
#p2p_external = ["tls://127.0.0.1:13337"]
# Connection slots
#slots = 8
# Seed nodes to connect to
seeds = ["tls://lilith0.dark.fi:13337", "tls://lilith1.dark.fi:13337"]
# Peers to connect to
#peers = []
# Prefered transports for outbound connections
#transports = ["tls", "tcp"]
# Enable localnet hosts
#localnet = false
# Enable channel log
#channel_log = false
base_dir = "~/.local/share/darkfi/fud"
# JSON-RPC settings
[rpc]
@@ -39,4 +15,58 @@ seeds = ["tls://lilith0.dark.fi:13337", "tls://lilith1.dark.fi:13337"]
rpc_listen = "tcp://127.0.0.1:13336"
# Disabled RPC methods
#rpc_disabled_methods = []
rpc_disabled_methods = ["p2p.get_info"]
# P2P network settings
[net]
# Path to the P2P datastore
p2p_datastore = "~/.local/share/darkfi/fud"
# Path to a configured hostlist for saving known peers
hostlist = "~/.local/share/darkfi/fud/p2p_hostlist.tsv"
## P2P accept addresses
# inbound = ["tcp://0.0.0.0:13337"]
## Outbound connection slots
# outbound_connections = 8
## Inbound connection slots
#inbound_connections = 8
## White connection percent
# gold_connect_count = 2
## White connection percent
# white_connect_percent = 70
## Addresses we want to advertise to peers (optional)
## These should be reachable externally
#external_addrs = ["tcp+tls://my.resolveable.address:26661"]
## Seed nodes to connect to
seeds = [
#"tcp+tls://lilith0.dark.fi:5262",
#"tcp+tls://lilith1.dark.fi:5262",
#"tor://czzulj66rr5kq3uhidzn7fh4qvt3vaxaoldukuxnl5vipayuj7obo7id.onion:5263",
#"tor://vgbfkcu5hcnlnwd2lz26nfoa6g6quciyxwbftm6ivvrx74yvv5jnaoid.onion:5273",
]
## Manual peers to connect to
#peers = []
# Whitelisted transports for outbound connections
allowed_transports = ["tcp", "tcp+tls"]
#allowed_transports = ["tor"]
#allowed_transports = ["tor", "tor+tls"]
# Enable transport mixing
# Allows mixing transports, e.g. tor+tls:// connecting to tcp+tls://
# By default this is not allowed.
transport_mixing = false
# Nodes to avoid interacting with for the duration of the program, in the
# format ["host", ["scheme", "scheme"], [port, port]].
# If scheme is left empty it will default to "tcp+tls".
# If ports are left empty all ports from this peer will be blocked.
#blacklist = [["example.com", ["tcp"], [8551, 23331]]]

View File

@@ -38,11 +38,15 @@ use darkfi::{
async_daemonize, cli_desc,
geode::Geode,
net::{
self, connector::Connector, protocol::ProtocolVersion, session::Session,
settings::SettingsOpt, P2p, P2pPtr,
connector::Connector,
protocol::ProtocolVersion,
session::{Session, SESSION_DEFAULT},
settings::SettingsOpt,
P2p, P2pPtr,
},
rpc::{
jsonrpc::{ErrorCode, JsonError, JsonRequest, JsonResponse, JsonResult},
jsonrpc::{ErrorCode, JsonError, JsonRequest, JsonResponse, JsonResult, JsonSubscriber},
p2p_method::HandlerP2p,
server::{listen_and_serve, RequestHandler},
settings::{RpcSettings, RpcSettingsOpt},
},
@@ -102,10 +106,17 @@ pub struct Fud {
file_fetch_tx: channel::Sender<(blake3::Hash, Result<()>)>,
file_fetch_rx: channel::Receiver<(blake3::Hash, Result<()>)>,
file_fetch_end_tx: channel::Sender<(blake3::Hash, Result<()>)>,
file_fetch_end_rx: channel::Receiver<(blake3::Hash, Result<()>)>,
chunk_fetch_tx: channel::Sender<(blake3::Hash, Result<()>)>,
chunk_fetch_rx: channel::Receiver<(blake3::Hash, Result<()>)>,
chunk_fetch_end_tx: channel::Sender<(blake3::Hash, Result<()>)>,
chunk_fetch_end_rx: channel::Receiver<(blake3::Hash, Result<()>)>,
rpc_connections: Mutex<HashSet<StoppableTaskPtr>>,
/// dnet JSON-RPC subscriber
dnet_sub: JsonSubscriber,
}
#[async_trait]
@@ -117,7 +128,9 @@ impl RequestHandler<()> for Fud {
"put" => self.put(req.id, req.params).await,
"get" => self.get(req.id, req.params).await,
"dnet_switch" => self.dnet_switch(req.id, req.params).await,
"dnet.switch" => self.dnet_switch(req.id, req.params).await,
"dnet.subscribe_events" => self.dnet_subscribe_events(req.id, req.params).await,
"p2p.get_info" => self.p2p_get_info(req.id, req.params).await,
_ => JsonError::new(ErrorCode::MethodNotFound, None, req.id).into(),
}
}
@@ -194,7 +207,7 @@ impl Fud {
info!("Requested file {} not found in Geode, triggering fetch", file_hash);
self.file_fetch_tx.send((file_hash, Ok(()))).await.unwrap();
info!("Waiting for background file fetch task...");
let (i_file_hash, status) = self.file_fetch_rx.recv().await.unwrap();
let (i_file_hash, status) = self.file_fetch_end_rx.recv().await.unwrap();
match status {
Ok(()) => {
let ch_file = self.geode.get(&file_hash).await.unwrap();
@@ -244,18 +257,17 @@ impl Fud {
for chunk in missing_chunks {
self.chunk_fetch_tx.send((chunk, Ok(()))).await.unwrap();
let (i_chunk_hash, status) = self.chunk_fetch_rx.recv().await.unwrap();
let (i_chunk_hash, status) = self.chunk_fetch_end_rx.recv().await.unwrap();
match status {
Ok(()) => {
let m = FudChunkPut { chunk_hash: i_chunk_hash };
self.p2p.broadcast(&m).await;
break
}
Err(Error::GeodeChunkRouteNotFound) => continue,
Err(e) => panic!("{}", e),
}
};
}
let chunked_file = match self.geode.get(&file_hash).await {
@@ -296,13 +308,35 @@ impl Fud {
let switch = params[0].get::<bool>().unwrap();
if *switch {
self.p2p.dnet_enable().await;
self.p2p.dnet_enable();
} else {
self.p2p.dnet_disable().await;
self.p2p.dnet_disable();
}
JsonResponse::new(JsonValue::Boolean(true), id).into()
}
// RPCAPI:
// Initializes a subscription to p2p dnet events.
// Once a subscription is established, `fud` will send JSON-RPC notifications of
// new network events to the subscriber.
//
// --> {"jsonrpc": "2.0", "method": "dnet.subscribe_events", "params": [], "id": 1}
// <-- {"jsonrpc": "2.0", "method": "dnet.subscribe_events", "params": [`event`]}
pub async fn dnet_subscribe_events(&self, id: u16, params: JsonValue) -> JsonResult {
let params = params.get::<Vec<JsonValue>>().unwrap();
if !params.is_empty() {
return JsonError::new(ErrorCode::InvalidParams, None, id).into()
}
self.dnet_sub.clone().into()
}
}
impl HandlerP2p for Fud {
fn p2p(&self) -> P2pPtr {
self.p2p.clone()
}
}
/// Background task that receives file fetch requests and tries to
@@ -319,7 +353,10 @@ async fn fetch_file_task(fud: Arc<Fud>, executor: Arc<Executor<'_>>) -> Result<(
if peers.is_none() {
warn!("File {} not in routing table, cannot fetch", file_hash);
fud.file_fetch_tx.send((file_hash, Err(Error::GeodeFileRouteNotFound))).await.unwrap();
fud.file_fetch_end_tx
.send((file_hash, Err(Error::GeodeFileRouteNotFound)))
.await
.unwrap();
continue
}
@@ -335,12 +372,8 @@ async fn fetch_file_task(fud: Arc<Fud>, executor: Arc<Executor<'_>>) -> Result<(
let connector = Connector::new(fud.p2p.settings(), session_weak);
match connector.connect(peer).await {
Ok((url, channel)) => {
let proto_ver = ProtocolVersion::new(
channel.clone(),
fud.p2p.settings().clone(),
fud.p2p.hosts().clone(),
)
.await;
let proto_ver =
ProtocolVersion::new(channel.clone(), fud.p2p.settings().clone()).await;
let handshake_task = session_out.perform_handshake_protocols(
proto_ver,
@@ -357,6 +390,8 @@ async fn fetch_file_task(fud: Arc<Fud>, executor: Arc<Executor<'_>>) -> Result<(
continue
}
let msg_subsystem = channel.message_subsystem();
msg_subsystem.add_dispatch::<FudFileReply>().await;
let msg_subscriber = channel.subscribe_msg::<FudFileReply>().await.unwrap();
let request = FudFileRequest { file_hash };
@@ -400,12 +435,15 @@ async fn fetch_file_task(fud: Arc<Fud>, executor: Arc<Executor<'_>>) -> Result<(
if !found {
warn!("Did not manage to fetch {} file metadata", file_hash);
fud.file_fetch_tx.send((file_hash, Err(Error::GeodeFileRouteNotFound))).await.unwrap();
fud.file_fetch_end_tx
.send((file_hash, Err(Error::GeodeFileRouteNotFound)))
.await
.unwrap();
continue
}
info!("Successfully fetched {} file metadata", file_hash);
fud.file_fetch_tx.send((file_hash, Ok(()))).await.unwrap();
fud.file_fetch_end_tx.send((file_hash, Ok(()))).await.unwrap();
}
}
@@ -423,7 +461,7 @@ async fn fetch_chunk_task(fud: Arc<Fud>, executor: Arc<Executor<'_>>) -> Result<
if peers.is_none() {
warn!("Chunk {} not in routing table, cannot fetch", chunk_hash);
fud.chunk_fetch_tx
fud.chunk_fetch_end_tx
.send((chunk_hash, Err(Error::GeodeChunkRouteNotFound)))
.await
.unwrap();
@@ -442,12 +480,8 @@ async fn fetch_chunk_task(fud: Arc<Fud>, executor: Arc<Executor<'_>>) -> Result<
let connector = Connector::new(fud.p2p.settings(), session_weak);
match connector.connect(peer).await {
Ok((url, channel)) => {
let proto_ver = ProtocolVersion::new(
channel.clone(),
fud.p2p.settings().clone(),
fud.p2p.hosts().clone(),
)
.await;
let proto_ver =
ProtocolVersion::new(channel.clone(), fud.p2p.settings().clone()).await;
let handshake_task = session_out.perform_handshake_protocols(
proto_ver,
@@ -464,6 +498,8 @@ async fn fetch_chunk_task(fud: Arc<Fud>, executor: Arc<Executor<'_>>) -> Result<
continue
}
let msg_subsystem = channel.message_subsystem();
msg_subsystem.add_dispatch::<FudChunkReply>().await;
let msg_subscriber = channel.subscribe_msg::<FudChunkReply>().await.unwrap();
let request = FudChunkRequest { chunk_hash };
@@ -516,7 +552,7 @@ async fn fetch_chunk_task(fud: Arc<Fud>, executor: Arc<Executor<'_>>) -> Result<
if !found {
warn!("Did not manage to fetch {} chunk", chunk_hash);
fud.chunk_fetch_tx
fud.chunk_fetch_end_tx
.send((chunk_hash, Err(Error::GeodeChunkRouteNotFound)))
.await
.unwrap();
@@ -524,7 +560,7 @@ async fn fetch_chunk_task(fud: Arc<Fud>, executor: Arc<Executor<'_>>) -> Result<
}
info!("Successfully fetched {} chunk", chunk_hash);
fud.chunk_fetch_tx.send((chunk_hash, Ok(()))).await.unwrap();
fud.chunk_fetch_end_tx.send((chunk_hash, Ok(()))).await.unwrap();
}
}
@@ -541,11 +577,37 @@ async fn realmain(args: Args, ex: Arc<Executor<'static>>) -> Result<()> {
let geode = Geode::new(&basedir).await?;
info!("Instantiating P2P network");
let p2p = P2p::new(args.net.into(), ex.clone()).await;
let p2p = P2p::new(args.net.into(), ex.clone()).await?;
info!("Starting dnet subs task");
let dnet_sub = JsonSubscriber::new("dnet.subscribe_events");
let dnet_sub_ = dnet_sub.clone();
let p2p_ = p2p.clone();
let dnet_task = StoppableTask::new();
dnet_task.clone().start(
async move {
let dnet_sub = p2p_.dnet_subscribe().await;
loop {
let event = dnet_sub.receive().await;
debug!("Got dnet event: {:?}", event);
dnet_sub_.notify(vec![event.into()].into()).await;
}
},
|res| async {
match res {
Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ }
Err(e) => panic!("{}", e),
}
},
Error::DetachedTaskStopped,
ex.clone(),
);
// Daemon instantiation
let (file_fetch_tx, file_fetch_rx) = smol::channel::unbounded();
let (file_fetch_end_tx, file_fetch_end_rx) = smol::channel::unbounded();
let (chunk_fetch_tx, chunk_fetch_rx) = smol::channel::unbounded();
let (chunk_fetch_end_tx, chunk_fetch_end_rx) = smol::channel::unbounded();
let fud = Arc::new(Fud {
metadata_router,
chunks_router,
@@ -553,9 +615,14 @@ async fn realmain(args: Args, ex: Arc<Executor<'static>>) -> Result<()> {
geode,
file_fetch_tx,
file_fetch_rx,
file_fetch_end_tx,
file_fetch_end_rx,
chunk_fetch_tx,
chunk_fetch_rx,
chunk_fetch_end_tx,
chunk_fetch_end_rx,
rpc_connections: Mutex::new(HashSet::new()),
dnet_sub,
});
info!(target: "fud", "Starting fetch file task");
@@ -606,7 +673,7 @@ async fn realmain(args: Args, ex: Arc<Executor<'static>>) -> Result<()> {
let registry = p2p.protocol_registry();
let fud_ = fud.clone();
registry
.register(net::SESSION_NET, move |channel, p2p| {
.register(SESSION_DEFAULT, move |channel, p2p| {
let fud_ = fud_.clone();
async move { ProtocolFud::init(fud_, channel, p2p).await.unwrap() }
})

View File

@@ -20,7 +20,7 @@ use std::{collections::HashSet, sync::Arc};
use async_trait::async_trait;
use darkfi::{
geode::MAX_CHUNK_SIZE,
geode::{read_until_filled, MAX_CHUNK_SIZE},
impl_p2p_message,
net::{
metering::{DEFAULT_METERING_CONFIGURATION, MeteringConfiguration},
@@ -31,7 +31,7 @@ use darkfi::{
};
use darkfi_serial::{SerialDecodable, SerialEncodable};
use log::{debug, error};
use smol::{fs::File, io::AsyncReadExt, Executor};
use smol::{fs::File, Executor};
use url::Url;
use super::Fud;
@@ -282,6 +282,13 @@ impl ProtocolFud {
metadata_lock.insert(fud_file.file_hash, peers);
}
}
let excluded_peers: Vec<Url> = metadata_lock
.get(&fud_file.file_hash)
.unwrap_or(&HashSet::new())
.iter()
.cloned()
.collect();
drop(metadata_lock);
let mut chunks_lock = self.fud.chunks_router.write().await;
@@ -310,7 +317,8 @@ impl ProtocolFud {
self.p2p
.broadcast_with_exclude(
&route,
&[self.channel.address().clone(), fud_file.peer.clone()],
&[vec![self.channel.address().clone(), fud_file.peer.clone()], excluded_peers]
.concat(),
)
.await;
}
@@ -344,6 +352,12 @@ impl ProtocolFud {
chunks_lock.insert(fud_chunk.chunk_hash, peers);
}
}
let excluded_peers: Vec<Url> = chunks_lock
.get(&fud_chunk.chunk_hash)
.unwrap_or(&HashSet::new())
.iter()
.cloned()
.collect();
drop(chunks_lock);
// Relay this knowledge of the new route
@@ -353,7 +367,8 @@ impl ProtocolFud {
self.p2p
.broadcast_with_exclude(
&route,
&[self.channel.address().clone(), fud_chunk.peer.clone()],
&[vec![self.channel.address().clone(), fud_chunk.peer.clone()], excluded_peers]
.concat(),
)
.await;
}
@@ -434,9 +449,9 @@ impl ProtocolFud {
// The consistency should already be checked in Geode, so we're
// fine not checking and unwrapping here.
let mut buf = [0u8; MAX_CHUNK_SIZE];
let mut buf = vec![0u8; MAX_CHUNK_SIZE];
let mut chunk_fd = File::open(&chunk_path).await.unwrap();
let bytes_read = chunk_fd.read(&mut buf).await.unwrap();
let bytes_read = read_until_filled(&mut chunk_fd, &mut buf).await.unwrap();
let chunk_slice = &buf[..bytes_read];
let reply = FudChunkReply { chunk: chunk_slice.to_vec() };

View File

@@ -61,9 +61,11 @@ use std::{collections::HashSet, path::PathBuf};
use futures::AsyncRead;
use log::{debug, info, warn};
use smol::{
fs,
fs::{File, OpenOptions},
io::{AsyncBufReadExt, AsyncReadExt, AsyncSeekExt, AsyncWriteExt, BufReader, Cursor, SeekFrom},
fs::{self, File, OpenOptions},
io::{
self, AsyncBufReadExt, AsyncReadExt, AsyncSeekExt, AsyncWriteExt, BufReader, Cursor,
SeekFrom,
},
stream::StreamExt,
};
@@ -111,6 +113,23 @@ pub struct Geode {
chunks_path: PathBuf,
}
pub async fn read_until_filled(
mut stream: impl AsyncRead + Unpin,
buffer: &mut [u8],
) -> io::Result<usize> {
let mut total_bytes_read = 0;
while total_bytes_read < buffer.len() {
let bytes_read = stream.read(&mut buffer[total_bytes_read..]).await?;
if bytes_read == 0 {
break; // EOF reached
}
total_bytes_read += bytes_read;
}
Ok(total_bytes_read)
}
impl Geode {
/// Instantiate a new [`Geode`] object.
/// `base_path` defines the root directory where Geode will store its
@@ -183,7 +202,7 @@ impl Geode {
};
// Perform consistency check
let Ok(bytes_read) = chunk_fd.read(&mut buf).await else {
let Ok(bytes_read) = read_until_filled(&mut chunk_fd, &mut buf).await else {
deleted_chunk_paths.insert(chunk_path);
deleted_chunks.insert(chunk_hash);
buf = [0u8; MAX_CHUNK_SIZE];
@@ -269,7 +288,8 @@ impl Geode {
let mut chunk_hashes = vec![];
let mut buf = [0u8; MAX_CHUNK_SIZE];
while let Ok(bytes_read) = stream.read(&mut buf).await {
loop {
let bytes_read = read_until_filled(&mut stream, &mut buf).await?;
if bytes_read == 0 {
break
}
@@ -285,11 +305,11 @@ impl Geode {
// reading from disk.
let mut chunk_path = self.chunks_path.clone();
chunk_path.push(chunk_hash.to_hex().as_str());
let mut chunk_fd =
let chunk_fd =
OpenOptions::new().read(true).write(true).create(true).open(&chunk_path).await?;
let mut fs_buf = [0u8; MAX_CHUNK_SIZE];
let fs_bytes_read = chunk_fd.read(&mut fs_buf).await?;
let fs_bytes_read = read_until_filled(chunk_fd, &mut fs_buf).await?;
let fs_chunk_slice = &fs_buf[..fs_bytes_read];
let fs_chunk_hash = blake3::hash(fs_chunk_slice);
@@ -300,9 +320,16 @@ impl Geode {
chunk_path,
);
// Here the chunk is broken, so we'll truncate and write the new one.
let mut chunk_fd = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.open(&chunk_path)
.await?;
chunk_fd.set_len(0).await?;
chunk_fd.seek(SeekFrom::Start(0)).await?;
chunk_fd.write_all(chunk_slice).await?;
chunk_fd.flush().await?;
} else {
debug!(
target: "geode::insert()",
@@ -325,6 +352,8 @@ impl Geode {
file_fd.write(format!("{}\n", ch.to_hex().as_str()).as_bytes()).await?;
}
file_fd.flush().await?;
Ok((file_hash, chunk_hashes))
}
@@ -344,6 +373,7 @@ impl Geode {
for ch in chunk_hashes {
file_fd.write(format!("{}\n", ch.to_hex().as_str()).as_bytes()).await?;
}
file_fd.flush().await?;
Ok(())
}
@@ -356,7 +386,7 @@ impl Geode {
let mut cursor = Cursor::new(&stream);
let mut chunk = [0u8; MAX_CHUNK_SIZE];
let bytes_read = cursor.read(&mut chunk).await?;
let bytes_read = read_until_filled(&mut cursor, &mut chunk).await?;
let chunk_slice = &chunk[..bytes_read];
let chunk_hash = blake3::hash(chunk_slice);
@@ -364,6 +394,7 @@ impl Geode {
chunk_path.push(chunk_hash.to_hex().as_str());
let mut chunk_fd = File::create(&chunk_path).await?;
chunk_fd.write_all(chunk_slice).await?;
chunk_fd.flush().await?;
Ok(chunk_hash)
}
@@ -393,7 +424,7 @@ impl Geode {
let mut chunked_file = ChunkedFile::new(&chunk_hashes);
// Iterate over chunks and find which chunks we have available locally.
let mut buf = [0u8; MAX_CHUNK_SIZE];
let mut buf = vec![];
for (chunk_hash, chunk_path) in chunked_file.0.iter_mut() {
let mut c_path = self.chunks_path.clone();
c_path.push(chunk_hash.to_hex().as_str());
@@ -405,17 +436,17 @@ impl Geode {
// Perform chunk consistency check
let mut chunk_fd = File::open(&c_path).await?;
let bytes_read = chunk_fd.read(&mut buf).await?;
let bytes_read = chunk_fd.read_to_end(&mut buf).await?;
let chunk_slice = &buf[..bytes_read];
let hashed_chunk = blake3::hash(chunk_slice);
if &hashed_chunk != chunk_hash {
// The chunk is corrupted/inconsistent. Garbage collection should run.
buf = [0u8; MAX_CHUNK_SIZE];
buf = vec![];
continue
}
*chunk_path = Some(c_path);
buf = [0u8; MAX_CHUNK_SIZE];
buf = vec![];
}
Ok(chunked_file)
@@ -434,9 +465,9 @@ impl Geode {
}
// Perform chunk consistency check
let mut buf = [0u8; MAX_CHUNK_SIZE];
let mut buf = vec![];
let mut chunk_fd = File::open(&chunk_path).await?;
let bytes_read = chunk_fd.read(&mut buf).await?;
let bytes_read = chunk_fd.read_to_end(&mut buf).await?;
let chunk_slice = &buf[..bytes_read];
let hashed_chunk = blake3::hash(chunk_slice);
if &hashed_chunk != chunk_hash {