drk/interactive: use a print queue instead of two way signal for background task

This commit is contained in:
skoupidi
2025-06-14 14:52:53 +03:00
parent 1de7bfec4a
commit 3f033be9a7
6 changed files with 207 additions and 148 deletions

1
Cargo.lock generated
View File

@@ -2467,6 +2467,7 @@ dependencies = [
"darkfi_deployooor_contract",
"darkfi_money_contract",
"easy-parallel",
"futures",
"lazy_static",
"libc",
"linenoise-rs",

View File

@@ -20,6 +20,7 @@ darkfi-serial = "0.5.0"
# Misc
blake3 = "1.8.2"
bs58 = "0.5.1"
futures = "0.3.31"
lazy_static = "1.5.0"
libc = "0.2"
linenoise-rs = "0.1.1"

View File

@@ -16,15 +16,22 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
use std::{io::ErrorKind, mem::zeroed, ptr::null_mut, str::FromStr};
use std::{io::ErrorKind, str::FromStr};
use libc::{fd_set, select, timeval, FD_SET, FD_ZERO};
use futures::{select, FutureExt};
use libc::{fcntl, F_GETFL, F_SETFL, O_NONBLOCK};
use linenoise_rs::{
linenoise_history_add, linenoise_history_load, linenoise_history_save,
linenoise_set_completion_callback, linenoise_set_hints_callback, LinenoiseState,
};
use smol::channel::{unbounded, Receiver, Sender};
use darkfi::{cli_desc, system::StoppableTask, util::path::expand_path};
use darkfi::{
cli_desc,
system::{msleep, ExecutorPtr, StoppableTask, StoppableTaskPtr},
util::path::expand_path,
Error,
};
use crate::{
cli_util::{generate_completions, kaching},
@@ -100,7 +107,7 @@ fn hints(buf: &str) -> Option<(String, i32, bool)> {
/// Auxiliary function to start provided Drk as an interactive shell.
/// Only sane/linenoise terminals are suported.
pub async fn interactive(drk: &Drk, history_path: &str) {
pub async fn interactive(drk: &Drk, history_path: &str, ex: &ExecutorPtr) {
// Expand the history file path
let history_path = match expand_path(history_path) {
Ok(p) => p,
@@ -127,87 +134,14 @@ pub async fn interactive(drk: &Drk, history_path: &str) {
let mut subscription_active = false;
let subscription_task = StoppableTask::new();
// Create two bounded smol channels, so we can have 2 way
// communication between the shell thread and the background task.
let (_shell_sender, shell_receiver) = smol::channel::bounded::<()>(1);
let (background_sender, _background_receiver) = smol::channel::bounded::<()>(1);
// Create an unbounded smol channel, so we can have a printing
// queue the background task can submit messages to the shell.
let (shell_sender, shell_receiver) = unbounded();
// Start the interactive shell
loop {
// Generate the linoise state structure
let mut state = match LinenoiseState::edit_start(-1, -1, "drk> ") {
Ok(s) => s,
Err(e) => {
eprintln!("Error while generating linenoise state: {e}");
break
}
};
// Read until we get a line to process
let mut line = None;
loop {
let retval = unsafe {
// Setup read buffers
let mut readfds: fd_set = zeroed();
FD_ZERO(&mut readfds);
FD_SET(state.get_fd(), &mut readfds);
// Setup a 1 second timeout to check if background
// process wants to print.
let mut tv = timeval { tv_sec: 1, tv_usec: 0 };
// Wait timeout or input
select(state.get_fd() + 1, &mut readfds, null_mut(), null_mut(), &mut tv)
};
// Handle error
if retval == -1 {
eprintln!("Error while reading linenoise buffers");
break
}
// Check if background process wants to print anything
if shell_receiver.is_full() {
// Consume the channel message
if let Err(e) = shell_receiver.recv().await {
eprintln!("Error while reading shell receiver channel: {e}");
break
}
// Signal background task it can start printing
let _ = state.hide();
if let Err(e) = background_sender.send(()).await {
eprintln!("Error while sending to background task channel: {e}");
break
}
// Wait signal that it finished
if let Err(e) = shell_receiver.recv().await {
eprintln!("Error while reading shell receiver channel: {e}");
break
}
let _ = state.show();
}
// Check if we have a line to process
if retval <= 0 {
continue
}
// Process linenoise feed
match state.edit_feed() {
Ok(Some(l)) => line = Some(l),
Ok(None) => { /* Do nothing */ }
Err(e) if e.kind() == ErrorKind::Interrupted => { /* Do nothing */ }
Err(e) if e.kind() == ErrorKind::WouldBlock => {
// Need more input, continue
continue;
}
Err(e) => eprintln!("Error while reading linenoise feed: {e}"),
}
break
}
let _ = state.edit_stop();
// Wait for next line to process
let line = listen_for_line(&shell_receiver).await;
// Grab input or end if Ctrl-D or Ctrl-C was pressed
let Some(line) = line else { break };
@@ -233,7 +167,14 @@ pub async fn interactive(drk: &Drk, history_path: &str) {
"ping" => handle_ping(drk).await,
"completions" => handle_completions(&parts),
"subscribe" => {
handle_subscribe(drk, &mut subscription_active, &subscription_task).await
handle_subscribe(
drk,
&mut subscription_active,
&subscription_task,
&shell_sender,
ex,
)
.await
}
"unsubscribe" => handle_unsubscribe(&mut subscription_active, &subscription_task).await,
"scan" => handle_scan(drk, &subscription_active, &parts).await,
@@ -250,6 +191,97 @@ pub async fn interactive(drk: &Drk, history_path: &str) {
let _ = linenoise_history_save(history_file);
}
/// Auxiliary function to listen for linenoise input line and handle
/// background task messages.
async fn listen_for_line(shell_receiver: &Receiver<Vec<String>>) -> Option<String> {
// Generate the linoise state structure
let mut state = match LinenoiseState::edit_start(-1, -1, "drk> ") {
Ok(s) => s,
Err(e) => {
eprintln!("Error while generating linenoise state: {e}");
return None
}
};
// Set stdin to non-blocking mode
let fd = state.get_fd();
unsafe {
let flags = fcntl(fd, F_GETFL, 0);
fcntl(fd, F_SETFL, flags | O_NONBLOCK);
}
// Read until we get a line to process
let mut line = None;
loop {
// Future that polls stdin for input
let input_future = async {
loop {
match state.edit_feed() {
Ok(Some(l)) => {
line = Some(l);
break
}
Ok(None) => break,
Err(e) if e.kind() == ErrorKind::Interrupted => break,
Err(e) if e.kind() == ErrorKind::WouldBlock => {
// No data available, yield and retry
msleep(10).await;
continue
}
Err(e) => {
eprintln!("Error while reading linenoise feed: {e}");
break
}
}
}
};
// Future that polls the channel
let channel_future = async {
loop {
if !shell_receiver.is_empty() {
break
}
msleep(1000).await;
}
};
// Manage the futures
select! {
// When input is ready we break out the loop
_ = input_future.fuse() => break,
// Manage filled channel
_ = channel_future.fuse() => {
while !shell_receiver.is_empty() {
match shell_receiver.recv().await {
Ok(msg) => {
// Hide prompt, print output, show prompt again
let _ = state.hide();
for line in msg {
println!("{line}\r");
}
let _ = state.show();
}
Err(e) => {
eprintln!("Error while reading shell receiver channel: {e}");
break
}
}
}
}
}
}
// Restore blocking mode
unsafe {
let flags = fcntl(fd, F_GETFL, 0);
fcntl(fd, F_SETFL, flags & !O_NONBLOCK);
}
let _ = state.edit_stop();
line
}
/// Auxiliary function to define the ping command handling.
async fn handle_ping(drk: &Drk) {
if let Err(e) = drk.ping().await {
@@ -274,7 +306,9 @@ fn handle_completions(parts: &[&str]) {
async fn handle_subscribe(
drk: &Drk,
subscription_active: &mut bool,
_subscription_task: &StoppableTask,
subscription_task: &StoppableTaskPtr,
shell_sender: &Sender<Vec<String>>,
ex: &ExecutorPtr,
) {
if *subscription_active {
println!("Subscription is already active!")
@@ -286,13 +320,41 @@ async fn handle_subscribe(
}
println!("Finished scanning blockchain");
// TODO: subscribe
// Start the subcristion task
// TODO: use actual subscribe not a dummy task
let shell_sender_ = shell_sender.clone();
subscription_task.clone().start(
async move {
loop {
msleep(750).await;
let line = String::from("This is a single line dummy message");
if shell_sender_.send(vec![line]).await.is_err() {
break;
}
msleep(750).await;
let line0 = String::from("This is the first line of a multiline dummy message");
let line1 = String::from("This is the second line of a multiline dummy message");
if shell_sender_.send(vec![line0, line1]).await.is_err() {
break;
}
}
Ok(())
},
|res| async {
match res {
Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ }
Err(e) => println!("Failed starting dnet subs task: {e}"),
}
},
Error::DetachedTaskStopped,
ex.clone(),
);
*subscription_active = true;
}
/// Auxiliary function to define the unsubscribe command handling.
async fn handle_unsubscribe(subscription_active: &mut bool, subscription_task: &StoppableTask) {
async fn handle_unsubscribe(subscription_active: &mut bool, subscription_task: &StoppableTaskPtr) {
if !*subscription_active {
println!("Subscription is already inactive!")
}

View File

@@ -16,11 +16,11 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
use std::{fs, sync::Arc};
use std::fs::create_dir_all;
use url::Url;
use darkfi::{rpc::client::RpcClient, util::path::expand_path, Error, Result};
use darkfi::{rpc::client::RpcClient, system::ExecutorPtr, util::path::expand_path, Error, Result};
/// Error codes
pub mod error;
@@ -85,7 +85,7 @@ impl Drk {
wallet_path: String,
wallet_pass: String,
endpoint: Option<Url>,
ex: Arc<smol::Executor<'static>>,
ex: &ExecutorPtr,
fun: bool,
) -> Result<Self> {
// Initialize blockchain cache database
@@ -99,7 +99,7 @@ impl Drk {
let wallet_path = expand_path(&wallet_path)?;
if !wallet_path.exists() {
if let Some(parent) = wallet_path.parent() {
fs::create_dir_all(parent)?;
create_dir_all(parent)?;
}
}
let Ok(wallet) = WalletDb::new(Some(wallet_path), Some(&wallet_pass)) else {
@@ -108,7 +108,7 @@ impl Drk {
// Initialize rpc client
let rpc_client = if let Some(endpoint) = endpoint {
Some(RpcClient::new(endpoint, ex).await?)
Some(RpcClient::new(endpoint, ex.clone()).await?)
} else {
None
};

View File

@@ -20,7 +20,6 @@ use std::{
io::{stdin, Read},
process::exit,
str::FromStr,
sync::Arc,
};
use log::info;
@@ -32,6 +31,7 @@ use url::Url;
use darkfi::{
async_daemonize, cli_desc,
system::ExecutorPtr,
util::{
encoding::base64,
parse::{decode_base10, encode_base10},
@@ -625,7 +625,7 @@ async fn new_wallet(
wallet_path: String,
wallet_pass: String,
endpoint: Option<Url>,
ex: Arc<smol::Executor<'static>>,
ex: &ExecutorPtr,
fun: bool,
) -> Drk {
// Script kiddies protection
@@ -644,7 +644,7 @@ async fn new_wallet(
}
async_daemonize!(realmain);
async fn realmain(args: Args, ex: Arc<smol::Executor<'static>>) -> Result<()> {
async fn realmain(args: Args, ex: ExecutorPtr) -> Result<()> {
// Grab blockchain network configuration
let blockchain_config = match args.network.as_str() {
"localnet" => parse_blockchain_config(args.config, "localnet").await?,
@@ -663,11 +663,11 @@ async fn realmain(args: Args, ex: Arc<smol::Executor<'static>>) -> Result<()> {
blockchain_config.wallet_path,
blockchain_config.wallet_pass,
Some(blockchain_config.endpoint),
ex,
&ex,
args.fun,
)
.await;
interactive(&drk, &blockchain_config.history_path).await;
interactive(&drk, &blockchain_config.history_path, &ex).await;
drk.stop_rpc_client().await
}
@@ -686,7 +686,7 @@ async fn realmain(args: Args, ex: Arc<smol::Executor<'static>>) -> Result<()> {
blockchain_config.wallet_path,
blockchain_config.wallet_pass,
Some(blockchain_config.endpoint),
ex,
&ex,
args.fun,
)
.await;
@@ -729,7 +729,7 @@ async fn realmain(args: Args, ex: Arc<smol::Executor<'static>>) -> Result<()> {
blockchain_config.wallet_path,
blockchain_config.wallet_pass,
None,
ex,
&ex,
args.fun,
)
.await;
@@ -970,7 +970,7 @@ async fn realmain(args: Args, ex: Arc<smol::Executor<'static>>) -> Result<()> {
blockchain_config.wallet_path,
blockchain_config.wallet_pass,
None,
ex,
&ex,
args.fun,
)
.await;
@@ -1006,7 +1006,7 @@ async fn realmain(args: Args, ex: Arc<smol::Executor<'static>>) -> Result<()> {
blockchain_config.wallet_path,
blockchain_config.wallet_pass,
None,
ex,
&ex,
args.fun,
)
.await;
@@ -1024,7 +1024,7 @@ async fn realmain(args: Args, ex: Arc<smol::Executor<'static>>) -> Result<()> {
blockchain_config.wallet_path,
blockchain_config.wallet_pass,
Some(blockchain_config.endpoint),
ex,
&ex,
args.fun,
)
.await;
@@ -1105,7 +1105,7 @@ async fn realmain(args: Args, ex: Arc<smol::Executor<'static>>) -> Result<()> {
blockchain_config.wallet_path,
blockchain_config.wallet_pass,
Some(blockchain_config.endpoint),
ex,
&ex,
args.fun,
)
.await;
@@ -1139,7 +1139,7 @@ async fn realmain(args: Args, ex: Arc<smol::Executor<'static>>) -> Result<()> {
blockchain_config.wallet_path,
blockchain_config.wallet_pass,
Some(blockchain_config.endpoint),
ex,
&ex,
args.fun,
)
.await;
@@ -1168,7 +1168,7 @@ async fn realmain(args: Args, ex: Arc<smol::Executor<'static>>) -> Result<()> {
blockchain_config.wallet_path,
blockchain_config.wallet_pass,
None,
ex,
&ex,
args.fun,
)
.await;
@@ -1188,7 +1188,7 @@ async fn realmain(args: Args, ex: Arc<smol::Executor<'static>>) -> Result<()> {
blockchain_config.wallet_path,
blockchain_config.wallet_pass,
None,
ex,
&ex,
args.fun,
)
.await;
@@ -1241,7 +1241,7 @@ async fn realmain(args: Args, ex: Arc<smol::Executor<'static>>) -> Result<()> {
blockchain_config.wallet_path,
blockchain_config.wallet_pass,
None,
ex,
&ex,
args.fun,
)
.await;
@@ -1307,7 +1307,7 @@ async fn realmain(args: Args, ex: Arc<smol::Executor<'static>>) -> Result<()> {
blockchain_config.wallet_path,
blockchain_config.wallet_pass,
None,
ex,
&ex,
args.fun,
)
.await;
@@ -1329,7 +1329,7 @@ async fn realmain(args: Args, ex: Arc<smol::Executor<'static>>) -> Result<()> {
blockchain_config.wallet_path,
blockchain_config.wallet_pass,
None,
ex,
&ex,
args.fun,
)
.await;
@@ -1347,7 +1347,7 @@ async fn realmain(args: Args, ex: Arc<smol::Executor<'static>>) -> Result<()> {
blockchain_config.wallet_path,
blockchain_config.wallet_pass,
None,
ex,
&ex,
args.fun,
)
.await;
@@ -1365,7 +1365,7 @@ async fn realmain(args: Args, ex: Arc<smol::Executor<'static>>) -> Result<()> {
blockchain_config.wallet_path,
blockchain_config.wallet_pass,
None,
ex,
&ex,
args.fun,
)
.await;
@@ -1417,7 +1417,7 @@ async fn realmain(args: Args, ex: Arc<smol::Executor<'static>>) -> Result<()> {
blockchain_config.wallet_path,
blockchain_config.wallet_pass,
Some(blockchain_config.endpoint),
ex,
&ex,
args.fun,
)
.await;
@@ -1447,7 +1447,7 @@ async fn realmain(args: Args, ex: Arc<smol::Executor<'static>>) -> Result<()> {
blockchain_config.wallet_path,
blockchain_config.wallet_pass,
Some(blockchain_config.endpoint),
ex,
&ex,
args.fun,
)
.await;
@@ -1529,7 +1529,7 @@ async fn realmain(args: Args, ex: Arc<smol::Executor<'static>>) -> Result<()> {
blockchain_config.wallet_path,
blockchain_config.wallet_pass,
Some(blockchain_config.endpoint),
ex,
&ex,
args.fun,
)
.await;
@@ -1574,7 +1574,7 @@ async fn realmain(args: Args, ex: Arc<smol::Executor<'static>>) -> Result<()> {
blockchain_config.wallet_path,
blockchain_config.wallet_pass,
None,
ex,
&ex,
args.fun,
)
.await;
@@ -1601,7 +1601,7 @@ async fn realmain(args: Args, ex: Arc<smol::Executor<'static>>) -> Result<()> {
blockchain_config.wallet_path,
blockchain_config.wallet_pass,
Some(blockchain_config.endpoint),
ex,
&ex,
args.fun,
)
.await;
@@ -1809,7 +1809,7 @@ async fn realmain(args: Args, ex: Arc<smol::Executor<'static>>) -> Result<()> {
blockchain_config.wallet_path,
blockchain_config.wallet_pass,
None,
ex,
&ex,
args.fun,
)
.await;
@@ -1876,7 +1876,7 @@ async fn realmain(args: Args, ex: Arc<smol::Executor<'static>>) -> Result<()> {
blockchain_config.wallet_path,
blockchain_config.wallet_pass,
Some(blockchain_config.endpoint),
ex,
&ex,
args.fun,
)
.await;
@@ -1906,7 +1906,7 @@ async fn realmain(args: Args, ex: Arc<smol::Executor<'static>>) -> Result<()> {
blockchain_config.wallet_path,
blockchain_config.wallet_pass,
Some(blockchain_config.endpoint),
ex,
&ex,
args.fun,
)
.await;
@@ -1966,7 +1966,7 @@ async fn realmain(args: Args, ex: Arc<smol::Executor<'static>>) -> Result<()> {
blockchain_config.wallet_path,
blockchain_config.wallet_pass,
Some(blockchain_config.endpoint),
ex,
&ex,
args.fun,
)
.await;
@@ -1996,7 +1996,7 @@ async fn realmain(args: Args, ex: Arc<smol::Executor<'static>>) -> Result<()> {
blockchain_config.wallet_path,
blockchain_config.wallet_pass,
Some(blockchain_config.endpoint),
ex,
&ex,
args.fun,
)
.await;
@@ -2030,12 +2030,12 @@ async fn realmain(args: Args, ex: Arc<smol::Executor<'static>>) -> Result<()> {
blockchain_config.wallet_path,
blockchain_config.wallet_pass,
Some(blockchain_config.endpoint.clone()),
ex.clone(),
&ex,
args.fun,
)
.await;
if let Err(e) = drk.subscribe_blocks(blockchain_config.endpoint, ex).await {
if let Err(e) = drk.subscribe_blocks(blockchain_config.endpoint, &ex).await {
eprintln!("Block subscription failed: {e:?}");
exit(2);
}
@@ -2049,7 +2049,7 @@ async fn realmain(args: Args, ex: Arc<smol::Executor<'static>>) -> Result<()> {
blockchain_config.wallet_path,
blockchain_config.wallet_pass,
Some(blockchain_config.endpoint),
ex,
&ex,
args.fun,
)
.await;
@@ -2079,7 +2079,7 @@ async fn realmain(args: Args, ex: Arc<smol::Executor<'static>>) -> Result<()> {
blockchain_config.wallet_path,
blockchain_config.wallet_pass,
Some(blockchain_config.endpoint),
ex,
&ex,
args.fun,
)
.await;
@@ -2121,7 +2121,7 @@ async fn realmain(args: Args, ex: Arc<smol::Executor<'static>>) -> Result<()> {
blockchain_config.wallet_path,
blockchain_config.wallet_pass,
Some(blockchain_config.endpoint),
ex,
&ex,
args.fun,
)
.await;
@@ -2146,7 +2146,7 @@ async fn realmain(args: Args, ex: Arc<smol::Executor<'static>>) -> Result<()> {
blockchain_config.wallet_path,
blockchain_config.wallet_pass,
None,
ex,
&ex,
args.fun,
)
.await;
@@ -2205,7 +2205,7 @@ async fn realmain(args: Args, ex: Arc<smol::Executor<'static>>) -> Result<()> {
blockchain_config.wallet_path,
blockchain_config.wallet_pass,
None,
ex,
&ex,
args.fun,
)
.await;
@@ -2224,7 +2224,7 @@ async fn realmain(args: Args, ex: Arc<smol::Executor<'static>>) -> Result<()> {
blockchain_config.wallet_path,
blockchain_config.wallet_pass,
None,
ex,
&ex,
args.fun,
)
.await;
@@ -2290,7 +2290,7 @@ async fn realmain(args: Args, ex: Arc<smol::Executor<'static>>) -> Result<()> {
blockchain_config.wallet_path,
blockchain_config.wallet_pass,
None,
ex,
&ex,
args.fun,
)
.await;
@@ -2319,7 +2319,7 @@ async fn realmain(args: Args, ex: Arc<smol::Executor<'static>>) -> Result<()> {
blockchain_config.wallet_path,
blockchain_config.wallet_pass,
None,
ex,
&ex,
args.fun,
)
.await;
@@ -2348,7 +2348,7 @@ async fn realmain(args: Args, ex: Arc<smol::Executor<'static>>) -> Result<()> {
blockchain_config.wallet_path,
blockchain_config.wallet_pass,
None,
ex,
&ex,
args.fun,
)
.await;
@@ -2384,7 +2384,7 @@ async fn realmain(args: Args, ex: Arc<smol::Executor<'static>>) -> Result<()> {
blockchain_config.wallet_path,
blockchain_config.wallet_pass,
None,
ex,
&ex,
args.fun,
)
.await;
@@ -2400,7 +2400,7 @@ async fn realmain(args: Args, ex: Arc<smol::Executor<'static>>) -> Result<()> {
blockchain_config.wallet_path,
blockchain_config.wallet_pass,
None,
ex,
&ex,
args.fun,
)
.await;
@@ -2418,7 +2418,7 @@ async fn realmain(args: Args, ex: Arc<smol::Executor<'static>>) -> Result<()> {
blockchain_config.wallet_path,
blockchain_config.wallet_pass,
None,
ex,
&ex,
args.fun,
)
.await;
@@ -2471,7 +2471,7 @@ async fn realmain(args: Args, ex: Arc<smol::Executor<'static>>) -> Result<()> {
blockchain_config.wallet_path,
blockchain_config.wallet_pass,
Some(blockchain_config.endpoint),
ex,
&ex,
args.fun,
)
.await;
@@ -2549,7 +2549,7 @@ async fn realmain(args: Args, ex: Arc<smol::Executor<'static>>) -> Result<()> {
blockchain_config.wallet_path,
blockchain_config.wallet_pass,
Some(blockchain_config.endpoint),
ex,
&ex,
args.fun,
)
.await;
@@ -2582,7 +2582,7 @@ async fn realmain(args: Args, ex: Arc<smol::Executor<'static>>) -> Result<()> {
blockchain_config.wallet_path,
blockchain_config.wallet_pass,
None,
ex,
&ex,
args.fun,
)
.await;
@@ -2601,7 +2601,7 @@ async fn realmain(args: Args, ex: Arc<smol::Executor<'static>>) -> Result<()> {
blockchain_config.wallet_path,
blockchain_config.wallet_pass,
None,
ex,
&ex,
args.fun,
)
.await;
@@ -2638,7 +2638,7 @@ async fn realmain(args: Args, ex: Arc<smol::Executor<'static>>) -> Result<()> {
blockchain_config.wallet_path,
blockchain_config.wallet_pass,
Some(blockchain_config.endpoint),
ex,
&ex,
args.fun,
)
.await;
@@ -2667,7 +2667,7 @@ async fn realmain(args: Args, ex: Arc<smol::Executor<'static>>) -> Result<()> {
blockchain_config.wallet_path,
blockchain_config.wallet_pass,
Some(blockchain_config.endpoint),
ex,
&ex,
args.fun,
)
.await;

View File

@@ -18,7 +18,6 @@
use std::{
collections::{BTreeMap, HashMap},
sync::Arc,
time::Instant,
};
@@ -31,7 +30,7 @@ use darkfi::{
jsonrpc::{ErrorCode, JsonError, JsonRequest, JsonResult},
util::JsonValue,
},
system::{Publisher, StoppableTask},
system::{ExecutorPtr, Publisher, StoppableTask},
tx::Transaction,
util::encoding::base64,
Error, Result,
@@ -136,11 +135,7 @@ impl Drk {
/// to its previous height and then scan it. We assume that the blocks
/// up to that point are unchanged, since darkfid will just broadcast
/// the sequence after the reorg.
pub async fn subscribe_blocks(
&self,
endpoint: Url,
ex: Arc<smol::Executor<'static>>,
) -> Result<()> {
pub async fn subscribe_blocks(&self, endpoint: Url, ex: &ExecutorPtr) -> Result<()> {
// Grab last confirmed block height
let (last_confirmed_height, _) = self.get_last_confirmed_block().await?;
@@ -204,7 +199,7 @@ impl Drk {
}
},
Error::RpcServerStopped,
ex,
ex.clone(),
);
println!("Detached subscription to background");
println!("All is good. Waiting for block notifications...");