fu: add lookup command, rename list-buckets and list-seeders to buckets and seeders

This commit is contained in:
epiphany
2025-11-11 11:55:22 +00:00
parent c66d9a7fe6
commit 097e43afed

View File

@@ -98,16 +98,22 @@ enum Subcmd {
},
/// Get the current node buckets
ListBuckets {},
Buckets {},
/// Get the router state
ListSeeders {},
Seeders {},
/// Verify local files
Verify {
/// File hashes
files: Option<Vec<String>>,
},
/// Lookup seeders of a resource from the network
Lookup {
/// Resource hash
hash: String,
},
}
struct Fu {
@@ -453,7 +459,7 @@ impl Fu {
Ok(())
}
async fn list_buckets(&self) -> Result<()> {
async fn buckets(&self) -> Result<()> {
let req = JsonRequest::new("list_buckets", JsonValue::Array(vec![]));
let rep = self.rpc_client.request(req).await?;
let buckets: Vec<JsonValue> = rep.try_into().unwrap();
@@ -501,7 +507,7 @@ impl Fu {
Ok(())
}
async fn list_seeders(&self) -> Result<()> {
async fn seeders(&self) -> Result<()> {
let req = JsonRequest::new("list_seeders", JsonValue::Array(vec![]));
let rep = self.rpc_client.request(req).await?;
@@ -782,6 +788,95 @@ impl Fu {
self.rpc_client.request(req).await?;
Ok(())
}
async fn lookup(&self, hash: String, ex: ExecutorPtr) -> Result<()> {
let publisher = Publisher::new();
let subscription = Arc::new(publisher.clone().subscribe().await);
let subscriber_task = StoppableTask::new();
let publisher_ = publisher.clone();
let rpc_client_ = self.rpc_client.clone();
subscriber_task.clone().start(
async move {
let req = JsonRequest::new("subscribe", JsonValue::Array(vec![]));
rpc_client_.subscribe(req, publisher).await
},
move |res| async move {
match res {
Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ }
Err(e) => {
error!("{e}");
publisher_
.notify(JsonResult::Error(JsonError::new(
ErrorCode::InternalError,
None,
0,
)))
.await;
}
}
},
Error::DetachedTaskStopped,
ex.clone(),
);
let req =
JsonRequest::new("lookup", JsonValue::Array(vec![JsonValue::String(hash.clone())]));
let rpc_client_lookup = RpcClient::new(self.endpoint.clone(), ex.clone()).await?;
rpc_client_lookup.request(req).await?;
let print_seeders = |info: &HashMap<String, JsonValue>| {
let seeders = info.get("seeders").unwrap().get::<Vec<JsonValue>>().unwrap();
for seeder in seeders {
let seeder = seeder.get::<HashMap<String, JsonValue>>().unwrap();
let node: HashMap<String, JsonValue> =
seeder.get("node").unwrap().clone().try_into().unwrap();
let node_id: String = node.get("id").unwrap().clone().try_into().unwrap();
let addresses: Vec<JsonValue> =
node.get("addresses").unwrap().clone().try_into().unwrap();
let tree: Vec<_> = addresses
.into_iter()
.map(|addr| TreeNode::key(TryInto::<String>::try_into(addr).unwrap()))
.collect();
print_tree(node_id.as_str(), &tree);
}
};
loop {
match subscription.receive().await {
JsonResult::Notification(n) => {
let params = n.params.get::<HashMap<String, JsonValue>>().unwrap();
let info =
params.get("info").unwrap().get::<HashMap<String, JsonValue>>().unwrap();
let hash_ = match info.get("hash") {
Some(hash_value) => hash_value.get::<String>().unwrap(),
None => continue,
};
if hash != *hash_ {
continue;
}
match params.get("event").unwrap().get::<String>().unwrap().as_str() {
"seeders_found" => {
print_seeders(info);
break
}
_ => {}
}
}
JsonResult::Error(e) => {
return Err(Error::UnexpectedJsonRpc(format!("Got error from JSON-RPC: {e:?}")))
}
x => {
return Err(Error::UnexpectedJsonRpc(format!(
"Got unexpected data from JSON-RPC: {x:?}"
)))
}
}
}
Ok(())
}
}
fn main() -> Result<()> {
@@ -801,9 +896,10 @@ fn main() -> Result<()> {
Subcmd::Ls {} => fu.list_resources().await,
Subcmd::Watch {} => fu.watch(ex.clone()).await,
Subcmd::Rm { hash } => fu.remove(hash).await,
Subcmd::ListBuckets {} => fu.list_buckets().await,
Subcmd::ListSeeders {} => fu.list_seeders().await,
Subcmd::Buckets {} => fu.buckets().await,
Subcmd::Seeders {} => fu.seeders().await,
Subcmd::Verify { files } => fu.verify(files).await,
Subcmd::Lookup { hash } => fu.lookup(hash, ex.clone()).await,
}?;
Ok(())