From a80800aac86ab9780a9c00cb9d8fbf58fb3e0bdd Mon Sep 17 00:00:00 2001 From: skoupidi Date: Mon, 24 Nov 2025 15:34:40 +0200 Subject: [PATCH] validator/pow: multithreaded mining fixed --- Cargo.lock | 26 +--- src/validator/pow.rs | 235 +++++++++++++++++++++++-------- src/validator/randomx_factory.rs | 17 +-- 3 files changed, 178 insertions(+), 100 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a8371cfb0..e480df0c5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -698,26 +698,6 @@ dependencies = [ "syn 2.0.110", ] -[[package]] -name = "bindgen" -version = "0.71.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f58bf3d7db68cfbac37cfc485a8d711e87e064c3d0fe0435b92f7a407f9d6b3" -dependencies = [ - "bitflags 2.10.0", - "cexpr", - "clang-sys", - "itertools 0.13.0", - "log", - "prettyplease", - "proc-macro2", - "quote", - "regex", - "rustc-hash 2.1.1", - "shlex", - "syn 2.0.110", -] - [[package]] name = "bindgen" version = "0.72.1" @@ -728,6 +708,8 @@ dependencies = [ "cexpr", "clang-sys", "itertools 0.13.0", + "log", + "prettyplease", "proc-macro2", "quote", "regex", @@ -5693,9 +5675,9 @@ dependencies = [ [[package]] name = "randomx" version = "1.2.1" -source = "git+https://codeberg.org/darkrenaissance/RandomX#d67ab9ccdf83e990c4dfd727181454199c02fa2f" +source = "git+https://codeberg.org/darkrenaissance/RandomX#648279c8ab20c563698ccc1117efba7d40ccf29c" dependencies = [ - "bindgen 0.71.1", + "bindgen 0.72.1", "bitflags 2.10.0", "libc", ] diff --git a/src/validator/pow.rs b/src/validator/pow.rs index 82570a461..5fea6012c 100644 --- a/src/validator/pow.rs +++ b/src/validator/pow.rs @@ -17,6 +17,7 @@ */ use std::{ + collections::VecDeque, sync::{ atomic::{AtomicBool, AtomicU64, Ordering}, Arc, @@ -29,7 +30,7 @@ use darkfi_sdk::num_traits::{One, Zero}; use num_bigint::BigUint; use randomx::{RandomXCache, RandomXDataset, RandomXFlags, RandomXVM}; use smol::channel::Receiver; -use tracing::debug; +use tracing::{debug, error}; use crate::{ blockchain::{ @@ -40,9 +41,8 @@ use crate::{ }, Blockchain, BlockchainOverlayPtr, }, - system::thread_priority::ThreadPriority, util::{ringbuffer::RingBuffer, time::Timestamp}, - validator::{randomx_factory::init_dataset_wrapper, utils::median, RandomXFactory}, + validator::{utils::median, RandomXFactory}, Error, Result, }; @@ -441,113 +441,224 @@ impl std::fmt::Display for PoWModule { } } -/// Mine provided block, based on provided PoW module next mine target. -pub fn mine_block( - target: &BigUint, - input: &HeaderHash, - miner_header: &mut Header, - threads: usize, - stop_signal: &Receiver<()>, -) -> Result<()> { - let miner_setup = Instant::now(); - - debug!(target: "validator::pow::mine_block", "[MINER] Mine target: 0x{target:064x}"); - debug!(target: "validator::pow::mine_block", "[MINER] PoW input: {input}"); - +/// Auxiliary function to define `RandomXFlags` used in mining. +fn get_mining_flags() -> RandomXFlags { + // TODO: Try adding `| RandomXFlags::LARGEPAGES`. let mut flags = RandomXFlags::get_recommended_flags() | RandomXFlags::FULLMEM; - #[cfg(any(target_arch = "x86", target_arch = "x86_64"))] if is_x86_feature_detected!("avx2") { flags |= RandomXFlags::ARGON2_AVX2; } else if is_x86_feature_detected!("ssse3") { flags |= RandomXFlags::ARGON2_SSSE3; } + flags +} - debug!(target: "validator::pow::mine_block", "[MINER] Initializing RandomX cache..."); +/// Auxiliary function to mine provided header using a single thread. +fn single_thread_mine( + target: &BigUint, + input: &HeaderHash, + header: &mut Header, + stop_signal: &Receiver<()>, +) -> Result<()> { + debug!(target: "validator::pow::single_thread_mine", "[MINER] Initializing RandomX cache and dataset..."); + let setup_start = Instant::now(); + let flags = get_mining_flags(); let cache = RandomXCache::new(flags, &input.inner()[..])?; + let dataset_item_count = RandomXDataset::count()?; + let dataset = RandomXDataset::new_init(flags, cache, 0, dataset_item_count)?; + debug!(target: "validator::pow::single_thread_mine", "[MINER] Setup time: {:?}", setup_start.elapsed()); - debug!(target: "validator::pow::mine_block", "[MINER] Setup time: {:?}", miner_setup.elapsed()); + // Check if stop signal is received + if stop_signal.is_full() { + debug!(target: "validator::pow::single_thread_mine", "[MINER] Stop signal received, exiting"); + return Err(Error::MinerTaskStopped); + } - // Multithreaded mining setup - let mining_time = Instant::now(); - let mut handles = vec![]; + debug!(target: "validator::pow::single_thread_mine", "[MINER] Initializing RandomX VM..."); + let vm_start = Instant::now(); + let vm = RandomXVM::new(flags, None, Some(dataset))?; + debug!(target: "validator::pow::single_thread_mine", "[MINER] Initialized RandomX VM in {:?}", vm_start.elapsed()); + + debug!(target: "validator::pow::single_thread_mine", "[MINER] Mining started!"); + let mining_start = Instant::now(); + loop { + // Check if stop signal is received + if stop_signal.is_full() { + debug!(target: "validator::pow::single_thread_mine", "[MINER] Stop signal received, exiting"); + return Err(Error::MinerTaskStopped); + } + + let out_hash = vm.calculate_hash(header.hash().inner())?; + let out_hash = BigUint::from_bytes_le(&out_hash); + if &out_hash <= target { + debug!(target: "validator::pow::single_thread_mine", "[MINER] Found block header using nonce {}", header.nonce); + debug!(target: "validator::pow::single_thread_mine", "[MINER] Block header hash {}", header.hash()); + debug!(target: "validator::pow::single_thread_mine", "[MINER] RandomX output: 0x{out_hash:064x}"); + break; + } + + header.nonce += 1; + } + debug!(target: "validator::pow::single_thread_mine", "[MINER] Completed mining in {:?}", mining_start.elapsed()); + debug!(target: "validator::pow::single_thread_mine", "[MINER] Mined header: {header:?}"); + Ok(()) +} + +/// Auxiliary function to mine provided header using a multiple threads. +fn multi_thread_mine( + target: &BigUint, + input: &HeaderHash, + header: &mut Header, + threads: usize, + stop_signal: &Receiver<()>, +) -> Result<()> { + debug!(target: "validator::pow::multi_thread_mine", "[MINER] Initializing RandomX cache and dataset..."); + let setup_start = Instant::now(); + let flags = get_mining_flags(); + let cache = RandomXCache::new(flags, &input.inner()[..])?; + let dataset_item_count = RandomXDataset::count()?; + let dataset = RandomXDataset::new(flags, cache, dataset_item_count)?; + let mut subsets = VecDeque::with_capacity(threads); + + // Multithreaded dataset init + let threads_u32 = threads as u32; + for t in 0..threads_u32 { + // Check if stop signal is received + if stop_signal.is_full() { + debug!(target: "validator::pow::multi_thread_mine", "[MINER] Stop signal received, exiting"); + return Err(Error::MinerTaskStopped); + } + + debug!(target: "validator::pow::multi_thread_mine", "[MINER] Initializing RandomX dataset for thread #{t}..."); + let ds_start = Instant::now(); + let a = (dataset_item_count * t) / threads_u32; + let b = (dataset_item_count * (t + 1)) / threads_u32; + subsets.push_back(dataset.subset_init(a, b - a)); + debug!(target: "validator::pow::multi_thread_mine", "[MINER] Initialized RandomX dataset for thread #{t} in {:?}", + ds_start.elapsed() + ); + } + debug!(target: "validator::pow::multi_thread_mine", "[MINER] Setup time: {:?}", setup_start.elapsed()); + + debug!(target: "validator::pow::multi_thread_mine", "[MINER] Initializing mining threads..."); + let mut handles = Vec::with_capacity(threads); let found_header = Arc::new(AtomicBool::new(false)); let found_nonce = Arc::new(AtomicU64::new(0)); - let threads = threads as u64; - let dataset_item_count = RandomXDataset::count()?; + let threads_u64 = threads as u64; + let mining_start = Instant::now(); + for t in 0..threads_u64 { + // Check if stop signal is received + if stop_signal.is_full() { + debug!(target: "validator::pow::multi_thread_mine", "[MINER] Stop signal received, threads creation loop exiting"); + break + } + + if found_header.load(Ordering::SeqCst) { + debug!(target: "validator::pow::multi_thread_mine", "[MINER] Block header found, threads creation loop exiting"); + break + } - for t in 0..threads { let target = target.clone(); - let mut header = miner_header.clone(); + let mut thread_header = header.clone(); + thread_header.nonce = t; let found_header = Arc::clone(&found_header); let found_nonce = Arc::clone(&found_nonce); - - let dataset = if threads > 1 { - let a = (dataset_item_count * (t as u32)) / (threads as u32); - let b = (dataset_item_count * (t as u32 + 1)) / (threads as u32); - init_dataset_wrapper(flags, cache.clone(), a, b - a, ThreadPriority::Normal)? - } else { - init_dataset_wrapper( - flags, - cache.clone(), - 0, - dataset_item_count, - ThreadPriority::Normal, - )? - }; - + let dataset = subsets.pop_front().unwrap(); let stop_signal = stop_signal.clone(); handles.push(thread::spawn(move || { - debug!(target: "validator::pow::mine_block", "[MINER] Initializing RandomX VM #{t}..."); - let mut miner_nonce = t; - let vm = RandomXVM::new(flags, None, Some(dataset)).unwrap(); + debug!(target: "validator::pow::multi_thread_mine", "[MINER] Initializing RandomX VM #{t}..."); + let vm_start = Instant::now(); + let vm = match RandomXVM::new(flags, None, Some(dataset)) { + Ok(vm) => vm, + Err(e) => { + error!(target: "validator::pow::multi_thread_mine", "[MINER] Initialized RandomX VM #{t} failed: {e}"); + return + } + }; + debug!(target: "validator::pow::multi_thread_mine", "[MINER] Initialized RandomX VM #{t} in {:?}", vm_start.elapsed()); loop { // Check if stop signal was received if stop_signal.is_full() { - debug!(target: "validator::pow::mine_block", "[MINER] Stop signal received, thread #{t} exiting"); + debug!(target: "validator::pow::multi_thread_mine", "[MINER] Stop signal received, thread #{t} exiting"); break } - header.nonce = miner_nonce; if found_header.load(Ordering::SeqCst) { - debug!(target: "validator::pow::mine_block", "[MINER] Block header found, thread #{t} exiting"); - break + debug!(target: "validator::pow::multi_thread_mine", "[MINER] Block header found, thread #{t} exiting"); + break; } - let out_hash = vm.calculate_hash(header.hash().inner()).unwrap(); + let out_hash = match vm.calculate_hash(thread_header.hash().inner()) { + Ok(hash) => hash, + Err(e) => { + error!(target: "validator::pow::multi_thread_mine", "[MINER] Calculating hash in thread #{t} failed: {e}"); + break + } + }; let out_hash = BigUint::from_bytes_le(&out_hash); if out_hash <= target { found_header.store(true, Ordering::SeqCst); - found_nonce.store(miner_nonce, Ordering::SeqCst); - debug!(target: "validator::pow::mine_block", "[MINER] Thread #{t} found block header using nonce {miner_nonce}"); - debug!(target: "validator::pow::mine_block", "[MINER] Block header hash {}", header.hash()); - debug!(target: "validator::pow::mine_block", "[MINER] RandomX output: 0x{out_hash:064x}"); - break + found_nonce.store(thread_header.nonce, Ordering::SeqCst); + debug!(target: "validator::pow::multi_thread_mine", "[MINER] Thread #{t} found block header using nonce {}", + thread_header.nonce + ); + debug!(target: "validator::pow::multi_thread_mine", "[MINER] Block header hash {}", thread_header.hash()); + debug!(target: "validator::pow::multi_thread_mine", "[MINER] RandomX output: 0x{out_hash:064x}"); + break; } // This means thread 0 will use nonces, 0, 4, 8, ... // and thread 1 will use nonces, 1, 5, 9, ... - miner_nonce += threads; + thread_header.nonce += threads_u64; } })); } + // Wait for threads to finish mining for handle in handles { let _ = handle.join(); } - // Check if stop signal was received + + // Check if stop signal is received if stop_signal.is_full() { - return Err(Error::MinerTaskStopped) + debug!(target: "validator::pow::multi_thread_mine", "[MINER] Stop signal received, exiting"); + return Err(Error::MinerTaskStopped); } - debug!(target: "validator::pow::mine_block", "[MINER] Mining time: {:?}", mining_time.elapsed()); - - // Set the valid mined nonce in the block header - miner_header.nonce = found_nonce.load(Ordering::SeqCst); - + debug!(target: "validator::pow::multi_thread_mine", "[MINER] Completed mining in {:?}", mining_start.elapsed()); + header.nonce = found_nonce.load(Ordering::SeqCst); + debug!(target: "validator::pow::multi_thread_mine", "[MINER] Mined header: {header:?}"); Ok(()) } +/// Mine provided header, based on provided PoW module next mine target. +pub fn mine_block( + target: &BigUint, + input: &HeaderHash, + header: &mut Header, + threads: usize, + stop_signal: &Receiver<()>, +) -> Result<()> { + debug!(target: "validator::pow::mine_block", "[MINER] Mine target: 0x{target:064x}"); + debug!(target: "validator::pow::mine_block", "[MINER] PoW input: {input}"); + + // Check if stop signal is received + if stop_signal.is_full() { + debug!(target: "validator::pow::mine_block", "[MINER] Stop signal received, exiting"); + return Err(Error::MinerTaskStopped); + } + + match threads { + 0 => { + error!(target: "validator::pow::mine_block", "[MINER] Can't use 0 threads!"); + Err(Error::MinerTaskStopped) + } + 1 => single_thread_mine(target, input, header, stop_signal), + _ => multi_thread_mine(target, input, header, threads, stop_signal), + } +} + #[cfg(test)] mod tests { use std::{ diff --git a/src/validator/randomx_factory.rs b/src/validator/randomx_factory.rs index 14071cc98..46c86496a 100644 --- a/src/validator/randomx_factory.rs +++ b/src/validator/randomx_factory.rs @@ -27,22 +27,7 @@ use std::{ use randomx::{RandomXCache, RandomXDataset, RandomXFlags, RandomXVM}; use tracing::{debug, warn}; -use crate::{ - system::thread_priority::{set_thread_priority, ThreadPriority}, - Result, -}; - -/// Wrapper for creating a [`RandomXDataset`] -pub fn init_dataset_wrapper( - flags: RandomXFlags, - cache: RandomXCache, - start_item: u32, - item_count: u32, - priority: ThreadPriority, -) -> Result { - set_thread_priority(priority); - Ok(RandomXDataset::new(flags, cache, start_item, item_count)?) -} +use crate::Result; /// The RandomX virtual machine instance used to verify mining. #[derive(Clone)]