From 3a446cecfdfdfdecd0020cf7dba5d82528083b1e Mon Sep 17 00:00:00 2001 From: skoupidi Date: Mon, 11 Mar 2024 15:06:29 +0200 Subject: [PATCH] darkfid,minerd: better mining abort signaling between the two daemons --- bin/darkfid/src/task/miner.rs | 58 ++++++++++++++++++++++++++---- bin/minerd/src/rpc.rs | 68 ++++++++++++++++++++++++----------- 2 files changed, 98 insertions(+), 28 deletions(-) diff --git a/bin/darkfid/src/task/miner.rs b/bin/darkfid/src/task/miner.rs index 7107d5d12..2760ced1a 100644 --- a/bin/darkfid/src/task/miner.rs +++ b/bin/darkfid/src/task/miner.rs @@ -19,7 +19,7 @@ use darkfi::{ blockchain::BlockInfo, rpc::{jsonrpc::JsonNotification, util::JsonValue}, - system::Subscription, + system::{sleep, Subscription}, tx::{ContractCallLeaf, Transaction, TransactionBuilder}, util::encoding::base64, validator::{ @@ -42,12 +42,13 @@ use darkfi_serial::{serialize, Encodable}; use log::info; use num_bigint::BigUint; use rand::rngs::OsRng; +use smol::channel::{Receiver, Sender}; use crate::{proto::ProposalMessage, Darkfid}; // TODO: handle all ? so the task don't stop on errors -/// async task used for participating in the PoW block production. +/// Async task used for participating in the PoW block production. /// Miner initializes their setup and waits for next finalization, /// by listenning for new proposals from the network, for optimal /// conditions. After finalization occurs, they start the actual @@ -108,6 +109,9 @@ pub async fn miner_task(node: &Darkfid, recipient: &PublicKey, skip_sync: bool) } } + // Create channels so threads can signal each other + let (sender, stop_signal) = smol::channel::bounded(1); + info!(target: "darkfid::task::miner_task", "Miner initialized successfully!"); // Start miner loop @@ -119,8 +123,8 @@ pub async fn miner_task(node: &Darkfid, recipient: &PublicKey, skip_sync: bool) // Start listenning for network proposals and mining next block for best fork. smol::future::or( - listen_to_network(node, &extended_fork, &subscription), - mine_next_block(node, &extended_fork, &mut secret, recipient, &zkbin, &pk), + listen_to_network(node, &extended_fork, &subscription, &sender), + mine(node, &extended_fork, &mut secret, recipient, &zkbin, &pk, &stop_signal), ) .await?; @@ -136,11 +140,12 @@ pub async fn miner_task(node: &Darkfid, recipient: &PublicKey, skip_sync: bool) } } -/// Auxiliary function to listen for incoming proposals and check if the best fork has changed +/// Async task to listen for incoming proposals and check if the best fork has changed async fn listen_to_network( node: &Darkfid, extended_fork: &Fork, subscription: &Subscription, + sender: &Sender<()>, ) -> Result<()> { // Grab extended fork last proposal hash let last_proposal_hash = extended_fork.last_proposal()?.hash; @@ -157,14 +162,53 @@ async fn listen_to_network( // Verify if proposals sequence has changed if forks[index].last_proposal()?.hash != last_proposal_hash { drop(forks); - return Ok(()) + break } drop(forks); } + + // Signal miner to abort mining + sender.send(()).await?; + node.miner_daemon_request("abort", JsonValue::Array(vec![])).await?; + + Ok(()) } -/// Auxiliary function to generate and mine provided fork index next block +/// Async task to generate and mine provided fork index next block, +/// while listening for a stop signal +async fn mine( + node: &Darkfid, + extended_fork: &Fork, + secret: &mut SecretKey, + recipient: &PublicKey, + zkbin: &ZkBinary, + pk: &ProvingKey, + stop_signal: &Receiver<()>, +) -> Result<()> { + smol::future::or( + wait_stop_signal(stop_signal), + mine_next_block(node, extended_fork, secret, recipient, zkbin, pk), + ) + .await +} + +/// Async task to wait for listener's stop signal. +async fn wait_stop_signal(stop_signal: &Receiver<()>) -> Result<()> { + // Clean stop signal channel + if stop_signal.is_full() { + stop_signal.recv().await?; + } + + // Wait for listener signal + stop_signal.recv().await?; + // Take a nap to let listener notify minerd + sleep(10).await; + + Ok(()) +} + +/// Async task to generate and mine provided fork index next block async fn mine_next_block( node: &Darkfid, extended_fork: &Fork, diff --git a/bin/minerd/src/rpc.rs b/bin/minerd/src/rpc.rs index 8fec63c7a..37a0dd769 100644 --- a/bin/minerd/src/rpc.rs +++ b/bin/minerd/src/rpc.rs @@ -48,6 +48,7 @@ impl RequestHandler for Minerd { match req.method.as_str() { "ping" => self.pong(req.id, req.params).await, + "abort" => self.abort(req.id, req.params).await, "mine" => self.mine(req.id, req.params).await, _ => JsonError::new(ErrorCode::MethodNotFound, None, req.id).into(), } @@ -59,6 +60,19 @@ impl RequestHandler for Minerd { } impl Minerd { + // RPCAPI: + // Signals miner daemon to abort mining pending request. + // Returns `true` on success. + // + // --> {"jsonrpc": "2.0", "method": "abort", "params": [], "id": 42} + // <-- {"jsonrpc": "2.0", "result": "true", "id": 42} + async fn abort(&self, id: u16, _params: JsonValue) -> JsonResult { + if let Some(e) = self.abort_pending(id).await { + return e + }; + JsonResponse::new(JsonValue::Boolean(true), id).into() + } + // RPCAPI: // Mine provided block for requested mine target, and return the corresponding nonce value. // @@ -89,27 +103,9 @@ impl Minerd { }; // Check if another request is being processed - if self.stop_signal.receiver_count() > 1 { - info!(target: "minerd::rpc", "Another request is in progress, sending stop signal..."); - // Send stop signal to other worker - if self.sender.send(()).await.is_err() { - error!(target: "minerd::rpc", "Failed to stop previous request"); - return server_error(RpcError::StopFailed, id, None) - } - - // Wait for other worker to terminate - info!(target: "minerd::rpc", "Waiting for request to terminate..."); - while self.stop_signal.receiver_count() > 1 { - sleep(1).await; - } - info!(target: "minerd::rpc", "Previous request terminated!"); - - // Consume channel item so its empty again - if self.stop_signal.recv().await.is_err() { - error!(target: "minerd::rpc", "Failed to cleanup stop signal channel"); - return server_error(RpcError::StopFailed, id, None) - } - } + if let Some(e) = self.abort_pending(id).await { + return e + }; // Mine provided block let Ok(block_hash) = block.hash() else { @@ -125,4 +121,34 @@ impl Minerd { // Return block nonce JsonResponse::new(JsonValue::Number(block.header.nonce as f64), id).into() } + + /// Auxiliary function to abort pending request. + async fn abort_pending(&self, id: u16) -> Option { + // Check if a pending request is being processed + if self.stop_signal.receiver_count() == 0 { + return None + } + + info!(target: "minerd::rpc", "Pending request is in progress, sending stop signal..."); + // Send stop signal to worker + if self.sender.send(()).await.is_err() { + error!(target: "minerd::rpc", "Failed to stop pending request"); + return Some(server_error(RpcError::StopFailed, id, None)) + } + + // Wait for worker to terminate + info!(target: "minerd::rpc", "Waiting for request to terminate..."); + while self.stop_signal.receiver_count() > 1 { + sleep(1).await; + } + info!(target: "minerd::rpc", "Pending request terminated!"); + + // Consume channel item so its empty again + if self.stop_signal.recv().await.is_err() { + error!(target: "minerd::rpc", "Failed to cleanup stop signal channel"); + return Some(server_error(RpcError::StopFailed, id, None)) + } + + None + } }