darkfid,minerd: better mining abort signaling between the two daemons

This commit is contained in:
skoupidi
2024-03-11 15:06:29 +02:00
parent 66d5e760b9
commit 3a446cecfd
2 changed files with 98 additions and 28 deletions

View File

@@ -19,7 +19,7 @@
use darkfi::{
blockchain::BlockInfo,
rpc::{jsonrpc::JsonNotification, util::JsonValue},
system::Subscription,
system::{sleep, Subscription},
tx::{ContractCallLeaf, Transaction, TransactionBuilder},
util::encoding::base64,
validator::{
@@ -42,12 +42,13 @@ use darkfi_serial::{serialize, Encodable};
use log::info;
use num_bigint::BigUint;
use rand::rngs::OsRng;
use smol::channel::{Receiver, Sender};
use crate::{proto::ProposalMessage, Darkfid};
// TODO: handle all ? so the task don't stop on errors
/// async task used for participating in the PoW block production.
/// Async task used for participating in the PoW block production.
/// Miner initializes their setup and waits for next finalization,
/// by listenning for new proposals from the network, for optimal
/// conditions. After finalization occurs, they start the actual
@@ -108,6 +109,9 @@ pub async fn miner_task(node: &Darkfid, recipient: &PublicKey, skip_sync: bool)
}
}
// Create channels so threads can signal each other
let (sender, stop_signal) = smol::channel::bounded(1);
info!(target: "darkfid::task::miner_task", "Miner initialized successfully!");
// Start miner loop
@@ -119,8 +123,8 @@ pub async fn miner_task(node: &Darkfid, recipient: &PublicKey, skip_sync: bool)
// Start listenning for network proposals and mining next block for best fork.
smol::future::or(
listen_to_network(node, &extended_fork, &subscription),
mine_next_block(node, &extended_fork, &mut secret, recipient, &zkbin, &pk),
listen_to_network(node, &extended_fork, &subscription, &sender),
mine(node, &extended_fork, &mut secret, recipient, &zkbin, &pk, &stop_signal),
)
.await?;
@@ -136,11 +140,12 @@ pub async fn miner_task(node: &Darkfid, recipient: &PublicKey, skip_sync: bool)
}
}
/// Auxiliary function to listen for incoming proposals and check if the best fork has changed
/// Async task to listen for incoming proposals and check if the best fork has changed
async fn listen_to_network(
node: &Darkfid,
extended_fork: &Fork,
subscription: &Subscription<JsonNotification>,
sender: &Sender<()>,
) -> Result<()> {
// Grab extended fork last proposal hash
let last_proposal_hash = extended_fork.last_proposal()?.hash;
@@ -157,14 +162,53 @@ async fn listen_to_network(
// Verify if proposals sequence has changed
if forks[index].last_proposal()?.hash != last_proposal_hash {
drop(forks);
return Ok(())
break
}
drop(forks);
}
// Signal miner to abort mining
sender.send(()).await?;
node.miner_daemon_request("abort", JsonValue::Array(vec![])).await?;
Ok(())
}
/// Auxiliary function to generate and mine provided fork index next block
/// Async task to generate and mine provided fork index next block,
/// while listening for a stop signal
async fn mine(
node: &Darkfid,
extended_fork: &Fork,
secret: &mut SecretKey,
recipient: &PublicKey,
zkbin: &ZkBinary,
pk: &ProvingKey,
stop_signal: &Receiver<()>,
) -> Result<()> {
smol::future::or(
wait_stop_signal(stop_signal),
mine_next_block(node, extended_fork, secret, recipient, zkbin, pk),
)
.await
}
/// Async task to wait for listener's stop signal.
async fn wait_stop_signal(stop_signal: &Receiver<()>) -> Result<()> {
// Clean stop signal channel
if stop_signal.is_full() {
stop_signal.recv().await?;
}
// Wait for listener signal
stop_signal.recv().await?;
// Take a nap to let listener notify minerd
sleep(10).await;
Ok(())
}
/// Async task to generate and mine provided fork index next block
async fn mine_next_block(
node: &Darkfid,
extended_fork: &Fork,

View File

@@ -48,6 +48,7 @@ impl RequestHandler for Minerd {
match req.method.as_str() {
"ping" => self.pong(req.id, req.params).await,
"abort" => self.abort(req.id, req.params).await,
"mine" => self.mine(req.id, req.params).await,
_ => JsonError::new(ErrorCode::MethodNotFound, None, req.id).into(),
}
@@ -59,6 +60,19 @@ impl RequestHandler for Minerd {
}
impl Minerd {
// RPCAPI:
// Signals miner daemon to abort mining pending request.
// Returns `true` on success.
//
// --> {"jsonrpc": "2.0", "method": "abort", "params": [], "id": 42}
// <-- {"jsonrpc": "2.0", "result": "true", "id": 42}
async fn abort(&self, id: u16, _params: JsonValue) -> JsonResult {
if let Some(e) = self.abort_pending(id).await {
return e
};
JsonResponse::new(JsonValue::Boolean(true), id).into()
}
// RPCAPI:
// Mine provided block for requested mine target, and return the corresponding nonce value.
//
@@ -89,27 +103,9 @@ impl Minerd {
};
// Check if another request is being processed
if self.stop_signal.receiver_count() > 1 {
info!(target: "minerd::rpc", "Another request is in progress, sending stop signal...");
// Send stop signal to other worker
if self.sender.send(()).await.is_err() {
error!(target: "minerd::rpc", "Failed to stop previous request");
return server_error(RpcError::StopFailed, id, None)
}
// Wait for other worker to terminate
info!(target: "minerd::rpc", "Waiting for request to terminate...");
while self.stop_signal.receiver_count() > 1 {
sleep(1).await;
}
info!(target: "minerd::rpc", "Previous request terminated!");
// Consume channel item so its empty again
if self.stop_signal.recv().await.is_err() {
error!(target: "minerd::rpc", "Failed to cleanup stop signal channel");
return server_error(RpcError::StopFailed, id, None)
}
}
if let Some(e) = self.abort_pending(id).await {
return e
};
// Mine provided block
let Ok(block_hash) = block.hash() else {
@@ -125,4 +121,34 @@ impl Minerd {
// Return block nonce
JsonResponse::new(JsonValue::Number(block.header.nonce as f64), id).into()
}
/// Auxiliary function to abort pending request.
async fn abort_pending(&self, id: u16) -> Option<JsonResult> {
// Check if a pending request is being processed
if self.stop_signal.receiver_count() == 0 {
return None
}
info!(target: "minerd::rpc", "Pending request is in progress, sending stop signal...");
// Send stop signal to worker
if self.sender.send(()).await.is_err() {
error!(target: "minerd::rpc", "Failed to stop pending request");
return Some(server_error(RpcError::StopFailed, id, None))
}
// Wait for worker to terminate
info!(target: "minerd::rpc", "Waiting for request to terminate...");
while self.stop_signal.receiver_count() > 1 {
sleep(1).await;
}
info!(target: "minerd::rpc", "Pending request terminated!");
// Consume channel item so its empty again
if self.stop_signal.recv().await.is_err() {
error!(target: "minerd::rpc", "Failed to cleanup stop signal channel");
return Some(server_error(RpcError::StopFailed, id, None))
}
None
}
}