drk/interactive: handle zombie subscriptions

This commit is contained in:
skoupidi
2025-10-27 16:26:14 +02:00
parent 3d62bd6a2c
commit bf28b41023
2 changed files with 24 additions and 18 deletions

View File

@@ -399,10 +399,10 @@ pub async fn interactive(drk: &DrkPtr, endpoint: &Url, history_path: &str, ex: &
// where entries are separated by newlines.
let _ = linenoise_history_load(history_file);
// Create a detached task to use for block subscription
// Create two detached tasks to use for block subscription
let mut subscription_active = false;
let mut snooze_active = false;
let subscription_task = StoppableTask::new();
let subscription_tasks = [StoppableTask::new(), StoppableTask::new()];
// Create an unbounded smol channel, so we can have a printing
// queue the background task can submit messages to the shell.
@@ -532,7 +532,7 @@ pub async fn interactive(drk: &DrkPtr, endpoint: &Url, history_path: &str, ex: &
drk,
endpoint,
&mut subscription_active,
&subscription_task,
&subscription_tasks,
&shell_sender,
ex,
&mut output,
@@ -540,7 +540,7 @@ pub async fn interactive(drk: &DrkPtr, endpoint: &Url, history_path: &str, ex: &
.await
}
"unsubscribe" => {
handle_unsubscribe(&mut subscription_active, &subscription_task, &mut output)
handle_unsubscribe(&mut subscription_active, &subscription_tasks, &mut output)
.await
}
"snooze" => snooze_active = true,
@@ -608,9 +608,10 @@ pub async fn interactive(drk: &DrkPtr, endpoint: &Url, history_path: &str, ex: &
print_output(&output);
}
// Stop the subscription task if its active
// Stop the subscription tasks if they are active
if subscription_active {
subscription_task.stop().await;
subscription_tasks[0].stop().await;
subscription_tasks[1].stop().await;
}
// Write history file
@@ -2358,7 +2359,7 @@ async fn handle_subscribe(
drk: &DrkPtr,
endpoint: &Url,
subscription_active: &mut bool,
subscription_task: &StoppableTaskPtr,
subscription_tasks: &[StoppableTaskPtr; 2],
shell_sender: &Sender<Vec<String>>,
ex: &ExecutorPtr,
output: &mut Vec<String>,
@@ -2370,11 +2371,12 @@ async fn handle_subscribe(
// Start the subcristion task
let drk_ = drk.clone();
let endpoint_ = endpoint.clone();
let rpc_task_ = subscription_tasks[1].clone();
let shell_sender_ = shell_sender.clone();
let endpoint_ = endpoint.clone();
let ex_ = ex.clone();
subscription_task.clone().start(
async move { subscribe_blocks(&drk_, shell_sender_, endpoint_, &ex_).await },
subscription_tasks[0].clone().start(
async move { subscribe_blocks(&drk_, rpc_task_, shell_sender_, endpoint_, &ex_).await },
|res| async {
match res {
Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ }
@@ -2391,14 +2393,15 @@ async fn handle_subscribe(
/// Auxiliary function to define the unsubscribe command handling.
async fn handle_unsubscribe(
subscription_active: &mut bool,
subscription_task: &StoppableTaskPtr,
subscription_tasks: &[StoppableTaskPtr; 2],
output: &mut Vec<String>,
) {
if !*subscription_active {
output.push(String::from("Subscription is already inactive!"));
return
}
subscription_task.stop().await;
subscription_tasks[0].stop().await;
subscription_tasks[1].stop().await;
*subscription_active = false;
}

View File

@@ -18,6 +18,7 @@
use std::{
collections::{BTreeMap, HashMap},
sync::Arc,
time::Instant,
};
@@ -31,7 +32,7 @@ use darkfi::{
jsonrpc::{ErrorCode, JsonError, JsonRequest, JsonResult},
util::JsonValue,
},
system::{ExecutorPtr, Publisher, StoppableTask},
system::{ExecutorPtr, Publisher, StoppableTaskPtr},
tx::Transaction,
util::encoding::base64,
Error, Result,
@@ -582,6 +583,7 @@ impl Drk {
/// the sequence after the reorg.
pub async fn subscribe_blocks(
drk: &DrkPtr,
rpc_task: StoppableTaskPtr,
shell_sender: Sender<Vec<String>>,
endpoint: Url,
ex: &ExecutorPtr,
@@ -638,17 +640,18 @@ pub async fn subscribe_blocks(
let publisher = Publisher::new();
let subscription = publisher.clone().subscribe().await;
let _publisher = publisher.clone();
let _ex = ex.clone();
StoppableTask::new().start(
let rpc_client = Arc::new(RpcClient::new(endpoint, ex.clone()).await?);
let rpc_client_ = rpc_client.clone();
rpc_task.start(
// Weird hack to prevent lifetimes hell
async move {
let rpc_client = RpcClient::new(endpoint, _ex).await?;
let req = JsonRequest::new("blockchain.subscribe_blocks", JsonValue::Array(vec![]));
rpc_client.subscribe(req, _publisher).await
rpc_client_.subscribe(req, _publisher).await
},
|res| async move {
rpc_client.stop().await;
match res {
Ok(()) => { /* Do nothing */ }
Ok(()) | Err(Error::DetachedTaskStopped) | Err(Error::RpcServerStopped) => { /* Do nothing */ }
Err(e) => {
eprintln!("[subscribe_blocks] JSON-RPC server error: {e}");
publisher