drk: moved subscribe to interactive shell

This commit is contained in:
skoupidi
2025-06-16 15:50:02 +03:00
parent bba4f31387
commit 45176ddc9c
7 changed files with 232 additions and 261 deletions

View File

@@ -246,14 +246,6 @@ pub fn generate_completions(shell: &str) -> Result<()> {
let broadcast =
SubCommand::with_name("broadcast").about("Read a transaction from stdin and broadcast it");
// Subscribe
let subscribe = SubCommand::with_name("subscribe").about(
"This subscription will listen for incoming blocks from darkfid and look \
through their transactions to see if there's any that interest us. \
With `drk` we look at transactions calling the money contract so we can \
find coins sent to us and fill our wallet with the necessary metadata.",
);
// DAO
let proposer_limit = Arg::with_name("proposer-limit")
.help("The minimum amount of governance tokens needed to open a proposal for this DAO");
@@ -516,7 +508,6 @@ pub fn generate_completions(shell: &str) -> Result<()> {
attach_fee,
inspect,
broadcast,
subscribe,
dao,
scan,
explorer,

View File

@@ -67,7 +67,7 @@ use darkfi_sdk::{
ContractCall,
};
use darkfi_serial::{
async_trait, deserialize_async, serialize_async, AsyncEncodable, SerialDecodable,
async_trait, deserialize_async, serialize, serialize_async, AsyncEncodable, SerialDecodable,
SerialEncodable,
};
@@ -1390,10 +1390,10 @@ impl Drk {
);
// Execute the query
if let Err(e) = self.wallet.exec_sql(
&query,
rusqlite::params![Some(*exec_height), Some(serialize_async(tx_hash).await), key],
) {
if let Err(e) = self
.wallet
.exec_sql(&query, rusqlite::params![Some(*exec_height), Some(serialize(tx_hash)), key])
{
return Err(Error::DatabaseError(format!(
"[apply_dao_exec_data] Update DAO proposal failed: {e:?}"
)))
@@ -1491,9 +1491,9 @@ impl Drk {
// Create its params
let params = rusqlite::params![
serialize_async(leaf_position).await,
serialize(leaf_position),
Some(*mint_height),
serialize_async(tx_hash).await,
serialize(tx_hash),
call_index,
key,
];
@@ -1567,8 +1567,8 @@ impl Drk {
let params = rusqlite::params![
key,
serialize_async(&proposal.proposal.dao_bulla).await,
serialize_async(&proposal.proposal).await,
serialize(&proposal.proposal.dao_bulla),
serialize(&proposal.proposal),
data,
leaf_position,
money_snapshot_tree,
@@ -1611,15 +1611,15 @@ impl Drk {
// Create its params
let params = rusqlite::params![
serialize_async(&vote.proposal).await,
serialize(&vote.proposal),
vote.vote_option as u64,
serialize_async(&vote.yes_vote_blind).await,
serialize_async(&vote.all_vote_value).await,
serialize_async(&vote.all_vote_blind).await,
serialize(&vote.yes_vote_blind),
serialize(&vote.all_vote_value),
serialize(&vote.all_vote_blind),
vote.block_height,
serialize_async(&vote.tx_hash).await,
serialize(&vote.tx_hash),
vote.call_index,
serialize_async(&vote.nullifiers).await,
serialize(&vote.nullifiers),
];
// Execute the query

View File

@@ -25,6 +25,7 @@ use linenoise_rs::{
linenoise_set_completion_callback, linenoise_set_hints_callback, LinenoiseState,
};
use smol::channel::{unbounded, Receiver, Sender};
use url::Url;
use darkfi::{
cli_desc,
@@ -35,6 +36,7 @@ use darkfi::{
use crate::{
cli_util::{generate_completions, kaching},
rpc::subscribe_blocks,
DrkPtr,
};
@@ -134,7 +136,7 @@ fn hints(buf: &str) -> Option<(String, i32, bool)> {
/// Auxiliary function to start provided Drk as an interactive shell.
/// Only sane/linenoise terminals are suported.
pub async fn interactive(drk: &DrkPtr, history_path: &str, ex: &ExecutorPtr) {
pub async fn interactive(drk: &DrkPtr, endpoint: &Url, history_path: &str, ex: &ExecutorPtr) {
// Expand the history file path
let history_path = match expand_path(history_path) {
Ok(p) => p,
@@ -197,6 +199,7 @@ pub async fn interactive(drk: &DrkPtr, history_path: &str, ex: &ExecutorPtr) {
"subscribe" => {
handle_subscribe(
drk,
endpoint,
&mut subscription_active,
&subscription_task,
&shell_sender,
@@ -297,7 +300,7 @@ async fn listen_for_line(
// Hide prompt, print output, show prompt again
let _ = state.hide();
for line in msg {
println!("{line}\r");
println!("{}\r", line.replace("\n", "\n\r"));
}
let _ = state.show();
}
@@ -344,13 +347,15 @@ fn handle_completions(parts: &[&str]) {
/// Auxiliary function to define the subscribe command handling.
async fn handle_subscribe(
drk: &DrkPtr,
endpoint: &Url,
subscription_active: &mut bool,
subscription_task: &StoppableTaskPtr,
shell_sender: &Sender<Vec<String>>,
ex: &ExecutorPtr,
) {
if *subscription_active {
println!("Subscription is already active!")
println!("Subscription is already active!");
return
}
if let Err(e) = drk.read().await.scan_blocks().await {
@@ -360,29 +365,16 @@ async fn handle_subscribe(
println!("Finished scanning blockchain");
// Start the subcristion task
// TODO: use actual subscribe not a dummy task
let drk_ = drk.clone();
let endpoint_ = endpoint.clone();
let shell_sender_ = shell_sender.clone();
let ex_ = ex.clone();
subscription_task.clone().start(
async move {
loop {
msleep(750).await;
let line = String::from("This is a single line dummy message");
if shell_sender_.send(vec![line]).await.is_err() {
break;
}
msleep(750).await;
let line0 = String::from("This is the first line of a multiline dummy message");
let line1 = String::from("This is the second line of a multiline dummy message");
if shell_sender_.send(vec![line0, line1]).await.is_err() {
break;
}
}
Ok(())
},
async move { subscribe_blocks(&drk_, shell_sender_, endpoint_, &ex_).await },
|res| async {
match res {
Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ }
Err(e) => println!("Failed starting dnet subs task: {e}"),
Err(e) => println!("Failed starting subscription task: {e}"),
}
},
Error::DetachedTaskStopped,
@@ -395,7 +387,8 @@ async fn handle_subscribe(
/// Auxiliary function to define the unsubscribe command handling.
async fn handle_unsubscribe(subscription_active: &mut bool, subscription_task: &StoppableTaskPtr) {
if !*subscription_active {
println!("Subscription is already inactive!")
println!("Subscription is already inactive!");
return
}
subscription_task.stop().await;
*subscription_active = false;

View File

@@ -207,12 +207,6 @@ enum Subcmd {
/// Read a transaction from stdin and broadcast it
Broadcast,
/// This subscription will listen for incoming blocks from darkfid and look
/// through their transactions to see if there's any that interest us.
/// With `drk` we look at transactions calling the money contract so we can
/// find coins sent to us and fill our wallet with the necessary metadata.
Subscribe,
/// DAO functionalities
Dao {
#[structopt(subcommand)]
@@ -662,13 +656,14 @@ async fn realmain(args: Args, ex: ExecutorPtr) -> Result<()> {
blockchain_config.cache_path,
blockchain_config.wallet_path,
blockchain_config.wallet_pass,
Some(blockchain_config.endpoint),
Some(blockchain_config.endpoint.clone()),
&ex,
args.fun,
)
.await
.into_ptr();
interactive(&drk, &blockchain_config.history_path, &ex).await;
interactive(&drk, &blockchain_config.endpoint, &blockchain_config.history_path, &ex)
.await;
drk.read().await.stop_rpc_client().await?;
Ok(())
}
@@ -2026,25 +2021,6 @@ async fn realmain(args: Args, ex: ExecutorPtr) -> Result<()> {
drk.stop_rpc_client().await
}
Subcmd::Subscribe => {
let drk = new_wallet(
blockchain_config.cache_path,
blockchain_config.wallet_path,
blockchain_config.wallet_pass,
Some(blockchain_config.endpoint.clone()),
&ex,
args.fun,
)
.await;
if let Err(e) = drk.subscribe_blocks(blockchain_config.endpoint, &ex).await {
eprintln!("Block subscription failed: {e:?}");
exit(2);
}
drk.stop_rpc_client().await
}
Subcmd::Scan { reset } => {
let drk = new_wallet(
blockchain_config.cache_path,

View File

@@ -55,7 +55,7 @@ use darkfi_sdk::{
pasta::pallas,
ContractCall,
};
use darkfi_serial::{deserialize_async, serialize_async, AsyncEncodable};
use darkfi_serial::{deserialize_async, serialize, serialize_async, AsyncEncodable};
use crate::{
cache::CacheSmt,
@@ -851,16 +851,16 @@ impl Drk {
// Execute the query
let params = rusqlite::params![
key,
serialize_async(&coin.note.value).await,
serialize_async(&coin.note.token_id).await,
serialize_async(&coin.note.spend_hook).await,
serialize_async(&coin.note.user_data).await,
serialize_async(&coin.note.coin_blind).await,
serialize_async(&coin.note.value_blind).await,
serialize_async(&coin.note.token_blind).await,
serialize_async(&coin.secret).await,
serialize_async(&coin.leaf_position).await,
serialize_async(&coin.note.memo).await,
serialize(&coin.note.value),
serialize(&coin.note.token_id),
serialize(&coin.note.spend_hook),
serialize(&coin.note.user_data),
serialize(&coin.note.coin_blind),
serialize(&coin.note.value_blind),
serialize(&coin.note.token_blind),
serialize(&coin.secret),
serialize(&coin.leaf_position),
serialize(&coin.note.memo),
creation_height,
0, // <-- is_spent
spent_height,

View File

@@ -21,6 +21,7 @@ use std::{
time::Instant,
};
use smol::channel::Sender;
use url::Url;
use darkfi::{
@@ -53,7 +54,7 @@ use crate::{
dao::{SLED_MERKLE_TREES_DAO_DAOS, SLED_MERKLE_TREES_DAO_PROPOSALS},
error::{WalletDbError, WalletDbResult},
money::SLED_MERKLE_TREES_MONEY,
Drk,
Drk, DrkPtr,
};
/// Auxiliary structure holding various in memory caches to use during scan
@@ -142,177 +143,6 @@ impl Drk {
})
}
/// Subscribes to darkfid's JSON-RPC notification endpoint that serves
/// new confirmed blocks. Upon receiving them, all the transactions are
/// scanned and we check if any of them call the money contract, and if
/// the payments are intended for us. If so, we decrypt them and append
/// the metadata to our wallet. If a reorg block is received, we revert
/// to its previous height and then scan it. We assume that the blocks
/// up to that point are unchanged, since darkfid will just broadcast
/// the sequence after the reorg.
pub async fn subscribe_blocks(&self, endpoint: Url, ex: &ExecutorPtr) -> Result<()> {
// Grab last confirmed block height
let (last_confirmed_height, _) = self.get_last_confirmed_block().await?;
// Handle genesis(0) block
if last_confirmed_height == 0 {
if let Err(e) = self.scan_blocks().await {
return Err(Error::DatabaseError(format!(
"[subscribe_blocks] Scanning from genesis block failed: {e:?}"
)))
}
}
// Grab last confirmed block again
let (last_confirmed_height, last_confirmed_hash) = self.get_last_confirmed_block().await?;
// Grab last scanned block
let (mut last_scanned_height, last_scanned_hash) = match self.get_last_scanned_block() {
Ok(last) => last,
Err(e) => {
return Err(Error::DatabaseError(format!(
"[subscribe_blocks] Retrieving last scanned block failed: {e:?}"
)))
}
};
// Check if other blocks have been created
if last_confirmed_height != last_scanned_height || last_confirmed_hash != last_scanned_hash
{
eprintln!("Warning: Last scanned block is not the last confirmed block.");
eprintln!("You should first fully scan the blockchain, and then subscribe");
return Err(Error::DatabaseError(
"[subscribe_blocks] Blockchain not fully scanned".to_string(),
))
}
println!("Subscribing to receive notifications of incoming blocks");
let publisher = Publisher::new();
let subscription = publisher.clone().subscribe().await;
let _publisher = publisher.clone();
let _ex = ex.clone();
StoppableTask::new().start(
// Weird hack to prevent lifetimes hell
async move {
let rpc_client = RpcClient::new(endpoint, _ex).await?;
let req = JsonRequest::new("blockchain.subscribe_blocks", JsonValue::Array(vec![]));
rpc_client.subscribe(req, _publisher).await
},
|res| async move {
match res {
Ok(()) => { /* Do nothing */ }
Err(e) => {
eprintln!("[subscribe_blocks] JSON-RPC server error: {e:?}");
publisher
.notify(JsonResult::Error(JsonError::new(
ErrorCode::InternalError,
None,
0,
)))
.await;
}
}
},
Error::RpcServerStopped,
ex.clone(),
);
println!("Detached subscription to background");
println!("All is good. Waiting for block notifications...");
let e = loop {
match subscription.receive().await {
JsonResult::Notification(n) => {
println!("Got Block notification from darkfid subscription");
if n.method != "blockchain.subscribe_blocks" {
break Error::UnexpectedJsonRpc(format!(
"Got foreign notification from darkfid: {}",
n.method
))
}
// Verify parameters
if !n.params.is_array() {
break Error::UnexpectedJsonRpc(
"Received notification params are not an array".to_string(),
)
}
let params = n.params.get::<Vec<JsonValue>>().unwrap();
if params.is_empty() {
break Error::UnexpectedJsonRpc(
"Notification parameters are empty".to_string(),
)
}
for param in params {
let param = param.get::<String>().unwrap();
let bytes = base64::decode(param).unwrap();
let block: BlockInfo = deserialize_async(&bytes).await?;
println!("Deserialized successfully. Scanning block...");
// Check if a reorg block was received, to reset to its previous
if block.header.height <= last_scanned_height {
let reset_height = block.header.height.saturating_sub(1);
if let Err(e) = self.reset_to_height(reset_height) {
return Err(Error::DatabaseError(format!(
"[subscribe_blocks] Wallet state reset failed: {e:?}"
)))
}
// Scan genesis again if needed
if reset_height == 0 {
let genesis = match self.get_block_by_height(reset_height).await {
Ok(b) => b,
Err(e) => {
return Err(Error::Custom(format!(
"[subscribe_blocks] RPC client request failed: {e:?}"
)))
}
};
let mut scan_cache = self.scan_cache().await?;
if let Err(e) = self.scan_block(&mut scan_cache, &genesis).await {
return Err(Error::DatabaseError(format!(
"[subscribe_blocks] Scanning block failed: {e:?}"
)))
};
for msg in scan_cache.flush_messages() {
println!("{msg}");
}
}
}
let mut scan_cache = self.scan_cache().await?;
if let Err(e) = self.scan_block(&mut scan_cache, &block).await {
return Err(Error::DatabaseError(format!(
"[subscribe_blocks] Scanning block failed: {e:?}"
)))
}
for msg in scan_cache.flush_messages() {
println!("{msg}");
}
// Set new last scanned block height
last_scanned_height = block.header.height;
}
}
JsonResult::Error(e) => {
// Some error happened in the transmission
break Error::UnexpectedJsonRpc(format!("Got error from JSON-RPC: {e:?}"))
}
x => {
// And this is weird
break Error::UnexpectedJsonRpc(format!(
"Got unexpected data from JSON-RPC: {x:?}"
))
}
}
};
Err(e)
}
/// `scan_block` will go over over transactions in a block and handle their calls
/// based on the called contract.
async fn scan_block(&self, scan_cache: &mut ScanCache, block: &BlockInfo) -> Result<()> {
@@ -724,3 +554,186 @@ impl Drk {
Ok(())
}
}
/// Subscribes to darkfid's JSON-RPC notification endpoint that serves
/// new confirmed blocks. Upon receiving them, all the transactions are
/// scanned and we check if any of them call the money contract, and if
/// the payments are intended for us. If so, we decrypt them and append
/// the metadata to our wallet. If a reorg block is received, we revert
/// to its previous height and then scan it. We assume that the blocks
/// up to that point are unchanged, since darkfid will just broadcast
/// the sequence after the reorg.
pub async fn subscribe_blocks(
drk: &DrkPtr,
shell_sender: Sender<Vec<String>>,
endpoint: Url,
ex: &ExecutorPtr,
) -> Result<()> {
// Grab last confirmed block height
let lock = drk.read().await;
let (last_confirmed_height, _) = lock.get_last_confirmed_block().await?;
// Handle genesis(0) block
if last_confirmed_height == 0 {
if let Err(e) = lock.scan_blocks().await {
return Err(Error::DatabaseError(format!(
"[subscribe_blocks] Scanning from genesis block failed: {e:?}"
)))
}
}
// Grab last confirmed block again
let (last_confirmed_height, last_confirmed_hash) = lock.get_last_confirmed_block().await?;
// Grab last scanned block
let (mut last_scanned_height, last_scanned_hash) = match lock.get_last_scanned_block() {
Ok(last) => last,
Err(e) => {
return Err(Error::DatabaseError(format!(
"[subscribe_blocks] Retrieving last scanned block failed: {e:?}"
)))
}
};
// Check if other blocks have been created
if last_confirmed_height != last_scanned_height || last_confirmed_hash != last_scanned_hash {
shell_sender
.send(vec![
String::from("Warning: Last scanned block is not the last confirmed block."),
String::from("You should first fully scan the blockchain, and then subscribe"),
])
.await?;
return Err(Error::DatabaseError(
"[subscribe_blocks] Blockchain not fully scanned".to_string(),
))
}
let mut shell_message =
vec![String::from("Subscribing to receive notifications of incoming blocks")];
let publisher = Publisher::new();
let subscription = publisher.clone().subscribe().await;
let _publisher = publisher.clone();
let _ex = ex.clone();
StoppableTask::new().start(
// Weird hack to prevent lifetimes hell
async move {
let rpc_client = RpcClient::new(endpoint, _ex).await?;
let req = JsonRequest::new("blockchain.subscribe_blocks", JsonValue::Array(vec![]));
rpc_client.subscribe(req, _publisher).await
},
|res| async move {
match res {
Ok(()) => { /* Do nothing */ }
Err(e) => {
eprintln!("[subscribe_blocks] JSON-RPC server error: {e:?}");
publisher
.notify(JsonResult::Error(JsonError::new(
ErrorCode::InternalError,
None,
0,
)))
.await;
}
}
},
Error::RpcServerStopped,
ex.clone(),
);
shell_message.push(String::from("Detached subscription to background"));
shell_message.push(String::from("All is good. Waiting for block notifications..."));
shell_sender.send(shell_message).await?;
drop(lock);
let e = loop {
match subscription.receive().await {
JsonResult::Notification(n) => {
let mut shell_message =
vec![String::from("Got Block notification from darkfid subscription")];
if n.method != "blockchain.subscribe_blocks" {
break Error::UnexpectedJsonRpc(format!(
"Got foreign notification from darkfid: {}",
n.method
))
}
// Verify parameters
if !n.params.is_array() {
break Error::UnexpectedJsonRpc(
"Received notification params are not an array".to_string(),
)
}
let params = n.params.get::<Vec<JsonValue>>().unwrap();
if params.is_empty() {
break Error::UnexpectedJsonRpc("Notification parameters are empty".to_string())
}
for param in params {
let param = param.get::<String>().unwrap();
let bytes = base64::decode(param).unwrap();
let block: BlockInfo = deserialize_async(&bytes).await?;
shell_message
.push(String::from("Deserialized successfully. Scanning block..."));
// Check if a reorg block was received, to reset to its previous
let lock = drk.read().await;
if block.header.height <= last_scanned_height {
let reset_height = block.header.height.saturating_sub(1);
if let Err(e) = lock.reset_to_height(reset_height) {
return Err(Error::DatabaseError(format!(
"[subscribe_blocks] Wallet state reset failed: {e:?}"
)))
}
// Scan genesis again if needed
if reset_height == 0 {
let genesis = match lock.get_block_by_height(reset_height).await {
Ok(b) => b,
Err(e) => {
return Err(Error::Custom(format!(
"[subscribe_blocks] RPC client request failed: {e:?}"
)))
}
};
let mut scan_cache = lock.scan_cache().await?;
if let Err(e) = lock.scan_block(&mut scan_cache, &genesis).await {
return Err(Error::DatabaseError(format!(
"[subscribe_blocks] Scanning block failed: {e:?}"
)))
};
for msg in scan_cache.flush_messages() {
shell_message.push(msg);
}
}
}
let mut scan_cache = lock.scan_cache().await?;
if let Err(e) = lock.scan_block(&mut scan_cache, &block).await {
return Err(Error::DatabaseError(format!(
"[subscribe_blocks] Scanning block failed: {e:?}"
)))
}
for msg in scan_cache.flush_messages() {
shell_message.push(msg);
}
shell_sender.send(shell_message.clone()).await?;
// Set new last scanned block height
last_scanned_height = block.header.height;
}
}
JsonResult::Error(e) => {
// Some error happened in the transmission
break Error::UnexpectedJsonRpc(format!("Got error from JSON-RPC: {e:?}"))
}
x => {
// And this is weird
break Error::UnexpectedJsonRpc(format!("Got unexpected data from JSON-RPC: {x:?}"))
}
}
};
Err(e)
}

View File

@@ -19,7 +19,7 @@
use rusqlite::types::Value;
use darkfi::{tx::Transaction, Error, Result};
use darkfi_serial::{deserialize_async, serialize_async};
use darkfi_serial::{deserialize_async, serialize};
use crate::{
convert_named_params,
@@ -56,10 +56,8 @@ impl Drk {
// Execute the query
let tx_hash = tx.hash().to_string();
self.wallet.exec_sql(
&query,
rusqlite::params![tx_hash, status, block_height, &serialize_async(tx).await],
)?;
self.wallet
.exec_sql(&query, rusqlite::params![tx_hash, status, block_height, &serialize(tx)])?;
Ok(tx_hash)
}