validator/pow: multithreaded mining fixed

This commit is contained in:
skoupidi
2025-11-24 15:34:40 +02:00
parent 245d142f16
commit a80800aac8
3 changed files with 178 additions and 100 deletions

26
Cargo.lock generated
View File

@@ -698,26 +698,6 @@ dependencies = [
"syn 2.0.110",
]
[[package]]
name = "bindgen"
version = "0.71.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5f58bf3d7db68cfbac37cfc485a8d711e87e064c3d0fe0435b92f7a407f9d6b3"
dependencies = [
"bitflags 2.10.0",
"cexpr",
"clang-sys",
"itertools 0.13.0",
"log",
"prettyplease",
"proc-macro2",
"quote",
"regex",
"rustc-hash 2.1.1",
"shlex",
"syn 2.0.110",
]
[[package]]
name = "bindgen"
version = "0.72.1"
@@ -728,6 +708,8 @@ dependencies = [
"cexpr",
"clang-sys",
"itertools 0.13.0",
"log",
"prettyplease",
"proc-macro2",
"quote",
"regex",
@@ -5693,9 +5675,9 @@ dependencies = [
[[package]]
name = "randomx"
version = "1.2.1"
source = "git+https://codeberg.org/darkrenaissance/RandomX#d67ab9ccdf83e990c4dfd727181454199c02fa2f"
source = "git+https://codeberg.org/darkrenaissance/RandomX#648279c8ab20c563698ccc1117efba7d40ccf29c"
dependencies = [
"bindgen 0.71.1",
"bindgen 0.72.1",
"bitflags 2.10.0",
"libc",
]

View File

@@ -17,6 +17,7 @@
*/
use std::{
collections::VecDeque,
sync::{
atomic::{AtomicBool, AtomicU64, Ordering},
Arc,
@@ -29,7 +30,7 @@ use darkfi_sdk::num_traits::{One, Zero};
use num_bigint::BigUint;
use randomx::{RandomXCache, RandomXDataset, RandomXFlags, RandomXVM};
use smol::channel::Receiver;
use tracing::debug;
use tracing::{debug, error};
use crate::{
blockchain::{
@@ -40,9 +41,8 @@ use crate::{
},
Blockchain, BlockchainOverlayPtr,
},
system::thread_priority::ThreadPriority,
util::{ringbuffer::RingBuffer, time::Timestamp},
validator::{randomx_factory::init_dataset_wrapper, utils::median, RandomXFactory},
validator::{utils::median, RandomXFactory},
Error, Result,
};
@@ -441,113 +441,224 @@ impl std::fmt::Display for PoWModule {
}
}
/// Mine provided block, based on provided PoW module next mine target.
pub fn mine_block(
target: &BigUint,
input: &HeaderHash,
miner_header: &mut Header,
threads: usize,
stop_signal: &Receiver<()>,
) -> Result<()> {
let miner_setup = Instant::now();
debug!(target: "validator::pow::mine_block", "[MINER] Mine target: 0x{target:064x}");
debug!(target: "validator::pow::mine_block", "[MINER] PoW input: {input}");
/// Auxiliary function to define `RandomXFlags` used in mining.
fn get_mining_flags() -> RandomXFlags {
// TODO: Try adding `| RandomXFlags::LARGEPAGES`.
let mut flags = RandomXFlags::get_recommended_flags() | RandomXFlags::FULLMEM;
#[cfg(any(target_arch = "x86", target_arch = "x86_64"))]
if is_x86_feature_detected!("avx2") {
flags |= RandomXFlags::ARGON2_AVX2;
} else if is_x86_feature_detected!("ssse3") {
flags |= RandomXFlags::ARGON2_SSSE3;
}
flags
}
debug!(target: "validator::pow::mine_block", "[MINER] Initializing RandomX cache...");
/// Auxiliary function to mine provided header using a single thread.
fn single_thread_mine(
target: &BigUint,
input: &HeaderHash,
header: &mut Header,
stop_signal: &Receiver<()>,
) -> Result<()> {
debug!(target: "validator::pow::single_thread_mine", "[MINER] Initializing RandomX cache and dataset...");
let setup_start = Instant::now();
let flags = get_mining_flags();
let cache = RandomXCache::new(flags, &input.inner()[..])?;
let dataset_item_count = RandomXDataset::count()?;
let dataset = RandomXDataset::new_init(flags, cache, 0, dataset_item_count)?;
debug!(target: "validator::pow::single_thread_mine", "[MINER] Setup time: {:?}", setup_start.elapsed());
debug!(target: "validator::pow::mine_block", "[MINER] Setup time: {:?}", miner_setup.elapsed());
// Check if stop signal is received
if stop_signal.is_full() {
debug!(target: "validator::pow::single_thread_mine", "[MINER] Stop signal received, exiting");
return Err(Error::MinerTaskStopped);
}
// Multithreaded mining setup
let mining_time = Instant::now();
let mut handles = vec![];
debug!(target: "validator::pow::single_thread_mine", "[MINER] Initializing RandomX VM...");
let vm_start = Instant::now();
let vm = RandomXVM::new(flags, None, Some(dataset))?;
debug!(target: "validator::pow::single_thread_mine", "[MINER] Initialized RandomX VM in {:?}", vm_start.elapsed());
debug!(target: "validator::pow::single_thread_mine", "[MINER] Mining started!");
let mining_start = Instant::now();
loop {
// Check if stop signal is received
if stop_signal.is_full() {
debug!(target: "validator::pow::single_thread_mine", "[MINER] Stop signal received, exiting");
return Err(Error::MinerTaskStopped);
}
let out_hash = vm.calculate_hash(header.hash().inner())?;
let out_hash = BigUint::from_bytes_le(&out_hash);
if &out_hash <= target {
debug!(target: "validator::pow::single_thread_mine", "[MINER] Found block header using nonce {}", header.nonce);
debug!(target: "validator::pow::single_thread_mine", "[MINER] Block header hash {}", header.hash());
debug!(target: "validator::pow::single_thread_mine", "[MINER] RandomX output: 0x{out_hash:064x}");
break;
}
header.nonce += 1;
}
debug!(target: "validator::pow::single_thread_mine", "[MINER] Completed mining in {:?}", mining_start.elapsed());
debug!(target: "validator::pow::single_thread_mine", "[MINER] Mined header: {header:?}");
Ok(())
}
/// Auxiliary function to mine provided header using a multiple threads.
fn multi_thread_mine(
target: &BigUint,
input: &HeaderHash,
header: &mut Header,
threads: usize,
stop_signal: &Receiver<()>,
) -> Result<()> {
debug!(target: "validator::pow::multi_thread_mine", "[MINER] Initializing RandomX cache and dataset...");
let setup_start = Instant::now();
let flags = get_mining_flags();
let cache = RandomXCache::new(flags, &input.inner()[..])?;
let dataset_item_count = RandomXDataset::count()?;
let dataset = RandomXDataset::new(flags, cache, dataset_item_count)?;
let mut subsets = VecDeque::with_capacity(threads);
// Multithreaded dataset init
let threads_u32 = threads as u32;
for t in 0..threads_u32 {
// Check if stop signal is received
if stop_signal.is_full() {
debug!(target: "validator::pow::multi_thread_mine", "[MINER] Stop signal received, exiting");
return Err(Error::MinerTaskStopped);
}
debug!(target: "validator::pow::multi_thread_mine", "[MINER] Initializing RandomX dataset for thread #{t}...");
let ds_start = Instant::now();
let a = (dataset_item_count * t) / threads_u32;
let b = (dataset_item_count * (t + 1)) / threads_u32;
subsets.push_back(dataset.subset_init(a, b - a));
debug!(target: "validator::pow::multi_thread_mine", "[MINER] Initialized RandomX dataset for thread #{t} in {:?}",
ds_start.elapsed()
);
}
debug!(target: "validator::pow::multi_thread_mine", "[MINER] Setup time: {:?}", setup_start.elapsed());
debug!(target: "validator::pow::multi_thread_mine", "[MINER] Initializing mining threads...");
let mut handles = Vec::with_capacity(threads);
let found_header = Arc::new(AtomicBool::new(false));
let found_nonce = Arc::new(AtomicU64::new(0));
let threads = threads as u64;
let dataset_item_count = RandomXDataset::count()?;
let threads_u64 = threads as u64;
let mining_start = Instant::now();
for t in 0..threads_u64 {
// Check if stop signal is received
if stop_signal.is_full() {
debug!(target: "validator::pow::multi_thread_mine", "[MINER] Stop signal received, threads creation loop exiting");
break
}
if found_header.load(Ordering::SeqCst) {
debug!(target: "validator::pow::multi_thread_mine", "[MINER] Block header found, threads creation loop exiting");
break
}
for t in 0..threads {
let target = target.clone();
let mut header = miner_header.clone();
let mut thread_header = header.clone();
thread_header.nonce = t;
let found_header = Arc::clone(&found_header);
let found_nonce = Arc::clone(&found_nonce);
let dataset = if threads > 1 {
let a = (dataset_item_count * (t as u32)) / (threads as u32);
let b = (dataset_item_count * (t as u32 + 1)) / (threads as u32);
init_dataset_wrapper(flags, cache.clone(), a, b - a, ThreadPriority::Normal)?
} else {
init_dataset_wrapper(
flags,
cache.clone(),
0,
dataset_item_count,
ThreadPriority::Normal,
)?
};
let dataset = subsets.pop_front().unwrap();
let stop_signal = stop_signal.clone();
handles.push(thread::spawn(move || {
debug!(target: "validator::pow::mine_block", "[MINER] Initializing RandomX VM #{t}...");
let mut miner_nonce = t;
let vm = RandomXVM::new(flags, None, Some(dataset)).unwrap();
debug!(target: "validator::pow::multi_thread_mine", "[MINER] Initializing RandomX VM #{t}...");
let vm_start = Instant::now();
let vm = match RandomXVM::new(flags, None, Some(dataset)) {
Ok(vm) => vm,
Err(e) => {
error!(target: "validator::pow::multi_thread_mine", "[MINER] Initialized RandomX VM #{t} failed: {e}");
return
}
};
debug!(target: "validator::pow::multi_thread_mine", "[MINER] Initialized RandomX VM #{t} in {:?}", vm_start.elapsed());
loop {
// Check if stop signal was received
if stop_signal.is_full() {
debug!(target: "validator::pow::mine_block", "[MINER] Stop signal received, thread #{t} exiting");
debug!(target: "validator::pow::multi_thread_mine", "[MINER] Stop signal received, thread #{t} exiting");
break
}
header.nonce = miner_nonce;
if found_header.load(Ordering::SeqCst) {
debug!(target: "validator::pow::mine_block", "[MINER] Block header found, thread #{t} exiting");
break
debug!(target: "validator::pow::multi_thread_mine", "[MINER] Block header found, thread #{t} exiting");
break;
}
let out_hash = vm.calculate_hash(header.hash().inner()).unwrap();
let out_hash = match vm.calculate_hash(thread_header.hash().inner()) {
Ok(hash) => hash,
Err(e) => {
error!(target: "validator::pow::multi_thread_mine", "[MINER] Calculating hash in thread #{t} failed: {e}");
break
}
};
let out_hash = BigUint::from_bytes_le(&out_hash);
if out_hash <= target {
found_header.store(true, Ordering::SeqCst);
found_nonce.store(miner_nonce, Ordering::SeqCst);
debug!(target: "validator::pow::mine_block", "[MINER] Thread #{t} found block header using nonce {miner_nonce}");
debug!(target: "validator::pow::mine_block", "[MINER] Block header hash {}", header.hash());
debug!(target: "validator::pow::mine_block", "[MINER] RandomX output: 0x{out_hash:064x}");
break
found_nonce.store(thread_header.nonce, Ordering::SeqCst);
debug!(target: "validator::pow::multi_thread_mine", "[MINER] Thread #{t} found block header using nonce {}",
thread_header.nonce
);
debug!(target: "validator::pow::multi_thread_mine", "[MINER] Block header hash {}", thread_header.hash());
debug!(target: "validator::pow::multi_thread_mine", "[MINER] RandomX output: 0x{out_hash:064x}");
break;
}
// This means thread 0 will use nonces, 0, 4, 8, ...
// and thread 1 will use nonces, 1, 5, 9, ...
miner_nonce += threads;
thread_header.nonce += threads_u64;
}
}));
}
// Wait for threads to finish mining
for handle in handles {
let _ = handle.join();
}
// Check if stop signal was received
// Check if stop signal is received
if stop_signal.is_full() {
return Err(Error::MinerTaskStopped)
debug!(target: "validator::pow::multi_thread_mine", "[MINER] Stop signal received, exiting");
return Err(Error::MinerTaskStopped);
}
debug!(target: "validator::pow::mine_block", "[MINER] Mining time: {:?}", mining_time.elapsed());
// Set the valid mined nonce in the block header
miner_header.nonce = found_nonce.load(Ordering::SeqCst);
debug!(target: "validator::pow::multi_thread_mine", "[MINER] Completed mining in {:?}", mining_start.elapsed());
header.nonce = found_nonce.load(Ordering::SeqCst);
debug!(target: "validator::pow::multi_thread_mine", "[MINER] Mined header: {header:?}");
Ok(())
}
/// Mine provided header, based on provided PoW module next mine target.
pub fn mine_block(
target: &BigUint,
input: &HeaderHash,
header: &mut Header,
threads: usize,
stop_signal: &Receiver<()>,
) -> Result<()> {
debug!(target: "validator::pow::mine_block", "[MINER] Mine target: 0x{target:064x}");
debug!(target: "validator::pow::mine_block", "[MINER] PoW input: {input}");
// Check if stop signal is received
if stop_signal.is_full() {
debug!(target: "validator::pow::mine_block", "[MINER] Stop signal received, exiting");
return Err(Error::MinerTaskStopped);
}
match threads {
0 => {
error!(target: "validator::pow::mine_block", "[MINER] Can't use 0 threads!");
Err(Error::MinerTaskStopped)
}
1 => single_thread_mine(target, input, header, stop_signal),
_ => multi_thread_mine(target, input, header, threads, stop_signal),
}
}
#[cfg(test)]
mod tests {
use std::{

View File

@@ -27,22 +27,7 @@ use std::{
use randomx::{RandomXCache, RandomXDataset, RandomXFlags, RandomXVM};
use tracing::{debug, warn};
use crate::{
system::thread_priority::{set_thread_priority, ThreadPriority},
Result,
};
/// Wrapper for creating a [`RandomXDataset`]
pub fn init_dataset_wrapper(
flags: RandomXFlags,
cache: RandomXCache,
start_item: u32,
item_count: u32,
priority: ThreadPriority,
) -> Result<RandomXDataset> {
set_thread_priority(priority);
Ok(RandomXDataset::new(flags, cache, start_item, item_count)?)
}
use crate::Result;
/// The RandomX virtual machine instance used to verify mining.
#[derive(Clone)]