diff --git a/Cargo.lock b/Cargo.lock index 1fd3fdd92..e6f06c93d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2468,6 +2468,7 @@ dependencies = [ "darkfi_money_contract", "easy-parallel", "lazy_static", + "libc", "linenoise-rs", "log", "num-bigint", diff --git a/bin/drk/Cargo.toml b/bin/drk/Cargo.toml index 7a8da072d..89c56b45d 100644 --- a/bin/drk/Cargo.toml +++ b/bin/drk/Cargo.toml @@ -21,6 +21,7 @@ darkfi-serial = "0.5.0" blake3 = "1.8.2" bs58 = "0.5.1" lazy_static = "1.5.0" +libc = "0.2" linenoise-rs = "0.1.1" log = "0.4.27" num-bigint = "0.4.6" diff --git a/bin/drk/src/interactive.rs b/bin/drk/src/interactive.rs index 48ea91eff..539aef794 100644 --- a/bin/drk/src/interactive.rs +++ b/bin/drk/src/interactive.rs @@ -16,9 +16,12 @@ * along with this program. If not, see . */ +use std::{io::ErrorKind, mem::zeroed, ptr::null_mut, str::FromStr}; + +use libc::{fd_set, select, timeval, FD_SET, FD_ZERO}; use linenoise_rs::{ - linenoise, linenoise_history_add, linenoise_history_load, linenoise_history_save, - linenoise_set_completion_callback, linenoise_set_hints_callback, + linenoise_history_add, linenoise_history_load, linenoise_history_save, + linenoise_set_completion_callback, linenoise_set_hints_callback, LinenoiseState, }; use darkfi::{cli_desc, system::StoppableTask, util::path::expand_path}; @@ -42,6 +45,11 @@ fn help() { println!("\tkaching: Fun"); println!("\tping: Send a ping request to the darkfid RPC endpoint"); println!("\tcompletions: Generate a SHELL completion script and print to stdout"); + println!( + "\tsubscribe: Perform a scan and then subscribe to darkfid to listen for incoming blocks" + ); + println!("\tunsubscribe: Stops the background subscription, if its active"); + println!("\tscan: Scan the blockchain and parse relevant transactions"); } /// Auxiliary function to define the interactive shell completions. @@ -63,6 +71,21 @@ fn completion(buf: &str, lc: &mut Vec) { if buf.starts_with("c") { lc.push("completions".to_string()); + return + } + + if buf.starts_with("su") { + lc.push("subscribe".to_string()); + return + } + + if buf.starts_with("u") { + lc.push("unsubscribe".to_string()); + return + } + + if buf.starts_with("sc") { + lc.push("scan".to_string()); } } @@ -70,17 +93,19 @@ fn completion(buf: &str, lc: &mut Vec) { fn hints(buf: &str) -> Option<(String, i32, bool)> { match buf { "completions " => Some(("{shell}".to_string(), 35, false)), // 35 = magenta + "scan " => Some(("--reset {height}".to_string(), 35, false)), // 35 = magenta _ => None, } } /// Auxiliary function to start provided Drk as an interactive shell. +/// Only sane/linenoise terminals are suported. pub async fn interactive(drk: &Drk, history_path: &str) { // Expand the history file path let history_path = match expand_path(history_path) { Ok(p) => p, Err(e) => { - println!("Error while expanding history file path: {e}"); + eprintln!("Error while expanding history file path: {e}"); return } }; @@ -99,23 +124,93 @@ pub async fn interactive(drk: &Drk, history_path: &str) { let _ = linenoise_history_load(history_file); // Create a detached task to use for block subscription - let subscription_active = false; + let mut subscription_active = false; let subscription_task = StoppableTask::new(); + // Create two bounded smol channels, so we can have 2 way + // communication between the shell thread and the background task. + let (_shell_sender, shell_receiver) = smol::channel::bounded::<()>(1); + let (background_sender, _background_receiver) = smol::channel::bounded::<()>(1); + // Start the interactive shell loop { - // Grab input or end if Ctrl-D or Ctrl-C was pressed - let Some(line) = linenoise("drk> ") else { - // Stop the subscription task if its active - if subscription_active { - subscription_task.stop().await; + // Generate the linoise state structure + let mut state = match LinenoiseState::edit_start(-1, -1, "drk> ") { + Ok(s) => s, + Err(e) => { + eprintln!("Error while generating linenoise state: {e}"); + break + } + }; + + // Read until we get a line to process + let mut line = None; + loop { + let retval = unsafe { + // Setup read buffers + let mut readfds: fd_set = zeroed(); + FD_ZERO(&mut readfds); + FD_SET(state.get_fd(), &mut readfds); + + // Setup a 1 second timeout to check if background + // process wants to print. + let mut tv = timeval { tv_sec: 1, tv_usec: 0 }; + + // Wait timeout or input + select(state.get_fd() + 1, &mut readfds, null_mut(), null_mut(), &mut tv) + }; + + // Handle error + if retval == -1 { + eprintln!("Error while reading linenoise buffers"); + break } - // Write history file - let _ = linenoise_history_save(history_file); + // Check if background process wants to print anything + if shell_receiver.is_full() { + // Consume the channel message + if let Err(e) = shell_receiver.recv().await { + eprintln!("Error while reading shell receiver channel: {e}"); + break + } - return - }; + // Signal background task it can start printing + let _ = state.hide(); + if let Err(e) = background_sender.send(()).await { + eprintln!("Error while sending to background task channel: {e}"); + break + } + + // Wait signal that it finished + if let Err(e) = shell_receiver.recv().await { + eprintln!("Error while reading shell receiver channel: {e}"); + break + } + let _ = state.show(); + } + + // Check if we have a line to process + if retval <= 0 { + continue + } + + // Process linenoise feed + match state.edit_feed() { + Ok(Some(l)) => line = Some(l), + Ok(None) => { /* Do nothing */ } + Err(e) if e.kind() == ErrorKind::Interrupted => { /* Do nothing */ } + Err(e) if e.kind() == ErrorKind::WouldBlock => { + // Need more input, continue + continue; + } + Err(e) => eprintln!("Error while reading linenoise feed: {e}"), + } + break + } + let _ = state.edit_stop(); + + // Grab input or end if Ctrl-D or Ctrl-C was pressed + let Some(line) = line else { break }; // Check if line is empty if line.is_empty() { @@ -137,9 +232,22 @@ pub async fn interactive(drk: &Drk, history_path: &str) { "kaching" => kaching().await, "ping" => handle_ping(drk).await, "completions" => handle_completions(&parts), + "subscribe" => { + handle_subscribe(drk, &mut subscription_active, &subscription_task).await + } + "unsubscribe" => handle_unsubscribe(&mut subscription_active, &subscription_task).await, + "scan" => handle_scan(drk, &subscription_active, &parts).await, _ => println!("Unreconized command: {}", parts[0]), } } + + // Stop the subscription task if its active + if subscription_active { + subscription_task.stop().await; + } + + // Write history file + let _ = linenoise_history_save(history_file); } /// Auxiliary function to define the ping command handling. @@ -161,3 +269,75 @@ fn handle_completions(parts: &[&str]) { println!("Error while executing completions command: {e}") } } + +/// Auxiliary function to define the subscribe command handling. +async fn handle_subscribe( + drk: &Drk, + subscription_active: &mut bool, + _subscription_task: &StoppableTask, +) { + if *subscription_active { + println!("Subscription is already active!") + } + + if let Err(e) = drk.scan_blocks().await { + println!("Failed during scanning: {e:?}"); + return + } + println!("Finished scanning blockchain"); + + // TODO: subscribe + + *subscription_active = true; +} + +/// Auxiliary function to define the unsubscribe command handling. +async fn handle_unsubscribe(subscription_active: &mut bool, subscription_task: &StoppableTask) { + if !*subscription_active { + println!("Subscription is already inactive!") + } + subscription_task.stop().await; + *subscription_active = false; +} + +/// Auxiliary function to define the scan command handling. +async fn handle_scan(drk: &Drk, subscription_active: &bool, parts: &[&str]) { + if *subscription_active { + println!("Subscription is already active!"); + return + } + + // Check correct command structure + if parts.len() != 1 && parts.len() != 3 { + println!("Malformed `scan` command"); + return + } + + // Check if reset was requested + if parts.len() == 3 { + if parts[1] != "--reset" { + println!("Malformed `scan` command"); + println!("Usage: scan --reset {{height}}"); + return + } + + let height = match u32::from_str(parts[2]) { + Ok(h) => h, + Err(e) => { + println!("Invalid reset height: {e:?}"); + return + } + }; + + if let Err(e) = drk.reset_to_height(height) { + println!("Failed during wallet reset: {e:?}"); + return + } + } + + if let Err(e) = drk.scan_blocks().await { + println!("Failed during scanning: {e:?}"); + return + } + println!("Finished scanning blockchain"); +}