validator/pow: properly initialize the dataset with all available threads

This commit is contained in:
skoupidi
2025-12-10 17:28:49 +02:00
parent 06a3538261
commit 9d2683fc6f
2 changed files with 53 additions and 16 deletions

2
Cargo.lock generated
View File

@@ -5679,7 +5679,7 @@ dependencies = [
[[package]]
name = "randomx"
version = "1.2.1"
source = "git+https://codeberg.org/darkrenaissance/RandomX#a4d6c256d1a32a1ba4dcb62d918ca0e938571979"
source = "git+https://codeberg.org/darkrenaissance/RandomX#49e59631dbe3db2481c7b0fdf1ae228e822bf05c"
dependencies = [
"bindgen 0.72.1",
"bitflags 2.10.0",

View File

@@ -436,24 +436,70 @@ fn get_mining_flags() -> RandomXFlags {
RandomXFlags::get_recommended_flags() | RandomXFlags::FULLMEM
}
/// Auxiliary function to initialize a `RandomXDataset` using all
/// available threads.
fn init_dataset(
flags: RandomXFlags,
input: &HeaderHash,
stop_signal: &Receiver<()>,
) -> Result<RandomXDataset> {
// Allocate cache and dataset
let cache = RandomXCache::new(flags, &input.inner()[..])?;
let dataset_item_count = RandomXDataset::count()?;
let dataset = RandomXDataset::new(flags, cache, dataset_item_count)?;
// Multithreaded dataset init using all available threads
let threads = thread::available_parallelism().map(|n| n.get()).unwrap_or(1);
debug!(target: "validator::pow::init_dataset", "[MINER] Initializing RandomX dataset using {threads} threads...");
let mut handles = Vec::with_capacity(threads);
let threads_u32 = threads as u32;
let per_thread = dataset_item_count / threads_u32;
let remainder = dataset_item_count % threads_u32;
for t in 0..threads_u32 {
// Check if stop signal is received
if stop_signal.is_full() {
debug!(target: "validator::pow::init_dataset", "[MINER] Stop signal received, threads creation loop exiting");
break
}
let dataset = dataset.clone();
let start_item = t * per_thread;
let count = per_thread + if t == threads_u32 - 1 { remainder } else { 0 };
handles.push(thread::spawn(move || {
dataset.init(start_item, count);
}));
}
// Wait for threads to finish setup
for handle in handles {
let _ = handle.join();
}
// Check if stop signal is received
if stop_signal.is_full() {
debug!(target: "validator::pow::init_dataset", "[MINER] Stop signal received, exiting");
return Err(Error::MinerTaskStopped);
}
Ok(dataset)
}
/// Auxiliary function to generate mining VMs for provided RandomX key.
pub fn generate_mining_vms(
input: &HeaderHash,
threads: usize,
stop_signal: &Receiver<()>,
) -> Result<Vec<Arc<RandomXVM>>> {
// Initialize dataset
debug!(target: "validator::pow::generate_mining_vms", "[MINER] Initializing RandomX cache and dataset...");
debug!(target: "validator::pow::generate_mining_vms", "[MINER] PoW input: {input}");
let setup_start = Instant::now();
let ds_start = Instant::now();
let flags = get_mining_flags();
let cache = RandomXCache::new(flags, &input.inner()[..])?;
let dataset_item_count = RandomXDataset::count()?;
let dataset = init_dataset(flags, input, stop_signal)?;
debug!(target: "validator::pow::generate_mining_vms", "[MINER] Initialized RandomX cache and dataset: {:?}", ds_start.elapsed());
// Single thread mining VM
if threads == 1 {
let dataset = RandomXDataset::new_init(flags, cache, 0, dataset_item_count)?;
debug!(target: "validator::pow::generate_mining_vms", "[MINER] Initialized RandomX cache and dataset: {:?}", ds_start.elapsed());
debug!(target: "validator::pow::generate_mining_vms", "[MINER] Initializing RandomX VM...");
let vm_start = Instant::now();
let vm = Arc::new(RandomXVM::new(flags, None, Some(dataset))?);
@@ -463,8 +509,7 @@ pub fn generate_mining_vms(
}
// Multi thread mining VMs
let dataset = RandomXDataset::new(flags, cache, dataset_item_count)?;
debug!(target: "validator::pow::generate_mining_vms", "[MINER] Initialized RandomX cache and dataset: {:?}", ds_start.elapsed());
debug!(target: "validator::pow::generate_mining_vms", "[MINER] Initializing {threads} RandomX VMs...");
let mut vms = Vec::with_capacity(threads);
let threads_u32 = threads as u32;
for t in 0..threads_u32 {
@@ -474,17 +519,9 @@ pub fn generate_mining_vms(
return Err(Error::MinerTaskStopped);
}
debug!(target: "validator::pow::generate_mining_vms", "[MINER] Initializing RandomX dataset for thread #{t}...");
let ds_start = Instant::now();
let a = (dataset_item_count * t) / threads_u32;
let b = (dataset_item_count * (t + 1)) / threads_u32;
let dataset = dataset.subset_init(a, b - a);
debug!(target: "validator::pow::generate_mining_vms", "[MINER] Initialized RandomX dataset for thread #{t} in {:?}",
ds_start.elapsed()
);
debug!(target: "validator::pow::generate_mining_vms", "[MINER] Initializing RandomX VM #{t}...");
let vm_start = Instant::now();
vms.push(Arc::new(RandomXVM::new(flags, None, Some(dataset))?));
vms.push(Arc::new(RandomXVM::new(flags, None, Some(dataset.clone()))?));
debug!(target: "validator::pow::generate_mining_vms", "[MINER] Initialized RandomX VM #{t} in {:?}", vm_start.elapsed());
}
debug!(target: "validator::pow::generate_mining_vms", "[MINER] Setup time: {:?}", setup_start.elapsed());