From 3f033be9a706e8500903c4516935587d27f58564 Mon Sep 17 00:00:00 2001 From: skoupidi Date: Sat, 14 Jun 2025 14:52:53 +0300 Subject: [PATCH] drk/interactive: use a print queue instead of two way signal for background task --- Cargo.lock | 1 + bin/drk/Cargo.toml | 1 + bin/drk/src/interactive.rs | 234 +++++++++++++++++++++++-------------- bin/drk/src/lib.rs | 10 +- bin/drk/src/main.rs | 98 ++++++++-------- bin/drk/src/rpc.rs | 11 +- 6 files changed, 207 insertions(+), 148 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e6f06c93d..7849b7515 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2467,6 +2467,7 @@ dependencies = [ "darkfi_deployooor_contract", "darkfi_money_contract", "easy-parallel", + "futures", "lazy_static", "libc", "linenoise-rs", diff --git a/bin/drk/Cargo.toml b/bin/drk/Cargo.toml index 89c56b45d..cab5e6b49 100644 --- a/bin/drk/Cargo.toml +++ b/bin/drk/Cargo.toml @@ -20,6 +20,7 @@ darkfi-serial = "0.5.0" # Misc blake3 = "1.8.2" bs58 = "0.5.1" +futures = "0.3.31" lazy_static = "1.5.0" libc = "0.2" linenoise-rs = "0.1.1" diff --git a/bin/drk/src/interactive.rs b/bin/drk/src/interactive.rs index 539aef794..911824c3e 100644 --- a/bin/drk/src/interactive.rs +++ b/bin/drk/src/interactive.rs @@ -16,15 +16,22 @@ * along with this program. If not, see . */ -use std::{io::ErrorKind, mem::zeroed, ptr::null_mut, str::FromStr}; +use std::{io::ErrorKind, str::FromStr}; -use libc::{fd_set, select, timeval, FD_SET, FD_ZERO}; +use futures::{select, FutureExt}; +use libc::{fcntl, F_GETFL, F_SETFL, O_NONBLOCK}; use linenoise_rs::{ linenoise_history_add, linenoise_history_load, linenoise_history_save, linenoise_set_completion_callback, linenoise_set_hints_callback, LinenoiseState, }; +use smol::channel::{unbounded, Receiver, Sender}; -use darkfi::{cli_desc, system::StoppableTask, util::path::expand_path}; +use darkfi::{ + cli_desc, + system::{msleep, ExecutorPtr, StoppableTask, StoppableTaskPtr}, + util::path::expand_path, + Error, +}; use crate::{ cli_util::{generate_completions, kaching}, @@ -100,7 +107,7 @@ fn hints(buf: &str) -> Option<(String, i32, bool)> { /// Auxiliary function to start provided Drk as an interactive shell. /// Only sane/linenoise terminals are suported. -pub async fn interactive(drk: &Drk, history_path: &str) { +pub async fn interactive(drk: &Drk, history_path: &str, ex: &ExecutorPtr) { // Expand the history file path let history_path = match expand_path(history_path) { Ok(p) => p, @@ -127,87 +134,14 @@ pub async fn interactive(drk: &Drk, history_path: &str) { let mut subscription_active = false; let subscription_task = StoppableTask::new(); - // Create two bounded smol channels, so we can have 2 way - // communication between the shell thread and the background task. - let (_shell_sender, shell_receiver) = smol::channel::bounded::<()>(1); - let (background_sender, _background_receiver) = smol::channel::bounded::<()>(1); + // Create an unbounded smol channel, so we can have a printing + // queue the background task can submit messages to the shell. + let (shell_sender, shell_receiver) = unbounded(); // Start the interactive shell loop { - // Generate the linoise state structure - let mut state = match LinenoiseState::edit_start(-1, -1, "drk> ") { - Ok(s) => s, - Err(e) => { - eprintln!("Error while generating linenoise state: {e}"); - break - } - }; - - // Read until we get a line to process - let mut line = None; - loop { - let retval = unsafe { - // Setup read buffers - let mut readfds: fd_set = zeroed(); - FD_ZERO(&mut readfds); - FD_SET(state.get_fd(), &mut readfds); - - // Setup a 1 second timeout to check if background - // process wants to print. - let mut tv = timeval { tv_sec: 1, tv_usec: 0 }; - - // Wait timeout or input - select(state.get_fd() + 1, &mut readfds, null_mut(), null_mut(), &mut tv) - }; - - // Handle error - if retval == -1 { - eprintln!("Error while reading linenoise buffers"); - break - } - - // Check if background process wants to print anything - if shell_receiver.is_full() { - // Consume the channel message - if let Err(e) = shell_receiver.recv().await { - eprintln!("Error while reading shell receiver channel: {e}"); - break - } - - // Signal background task it can start printing - let _ = state.hide(); - if let Err(e) = background_sender.send(()).await { - eprintln!("Error while sending to background task channel: {e}"); - break - } - - // Wait signal that it finished - if let Err(e) = shell_receiver.recv().await { - eprintln!("Error while reading shell receiver channel: {e}"); - break - } - let _ = state.show(); - } - - // Check if we have a line to process - if retval <= 0 { - continue - } - - // Process linenoise feed - match state.edit_feed() { - Ok(Some(l)) => line = Some(l), - Ok(None) => { /* Do nothing */ } - Err(e) if e.kind() == ErrorKind::Interrupted => { /* Do nothing */ } - Err(e) if e.kind() == ErrorKind::WouldBlock => { - // Need more input, continue - continue; - } - Err(e) => eprintln!("Error while reading linenoise feed: {e}"), - } - break - } - let _ = state.edit_stop(); + // Wait for next line to process + let line = listen_for_line(&shell_receiver).await; // Grab input or end if Ctrl-D or Ctrl-C was pressed let Some(line) = line else { break }; @@ -233,7 +167,14 @@ pub async fn interactive(drk: &Drk, history_path: &str) { "ping" => handle_ping(drk).await, "completions" => handle_completions(&parts), "subscribe" => { - handle_subscribe(drk, &mut subscription_active, &subscription_task).await + handle_subscribe( + drk, + &mut subscription_active, + &subscription_task, + &shell_sender, + ex, + ) + .await } "unsubscribe" => handle_unsubscribe(&mut subscription_active, &subscription_task).await, "scan" => handle_scan(drk, &subscription_active, &parts).await, @@ -250,6 +191,97 @@ pub async fn interactive(drk: &Drk, history_path: &str) { let _ = linenoise_history_save(history_file); } +/// Auxiliary function to listen for linenoise input line and handle +/// background task messages. +async fn listen_for_line(shell_receiver: &Receiver>) -> Option { + // Generate the linoise state structure + let mut state = match LinenoiseState::edit_start(-1, -1, "drk> ") { + Ok(s) => s, + Err(e) => { + eprintln!("Error while generating linenoise state: {e}"); + return None + } + }; + + // Set stdin to non-blocking mode + let fd = state.get_fd(); + unsafe { + let flags = fcntl(fd, F_GETFL, 0); + fcntl(fd, F_SETFL, flags | O_NONBLOCK); + } + + // Read until we get a line to process + let mut line = None; + loop { + // Future that polls stdin for input + let input_future = async { + loop { + match state.edit_feed() { + Ok(Some(l)) => { + line = Some(l); + break + } + Ok(None) => break, + Err(e) if e.kind() == ErrorKind::Interrupted => break, + Err(e) if e.kind() == ErrorKind::WouldBlock => { + // No data available, yield and retry + msleep(10).await; + continue + } + Err(e) => { + eprintln!("Error while reading linenoise feed: {e}"); + break + } + } + } + }; + + // Future that polls the channel + let channel_future = async { + loop { + if !shell_receiver.is_empty() { + break + } + msleep(1000).await; + } + }; + + // Manage the futures + select! { + // When input is ready we break out the loop + _ = input_future.fuse() => break, + // Manage filled channel + _ = channel_future.fuse() => { + while !shell_receiver.is_empty() { + match shell_receiver.recv().await { + Ok(msg) => { + // Hide prompt, print output, show prompt again + let _ = state.hide(); + for line in msg { + println!("{line}\r"); + } + let _ = state.show(); + } + Err(e) => { + eprintln!("Error while reading shell receiver channel: {e}"); + break + } + } + } + } + } + } + + // Restore blocking mode + unsafe { + let flags = fcntl(fd, F_GETFL, 0); + fcntl(fd, F_SETFL, flags & !O_NONBLOCK); + } + + let _ = state.edit_stop(); + line +} + /// Auxiliary function to define the ping command handling. async fn handle_ping(drk: &Drk) { if let Err(e) = drk.ping().await { @@ -274,7 +306,9 @@ fn handle_completions(parts: &[&str]) { async fn handle_subscribe( drk: &Drk, subscription_active: &mut bool, - _subscription_task: &StoppableTask, + subscription_task: &StoppableTaskPtr, + shell_sender: &Sender>, + ex: &ExecutorPtr, ) { if *subscription_active { println!("Subscription is already active!") @@ -286,13 +320,41 @@ async fn handle_subscribe( } println!("Finished scanning blockchain"); - // TODO: subscribe + // Start the subcristion task + // TODO: use actual subscribe not a dummy task + let shell_sender_ = shell_sender.clone(); + subscription_task.clone().start( + async move { + loop { + msleep(750).await; + let line = String::from("This is a single line dummy message"); + if shell_sender_.send(vec![line]).await.is_err() { + break; + } + msleep(750).await; + let line0 = String::from("This is the first line of a multiline dummy message"); + let line1 = String::from("This is the second line of a multiline dummy message"); + if shell_sender_.send(vec![line0, line1]).await.is_err() { + break; + } + } + Ok(()) + }, + |res| async { + match res { + Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ } + Err(e) => println!("Failed starting dnet subs task: {e}"), + } + }, + Error::DetachedTaskStopped, + ex.clone(), + ); *subscription_active = true; } /// Auxiliary function to define the unsubscribe command handling. -async fn handle_unsubscribe(subscription_active: &mut bool, subscription_task: &StoppableTask) { +async fn handle_unsubscribe(subscription_active: &mut bool, subscription_task: &StoppableTaskPtr) { if !*subscription_active { println!("Subscription is already inactive!") } diff --git a/bin/drk/src/lib.rs b/bin/drk/src/lib.rs index 546c55206..049265237 100644 --- a/bin/drk/src/lib.rs +++ b/bin/drk/src/lib.rs @@ -16,11 +16,11 @@ * along with this program. If not, see . */ -use std::{fs, sync::Arc}; +use std::fs::create_dir_all; use url::Url; -use darkfi::{rpc::client::RpcClient, util::path::expand_path, Error, Result}; +use darkfi::{rpc::client::RpcClient, system::ExecutorPtr, util::path::expand_path, Error, Result}; /// Error codes pub mod error; @@ -85,7 +85,7 @@ impl Drk { wallet_path: String, wallet_pass: String, endpoint: Option, - ex: Arc>, + ex: &ExecutorPtr, fun: bool, ) -> Result { // Initialize blockchain cache database @@ -99,7 +99,7 @@ impl Drk { let wallet_path = expand_path(&wallet_path)?; if !wallet_path.exists() { if let Some(parent) = wallet_path.parent() { - fs::create_dir_all(parent)?; + create_dir_all(parent)?; } } let Ok(wallet) = WalletDb::new(Some(wallet_path), Some(&wallet_pass)) else { @@ -108,7 +108,7 @@ impl Drk { // Initialize rpc client let rpc_client = if let Some(endpoint) = endpoint { - Some(RpcClient::new(endpoint, ex).await?) + Some(RpcClient::new(endpoint, ex.clone()).await?) } else { None }; diff --git a/bin/drk/src/main.rs b/bin/drk/src/main.rs index 7b59144e2..652b2e3a7 100644 --- a/bin/drk/src/main.rs +++ b/bin/drk/src/main.rs @@ -20,7 +20,6 @@ use std::{ io::{stdin, Read}, process::exit, str::FromStr, - sync::Arc, }; use log::info; @@ -32,6 +31,7 @@ use url::Url; use darkfi::{ async_daemonize, cli_desc, + system::ExecutorPtr, util::{ encoding::base64, parse::{decode_base10, encode_base10}, @@ -625,7 +625,7 @@ async fn new_wallet( wallet_path: String, wallet_pass: String, endpoint: Option, - ex: Arc>, + ex: &ExecutorPtr, fun: bool, ) -> Drk { // Script kiddies protection @@ -644,7 +644,7 @@ async fn new_wallet( } async_daemonize!(realmain); -async fn realmain(args: Args, ex: Arc>) -> Result<()> { +async fn realmain(args: Args, ex: ExecutorPtr) -> Result<()> { // Grab blockchain network configuration let blockchain_config = match args.network.as_str() { "localnet" => parse_blockchain_config(args.config, "localnet").await?, @@ -663,11 +663,11 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> { blockchain_config.wallet_path, blockchain_config.wallet_pass, Some(blockchain_config.endpoint), - ex, + &ex, args.fun, ) .await; - interactive(&drk, &blockchain_config.history_path).await; + interactive(&drk, &blockchain_config.history_path, &ex).await; drk.stop_rpc_client().await } @@ -686,7 +686,7 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> { blockchain_config.wallet_path, blockchain_config.wallet_pass, Some(blockchain_config.endpoint), - ex, + &ex, args.fun, ) .await; @@ -729,7 +729,7 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> { blockchain_config.wallet_path, blockchain_config.wallet_pass, None, - ex, + &ex, args.fun, ) .await; @@ -970,7 +970,7 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> { blockchain_config.wallet_path, blockchain_config.wallet_pass, None, - ex, + &ex, args.fun, ) .await; @@ -1006,7 +1006,7 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> { blockchain_config.wallet_path, blockchain_config.wallet_pass, None, - ex, + &ex, args.fun, ) .await; @@ -1024,7 +1024,7 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> { blockchain_config.wallet_path, blockchain_config.wallet_pass, Some(blockchain_config.endpoint), - ex, + &ex, args.fun, ) .await; @@ -1105,7 +1105,7 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> { blockchain_config.wallet_path, blockchain_config.wallet_pass, Some(blockchain_config.endpoint), - ex, + &ex, args.fun, ) .await; @@ -1139,7 +1139,7 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> { blockchain_config.wallet_path, blockchain_config.wallet_pass, Some(blockchain_config.endpoint), - ex, + &ex, args.fun, ) .await; @@ -1168,7 +1168,7 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> { blockchain_config.wallet_path, blockchain_config.wallet_pass, None, - ex, + &ex, args.fun, ) .await; @@ -1188,7 +1188,7 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> { blockchain_config.wallet_path, blockchain_config.wallet_pass, None, - ex, + &ex, args.fun, ) .await; @@ -1241,7 +1241,7 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> { blockchain_config.wallet_path, blockchain_config.wallet_pass, None, - ex, + &ex, args.fun, ) .await; @@ -1307,7 +1307,7 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> { blockchain_config.wallet_path, blockchain_config.wallet_pass, None, - ex, + &ex, args.fun, ) .await; @@ -1329,7 +1329,7 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> { blockchain_config.wallet_path, blockchain_config.wallet_pass, None, - ex, + &ex, args.fun, ) .await; @@ -1347,7 +1347,7 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> { blockchain_config.wallet_path, blockchain_config.wallet_pass, None, - ex, + &ex, args.fun, ) .await; @@ -1365,7 +1365,7 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> { blockchain_config.wallet_path, blockchain_config.wallet_pass, None, - ex, + &ex, args.fun, ) .await; @@ -1417,7 +1417,7 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> { blockchain_config.wallet_path, blockchain_config.wallet_pass, Some(blockchain_config.endpoint), - ex, + &ex, args.fun, ) .await; @@ -1447,7 +1447,7 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> { blockchain_config.wallet_path, blockchain_config.wallet_pass, Some(blockchain_config.endpoint), - ex, + &ex, args.fun, ) .await; @@ -1529,7 +1529,7 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> { blockchain_config.wallet_path, blockchain_config.wallet_pass, Some(blockchain_config.endpoint), - ex, + &ex, args.fun, ) .await; @@ -1574,7 +1574,7 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> { blockchain_config.wallet_path, blockchain_config.wallet_pass, None, - ex, + &ex, args.fun, ) .await; @@ -1601,7 +1601,7 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> { blockchain_config.wallet_path, blockchain_config.wallet_pass, Some(blockchain_config.endpoint), - ex, + &ex, args.fun, ) .await; @@ -1809,7 +1809,7 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> { blockchain_config.wallet_path, blockchain_config.wallet_pass, None, - ex, + &ex, args.fun, ) .await; @@ -1876,7 +1876,7 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> { blockchain_config.wallet_path, blockchain_config.wallet_pass, Some(blockchain_config.endpoint), - ex, + &ex, args.fun, ) .await; @@ -1906,7 +1906,7 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> { blockchain_config.wallet_path, blockchain_config.wallet_pass, Some(blockchain_config.endpoint), - ex, + &ex, args.fun, ) .await; @@ -1966,7 +1966,7 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> { blockchain_config.wallet_path, blockchain_config.wallet_pass, Some(blockchain_config.endpoint), - ex, + &ex, args.fun, ) .await; @@ -1996,7 +1996,7 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> { blockchain_config.wallet_path, blockchain_config.wallet_pass, Some(blockchain_config.endpoint), - ex, + &ex, args.fun, ) .await; @@ -2030,12 +2030,12 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> { blockchain_config.wallet_path, blockchain_config.wallet_pass, Some(blockchain_config.endpoint.clone()), - ex.clone(), + &ex, args.fun, ) .await; - if let Err(e) = drk.subscribe_blocks(blockchain_config.endpoint, ex).await { + if let Err(e) = drk.subscribe_blocks(blockchain_config.endpoint, &ex).await { eprintln!("Block subscription failed: {e:?}"); exit(2); } @@ -2049,7 +2049,7 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> { blockchain_config.wallet_path, blockchain_config.wallet_pass, Some(blockchain_config.endpoint), - ex, + &ex, args.fun, ) .await; @@ -2079,7 +2079,7 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> { blockchain_config.wallet_path, blockchain_config.wallet_pass, Some(blockchain_config.endpoint), - ex, + &ex, args.fun, ) .await; @@ -2121,7 +2121,7 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> { blockchain_config.wallet_path, blockchain_config.wallet_pass, Some(blockchain_config.endpoint), - ex, + &ex, args.fun, ) .await; @@ -2146,7 +2146,7 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> { blockchain_config.wallet_path, blockchain_config.wallet_pass, None, - ex, + &ex, args.fun, ) .await; @@ -2205,7 +2205,7 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> { blockchain_config.wallet_path, blockchain_config.wallet_pass, None, - ex, + &ex, args.fun, ) .await; @@ -2224,7 +2224,7 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> { blockchain_config.wallet_path, blockchain_config.wallet_pass, None, - ex, + &ex, args.fun, ) .await; @@ -2290,7 +2290,7 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> { blockchain_config.wallet_path, blockchain_config.wallet_pass, None, - ex, + &ex, args.fun, ) .await; @@ -2319,7 +2319,7 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> { blockchain_config.wallet_path, blockchain_config.wallet_pass, None, - ex, + &ex, args.fun, ) .await; @@ -2348,7 +2348,7 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> { blockchain_config.wallet_path, blockchain_config.wallet_pass, None, - ex, + &ex, args.fun, ) .await; @@ -2384,7 +2384,7 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> { blockchain_config.wallet_path, blockchain_config.wallet_pass, None, - ex, + &ex, args.fun, ) .await; @@ -2400,7 +2400,7 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> { blockchain_config.wallet_path, blockchain_config.wallet_pass, None, - ex, + &ex, args.fun, ) .await; @@ -2418,7 +2418,7 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> { blockchain_config.wallet_path, blockchain_config.wallet_pass, None, - ex, + &ex, args.fun, ) .await; @@ -2471,7 +2471,7 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> { blockchain_config.wallet_path, blockchain_config.wallet_pass, Some(blockchain_config.endpoint), - ex, + &ex, args.fun, ) .await; @@ -2549,7 +2549,7 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> { blockchain_config.wallet_path, blockchain_config.wallet_pass, Some(blockchain_config.endpoint), - ex, + &ex, args.fun, ) .await; @@ -2582,7 +2582,7 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> { blockchain_config.wallet_path, blockchain_config.wallet_pass, None, - ex, + &ex, args.fun, ) .await; @@ -2601,7 +2601,7 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> { blockchain_config.wallet_path, blockchain_config.wallet_pass, None, - ex, + &ex, args.fun, ) .await; @@ -2638,7 +2638,7 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> { blockchain_config.wallet_path, blockchain_config.wallet_pass, Some(blockchain_config.endpoint), - ex, + &ex, args.fun, ) .await; @@ -2667,7 +2667,7 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> { blockchain_config.wallet_path, blockchain_config.wallet_pass, Some(blockchain_config.endpoint), - ex, + &ex, args.fun, ) .await; diff --git a/bin/drk/src/rpc.rs b/bin/drk/src/rpc.rs index 58ba1c88e..2f716adc0 100644 --- a/bin/drk/src/rpc.rs +++ b/bin/drk/src/rpc.rs @@ -18,7 +18,6 @@ use std::{ collections::{BTreeMap, HashMap}, - sync::Arc, time::Instant, }; @@ -31,7 +30,7 @@ use darkfi::{ jsonrpc::{ErrorCode, JsonError, JsonRequest, JsonResult}, util::JsonValue, }, - system::{Publisher, StoppableTask}, + system::{ExecutorPtr, Publisher, StoppableTask}, tx::Transaction, util::encoding::base64, Error, Result, @@ -136,11 +135,7 @@ impl Drk { /// to its previous height and then scan it. We assume that the blocks /// up to that point are unchanged, since darkfid will just broadcast /// the sequence after the reorg. - pub async fn subscribe_blocks( - &self, - endpoint: Url, - ex: Arc>, - ) -> Result<()> { + pub async fn subscribe_blocks(&self, endpoint: Url, ex: &ExecutorPtr) -> Result<()> { // Grab last confirmed block height let (last_confirmed_height, _) = self.get_last_confirmed_block().await?; @@ -204,7 +199,7 @@ impl Drk { } }, Error::RpcServerStopped, - ex, + ex.clone(), ); println!("Detached subscription to background"); println!("All is good. Waiting for block notifications...");