diff --git a/Cargo.lock b/Cargo.lock
index e6f06c93d..7849b7515 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -2467,6 +2467,7 @@ dependencies = [
"darkfi_deployooor_contract",
"darkfi_money_contract",
"easy-parallel",
+ "futures",
"lazy_static",
"libc",
"linenoise-rs",
diff --git a/bin/drk/Cargo.toml b/bin/drk/Cargo.toml
index 89c56b45d..cab5e6b49 100644
--- a/bin/drk/Cargo.toml
+++ b/bin/drk/Cargo.toml
@@ -20,6 +20,7 @@ darkfi-serial = "0.5.0"
# Misc
blake3 = "1.8.2"
bs58 = "0.5.1"
+futures = "0.3.31"
lazy_static = "1.5.0"
libc = "0.2"
linenoise-rs = "0.1.1"
diff --git a/bin/drk/src/interactive.rs b/bin/drk/src/interactive.rs
index 539aef794..911824c3e 100644
--- a/bin/drk/src/interactive.rs
+++ b/bin/drk/src/interactive.rs
@@ -16,15 +16,22 @@
* along with this program. If not, see .
*/
-use std::{io::ErrorKind, mem::zeroed, ptr::null_mut, str::FromStr};
+use std::{io::ErrorKind, str::FromStr};
-use libc::{fd_set, select, timeval, FD_SET, FD_ZERO};
+use futures::{select, FutureExt};
+use libc::{fcntl, F_GETFL, F_SETFL, O_NONBLOCK};
use linenoise_rs::{
linenoise_history_add, linenoise_history_load, linenoise_history_save,
linenoise_set_completion_callback, linenoise_set_hints_callback, LinenoiseState,
};
+use smol::channel::{unbounded, Receiver, Sender};
-use darkfi::{cli_desc, system::StoppableTask, util::path::expand_path};
+use darkfi::{
+ cli_desc,
+ system::{msleep, ExecutorPtr, StoppableTask, StoppableTaskPtr},
+ util::path::expand_path,
+ Error,
+};
use crate::{
cli_util::{generate_completions, kaching},
@@ -100,7 +107,7 @@ fn hints(buf: &str) -> Option<(String, i32, bool)> {
/// Auxiliary function to start provided Drk as an interactive shell.
/// Only sane/linenoise terminals are suported.
-pub async fn interactive(drk: &Drk, history_path: &str) {
+pub async fn interactive(drk: &Drk, history_path: &str, ex: &ExecutorPtr) {
// Expand the history file path
let history_path = match expand_path(history_path) {
Ok(p) => p,
@@ -127,87 +134,14 @@ pub async fn interactive(drk: &Drk, history_path: &str) {
let mut subscription_active = false;
let subscription_task = StoppableTask::new();
- // Create two bounded smol channels, so we can have 2 way
- // communication between the shell thread and the background task.
- let (_shell_sender, shell_receiver) = smol::channel::bounded::<()>(1);
- let (background_sender, _background_receiver) = smol::channel::bounded::<()>(1);
+ // Create an unbounded smol channel, so we can have a printing
+ // queue the background task can submit messages to the shell.
+ let (shell_sender, shell_receiver) = unbounded();
// Start the interactive shell
loop {
- // Generate the linoise state structure
- let mut state = match LinenoiseState::edit_start(-1, -1, "drk> ") {
- Ok(s) => s,
- Err(e) => {
- eprintln!("Error while generating linenoise state: {e}");
- break
- }
- };
-
- // Read until we get a line to process
- let mut line = None;
- loop {
- let retval = unsafe {
- // Setup read buffers
- let mut readfds: fd_set = zeroed();
- FD_ZERO(&mut readfds);
- FD_SET(state.get_fd(), &mut readfds);
-
- // Setup a 1 second timeout to check if background
- // process wants to print.
- let mut tv = timeval { tv_sec: 1, tv_usec: 0 };
-
- // Wait timeout or input
- select(state.get_fd() + 1, &mut readfds, null_mut(), null_mut(), &mut tv)
- };
-
- // Handle error
- if retval == -1 {
- eprintln!("Error while reading linenoise buffers");
- break
- }
-
- // Check if background process wants to print anything
- if shell_receiver.is_full() {
- // Consume the channel message
- if let Err(e) = shell_receiver.recv().await {
- eprintln!("Error while reading shell receiver channel: {e}");
- break
- }
-
- // Signal background task it can start printing
- let _ = state.hide();
- if let Err(e) = background_sender.send(()).await {
- eprintln!("Error while sending to background task channel: {e}");
- break
- }
-
- // Wait signal that it finished
- if let Err(e) = shell_receiver.recv().await {
- eprintln!("Error while reading shell receiver channel: {e}");
- break
- }
- let _ = state.show();
- }
-
- // Check if we have a line to process
- if retval <= 0 {
- continue
- }
-
- // Process linenoise feed
- match state.edit_feed() {
- Ok(Some(l)) => line = Some(l),
- Ok(None) => { /* Do nothing */ }
- Err(e) if e.kind() == ErrorKind::Interrupted => { /* Do nothing */ }
- Err(e) if e.kind() == ErrorKind::WouldBlock => {
- // Need more input, continue
- continue;
- }
- Err(e) => eprintln!("Error while reading linenoise feed: {e}"),
- }
- break
- }
- let _ = state.edit_stop();
+ // Wait for next line to process
+ let line = listen_for_line(&shell_receiver).await;
// Grab input or end if Ctrl-D or Ctrl-C was pressed
let Some(line) = line else { break };
@@ -233,7 +167,14 @@ pub async fn interactive(drk: &Drk, history_path: &str) {
"ping" => handle_ping(drk).await,
"completions" => handle_completions(&parts),
"subscribe" => {
- handle_subscribe(drk, &mut subscription_active, &subscription_task).await
+ handle_subscribe(
+ drk,
+ &mut subscription_active,
+ &subscription_task,
+ &shell_sender,
+ ex,
+ )
+ .await
}
"unsubscribe" => handle_unsubscribe(&mut subscription_active, &subscription_task).await,
"scan" => handle_scan(drk, &subscription_active, &parts).await,
@@ -250,6 +191,97 @@ pub async fn interactive(drk: &Drk, history_path: &str) {
let _ = linenoise_history_save(history_file);
}
+/// Auxiliary function to listen for linenoise input line and handle
+/// background task messages.
+async fn listen_for_line(shell_receiver: &Receiver>) -> Option {
+ // Generate the linoise state structure
+ let mut state = match LinenoiseState::edit_start(-1, -1, "drk> ") {
+ Ok(s) => s,
+ Err(e) => {
+ eprintln!("Error while generating linenoise state: {e}");
+ return None
+ }
+ };
+
+ // Set stdin to non-blocking mode
+ let fd = state.get_fd();
+ unsafe {
+ let flags = fcntl(fd, F_GETFL, 0);
+ fcntl(fd, F_SETFL, flags | O_NONBLOCK);
+ }
+
+ // Read until we get a line to process
+ let mut line = None;
+ loop {
+ // Future that polls stdin for input
+ let input_future = async {
+ loop {
+ match state.edit_feed() {
+ Ok(Some(l)) => {
+ line = Some(l);
+ break
+ }
+ Ok(None) => break,
+ Err(e) if e.kind() == ErrorKind::Interrupted => break,
+ Err(e) if e.kind() == ErrorKind::WouldBlock => {
+ // No data available, yield and retry
+ msleep(10).await;
+ continue
+ }
+ Err(e) => {
+ eprintln!("Error while reading linenoise feed: {e}");
+ break
+ }
+ }
+ }
+ };
+
+ // Future that polls the channel
+ let channel_future = async {
+ loop {
+ if !shell_receiver.is_empty() {
+ break
+ }
+ msleep(1000).await;
+ }
+ };
+
+ // Manage the futures
+ select! {
+ // When input is ready we break out the loop
+ _ = input_future.fuse() => break,
+ // Manage filled channel
+ _ = channel_future.fuse() => {
+ while !shell_receiver.is_empty() {
+ match shell_receiver.recv().await {
+ Ok(msg) => {
+ // Hide prompt, print output, show prompt again
+ let _ = state.hide();
+ for line in msg {
+ println!("{line}\r");
+ }
+ let _ = state.show();
+ }
+ Err(e) => {
+ eprintln!("Error while reading shell receiver channel: {e}");
+ break
+ }
+ }
+ }
+ }
+ }
+ }
+
+ // Restore blocking mode
+ unsafe {
+ let flags = fcntl(fd, F_GETFL, 0);
+ fcntl(fd, F_SETFL, flags & !O_NONBLOCK);
+ }
+
+ let _ = state.edit_stop();
+ line
+}
+
/// Auxiliary function to define the ping command handling.
async fn handle_ping(drk: &Drk) {
if let Err(e) = drk.ping().await {
@@ -274,7 +306,9 @@ fn handle_completions(parts: &[&str]) {
async fn handle_subscribe(
drk: &Drk,
subscription_active: &mut bool,
- _subscription_task: &StoppableTask,
+ subscription_task: &StoppableTaskPtr,
+ shell_sender: &Sender>,
+ ex: &ExecutorPtr,
) {
if *subscription_active {
println!("Subscription is already active!")
@@ -286,13 +320,41 @@ async fn handle_subscribe(
}
println!("Finished scanning blockchain");
- // TODO: subscribe
+ // Start the subcristion task
+ // TODO: use actual subscribe not a dummy task
+ let shell_sender_ = shell_sender.clone();
+ subscription_task.clone().start(
+ async move {
+ loop {
+ msleep(750).await;
+ let line = String::from("This is a single line dummy message");
+ if shell_sender_.send(vec![line]).await.is_err() {
+ break;
+ }
+ msleep(750).await;
+ let line0 = String::from("This is the first line of a multiline dummy message");
+ let line1 = String::from("This is the second line of a multiline dummy message");
+ if shell_sender_.send(vec![line0, line1]).await.is_err() {
+ break;
+ }
+ }
+ Ok(())
+ },
+ |res| async {
+ match res {
+ Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ }
+ Err(e) => println!("Failed starting dnet subs task: {e}"),
+ }
+ },
+ Error::DetachedTaskStopped,
+ ex.clone(),
+ );
*subscription_active = true;
}
/// Auxiliary function to define the unsubscribe command handling.
-async fn handle_unsubscribe(subscription_active: &mut bool, subscription_task: &StoppableTask) {
+async fn handle_unsubscribe(subscription_active: &mut bool, subscription_task: &StoppableTaskPtr) {
if !*subscription_active {
println!("Subscription is already inactive!")
}
diff --git a/bin/drk/src/lib.rs b/bin/drk/src/lib.rs
index 546c55206..049265237 100644
--- a/bin/drk/src/lib.rs
+++ b/bin/drk/src/lib.rs
@@ -16,11 +16,11 @@
* along with this program. If not, see .
*/
-use std::{fs, sync::Arc};
+use std::fs::create_dir_all;
use url::Url;
-use darkfi::{rpc::client::RpcClient, util::path::expand_path, Error, Result};
+use darkfi::{rpc::client::RpcClient, system::ExecutorPtr, util::path::expand_path, Error, Result};
/// Error codes
pub mod error;
@@ -85,7 +85,7 @@ impl Drk {
wallet_path: String,
wallet_pass: String,
endpoint: Option,
- ex: Arc>,
+ ex: &ExecutorPtr,
fun: bool,
) -> Result {
// Initialize blockchain cache database
@@ -99,7 +99,7 @@ impl Drk {
let wallet_path = expand_path(&wallet_path)?;
if !wallet_path.exists() {
if let Some(parent) = wallet_path.parent() {
- fs::create_dir_all(parent)?;
+ create_dir_all(parent)?;
}
}
let Ok(wallet) = WalletDb::new(Some(wallet_path), Some(&wallet_pass)) else {
@@ -108,7 +108,7 @@ impl Drk {
// Initialize rpc client
let rpc_client = if let Some(endpoint) = endpoint {
- Some(RpcClient::new(endpoint, ex).await?)
+ Some(RpcClient::new(endpoint, ex.clone()).await?)
} else {
None
};
diff --git a/bin/drk/src/main.rs b/bin/drk/src/main.rs
index 7b59144e2..652b2e3a7 100644
--- a/bin/drk/src/main.rs
+++ b/bin/drk/src/main.rs
@@ -20,7 +20,6 @@ use std::{
io::{stdin, Read},
process::exit,
str::FromStr,
- sync::Arc,
};
use log::info;
@@ -32,6 +31,7 @@ use url::Url;
use darkfi::{
async_daemonize, cli_desc,
+ system::ExecutorPtr,
util::{
encoding::base64,
parse::{decode_base10, encode_base10},
@@ -625,7 +625,7 @@ async fn new_wallet(
wallet_path: String,
wallet_pass: String,
endpoint: Option,
- ex: Arc>,
+ ex: &ExecutorPtr,
fun: bool,
) -> Drk {
// Script kiddies protection
@@ -644,7 +644,7 @@ async fn new_wallet(
}
async_daemonize!(realmain);
-async fn realmain(args: Args, ex: Arc>) -> Result<()> {
+async fn realmain(args: Args, ex: ExecutorPtr) -> Result<()> {
// Grab blockchain network configuration
let blockchain_config = match args.network.as_str() {
"localnet" => parse_blockchain_config(args.config, "localnet").await?,
@@ -663,11 +663,11 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> {
blockchain_config.wallet_path,
blockchain_config.wallet_pass,
Some(blockchain_config.endpoint),
- ex,
+ &ex,
args.fun,
)
.await;
- interactive(&drk, &blockchain_config.history_path).await;
+ interactive(&drk, &blockchain_config.history_path, &ex).await;
drk.stop_rpc_client().await
}
@@ -686,7 +686,7 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> {
blockchain_config.wallet_path,
blockchain_config.wallet_pass,
Some(blockchain_config.endpoint),
- ex,
+ &ex,
args.fun,
)
.await;
@@ -729,7 +729,7 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> {
blockchain_config.wallet_path,
blockchain_config.wallet_pass,
None,
- ex,
+ &ex,
args.fun,
)
.await;
@@ -970,7 +970,7 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> {
blockchain_config.wallet_path,
blockchain_config.wallet_pass,
None,
- ex,
+ &ex,
args.fun,
)
.await;
@@ -1006,7 +1006,7 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> {
blockchain_config.wallet_path,
blockchain_config.wallet_pass,
None,
- ex,
+ &ex,
args.fun,
)
.await;
@@ -1024,7 +1024,7 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> {
blockchain_config.wallet_path,
blockchain_config.wallet_pass,
Some(blockchain_config.endpoint),
- ex,
+ &ex,
args.fun,
)
.await;
@@ -1105,7 +1105,7 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> {
blockchain_config.wallet_path,
blockchain_config.wallet_pass,
Some(blockchain_config.endpoint),
- ex,
+ &ex,
args.fun,
)
.await;
@@ -1139,7 +1139,7 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> {
blockchain_config.wallet_path,
blockchain_config.wallet_pass,
Some(blockchain_config.endpoint),
- ex,
+ &ex,
args.fun,
)
.await;
@@ -1168,7 +1168,7 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> {
blockchain_config.wallet_path,
blockchain_config.wallet_pass,
None,
- ex,
+ &ex,
args.fun,
)
.await;
@@ -1188,7 +1188,7 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> {
blockchain_config.wallet_path,
blockchain_config.wallet_pass,
None,
- ex,
+ &ex,
args.fun,
)
.await;
@@ -1241,7 +1241,7 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> {
blockchain_config.wallet_path,
blockchain_config.wallet_pass,
None,
- ex,
+ &ex,
args.fun,
)
.await;
@@ -1307,7 +1307,7 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> {
blockchain_config.wallet_path,
blockchain_config.wallet_pass,
None,
- ex,
+ &ex,
args.fun,
)
.await;
@@ -1329,7 +1329,7 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> {
blockchain_config.wallet_path,
blockchain_config.wallet_pass,
None,
- ex,
+ &ex,
args.fun,
)
.await;
@@ -1347,7 +1347,7 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> {
blockchain_config.wallet_path,
blockchain_config.wallet_pass,
None,
- ex,
+ &ex,
args.fun,
)
.await;
@@ -1365,7 +1365,7 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> {
blockchain_config.wallet_path,
blockchain_config.wallet_pass,
None,
- ex,
+ &ex,
args.fun,
)
.await;
@@ -1417,7 +1417,7 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> {
blockchain_config.wallet_path,
blockchain_config.wallet_pass,
Some(blockchain_config.endpoint),
- ex,
+ &ex,
args.fun,
)
.await;
@@ -1447,7 +1447,7 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> {
blockchain_config.wallet_path,
blockchain_config.wallet_pass,
Some(blockchain_config.endpoint),
- ex,
+ &ex,
args.fun,
)
.await;
@@ -1529,7 +1529,7 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> {
blockchain_config.wallet_path,
blockchain_config.wallet_pass,
Some(blockchain_config.endpoint),
- ex,
+ &ex,
args.fun,
)
.await;
@@ -1574,7 +1574,7 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> {
blockchain_config.wallet_path,
blockchain_config.wallet_pass,
None,
- ex,
+ &ex,
args.fun,
)
.await;
@@ -1601,7 +1601,7 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> {
blockchain_config.wallet_path,
blockchain_config.wallet_pass,
Some(blockchain_config.endpoint),
- ex,
+ &ex,
args.fun,
)
.await;
@@ -1809,7 +1809,7 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> {
blockchain_config.wallet_path,
blockchain_config.wallet_pass,
None,
- ex,
+ &ex,
args.fun,
)
.await;
@@ -1876,7 +1876,7 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> {
blockchain_config.wallet_path,
blockchain_config.wallet_pass,
Some(blockchain_config.endpoint),
- ex,
+ &ex,
args.fun,
)
.await;
@@ -1906,7 +1906,7 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> {
blockchain_config.wallet_path,
blockchain_config.wallet_pass,
Some(blockchain_config.endpoint),
- ex,
+ &ex,
args.fun,
)
.await;
@@ -1966,7 +1966,7 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> {
blockchain_config.wallet_path,
blockchain_config.wallet_pass,
Some(blockchain_config.endpoint),
- ex,
+ &ex,
args.fun,
)
.await;
@@ -1996,7 +1996,7 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> {
blockchain_config.wallet_path,
blockchain_config.wallet_pass,
Some(blockchain_config.endpoint),
- ex,
+ &ex,
args.fun,
)
.await;
@@ -2030,12 +2030,12 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> {
blockchain_config.wallet_path,
blockchain_config.wallet_pass,
Some(blockchain_config.endpoint.clone()),
- ex.clone(),
+ &ex,
args.fun,
)
.await;
- if let Err(e) = drk.subscribe_blocks(blockchain_config.endpoint, ex).await {
+ if let Err(e) = drk.subscribe_blocks(blockchain_config.endpoint, &ex).await {
eprintln!("Block subscription failed: {e:?}");
exit(2);
}
@@ -2049,7 +2049,7 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> {
blockchain_config.wallet_path,
blockchain_config.wallet_pass,
Some(blockchain_config.endpoint),
- ex,
+ &ex,
args.fun,
)
.await;
@@ -2079,7 +2079,7 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> {
blockchain_config.wallet_path,
blockchain_config.wallet_pass,
Some(blockchain_config.endpoint),
- ex,
+ &ex,
args.fun,
)
.await;
@@ -2121,7 +2121,7 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> {
blockchain_config.wallet_path,
blockchain_config.wallet_pass,
Some(blockchain_config.endpoint),
- ex,
+ &ex,
args.fun,
)
.await;
@@ -2146,7 +2146,7 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> {
blockchain_config.wallet_path,
blockchain_config.wallet_pass,
None,
- ex,
+ &ex,
args.fun,
)
.await;
@@ -2205,7 +2205,7 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> {
blockchain_config.wallet_path,
blockchain_config.wallet_pass,
None,
- ex,
+ &ex,
args.fun,
)
.await;
@@ -2224,7 +2224,7 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> {
blockchain_config.wallet_path,
blockchain_config.wallet_pass,
None,
- ex,
+ &ex,
args.fun,
)
.await;
@@ -2290,7 +2290,7 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> {
blockchain_config.wallet_path,
blockchain_config.wallet_pass,
None,
- ex,
+ &ex,
args.fun,
)
.await;
@@ -2319,7 +2319,7 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> {
blockchain_config.wallet_path,
blockchain_config.wallet_pass,
None,
- ex,
+ &ex,
args.fun,
)
.await;
@@ -2348,7 +2348,7 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> {
blockchain_config.wallet_path,
blockchain_config.wallet_pass,
None,
- ex,
+ &ex,
args.fun,
)
.await;
@@ -2384,7 +2384,7 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> {
blockchain_config.wallet_path,
blockchain_config.wallet_pass,
None,
- ex,
+ &ex,
args.fun,
)
.await;
@@ -2400,7 +2400,7 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> {
blockchain_config.wallet_path,
blockchain_config.wallet_pass,
None,
- ex,
+ &ex,
args.fun,
)
.await;
@@ -2418,7 +2418,7 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> {
blockchain_config.wallet_path,
blockchain_config.wallet_pass,
None,
- ex,
+ &ex,
args.fun,
)
.await;
@@ -2471,7 +2471,7 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> {
blockchain_config.wallet_path,
blockchain_config.wallet_pass,
Some(blockchain_config.endpoint),
- ex,
+ &ex,
args.fun,
)
.await;
@@ -2549,7 +2549,7 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> {
blockchain_config.wallet_path,
blockchain_config.wallet_pass,
Some(blockchain_config.endpoint),
- ex,
+ &ex,
args.fun,
)
.await;
@@ -2582,7 +2582,7 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> {
blockchain_config.wallet_path,
blockchain_config.wallet_pass,
None,
- ex,
+ &ex,
args.fun,
)
.await;
@@ -2601,7 +2601,7 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> {
blockchain_config.wallet_path,
blockchain_config.wallet_pass,
None,
- ex,
+ &ex,
args.fun,
)
.await;
@@ -2638,7 +2638,7 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> {
blockchain_config.wallet_path,
blockchain_config.wallet_pass,
Some(blockchain_config.endpoint),
- ex,
+ &ex,
args.fun,
)
.await;
@@ -2667,7 +2667,7 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> {
blockchain_config.wallet_path,
blockchain_config.wallet_pass,
Some(blockchain_config.endpoint),
- ex,
+ &ex,
args.fun,
)
.await;
diff --git a/bin/drk/src/rpc.rs b/bin/drk/src/rpc.rs
index 58ba1c88e..2f716adc0 100644
--- a/bin/drk/src/rpc.rs
+++ b/bin/drk/src/rpc.rs
@@ -18,7 +18,6 @@
use std::{
collections::{BTreeMap, HashMap},
- sync::Arc,
time::Instant,
};
@@ -31,7 +30,7 @@ use darkfi::{
jsonrpc::{ErrorCode, JsonError, JsonRequest, JsonResult},
util::JsonValue,
},
- system::{Publisher, StoppableTask},
+ system::{ExecutorPtr, Publisher, StoppableTask},
tx::Transaction,
util::encoding::base64,
Error, Result,
@@ -136,11 +135,7 @@ impl Drk {
/// to its previous height and then scan it. We assume that the blocks
/// up to that point are unchanged, since darkfid will just broadcast
/// the sequence after the reorg.
- pub async fn subscribe_blocks(
- &self,
- endpoint: Url,
- ex: Arc>,
- ) -> Result<()> {
+ pub async fn subscribe_blocks(&self, endpoint: Url, ex: &ExecutorPtr) -> Result<()> {
// Grab last confirmed block height
let (last_confirmed_height, _) = self.get_last_confirmed_block().await?;
@@ -204,7 +199,7 @@ impl Drk {
}
},
Error::RpcServerStopped,
- ex,
+ ex.clone(),
);
println!("Detached subscription to background");
println!("All is good. Waiting for block notifications...");