drk/interactive: scan and subscribe infra added

This commit is contained in:
skoupidi
2025-06-13 16:02:51 +03:00
parent 4325d8019b
commit 1de7bfec4a
3 changed files with 195 additions and 13 deletions

1
Cargo.lock generated
View File

@@ -2468,6 +2468,7 @@ dependencies = [
"darkfi_money_contract",
"easy-parallel",
"lazy_static",
"libc",
"linenoise-rs",
"log",
"num-bigint",

View File

@@ -21,6 +21,7 @@ darkfi-serial = "0.5.0"
blake3 = "1.8.2"
bs58 = "0.5.1"
lazy_static = "1.5.0"
libc = "0.2"
linenoise-rs = "0.1.1"
log = "0.4.27"
num-bigint = "0.4.6"

View File

@@ -16,9 +16,12 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
use std::{io::ErrorKind, mem::zeroed, ptr::null_mut, str::FromStr};
use libc::{fd_set, select, timeval, FD_SET, FD_ZERO};
use linenoise_rs::{
linenoise, linenoise_history_add, linenoise_history_load, linenoise_history_save,
linenoise_set_completion_callback, linenoise_set_hints_callback,
linenoise_history_add, linenoise_history_load, linenoise_history_save,
linenoise_set_completion_callback, linenoise_set_hints_callback, LinenoiseState,
};
use darkfi::{cli_desc, system::StoppableTask, util::path::expand_path};
@@ -42,6 +45,11 @@ fn help() {
println!("\tkaching: Fun");
println!("\tping: Send a ping request to the darkfid RPC endpoint");
println!("\tcompletions: Generate a SHELL completion script and print to stdout");
println!(
"\tsubscribe: Perform a scan and then subscribe to darkfid to listen for incoming blocks"
);
println!("\tunsubscribe: Stops the background subscription, if its active");
println!("\tscan: Scan the blockchain and parse relevant transactions");
}
/// Auxiliary function to define the interactive shell completions.
@@ -63,6 +71,21 @@ fn completion(buf: &str, lc: &mut Vec<String>) {
if buf.starts_with("c") {
lc.push("completions".to_string());
return
}
if buf.starts_with("su") {
lc.push("subscribe".to_string());
return
}
if buf.starts_with("u") {
lc.push("unsubscribe".to_string());
return
}
if buf.starts_with("sc") {
lc.push("scan".to_string());
}
}
@@ -70,17 +93,19 @@ fn completion(buf: &str, lc: &mut Vec<String>) {
fn hints(buf: &str) -> Option<(String, i32, bool)> {
match buf {
"completions " => Some(("{shell}".to_string(), 35, false)), // 35 = magenta
"scan " => Some(("--reset {height}".to_string(), 35, false)), // 35 = magenta
_ => None,
}
}
/// Auxiliary function to start provided Drk as an interactive shell.
/// Only sane/linenoise terminals are suported.
pub async fn interactive(drk: &Drk, history_path: &str) {
// Expand the history file path
let history_path = match expand_path(history_path) {
Ok(p) => p,
Err(e) => {
println!("Error while expanding history file path: {e}");
eprintln!("Error while expanding history file path: {e}");
return
}
};
@@ -99,23 +124,93 @@ pub async fn interactive(drk: &Drk, history_path: &str) {
let _ = linenoise_history_load(history_file);
// Create a detached task to use for block subscription
let subscription_active = false;
let mut subscription_active = false;
let subscription_task = StoppableTask::new();
// Create two bounded smol channels, so we can have 2 way
// communication between the shell thread and the background task.
let (_shell_sender, shell_receiver) = smol::channel::bounded::<()>(1);
let (background_sender, _background_receiver) = smol::channel::bounded::<()>(1);
// Start the interactive shell
loop {
// Grab input or end if Ctrl-D or Ctrl-C was pressed
let Some(line) = linenoise("drk> ") else {
// Stop the subscription task if its active
if subscription_active {
subscription_task.stop().await;
// Generate the linoise state structure
let mut state = match LinenoiseState::edit_start(-1, -1, "drk> ") {
Ok(s) => s,
Err(e) => {
eprintln!("Error while generating linenoise state: {e}");
break
}
};
// Read until we get a line to process
let mut line = None;
loop {
let retval = unsafe {
// Setup read buffers
let mut readfds: fd_set = zeroed();
FD_ZERO(&mut readfds);
FD_SET(state.get_fd(), &mut readfds);
// Setup a 1 second timeout to check if background
// process wants to print.
let mut tv = timeval { tv_sec: 1, tv_usec: 0 };
// Wait timeout or input
select(state.get_fd() + 1, &mut readfds, null_mut(), null_mut(), &mut tv)
};
// Handle error
if retval == -1 {
eprintln!("Error while reading linenoise buffers");
break
}
// Write history file
let _ = linenoise_history_save(history_file);
// Check if background process wants to print anything
if shell_receiver.is_full() {
// Consume the channel message
if let Err(e) = shell_receiver.recv().await {
eprintln!("Error while reading shell receiver channel: {e}");
break
}
return
};
// Signal background task it can start printing
let _ = state.hide();
if let Err(e) = background_sender.send(()).await {
eprintln!("Error while sending to background task channel: {e}");
break
}
// Wait signal that it finished
if let Err(e) = shell_receiver.recv().await {
eprintln!("Error while reading shell receiver channel: {e}");
break
}
let _ = state.show();
}
// Check if we have a line to process
if retval <= 0 {
continue
}
// Process linenoise feed
match state.edit_feed() {
Ok(Some(l)) => line = Some(l),
Ok(None) => { /* Do nothing */ }
Err(e) if e.kind() == ErrorKind::Interrupted => { /* Do nothing */ }
Err(e) if e.kind() == ErrorKind::WouldBlock => {
// Need more input, continue
continue;
}
Err(e) => eprintln!("Error while reading linenoise feed: {e}"),
}
break
}
let _ = state.edit_stop();
// Grab input or end if Ctrl-D or Ctrl-C was pressed
let Some(line) = line else { break };
// Check if line is empty
if line.is_empty() {
@@ -137,9 +232,22 @@ pub async fn interactive(drk: &Drk, history_path: &str) {
"kaching" => kaching().await,
"ping" => handle_ping(drk).await,
"completions" => handle_completions(&parts),
"subscribe" => {
handle_subscribe(drk, &mut subscription_active, &subscription_task).await
}
"unsubscribe" => handle_unsubscribe(&mut subscription_active, &subscription_task).await,
"scan" => handle_scan(drk, &subscription_active, &parts).await,
_ => println!("Unreconized command: {}", parts[0]),
}
}
// Stop the subscription task if its active
if subscription_active {
subscription_task.stop().await;
}
// Write history file
let _ = linenoise_history_save(history_file);
}
/// Auxiliary function to define the ping command handling.
@@ -161,3 +269,75 @@ fn handle_completions(parts: &[&str]) {
println!("Error while executing completions command: {e}")
}
}
/// Auxiliary function to define the subscribe command handling.
async fn handle_subscribe(
drk: &Drk,
subscription_active: &mut bool,
_subscription_task: &StoppableTask,
) {
if *subscription_active {
println!("Subscription is already active!")
}
if let Err(e) = drk.scan_blocks().await {
println!("Failed during scanning: {e:?}");
return
}
println!("Finished scanning blockchain");
// TODO: subscribe
*subscription_active = true;
}
/// Auxiliary function to define the unsubscribe command handling.
async fn handle_unsubscribe(subscription_active: &mut bool, subscription_task: &StoppableTask) {
if !*subscription_active {
println!("Subscription is already inactive!")
}
subscription_task.stop().await;
*subscription_active = false;
}
/// Auxiliary function to define the scan command handling.
async fn handle_scan(drk: &Drk, subscription_active: &bool, parts: &[&str]) {
if *subscription_active {
println!("Subscription is already active!");
return
}
// Check correct command structure
if parts.len() != 1 && parts.len() != 3 {
println!("Malformed `scan` command");
return
}
// Check if reset was requested
if parts.len() == 3 {
if parts[1] != "--reset" {
println!("Malformed `scan` command");
println!("Usage: scan --reset {{height}}");
return
}
let height = match u32::from_str(parts[2]) {
Ok(h) => h,
Err(e) => {
println!("Invalid reset height: {e:?}");
return
}
};
if let Err(e) = drk.reset_to_height(height) {
println!("Failed during wallet reset: {e:?}");
return
}
}
if let Err(e) = drk.scan_blocks().await {
println!("Failed during scanning: {e:?}");
return
}
println!("Finished scanning blockchain");
}