diff --git a/bin/darkfid/src/main.rs b/bin/darkfid/src/main.rs index 87cbf4662..49007ec51 100644 --- a/bin/darkfid/src/main.rs +++ b/bin/darkfid/src/main.rs @@ -222,6 +222,9 @@ impl RequestHandler for Darkfid { Some("blockchain.subscribe_blocks") => { return self.blockchain_subscribe_blocks(req.id, params).await } + Some("blockchain.subscribe_err_txs") => { + return self.blockchain_subscribe_err_txs(req.id, params).await + } Some("blockchain.lookup_zkas") => { return self.blockchain_lookup_zkas(req.id, params).await } diff --git a/bin/darkfid/src/rpc_blockchain.rs b/bin/darkfid/src/rpc_blockchain.rs index 4e6fe9dcf..485b0e781 100644 --- a/bin/darkfid/src/rpc_blockchain.rs +++ b/bin/darkfid/src/rpc_blockchain.rs @@ -144,6 +144,24 @@ impl Darkfid { JsonSubscriber::new(blocks_subscriber).into() } + // RPCAPI: + // Initializes a subscription to erroneous transactions notifications. + // Once a subscription is established, `darkfid` will send JSON-RPC notifications of + // erroneous transactions to the subscriber. + // + // --> {"jsonrpc": "2.0", "method": "blockchain.subscribe_err_txs", "params": [], "id": 1} + // <-- {"jsonrpc": "2.0", "method": "blockchain.subscribe_err_txs", "params": [`tx_hash`]} + pub async fn blockchain_subscribe_err_txs(&self, id: Value, params: &[Value]) -> JsonResult { + if !params.is_empty() { + return JsonError::new(InvalidParams, None, id).into() + } + + let err_txs_subscriber = + self.validator_state.read().await.subscribers.get("err_txs").unwrap().clone(); + + JsonSubscriber::new(err_txs_subscriber).into() + } + // RPCAPI: // Performs a lookup of zkas bincodes for a given contract ID and returns all of // them, including their namespace. diff --git a/bin/drk/src/main.rs b/bin/drk/src/main.rs index b2442b379..7899d7649 100644 --- a/bin/drk/src/main.rs +++ b/bin/drk/src/main.rs @@ -189,13 +189,9 @@ enum Subcmd { /// Read a transaction from stdin and broadcast it Broadcast, - /// Subscribe to incoming blocks from darkfid - /// - /// This subscription will listen for incoming blocks from darkfid and look - /// through their transactions to see if there's any that interest us. - /// With `drk` we look at transactions calling the money contract so we can - /// find coins sent to us and fill our wallet with the necessary metadata. - Subscribe, + /// Subscribe to incoming notifications from darkfid + #[command(subcommand)] + Subscribe(SubscribeSubcmd), /// DAO functionalities #[command(subcommand)] @@ -420,6 +416,19 @@ enum TokenSubcmd { }, } +#[derive(Subcommand)] +enum SubscribeSubcmd { + /// This subscription will listen for incoming blocks from darkfid and look + /// through their transactions to see if there's any that interest us. + /// With `drk` we look at transactions calling the money contract so we can + /// find coins sent to us and fill our wallet with the necessary metadata. + Blocks, + + /// This subscription will listen for erroneous transactions that got + /// removed from darkfid mempool. + Transactions, +} + pub struct Drk { pub rpc_client: RpcClient, } @@ -799,15 +808,27 @@ async fn main() -> Result<()> { Ok(()) } - Subcmd::Subscribe => { - let drk = Drk::new(args.endpoint.clone()).await?; + Subcmd::Subscribe(cmd) => match cmd { + SubscribeSubcmd::Blocks => { + let drk = Drk::new(args.endpoint.clone()).await?; - drk.subscribe_blocks(args.endpoint) - .await - .with_context(|| "Block subscription failed")?; + drk.subscribe_blocks(args.endpoint.clone()) + .await + .with_context(|| "Block subscription failed")?; - Ok(()) - } + Ok(()) + } + + SubscribeSubcmd::Transactions => { + let drk = Drk::new(args.endpoint.clone()).await?; + + drk.subscribe_err_txs(args.endpoint) + .await + .with_context(|| "Erroneous transactions subscription failed")?; + + Ok(()) + } + }, Subcmd::Scan { reset, list, checkpoint } => { let drk = Drk::new(args.endpoint).await?; diff --git a/bin/drk/src/rpc_blockchain.rs b/bin/drk/src/rpc_blockchain.rs index 6a7a20640..0c48b1248 100644 --- a/bin/drk/src/rpc_blockchain.rs +++ b/bin/drk/src/rpc_blockchain.rs @@ -299,4 +299,58 @@ impl Drk { Ok(()) } + + /// Subscribes to darkfid's JSON-RPC notification endpoint that serves + /// erroneous transactions rejections. + pub async fn subscribe_err_txs(&self, endpoint: Url) -> Result<()> { + eprintln!("Subscribing to receive notifications of erroneous transactions"); + let subscriber = Subscriber::new(); + let subscription = subscriber.clone().subscribe().await; + + let rpc_client = RpcClient::new(endpoint).await?; + + let req = JsonRequest::new("blockchain.subscribe_err_txs", json!([])); + task::spawn(async move { rpc_client.subscribe(req, subscriber).await.unwrap() }); + eprintln!("Detached subscription to background"); + eprintln!("All is good. Waiting for erroneous transactions notifications..."); + + let e = loop { + match subscription.receive().await { + JsonResult::Notification(n) => { + eprintln!("Got erroneous transaction notification from darkfid subscription"); + if n.method != "blockchain.subscribe_err_txs" { + break anyhow!("Got foreign notification from darkfid: {}", n.method) + } + + let Some(params) = n.params.as_array() else { + break anyhow!("Received notification params are not an array") + }; + + if params.len() != 1 { + break anyhow!("Notification parameters are not len 1") + } + + let params = n.params.as_array().unwrap()[0].as_str().unwrap(); + let bytes = bs58::decode(params).into_vec()?; + + let txs_hash: String = deserialize(&bytes)?; + eprintln!("==================================="); + eprintln!("Erroneous transaction: {}", txs_hash); + eprintln!("==================================="); + } + + JsonResult::Error(e) => { + // Some error happened in the transmission + break anyhow!("Got error from JSON-RPC: {:?}", e) + } + + x => { + // And this is weird + break anyhow!("Got unexpected data from JSON-RPC: {:?}", x) + } + } + }; + + Err(e) + } } diff --git a/contrib/localnet/darkfid-single-node/faucetd/wallet.db b/contrib/localnet/darkfid-single-node/faucetd/wallet.db index 75149d118..38a3f738e 100644 Binary files a/contrib/localnet/darkfid-single-node/faucetd/wallet.db and b/contrib/localnet/darkfid-single-node/faucetd/wallet.db differ diff --git a/contrib/localnet/darkfid/README.md b/contrib/localnet/darkfid/README.md index 6a187b794..315b5c94d 100644 --- a/contrib/localnet/darkfid/README.md +++ b/contrib/localnet/darkfid/README.md @@ -35,8 +35,8 @@ work, we also need to subscribe to their RPC endpoints so we can scan incoming blocks and add them to our wallet. ``` -$ ./drk -e tcp://127.0.0.1:8440 subscribe -$ ./drk -e tcp://127.0.0.1:8540 subscribe +$ ./drk -e tcp://127.0.0.1:8440 subscribe blocks +$ ./drk -e tcp://127.0.0.1:8540 subscribe blocks ``` And now we can execute our airdrop calls: diff --git a/doc/src/testnet/airdrop.md b/doc/src/testnet/airdrop.md index 248d64d16..a7db17f51 100644 --- a/doc/src/testnet/airdrop.md +++ b/doc/src/testnet/airdrop.md @@ -19,7 +19,7 @@ On success, you should see a transaction ID. If successful, the airdrop transactions will how be in the consensus' mempool, waiting for inclusion in the next block. Depending on the network, finalization of the blocks could take some time. You'll have to wait -for this to happen. If your `drk subscribe` is running, then after +for this to happen. If your `drk subscribe blocks` is running, then after some time your balance should be in your wallet. ![pablo-waiting0](pablo0.jpg) @@ -101,6 +101,6 @@ $ ./drk broadcast < mint_tx ``` Now the transaction should be published to the network. If you have -an active block subscription (which you can do with `drk subscribe`), +an active block subscription (which you can do with `drk subscribe blocks`), then when the transaction is finalized, your wallet should have your new tokens listed when you request to see the balance. diff --git a/doc/src/testnet/node.md b/doc/src/testnet/node.md index 57110b7d9..8f5c2b2c8 100644 --- a/doc/src/testnet/node.md +++ b/doc/src/testnet/node.md @@ -71,7 +71,7 @@ and then to subscribe to new blocks: ``` $ ./drk scan -$ ./drk subscribe +$ ./drk subscribe blocks ``` Now you can leave the subscriber running. In case you stop it, just diff --git a/src/consensus/validator.rs b/src/consensus/validator.rs index ff5f384fc..698718e17 100644 --- a/src/consensus/validator.rs +++ b/src/consensus/validator.rs @@ -182,7 +182,9 @@ impl ValidatorState { // Here we initialize various subscribers that can export live consensus/blockchain data. let mut subscribers = HashMap::new(); let block_subscriber = Subscriber::new(); + let err_txs_subscriber = Subscriber::new(); subscribers.insert("blocks", block_subscriber); + subscribers.insert("err_txs", err_txs_subscriber); let state = Arc::new(RwLock::new(ValidatorState { lead_proving_key, @@ -311,6 +313,17 @@ impl ValidatorState { } info!(target: "consensus::validator", "purge_pending_txs(): Removing {} erroneous transactions...", erroneous_txs.len()); self.blockchain.pending_txs.remove(&erroneous_txs)?; + + // TODO: Don't hardcode this: + let err_txs_subscriber = self.subscribers.get("err_txs").unwrap(); + for err_tx in erroneous_txs { + let tx_hash = blake3::hash(&serialize(&err_tx)).to_hex().as_str().to_string(); + let params = json!([bs58::encode(&serialize(&tx_hash)).into_string()]); + let notif = JsonNotification::new("blockchain.subscribe_err_txs", params); + info!(target: "consensus::validator", "purge_pending_txs(): Sending notification about erroneous transaction"); + err_txs_subscriber.notify(notif).await; + } + Ok(()) } @@ -831,7 +844,7 @@ impl ValidatorState { // ========================== /// Validate and append to canonical state received blocks. - pub async fn receive_blocks(&mut self, blocks: &[BlockInfo]) -> Result<()> { + async fn receive_blocks(&mut self, blocks: &[BlockInfo]) -> Result<()> { // Verify state transitions for all blocks and their respective transactions. info!(target: "consensus::validator", "receive_blocks(): Starting state transition validations"); diff --git a/src/runtime/import/db.rs b/src/runtime/import/db.rs index 08add1c82..895efc020 100644 --- a/src/runtime/import/db.rs +++ b/src/runtime/import/db.rs @@ -186,7 +186,7 @@ pub(crate) fn db_lookup(ctx: FunctionEnvMut, ptr: WasmPtr, len: u32) -> let tree_handle = match contracts.lookup(&cid, &db_name) { Ok(v) => v, - Err(e) => return DB_LOOKUP_FAILED, + Err(_) => return DB_LOOKUP_FAILED, }; // TODO: Make sure we don't duplicate the DbHandle in the vec.