darkfid/task/miner: properly listen for network proposals

This commit is contained in:
skoupidi
2024-02-13 17:44:17 +02:00
parent f1f05b726d
commit 847e4749eb
2 changed files with 104 additions and 56 deletions

View File

@@ -341,7 +341,7 @@ async fn realmain(args: Args, ex: Arc<smol::Executor<'static>>) -> Result<()> {
let task = StoppableTask::new();
task.clone().start(
// Weird hack to prevent lifetimes hell
async move { miner_task(&darkfid, &recipient).await },
async move { miner_task(&darkfid, &recipient, blockchain_config.skip_sync).await },
|res| async {
match res {
Ok(()) | Err(Error::MinerTaskStopped) => { /* Do nothing */ }

View File

@@ -18,7 +18,8 @@
use darkfi::{
blockchain::BlockInfo,
rpc::util::JsonValue,
rpc::{jsonrpc::JsonNotification, util::JsonValue},
system::Subscription,
tx::{ContractCallLeaf, Transaction, TransactionBuilder},
util::encoding::base64,
validator::{
@@ -46,35 +47,20 @@ use crate::{proto::ProposalMessage, Darkfid};
// TODO: handle all ? so the task don't stop on errors
/// async task used for participating in the PoW block production
pub async fn miner_task(node: &Darkfid, recipient: &PublicKey) -> Result<()> {
// TODO: For now we asume we have a single miner that produces block,
// until the PoW consensus and proper validations have been added.
// The miner workflow would be:
// First we wait for next finalization, for optimal conditions.
// After that we ask all our connected peers for their blocks,
// and append them to our consensus state, creating their forks.
// Then we evaluate each fork and find the best one, so we can
// mine its next.
// We start running 2 tasks, one listenning for blocks(proposals)
// from other miners, and one mining the best fork next block.
// These two tasks run in parallel. If we receive a block from
// another miner, we evaluate it and if it produces a higher
// ranking fork that the one we currectly mine, we stop, check
// if we can finalize any fork, and then start mining that fork
// next block. If we manage to mine the block next, we broadcast
// it and then execute the finalization check and start mining
// next best fork block.
/// async task used for participating in the PoW block production.
/// Miner initializes their setup and waits for next finalization,
/// by listenning for new proposals from the network, for optimal
/// conditions. After finalization occurs, they start the actual
/// miner loop, where they first grab the best ranking fork to extend,
/// and start mining procedure for its next block. Additionally, they
/// listen to the network for new proposals, and check if these
/// proposals produce a new best ranking fork. If they do, the stop
/// mining. These two tasks run in parallel, and after one of them
/// finishes, node triggers finallization check.
pub async fn miner_task(node: &Darkfid, recipient: &PublicKey, skip_sync: bool) -> Result<()> {
// Initialize miner configuration
info!(target: "darkfid::task::miner_task", "Starting miner task...");
// Start miner loop
miner_loop(node, recipient).await?;
Ok(())
}
/// Miner loop
async fn miner_loop(node: &Darkfid, recipient: &PublicKey) -> Result<()> {
// Grab zkas proving keys and bin for PoWReward transaction
info!(target: "darkfid::task::miner_task", "Generating zkas bin and proving keys...");
let (zkbin, _) = node.validator.blockchain.contracts.get_zkas(
@@ -99,34 +85,40 @@ async fn miner_loop(node: &Darkfid, recipient: &PublicKey) -> Result<()> {
// Grab blocks subscriber
let block_sub = node.subscribers.get("blocks").unwrap();
info!(target: "darkfid::task::miner_task", "Miner loop starts!");
// Miner loop
// Grab proposals subscriber and subscribe to it
let proposals_sub = node.subscribers.get("proposals").unwrap();
let subscription = proposals_sub.sub.clone().subscribe().await;
// Listen for blocks until next finalization, for optimal conditions
if !skip_sync {
loop {
subscription.receive().await;
// Check if we can finalize anything and broadcast them
let finalized = node.validator.finalization().await?;
if !finalized.is_empty() {
let mut notif_blocks = Vec::with_capacity(finalized.len());
for block in finalized {
notif_blocks
.push(JsonValue::String(bs58::encode(&serialize(&block)).into_string()));
}
block_sub.notify(JsonValue::Array(notif_blocks)).await;
break;
}
}
}
// Start miner loop
loop {
// Grab next target and block
let (next_target, mut next_block) =
generate_next_block(node, &mut secret, recipient, &zkbin, &pk).await?;
// Grab best current fork index
let fork_index = best_forks_indexes(&node.validator.consensus.forks.read().await)?[0];
// Execute request to minerd and parse response
let target = JsonValue::String(next_target.to_string());
let block = JsonValue::String(base64::encode(&serialize(&next_block)));
let response =
node.miner_daemon_request("mine", JsonValue::Array(vec![target, block])).await?;
next_block.header.nonce = *response.get::<f64>().unwrap() as u64;
// Sign the mined block
next_block.sign(&secret)?;
// Verify it
node.validator.consensus.module.read().await.verify_current_block(&next_block)?;
// Append the mined block as a proposal
let proposal = Proposal::new(next_block)?;
node.validator.consensus.append_proposal(&proposal).await?;
// Broadcast proposal to the network
let message = ProposalMessage(proposal);
node.miners_p2p.as_ref().unwrap().broadcast(&message).await;
node.sync_p2p.broadcast(&message).await;
// Start listenning for network proposals and mining next block for best fork.
smol::future::or(
listen_to_network(node, fork_index, &subscription),
mine_next_block(node, fork_index, &mut secret, recipient, &zkbin, &pk),
)
.await?;
// Check if we can finalize anything and broadcast them
let finalized = node.validator.finalization().await?;
@@ -141,9 +133,66 @@ async fn miner_loop(node: &Darkfid, recipient: &PublicKey) -> Result<()> {
}
}
/// Auxiliary function to listen for incoming proposals and check if the best fork has changed
async fn listen_to_network(
node: &Darkfid,
fork_index: usize,
subscription: &Subscription<JsonNotification>,
) -> Result<()> {
loop {
// Wait until a new proposal has been received
subscription.receive().await;
// Grab best current fork indexes
let fork_indexes = best_forks_indexes(&node.validator.consensus.forks.read().await)?;
if !fork_indexes.contains(&fork_index) {
return Ok(())
}
}
}
/// Auxiliary function to generate and mine provided fork index next block
async fn mine_next_block(
node: &Darkfid,
fork_index: usize,
secret: &mut SecretKey,
recipient: &PublicKey,
zkbin: &ZkBinary,
pk: &ProvingKey,
) -> Result<()> {
// Grab next target and block
let (next_target, mut next_block) =
generate_next_block(node, fork_index, secret, recipient, zkbin, pk).await?;
// Execute request to minerd and parse response
let target = JsonValue::String(next_target.to_string());
let block = JsonValue::String(base64::encode(&serialize(&next_block)));
let response = node.miner_daemon_request("mine", JsonValue::Array(vec![target, block])).await?;
next_block.header.nonce = *response.get::<f64>().unwrap() as u64;
// Sign the mined block
next_block.sign(secret)?;
// Verify it
node.validator.consensus.module.read().await.verify_current_block(&next_block)?;
// Append the mined block as a proposal
let proposal = Proposal::new(next_block)?;
node.validator.consensus.append_proposal(&proposal).await?;
// Broadcast proposal to the network
let message = ProposalMessage(proposal);
node.miners_p2p.as_ref().unwrap().broadcast(&message).await;
node.sync_p2p.broadcast(&message).await;
Ok(())
}
/// Auxiliary function to generate next block in an atomic manner
async fn generate_next_block(
node: &Darkfid,
fork_index: usize,
secret: &mut SecretKey,
recipient: &PublicKey,
zkbin: &ZkBinary,
@@ -153,7 +202,6 @@ async fn generate_next_block(
let forks = node.validator.consensus.forks.read().await;
// Grab best current fork
let fork_index = best_forks_indexes(&forks)?[0];
let fork = &forks[fork_index];
// Generate new signing key for next block