From b26106ffe760b5dbceb8f4fc035fdd6956027b8f Mon Sep 17 00:00:00 2001 From: aggstam Date: Thu, 28 Sep 2023 16:49:53 +0300 Subject: [PATCH] validator/pow: stoppable mining --- bin/darkfid2/src/main.rs | 11 ++- bin/darkfid2/src/task/miner.rs | 23 ++--- src/validator/pow.rs | 165 ++++++++++++++++++--------------- 3 files changed, 111 insertions(+), 88 deletions(-) diff --git a/bin/darkfid2/src/main.rs b/bin/darkfid2/src/main.rs index e6f69342f..2b2225fef 100644 --- a/bin/darkfid2/src/main.rs +++ b/bin/darkfid2/src/main.rs @@ -241,12 +241,13 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> { darkfid.validator.write().await.purge_pending_txs().await?; // Consensus protocol - let consensus_task = if args.consensus { + let (consensus_task, consensus_sender) = if args.consensus { info!(target: "darkfid", "Starting consensus protocol task"); + let (sender, recvr) = smol::channel::bounded(1); let task = StoppableTask::new(); task.clone().start( // Weird hack to prevent lifetimes hell - async move { miner_task(&darkfid).await }, + async move { miner_task(&darkfid, &recvr).await }, |res| async { match res { Ok(()) | Err(Error::MinerTaskStopped) => { /* Do nothing */ } @@ -256,10 +257,10 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> { Error::MinerTaskStopped, ex.clone(), ); - Some(task) + (Some(task), Some(sender)) } else { info!(target: "darkfid", "Not participating in consensus"); - None + (None, None) }; // Signal handling for graceful termination. @@ -278,6 +279,8 @@ async fn realmain(args: Args, ex: Arc>) -> Result<()> { consensus_p2p.unwrap().stop().await; info!(target: "darkfid", "Stopping consensus task..."); + // Send signal to spawned miner threads to stop + consensus_sender.unwrap().send(()).await?; consensus_task.unwrap().stop().await; } diff --git a/bin/darkfid2/src/task/miner.rs b/bin/darkfid2/src/task/miner.rs index 872e7f82f..b4443183b 100644 --- a/bin/darkfid2/src/task/miner.rs +++ b/bin/darkfid2/src/task/miner.rs @@ -17,18 +17,15 @@ */ use darkfi::{ - blockchain::BlockInfo, - system::sleep, - util::time::Timestamp, - validator::pow::{mine_block, PoWModule}, - Result, + blockchain::BlockInfo, system::sleep, util::time::Timestamp, validator::pow::PoWModule, Result, }; use log::info; +use smol::channel::Receiver; use crate::{proto::BlockInfoMessage, Darkfid}; /// async task used for participating in the PoW consensus protocol -pub async fn miner_task(node: &Darkfid) -> Result<()> { +pub async fn miner_task(node: &Darkfid, stop_signal: &Receiver<()>) -> Result<()> { // TODO: For now we asume we have a single miner that produces block, // until the PoW consensus and proper validations have been added. // The miner workflow would be: @@ -52,6 +49,14 @@ pub async fn miner_task(node: &Darkfid) -> Result<()> { // We sleep so our miner can grab their pickaxe sleep(10).await; + // Start miner loop + miner_loop(node, stop_signal).await?; + + Ok(()) +} + +/// Miner loop +async fn miner_loop(node: &Darkfid, stop_signal: &Receiver<()>) -> Result<()> { // TODO: add miner threads arg // Generate a PoW module let mut module = PoWModule::new(node.validator.read().await.blockchain.clone(), None, Some(90)); @@ -73,7 +78,7 @@ pub async fn miner_task(node: &Darkfid) -> Result<()> { next_block.header.previous = last.hash()?; next_block.header.height = last.header.height + 1; next_block.header.timestamp = Timestamp::current_time(); - mine_block(module.clone(), &mut next_block); + module.mine_block(&mut next_block, stop_signal)?; // Verify it module.verify_block(&next_block)?; @@ -90,9 +95,5 @@ pub async fn miner_task(node: &Darkfid) -> Result<()> { // Update PoW module module.append(timestamp, &difficulty); - - // TODO: remove this once mining is not blocking - // Lazy way to enable stopping this task - sleep(10).await; } } diff --git a/src/validator/pow.rs b/src/validator/pow.rs index 5153e3565..e017e2e08 100644 --- a/src/validator/pow.rs +++ b/src/validator/pow.rs @@ -32,11 +32,12 @@ use darkfi_sdk::{ use log::info; use num_bigint::BigUint; use randomx::{RandomXCache, RandomXDataset, RandomXFlags, RandomXVM}; +use smol::channel::Receiver; use crate::{ blockchain::{BlockInfo, Blockchain}, util::{ringbuffer::RingBuffer, time::Timestamp}, - Result, + Error, Result, }; // TODO: replace asserts with error returns @@ -232,6 +233,93 @@ impl PoWModule { self.cummulative_difficulty += difficulty; self.difficulties.push(self.cummulative_difficulty.clone()); } + + /// Mine provided block, based on provided PoW module next mine target and difficulty + pub fn mine_block( + &self, + miner_block: &mut BlockInfo, + stop_signal: &Receiver<()>, + ) -> Result<()> { + let miner_setup = Instant::now(); + + // Grab the next mine target + let target = self.next_mine_target(); + info!(target: "validator::pow::mine_block", "[MINER] Mine target: 0x{:064x}", target); + + // Get the PoW input. The key changes with every mined block. + let input = miner_block.header.previous; + info!(target: "validator::pow::mine_block", "[MINER] PoW input: {}", input.to_hex()); + let flags = RandomXFlags::default() | RandomXFlags::FULLMEM; + info!(target: "validator::pow::mine_block", "[MINER] Initializing RandomX dataset..."); + let dataset = Arc::new(RandomXDataset::new(flags, input.as_bytes(), self.threads).unwrap()); + info!(target: "validator::pow::mine_block", "[MINER] Setup time: {:?}", miner_setup.elapsed()); + + // Multithreaded mining setup + let mining_time = Instant::now(); + let mut handles = vec![]; + let found_block = Arc::new(AtomicBool::new(false)); + let found_nonce = Arc::new(AtomicU32::new(0)); + let threads = self.threads as u32; + for t in 0..threads { + let target = target.clone(); + let mut block = miner_block.clone(); + let found_block = Arc::clone(&found_block); + let found_nonce = Arc::clone(&found_nonce); + let dataset = Arc::clone(&dataset); + let stop_signal = stop_signal.clone(); + + handles.push(thread::spawn(move || { + info!(target: "validator::pow::mine_block", "[MINER] Initializing RandomX VM #{}...", t); + let mut miner_nonce = t; + let vm = RandomXVM::new_fast(flags, &dataset).unwrap(); + loop { + // Check if stop signal was received + if stop_signal.is_full() { + info!(target: "validator::pow::mine_block", "[MINER] Stop signal received, thread #{} exiting", t); + break + } + + block.header.nonce = pallas::Base::from(miner_nonce as u64); + if found_block.load(Ordering::SeqCst) { + info!(target: "validator::pow::mine_block", "[MINER] Block found, thread #{} exiting", t); + break + } + + let out_hash = vm.hash(block.hash().unwrap().as_bytes()); + let out_hash = BigUint::from_bytes_be(&out_hash); + if out_hash <= target { + found_block.store(true, Ordering::SeqCst); + found_nonce.store(miner_nonce, Ordering::SeqCst); + info!(target: "validator::pow::mine_block", "[MINER] Thread #{} found block using nonce {}", + t, miner_nonce + ); + info!(target: "validator::pow::mine_block", "[MINER] Block hash {}", block.hash().unwrap().to_hex()); + info!(target: "validator::pow::mine_block", "[MINER] RandomX output: 0x{:064x}", out_hash); + break + } + + // This means thread 0 will use nonces, 0, 4, 8, ... + // and thread 1 will use nonces, 1, 5, 9, ... + miner_nonce += threads; + } + })); + } + + for handle in handles { + let _ = handle.join(); + } + // Check if stop signal was received + if stop_signal.is_full() { + return Err(Error::MinerTaskStopped) + } + + info!(target: "validator::pow::mine_block", "[MINER] Mining time: {:?}", mining_time.elapsed()); + + // Set the valid mined nonce in the block + miner_block.header.nonce = pallas::Base::from(found_nonce.load(Ordering::SeqCst) as u64); + + Ok(()) + } } impl std::fmt::Display for PoWModule { @@ -245,76 +333,6 @@ impl std::fmt::Display for PoWModule { } } -// TODO: Move this inside PoWModule(if possible) and use stoppable task so -// we can stop it externally -/// Mine provided block, based on provided PoW module next mine target and difficulty -pub fn mine_block(module: PoWModule, miner_block: &mut BlockInfo) { - let miner_setup = Instant::now(); - - // Grab the next mine target - let target = module.next_mine_target(); - info!(target: "validator::pow::mine_block", "[MINER] Mine target: 0x{:064x}", target); - - // Get the PoW input. The key changes with every mined block. - let input = miner_block.header.previous; - info!(target: "validator::pow::mine_block", "[MINER] PoW input: {}", input.to_hex()); - let flags = RandomXFlags::default() | RandomXFlags::FULLMEM; - info!(target: "validator::pow::mine_block", "[MINER] Initializing RandomX dataset..."); - let dataset = Arc::new(RandomXDataset::new(flags, input.as_bytes(), module.threads).unwrap()); - info!(target: "validator::pow::mine_block", "[MINER] Setup time: {:?}", miner_setup.elapsed()); - - // Multithreaded mining setup - let mining_time = Instant::now(); - let mut handles = vec![]; - let found_block = Arc::new(AtomicBool::new(false)); - let found_nonce = Arc::new(AtomicU32::new(0)); - for t in 0..module.threads { - let target = target.clone(); - let mut block = miner_block.clone(); - let found_block = Arc::clone(&found_block); - let found_nonce = Arc::clone(&found_nonce); - let dataset = Arc::clone(&dataset); - - handles.push(thread::spawn(move || { - info!(target: "validator::pow::mine_block", "[MINER] Initializing RandomX VM #{}...", t); - let mut miner_nonce = t as u32; - let vm = RandomXVM::new_fast(flags, &dataset).unwrap(); - loop { - block.header.nonce = pallas::Base::from(miner_nonce as u64); - if found_block.load(Ordering::SeqCst) { - info!(target: "validator::pow::mine_block", "[MINER] Block found, thread #{} exiting", t); - break - } - - let out_hash = vm.hash(block.hash().unwrap().as_bytes()); - let out_hash = BigUint::from_bytes_be(&out_hash); - if out_hash <= target { - found_block.store(true, Ordering::SeqCst); - found_nonce.store(miner_nonce, Ordering::SeqCst); - info!(target: "validator::pow::mine_block", "[MINER] Thread #{} found block using nonce {}", - t, miner_nonce - ); - info!(target: "validator::pow::mine_block", "[MINER] Block hash {}", block.hash().unwrap().to_hex()); - info!(target: "validator::pow::mine_block", "[MINER] RandomX output: 0x{:064x}", out_hash); - break - } - - // This means thread 0 will use nonces, 0, 4, 8, ... - // and thread 1 will use nonces, 1, 5, 9, ... - miner_nonce += module.threads as u32; - } - })); - } - - for handle in handles { - let _ = handle.join(); - } - info!(target: "validator::pow::mine_block", "[MINER] Mining time: {:?}", mining_time.elapsed()); - - // Set the valid mined nonce in the block - miner_block.header.nonce = pallas::Base::from(found_nonce.load(Ordering::SeqCst) as u64); -} - // TODO: move these to utils or something fn get_mid(a: u64, b: u64) -> u64 { (a / 2) + (b / 2) + ((a - 2 * (a / 2)) + (b - 2 * (b / 2))) / 2 @@ -352,7 +370,7 @@ mod tests { Result, }; - use super::{mine_block, PoWModule}; + use super::PoWModule; const DEFAULT_TEST_DIFFICULTY_TARGET: usize = 120; @@ -394,12 +412,13 @@ mod tests { let sled_db = sled::Config::new().temporary(true).open()?; let blockchain = Blockchain::new(&sled_db)?; let module = PoWModule::new(blockchain, None, Some(DEFAULT_TEST_DIFFICULTY_TARGET)); + let (_, recvr) = smol::channel::bounded(1); let genesis_block = BlockInfo::default(); // Mine next block let mut next_block = BlockInfo::default(); next_block.header.previous = genesis_block.hash()?; - mine_block(module.clone(), &mut next_block); + module.mine_block(&mut next_block, &recvr)?; // Verify it module.verify_block(&next_block)?;