From e81e01dd98513587dd9575e56e882d34200d6c26 Mon Sep 17 00:00:00 2001 From: skoupidi Date: Sat, 27 Dec 2025 19:37:39 +0200 Subject: [PATCH] darkfid: massive overhaul of mining rpc so everything goes through the registry --- bin/darkfid/src/error.rs | 116 +++-- bin/darkfid/src/lib.rs | 14 +- bin/darkfid/src/registry/mod.rs | 324 +++++++++++- bin/darkfid/src/registry/model.rs | 204 ++++++-- bin/darkfid/src/rpc/rpc_stratum.rs | 479 +++++++++--------- bin/darkfid/src/rpc/rpc_xmr.rs | 336 ++++-------- bin/darkfid/src/task/consensus.rs | 81 +-- bin/darkfid/src/task/garbage_collect.rs | 11 +- bin/darkfid/src/tests/harness.rs | 16 +- bin/darkfid/src/tests/unproposed_txs.rs | 9 +- script/research/gg/Cargo.toml | 4 +- src/blockchain/header_store.rs | 10 +- src/blockchain/monero/mod.rs | 4 +- .../test-harness/src/money_pow_reward.rs | 2 +- src/system/publisher.rs | 24 + src/validator/consensus.rs | 30 +- src/validator/pow.rs | 6 +- src/validator/verification.rs | 2 + 18 files changed, 935 insertions(+), 737 deletions(-) diff --git a/bin/darkfid/src/error.rs b/bin/darkfid/src/error.rs index 0fde5096d..6c2a71743 100644 --- a/bin/darkfid/src/error.rs +++ b/bin/darkfid/src/error.rs @@ -37,37 +37,55 @@ pub enum RpcError { ContractStateNotFound = -32201, ContractStateKeyNotFound = -32202, - // Miner errors - MinerMissingHeader = -32300, - MinerInvalidHeader = -32301, - MinerMissingRecipient = -32302, - MinerInvalidRecipient = -32303, - MinerInvalidRecipientPrefix = -32304, - MinerInvalidSpendHook = -32305, - MinerInvalidUserData = -32306, - MinerMissingNonce = -32307, - MinerInvalidNonce = -32308, - MinerMissingAddress = -32309, - MinerInvalidAddress = -32310, - MinerMissingAuxHash = -32311, - MinerInvalidAuxHash = -32312, - MinerMissingHeight = -32313, - MinerInvalidHeight = -32314, - MinerMissingPrevId = -32315, - MinerInvalidPrevId = -32316, - MinerMissingAuxBlob = -32317, - MinerInvalidAuxBlob = -32318, - MinerMissingBlob = -32319, - MinerInvalidBlob = -32320, - MinerMissingMerkleProof = -32321, - MinerInvalidMerkleProof = -32322, - MinerMissingPath = -32323, - MinerInvalidPath = -32324, - MinerMissingSeedHash = -32325, - MinerInvalidSeedHash = -32326, - MinerMerkleProofConstructionFailed = -32327, - MinerMoneroPowDataConstructionFailed = -32328, - MinerUnknownJob = -32329, + // Miner configuration errors + MinerInvalidWalletConfig = -32301, + MinerInvalidRecipient = -32302, + MinerInvalidRecipientPrefix = -32303, + MinerInvalidSpendHook = -32304, + MinerInvalidUserData = -32305, + + // Stratum errors + MinerMissingLogin = -32306, + MinerInvalidLogin = -32307, + MinerMissingPassword = -32308, + MinerInvalidPassword = -32309, + MinerMissingAgent = -32310, + MinerInvalidAgent = -32311, + MinerMissingAlgo = -32312, + MinerInvalidAlgo = -32313, + MinerRandomXNotSupported = -32314, + MinerMissingClientId = -32315, + MinerInvalidClientId = -32316, + MinerUnknownClient = -32317, + MinerMissingJobId = -32318, + MinerInvalidJobId = -32319, + MinerUnknownJob = -32320, + MinerMissingNonce = -32321, + MinerInvalidNonce = -32322, + MinerMissingResult = -32323, + MinerInvalidResult = -32324, + + // Merge mining errors + MinerMissingAddress = -32325, + MinerInvalidAddress = -32326, + MinerMissingAuxHash = -32327, + MinerInvalidAuxHash = -32328, + MinerMissingHeight = -32329, + MinerInvalidHeight = -32330, + MinerMissingPrevId = -32331, + MinerInvalidPrevId = -32332, + MinerMissingAuxBlob = -32333, + MinerInvalidAuxBlob = -32334, + MinerMissingBlob = -32335, + MinerInvalidBlob = -32336, + MinerMissingMerkleProof = -32337, + MinerInvalidMerkleProof = -32338, + MinerMissingPath = -32339, + MinerInvalidPath = -32340, + MinerMissingSeedHash = -32341, + MinerInvalidSeedHash = -32342, + MinerMerkleProofConstructionFailed = -32343, + MinerMoneroPowDataConstructionFailed = -32344, } fn to_tuple(e: RpcError) -> (i32, String) { @@ -75,27 +93,50 @@ fn to_tuple(e: RpcError) -> (i32, String) { // Transaction-related errors RpcError::TxSimulationFail => "Failed simulating transaction state change", RpcError::TxGasCalculationFail => "Failed to calculate transaction's gas", + // State-related errors RpcError::NotSynced => "Blockchain is not synced", RpcError::UnknownBlockHeight => "Did not find block height", + // Parsing errors RpcError::ParseError => "Parse error", + // Contract-related errors RpcError::ContractZkasDbNotFound => "zkas database not found for given contract", RpcError::ContractStateNotFound => "Records not found for given contract state", RpcError::ContractStateKeyNotFound => "Value not found for given contract state key", - // Miner errors - RpcError::MinerMissingHeader => "Request is missing the Header hash", - RpcError::MinerInvalidHeader => "Request Header hash is invalid", - RpcError::MinerMissingRecipient => "Request is missing the recipient wallet address", + + // Miner configuration errors + RpcError::MinerInvalidWalletConfig => "Request wallet configuration is invalid", RpcError::MinerInvalidRecipient => "Request recipient wallet address is invalid", RpcError::MinerInvalidRecipientPrefix => { "Request recipient wallet address prefix is invalid" } RpcError::MinerInvalidSpendHook => "Request spend hook is invalid", RpcError::MinerInvalidUserData => "Request user data is invalid", - RpcError::MinerMissingNonce => "Request is missing the Header nonce", - RpcError::MinerInvalidNonce => "Request Header nonce is invalid", + + // Stratum errors + RpcError::MinerMissingLogin => "Request is missing the login", + RpcError::MinerInvalidLogin => "Request login is invalid", + RpcError::MinerMissingPassword => "Request is missing the password", + RpcError::MinerInvalidPassword => "Request password is invalid", + RpcError::MinerMissingAgent => "Request is missing the agent", + RpcError::MinerInvalidAgent => "Request agent is invalid", + RpcError::MinerMissingAlgo => "Request is missing the algo", + RpcError::MinerInvalidAlgo => "Request algo is invalid", + RpcError::MinerRandomXNotSupported => "Request doesn't support rx/0", + RpcError::MinerMissingClientId => "Request is missing the client ID", + RpcError::MinerInvalidClientId => "Request client ID is invalid", + RpcError::MinerUnknownClient => "Request client is unknown", + RpcError::MinerMissingJobId => "Request is missing the job ID", + RpcError::MinerInvalidJobId => "Request job ID is invalid", + RpcError::MinerUnknownJob => "Request job is unknown", + RpcError::MinerMissingNonce => "Request is missing the nonce", + RpcError::MinerInvalidNonce => "Request nonce is invalid", + RpcError::MinerMissingResult => "Request is missing the result", + RpcError::MinerInvalidResult => "Request nonce is result", + + // Merge mining errors RpcError::MinerMissingAddress => { "Request is missing the recipient wallet address configuration" } @@ -122,7 +163,6 @@ fn to_tuple(e: RpcError) -> (i32, String) { "failed constructing aux chain Merkle proof" } RpcError::MinerMoneroPowDataConstructionFailed => "Failed constructing Monero PoW data", - RpcError::MinerUnknownJob => "Request job is unknown", }; (e as i32, msg.to_string()) diff --git a/bin/darkfid/src/lib.rs b/bin/darkfid/src/lib.rs index 2f598af29..c158b5125 100644 --- a/bin/darkfid/src/lib.rs +++ b/bin/darkfid/src/lib.rs @@ -57,18 +57,13 @@ use proto::{DarkfidP2pHandler, DarkfidP2pHandlerPtr}; /// Miners registry mod registry; -use registry::{ - model::{BlockTemplate, MiningJobs}, - DarkfiMinersRegistry, DarkfiMinersRegistryPtr, -}; +use registry::{DarkfiMinersRegistry, DarkfiMinersRegistryPtr}; /// Atomic pointer to the DarkFi node pub type DarkfiNodePtr = Arc; /// Structure representing a DarkFi node pub struct DarkfiNode { - /// Blockchain network - network: Network, /// Validator(node) pointer validator: ValidatorPtr, /// P2P network protocols handler @@ -85,7 +80,6 @@ pub struct DarkfiNode { impl DarkfiNode { pub async fn new( - network: Network, validator: ValidatorPtr, p2p_handler: DarkfidP2pHandlerPtr, registry: DarkfiMinersRegistryPtr, @@ -93,7 +87,6 @@ impl DarkfiNode { subscribers: HashMap<&'static str, JsonSubscriber>, ) -> Result { Ok(Arc::new(Self { - network, validator, p2p_handler, registry, @@ -140,7 +133,7 @@ impl Darkfid { let p2p_handler = DarkfidP2pHandler::init(net_settings, ex).await?; // Initialize the miners registry - let registry = DarkfiMinersRegistry::init(&validator)?; + let registry = DarkfiMinersRegistry::init(network, &validator)?; // Grab blockchain network configured transactions batch size for garbage collection let txs_batch_size = match txs_batch_size { @@ -163,8 +156,7 @@ impl Darkfid { // Initialize node let node = - DarkfiNode::new(network, validator, p2p_handler, registry, txs_batch_size, subscribers) - .await?; + DarkfiNode::new(validator, p2p_handler, registry, txs_batch_size, subscribers).await?; // Generate the background tasks let dnet_task = StoppableTask::new(); diff --git a/bin/darkfid/src/registry/mod.rs b/bin/darkfid/src/registry/mod.rs index fc376042f..dff8145e2 100644 --- a/bin/darkfid/src/registry/mod.rs +++ b/bin/darkfid/src/registry/mod.rs @@ -21,41 +21,58 @@ use std::{ sync::Arc, }; -use smol::lock::Mutex; +use smol::lock::{Mutex, RwLock}; +use tinyjson::JsonValue; use tracing::{error, info}; use darkfi::{ + blockchain::BlockInfo, rpc::{ + jsonrpc::JsonSubscriber, server::{listen_and_serve, RequestHandler}, settings::RpcSettings, }, system::{ExecutorPtr, StoppableTask, StoppableTaskPtr}, - validator::ValidatorPtr, + util::encoding::base64, + validator::{consensus::Proposal, ValidatorPtr}, Error, Result, }; +use darkfi_sdk::crypto::{keypair::Network, pasta_prelude::PrimeField}; +use darkfi_serial::serialize_async; use crate::{ + proto::{DarkfidP2pHandlerPtr, ProposalMessage}, rpc::{rpc_stratum::StratumRpcHandler, rpc_xmr::MmRpcHandler}, DarkfiNode, DarkfiNodePtr, }; /// Block related structures pub mod model; -use model::{BlockTemplate, MiningJobs, MmBlockTemplate, PowRewardV1Zk}; +use model::{ + generate_next_block_template, BlockTemplate, MinerClient, MinerRewardsRecipientConfig, + PowRewardV1Zk, +}; /// Atomic pointer to the DarkFi node miners registry. pub type DarkfiMinersRegistryPtr = Arc; /// DarkFi node miners registry. pub struct DarkfiMinersRegistry { + /// Blockchain network + pub network: Network, /// PowRewardV1 ZK data pub powrewardv1_zk: PowRewardV1Zk, - /// Native mining block templates - pub blocktemplates: Mutex, BlockTemplate>>, - /// Active native mining jobs per connection ID - pub mining_jobs: Mutex>, - /// Merge mining block templates - pub mm_blocktemplates: Mutex, MmBlockTemplate>>, + /// Mining block templates of each wallet config + pub block_templates: RwLock>, + /// Active mining jobs mapped to the wallet template they + /// represent. For native jobs the key(job id) is the hex + /// encoded header hash, while for merge mining jobs it's + /// the header template hash. + pub jobs: RwLock>, + /// Active native clients mapped to their information. + pub clients: RwLock>, + /// Submission lock so we can queue up submissions process + pub submit_lock: RwLock<()>, /// Stratum JSON-RPC background task stratum_rpc_task: StoppableTaskPtr, /// Stratum JSON-RPC connection tracker @@ -68,7 +85,7 @@ pub struct DarkfiMinersRegistry { impl DarkfiMinersRegistry { /// Initialize a DarkFi node miners registry. - pub fn init(validator: &ValidatorPtr) -> Result { + pub fn init(network: Network, validator: &ValidatorPtr) -> Result { info!( target: "darkfid::registry::mod::DarkfiMinersRegistry::init", "Initializing a new DarkFi node miners registry..." @@ -93,10 +110,12 @@ impl DarkfiMinersRegistry { ); Ok(Arc::new(Self { + network, powrewardv1_zk, - blocktemplates: Mutex::new(HashMap::new()), - mining_jobs: Mutex::new(HashMap::new()), - mm_blocktemplates: Mutex::new(HashMap::new()), + block_templates: RwLock::new(HashMap::new()), + jobs: RwLock::new(HashMap::new()), + clients: RwLock::new(HashMap::new()), + submit_lock: RwLock::new(()), stratum_rpc_task, stratum_rpc_connections, mm_rpc_task, @@ -190,4 +209,283 @@ impl DarkfiMinersRegistry { info!(target: "darkfid::registry::mod::DarkfiMinersRegistry::stop", "DarkFi node miners registry terminated successfully!"); } + + /// Create a registry record for provided wallet config. If the + /// record already exists return its template, otherwise create its + /// current template based on provided validator state. + async fn create_template( + &self, + validator: &ValidatorPtr, + wallet: &String, + config: &MinerRewardsRecipientConfig, + ) -> Result { + // Grab a lock over current templates + let mut block_templates = self.block_templates.write().await; + + // Check if a template already exists for this wallet + if let Some(block_template) = block_templates.get(wallet) { + return Ok(block_template.clone()) + } + + // Grab validator best current fork + let mut extended_fork = validator.best_current_fork().await?; + + // Generate the next block template + let result = generate_next_block_template( + &mut extended_fork, + config, + &self.powrewardv1_zk.zkbin, + &self.powrewardv1_zk.provingkey, + validator.verify_fees, + ) + .await; + + // Drop new trees opened by the forks' overlay + extended_fork.overlay.lock().unwrap().overlay.lock().unwrap().purge_new_trees()?; + + // Check result + let block_template = result?; + + // Create the new registry record + block_templates.insert(wallet.clone(), block_template.clone()); + + // Print the new template wallet information + let recipient_str = format!("{}", config.recipient); + let spend_hook_str = match config.spend_hook { + Some(spend_hook) => format!("{spend_hook}"), + None => String::from("-"), + }; + let user_data_str = match config.user_data { + Some(user_data) => bs58::encode(user_data.to_repr()).into_string(), + None => String::from("-"), + }; + info!(target: "darkfid::registry::mod::DarkfiMinersRegistry::create_template", + "Created new block template for wallet: address={recipient_str}, spend_hook={spend_hook_str}, user_data={user_data_str}", + ); + + Ok(block_template) + } + + /// Register a new miner and create its job. + pub async fn register_miner( + &self, + validator: &ValidatorPtr, + wallet: &String, + config: &MinerRewardsRecipientConfig, + ) -> Result<(String, BlockTemplate, JsonSubscriber)> { + // Grab a lock over current jobs and clients + let mut jobs = self.jobs.write().await; + let mut clients = self.clients.write().await; + + // Create wallet template + let block_template = self.create_template(validator, wallet, config).await?; + + // Grab the hex encoded block hash and create the job record + let block_hash = hex::encode(block_template.block.header.hash().inner()).to_string(); + jobs.insert(block_hash.clone(), wallet.clone()); + + // Create the client record + let (client_id, client) = MinerClient::new(wallet, config, &block_hash); + let publisher = client.publisher.clone(); + clients.insert(client_id.clone(), client); + + Ok((client_id, block_template, publisher)) + } + + /// Register a new merge miner and create its job. + pub async fn register_merge_miner( + &self, + validator: &ValidatorPtr, + wallet: &String, + config: &MinerRewardsRecipientConfig, + ) -> Result<(String, f64)> { + // Grab a lock over current jobs + let mut jobs = self.jobs.write().await; + + // Create wallet template + let block_template = self.create_template(validator, wallet, config).await?; + + // Grab the block template hash and its difficulty, and then + // create the job record. + let block_template_hash = block_template.block.header.template_hash().as_string(); + let difficulty = block_template.difficulty; + jobs.insert(block_template_hash.clone(), wallet.clone()); + + Ok((block_template_hash, difficulty)) + } + + /// Submit provided block to the provided node. + pub async fn submit( + &self, + validator: &ValidatorPtr, + subscribers: &HashMap<&'static str, JsonSubscriber>, + p2p_handler: &DarkfidP2pHandlerPtr, + block: BlockInfo, + ) -> Result<()> { + info!( + target: "darkfid::registry::mod::DarkfiMinersRegistry::submit", + "Proposing new block to network", + ); + let proposal = Proposal::new(block); + validator.append_proposal(&proposal).await?; + + let proposals_sub = subscribers.get("proposals").unwrap(); + let enc_prop = JsonValue::String(base64::encode(&serialize_async(&proposal).await)); + proposals_sub.notify(vec![enc_prop].into()).await; + + info!( + target: "darkfid::registry::mod::DarkfiMinersRegistry::submit", + "Broadcasting new block to network", + ); + let message = ProposalMessage(proposal); + p2p_handler.p2p.broadcast(&message).await; + + Ok(()) + } + + /// Refresh outdated jobs in the registry based on provided + /// validator state. + pub async fn refresh(&self, validator: &ValidatorPtr) -> Result<()> { + // Grab locks + let submit_lock = self.submit_lock.write().await; + let mut clients = self.clients.write().await; + let mut jobs = self.jobs.write().await; + let mut block_templates = self.block_templates.write().await; + + // Find inactive clients + let mut dropped_clients = vec![]; + let mut active_clients_jobs = vec![]; + for (client_id, client) in clients.iter() { + if client.publisher.publisher.clear_inactive().await { + dropped_clients.push(client_id.clone()); + continue + } + active_clients_jobs.push(client.job.clone()); + } + + // Drop inactive clients and their jobs + for client_id in dropped_clients { + // Its safe to unwrap here since the client key is from the + // previous loop. + let client = clients.remove(&client_id).unwrap(); + let wallet = jobs.remove(&client.job).unwrap(); + block_templates.remove(&wallet); + } + + // Return if no clients exists. Merge miners will create a new + // template and job on next poll. + if clients.is_empty() { + *jobs = HashMap::new(); + *block_templates = HashMap::new(); + return Ok(()) + } + + // Find inactive jobs (not referenced by clients) + let mut dropped_jobs = vec![]; + let mut active_wallets = vec![]; + for (job, wallet) in jobs.iter() { + if !active_clients_jobs.contains(job) { + dropped_jobs.push(job.clone()); + continue + } + active_wallets.push(wallet.clone()); + } + + // Drop inactive jobs + for job in dropped_jobs { + jobs.remove(&job); + } + + // Return if no jobs exists. Merge miners will create a new + // template and job on next poll. + if jobs.is_empty() { + *block_templates = HashMap::new(); + return Ok(()) + } + + // Find inactive wallets templates + let mut dropped_wallets = vec![]; + for wallet in block_templates.keys() { + if !active_wallets.contains(wallet) { + dropped_wallets.push(wallet.clone()); + } + } + + // Drop inactive wallets templates + for wallet in dropped_wallets { + block_templates.remove(&wallet); + } + + // Return if no wallets templates exists. Merge miners will + // create a new template and job on next poll. + if block_templates.is_empty() { + return Ok(()) + } + + // Grab validator best current fork + let extended_fork = validator.best_current_fork().await?; + + // Iterate over active clients to refresh their jobs + for (_, client) in clients.iter_mut() { + // Clone the fork so each client generates over a new one + let mut extended_fork = extended_fork.full_clone()?; + + // Drop its current job. Its safe to unwrap here since we + // know the job exists. + let wallet = jobs.remove(&client.job).unwrap(); + // Drop its current template. Its safe to unwrap here since + // we know the template exists. + block_templates.remove(&wallet); + + // Generate the next block template + let result = generate_next_block_template( + &mut extended_fork, + &client.config, + &self.powrewardv1_zk.zkbin, + &self.powrewardv1_zk.provingkey, + validator.verify_fees, + ) + .await; + + // Drop new trees opened by the forks' overlay + extended_fork.overlay.lock().unwrap().overlay.lock().unwrap().purge_new_trees()?; + + // Check result + let block_template = result?; + + // Print the updated template wallet information + let recipient_str = format!("{}", client.config.recipient); + let spend_hook_str = match client.config.spend_hook { + Some(spend_hook) => format!("{spend_hook}"), + None => String::from("-"), + }; + let user_data_str = match client.config.user_data { + Some(user_data) => bs58::encode(user_data.to_repr()).into_string(), + None => String::from("-"), + }; + info!(target: "darkfid::registry::mod::DarkfiMinersRegistry::create_template", + "Updated block template for wallet: address={recipient_str}, spend_hook={spend_hook_str}, user_data={user_data_str}", + ); + + // Create the new job notification + let (job, notification) = block_template.job_notification(); + + // Create the new registry records + block_templates.insert(wallet.clone(), block_template); + jobs.insert(job.clone(), wallet); + + // Update the client record + client.job = job; + + // Push job notification to subscriber + client.publisher.notify(notification).await; + } + + // Release all locks + drop(block_templates); + drop(jobs); + drop(submit_lock); + + Ok(()) + } } diff --git a/bin/darkfid/src/registry/model.rs b/bin/darkfid/src/registry/model.rs index bb2557b1a..043a935d7 100644 --- a/bin/darkfid/src/registry/model.rs +++ b/bin/darkfid/src/registry/model.rs @@ -16,16 +16,20 @@ * along with this program. If not, see . */ -use std::collections::HashMap; +use std::{collections::HashMap, str::FromStr}; -use num_bigint::BigUint; use rand::rngs::OsRng; +use tinyjson::JsonValue; use tracing::info; use darkfi::{ - blockchain::{BlockInfo, Header}, + blockchain::{BlockInfo, Header, HeaderHash}, + rpc::jsonrpc::JsonSubscriber, tx::{ContractCallLeaf, Transaction, TransactionBuilder}, - util::time::Timestamp, + util::{ + encoding::base64, + time::{NanoTimestamp, Timestamp}, + }, validator::{consensus::Fork, verification::apply_producer_transaction, ValidatorPtr}, zk::{empty_witnesses, ProvingKey, ZkCircuit}, zkas::ZkBinary, @@ -36,15 +40,19 @@ use darkfi_money_contract::{ }; use darkfi_sdk::{ crypto::{ - keypair::{Address, Keypair, SecretKey}, + keypair::{Address, Keypair, Network, SecretKey}, + pasta_prelude::PrimeField, FuncId, MerkleTree, MONEY_CONTRACT_ID, }, pasta::pallas, ContractCall, }; -use darkfi_serial::Encodable; +use darkfi_serial::{deserialize_async, Encodable}; + +use crate::error::RpcError; /// Auxiliary structure representing node miner rewards recipient configuration. +#[derive(Debug, Clone)] pub struct MinerRewardsRecipientConfig { /// Wallet mining address to receive mining rewards pub recipient: Address, @@ -55,47 +63,128 @@ pub struct MinerRewardsRecipientConfig { pub user_data: Option, } -/// Auxiliary structure representing a block template for native -/// mining. +impl MinerRewardsRecipientConfig { + pub fn new( + recipient: Address, + spend_hook: Option, + user_data: Option, + ) -> Self { + Self { recipient, spend_hook, user_data } + } + + pub async fn from_base64( + network: &Network, + encoded_address: &str, + ) -> std::result::Result { + let Some(address_bytes) = base64::decode(encoded_address) else { + return Err(RpcError::MinerInvalidWalletConfig) + }; + let Ok((recipient, spend_hook, user_data)) = + deserialize_async::<(String, Option, Option)>(&address_bytes).await + else { + return Err(RpcError::MinerInvalidWalletConfig) + }; + let Ok(recipient) = Address::from_str(&recipient) else { + return Err(RpcError::MinerInvalidRecipient) + }; + if recipient.network() != *network { + return Err(RpcError::MinerInvalidRecipientPrefix) + } + let spend_hook = match spend_hook { + Some(s) => match FuncId::from_str(&s) { + Ok(s) => Some(s), + Err(_) => return Err(RpcError::MinerInvalidSpendHook), + }, + None => None, + }; + let user_data: Option = match user_data { + Some(u) => { + let Ok(bytes) = bs58::decode(&u).into_vec() else { + return Err(RpcError::MinerInvalidUserData) + }; + let bytes: [u8; 32] = match bytes.try_into() { + Ok(b) => b, + Err(_) => return Err(RpcError::MinerInvalidUserData), + }; + match pallas::Base::from_repr(bytes).into() { + Some(v) => Some(v), + None => return Err(RpcError::MinerInvalidUserData), + } + } + None => None, + }; + + Ok(Self { recipient, spend_hook, user_data }) + } +} + +/// Auxiliary structure representing a block template for mining. #[derive(Debug, Clone)] pub struct BlockTemplate { /// Block that is being mined pub block: BlockInfo, - /// RandomX init key - pub randomx_key: [u8; 32], - /// Block mining target - pub target: BigUint, - /// Ephemeral signing secret for this blocktemplate - pub secret: SecretKey, -} - -/// Auxiliary structure representing a block template for merge mining. -#[derive(Debug, Clone)] -pub struct MmBlockTemplate { - /// Block that is being mined - pub block: BlockInfo, + /// RandomX current key + pub randomx_key: HeaderHash, + /// Compacted block mining target + pub target: Vec, /// Block difficulty pub difficulty: f64, /// Ephemeral signing secret for this blocktemplate pub secret: SecretKey, + /// Flag indicating if this template has been submitted + pub submitted: bool, } -/// Storage for active mining jobs. These are stored per connection ID. -/// A new map will be made for each stratum login. -#[derive(Debug, Default)] -pub struct MiningJobs(HashMap<[u8; 32], BlockTemplate>); - -impl MiningJobs { - pub fn insert(&mut self, job_id: [u8; 32], blocktemplate: BlockTemplate) { - self.0.insert(job_id, blocktemplate); +impl BlockTemplate { + fn new( + block: BlockInfo, + randomx_key: HeaderHash, + target: Vec, + difficulty: f64, + secret: SecretKey, + ) -> Self { + Self { block, randomx_key, target, difficulty, secret, submitted: false } } - pub fn get(&self, job_id: &[u8; 32]) -> Option<&BlockTemplate> { - self.0.get(job_id) + pub fn job_notification(&self) -> (String, JsonValue) { + let block_hash = hex::encode(self.block.header.hash().inner()).to_string(); + let job = HashMap::from([ + ( + "blob".to_string(), + JsonValue::from(hex::encode(self.block.header.to_block_hashing_blob()).to_string()), + ), + ("job_id".to_string(), JsonValue::from(block_hash.clone())), + ("height".to_string(), JsonValue::from(self.block.header.height as f64)), + ("target".to_string(), JsonValue::from(hex::encode(&self.target))), + ("algo".to_string(), JsonValue::from(String::from("rx/0"))), + ( + "seed_hash".to_string(), + JsonValue::from(hex::encode(self.randomx_key.inner()).to_string()), + ), + ]); + (block_hash, JsonValue::from(job)) } +} - pub fn get_mut(&mut self, job_id: &[u8; 32]) -> Option<&mut BlockTemplate> { - self.0.get_mut(job_id) +/// Auxiliary structure representing a native miner client record. +#[derive(Debug, Clone)] +pub struct MinerClient { + /// Miner recipient configuration + pub config: MinerRewardsRecipientConfig, + /// Current mining job + pub job: String, + /// Connection publisher to push new jobs + pub publisher: JsonSubscriber, +} + +impl MinerClient { + pub fn new(wallet: &str, config: &MinerRewardsRecipientConfig, job: &str) -> (String, Self) { + let mut hasher = blake3::Hasher::new(); + hasher.update(wallet.as_bytes()); + hasher.update(&NanoTimestamp::current_time().inner().to_le_bytes()); + let client_id = hex::encode(hasher.finalize().as_bytes()).to_string(); + let publisher = JsonSubscriber::new("job"); + (client_id, Self { config: config.clone(), job: job.to_owned(), publisher }) } } @@ -125,25 +214,32 @@ impl PowRewardV1Zk { } } -/// Auxiliary function to generate next block in an atomic manner. -pub async fn generate_next_block( +/// Auxiliary function to generate next mining block template, in an +/// atomic manner. +pub async fn generate_next_block_template( extended_fork: &mut Fork, recipient_config: &MinerRewardsRecipientConfig, zkbin: &ZkBinary, pk: &ProvingKey, - block_target: u32, verify_fees: bool, -) -> Result<(BigUint, BlockInfo, SecretKey)> { +) -> Result { // Grab forks' last block proposal(previous) let last_proposal = extended_fork.last_proposal()?; // Grab forks' next block height let next_block_height = last_proposal.block.header.height + 1; + // Grab forks' next mine target and difficulty + let (target, difficulty) = extended_fork.module.next_mine_target_and_difficulty()?; + + // The target should be compacted to 8 bytes little-endian. + let target = target.to_bytes_le()[..8].to_vec(); + + // Cast difficulty to f64. This should always work. + let difficulty = difficulty.to_string().parse()?; + // Grab forks' unproposed transactions - let (mut txs, _, fees, overlay) = extended_fork - .unproposed_txs(&extended_fork.blockchain, next_block_height, block_target, verify_fees) - .await?; + let (mut txs, _, fees) = extended_fork.unproposed_txs(next_block_height, verify_fees).await?; // Create an ephemeral block signing keypair. Its secret key will // be stored in the PowReward transaction's encrypted note for @@ -161,11 +257,11 @@ pub async fn generate_next_block( pk, )?; - // Apply producer transaction in the overlay + // Apply producer transaction in the forks' overlay let _ = apply_producer_transaction( - &overlay, + &extended_fork.overlay, next_block_height, - block_target, + extended_fork.module.target, &tx, &mut MerkleTree::new(1), ) @@ -173,8 +269,10 @@ pub async fn generate_next_block( txs.push(tx); // Grab the updated contracts states root - let diff = overlay.lock().unwrap().overlay.lock().unwrap().diff(&extended_fork.diffs)?; - overlay + let diff = + extended_fork.overlay.lock().unwrap().overlay.lock().unwrap().diff(&extended_fork.diffs)?; + extended_fork + .overlay .lock() .unwrap() .contracts @@ -183,12 +281,9 @@ pub async fn generate_next_block( return Err(Error::ContractsStatesRootNotFoundError); }; - // Drop new trees opened by the unproposed transactions overlay - overlay.lock().unwrap().overlay.lock().unwrap().purge_new_trees()?; - // Generate the new header let mut header = - Header::new(last_proposal.hash, next_block_height, Timestamp::current_time(), 0); + Header::new(last_proposal.hash, next_block_height, 0, Timestamp::current_time()); header.state_root = state_root; // Generate the block @@ -197,10 +292,13 @@ pub async fn generate_next_block( // Add transactions to the block next_block.append_txs(txs); - // Grab the next mine target - let target = extended_fork.module.next_mine_target()?; - - Ok((target, next_block, block_signing_keypair.secret)) + Ok(BlockTemplate::new( + next_block, + extended_fork.module.darkfi_rx_keys.0, + target, + difficulty, + block_signing_keypair.secret, + )) } /// Auxiliary function to generate a Money::PoWReward transaction. diff --git a/bin/darkfid/src/rpc/rpc_stratum.rs b/bin/darkfid/src/rpc/rpc_stratum.rs index 8b4d836e4..9e53f4315 100644 --- a/bin/darkfid/src/rpc/rpc_stratum.rs +++ b/bin/darkfid/src/rpc/rpc_stratum.rs @@ -16,10 +16,7 @@ * along with this program. If not, see . */ -use std::{ - collections::{HashMap, HashSet}, - str::FromStr, -}; +use std::collections::{HashMap, HashSet}; use async_trait::async_trait; use smol::lock::MutexGuard; @@ -34,17 +31,9 @@ use darkfi::{ server::RequestHandler, }, system::StoppableTaskPtr, - util::{encoding::base64, time::Timestamp}, - validator::consensus::Proposal, }; -use darkfi_sdk::crypto::keypair::Address; -use darkfi_serial::serialize_async; -use crate::{ - proto::ProposalMessage, - registry::model::{generate_next_block, MinerRewardsRecipientConfig}, - BlockTemplate, DarkfiNode, MiningJobs, -}; +use crate::{registry::model::MinerRewardsRecipientConfig, server_error, DarkfiNode, RpcError}; // https://github.com/xmrig/xmrig-proxy/blob/master/doc/STRATUM.md // https://github.com/xmrig/xmrig-proxy/blob/master/doc/STRATUM_EXT.md @@ -74,77 +63,92 @@ impl RequestHandler for DarkfiNode { } } -// TODO: We often just return InvalidParams. These should be cleaned up -// and more verbose. -// TODO: The jobs storing method is not the most ideal. Think of a better one. -// `self.mining_jobs` - -// Random testnet address for reference: -// fUfG4WhbHP5C2MhYW3FHctVqi2jfXHamoQeU8KiirKVtoMBEUejkwq9F - impl DarkfiNode { // RPCAPI: - // Miner sends a `login` request after establishing connection - // in order to authorize. + // Register a new mining client to the registry and generate a new + // job. // - // The server will return a job along with an id. - // ``` - // "job": { - // "blob": 070780e6b9d...4d62fa6c77e76c3001", - // "job_id": "q7PLUPL25UV0z5Ij14IyMk8htXbj", - // "target": "b88d0600", - // "algo": "rx/0" - // } - // ``` + // **Request:** + // * `login` : A base-64 encoded wallet address mining configuration + // * `pass` : Unused client password field. Expects default "x" value. + // * `agent` : Client agent description + // * `algo` : Client supported mining algorithms // - // --> {"jsonrpc":"2.0", "method": "login", "id": 1, "params": {"login": "receiving_address", "pass": "x", "agent": "XMRig", "algo": ["rx/0"]}} + // **Response:** + // * `id` : Registry client ID + // * `job` : The generated mining job + // * `status` : Response status + // + // The generated mining job consists of the following fields: + // * `blob` : The hex encoded block hashing blob of the job block + // * `job_id` : Registry mining job ID + // * `height` : The job block height + // * `target` : Current mining target + // * `algo` : The mining algorithm - RandomX + // * `seed_hash` : Current RandomX key + // + // --> {"jsonrpc":"2.0", "method": "login", "id": 1, "params": {"login": "MINING_CONFIG", "pass": "", "agent": "XMRig", "algo": ["rx/0"]}} // <-- {"jsonrpc":"2.0", "id": 1, "result": {"id": "1be0b7b6-b15a-47be-a17d-46b2911cf7d0", "job": { ... }, "status": "OK"}} pub async fn stratum_login(&self, id: u16, params: JsonValue) -> JsonResult { - // TODO: Fail when not synced + // Check if node is synced before responding + if !*self.validator.synced.read().await { + return server_error(RpcError::NotSynced, id, None) + } + + // Parse request params let Some(params) = params.get::>() else { return JsonError::new(InvalidParams, None, id).into() }; - let Some(login) = params.get("login") else { - return JsonError::new(InvalidParams, Some("Missing 'login'".to_string()), id).into() - }; - let Some(login) = login.get::() else { + if params.len() != 4 { return JsonError::new(InvalidParams, None, id).into() + } + + // Parse login mining configuration + let Some(wallet) = params.get("login") else { + return server_error(RpcError::MinerMissingLogin, id, None) }; + let Some(wallet) = wallet.get::() else { + return server_error(RpcError::MinerInvalidLogin, id, None) + }; + let config = + match MinerRewardsRecipientConfig::from_base64(&self.registry.network, wallet).await { + Ok(c) => c, + Err(e) => return server_error(e, id, None), + }; + + // Parse password let Some(pass) = params.get("pass") else { - return JsonError::new(InvalidParams, Some("Missing 'pass'".to_string()), id).into() + return server_error(RpcError::MinerMissingPassword, id, None) }; - let Some(_pass) = pass.get::() else { - return JsonError::new(InvalidParams, None, id).into() + let Some(pass) = pass.get::() else { + return server_error(RpcError::MinerInvalidPassword, id, None) }; + if pass != "x" { + return server_error(RpcError::MinerInvalidPassword, id, None) + } + + // Parse agent let Some(agent) = params.get("agent") else { - return JsonError::new(InvalidParams, Some("Missing 'agent'".to_string()), id).into() + return server_error(RpcError::MinerMissingAgent, id, None) }; let Some(agent) = agent.get::() else { - return JsonError::new(InvalidParams, None, id).into() - }; - let Some(algo) = params.get("algo") else { - return JsonError::new(InvalidParams, Some("Missing 'algo'".to_string()), id).into() - }; - let Some(algo) = algo.get::>() else { - return JsonError::new(InvalidParams, None, id).into() + return server_error(RpcError::MinerInvalidAgent, id, None) }; - // Try to parse `login` as valid address. This will be the - // block reward recipient. - let Ok(address) = Address::from_str(login) else { - return JsonError::new(InvalidParams, Some("Invalid address".to_string()), id).into() + // Parge algo + let Some(algo) = params.get("algo") else { + return server_error(RpcError::MinerMissingAlgo, id, None) + }; + let Some(algo) = algo.get::>() else { + return server_error(RpcError::MinerInvalidAlgo, id, None) }; - if address.network() != self.network { - return JsonError::new(InvalidParams, Some("Invalid address prefix".to_string()), id) - .into() - } // Iterate through `algo` to see if "rx/0" is supported. // rx/0 is RandomX. let mut found_rx0 = false; for i in algo { let Some(algo) = i.get::() else { - return JsonError::new(InvalidParams, None, id).into() + return server_error(RpcError::MinerInvalidAlgo, id, None) }; if algo == "rx/0" { found_rx0 = true; @@ -152,231 +156,184 @@ impl DarkfiNode { } } if !found_rx0 { - return JsonError::new(InvalidParams, Some("rx/0 not supported".to_string()), id).into() + return server_error(RpcError::MinerRandomXNotSupported, id, None) } - info!("[STRATUM] Got login from {} ({})", address, agent); - let conn_id = { - let mut hasher = blake3::Hasher::new(); - hasher.update(&address.to_string().into_bytes()); - hasher.update(&Timestamp::current_time().inner().to_le_bytes()); - *hasher.finalize().as_bytes() - }; + // Register the new miner + info!(target: "darkfid::rpc::rpc_stratum::stratum_login","[RPC-STRATUM] Got login from {wallet} ({agent})"); + let (client_id, block_template, publisher) = + match self.registry.register_miner(&self.validator, wallet, &config).await { + Ok(p) => p, + Err(e) => { + error!( + target: "darkfid::rpc::rpc_stratum::stratum_login", + "[RPC-STRATUM] Failed to register miner: {e}", + ); + return JsonError::new(ErrorCode::InternalError, None, id).into() + } + }; - // Now we should register this login, and create a blocktemplate and - // a job for them. - // TODO: We also have to spawn the notification task that will send - // JSONRPC notifications to this connection when a new job is available. - - // We'll clear any existing jobs for this login. - let mut mining_jobs = self.registry.mining_jobs.lock().await; - mining_jobs.insert(conn_id, MiningJobs::default()); - - // Find applicable chain fork - let mut extended_fork = match self.validator.best_current_fork().await { - Ok(f) => f, - Err(e) => { - error!( - target: "darkfid::rpc_stratum::stratum_login", - "[STRATUM] Finding best fork index failed: {e}", - ); - return JsonError::new(ErrorCode::InternalError, None, id).into() - } - }; - - // Query the Validator for a new blocktemplate. - // We first need to construct `MinerRewardsRecipientConfig` from the - // address configuration provided to us through the RPC. - // TODO: Parse any spend hook from the login. We might also want to - // define a specific address format if it includes extra data. - // We could also include arbitrary information in the login password. - let recipient_config = - MinerRewardsRecipientConfig { recipient: address, spend_hook: None, user_data: None }; - - // Find next block target - let target = self.validator.consensus.module.read().await.target; - - // Generate blocktemplate with all the information. - // This will return the mining target, the entire block, and the - // ephemeral secret used to sign the mined block. - let (target, block, secret) = match generate_next_block( - &mut extended_fork, - &recipient_config, - &self.registry.powrewardv1_zk.zkbin, - &self.registry.powrewardv1_zk.provingkey, - target, - self.validator.verify_fees, - ) - .await - { - Ok(v) => v, - Err(e) => { - error!( - target: "darkfid::rpc_stratum::stratum_login", - "[STRATUM] Failed to generate next blocktemplate: {e}", - ); - return JsonError::new(ErrorCode::InternalError, None, id).into() - } - }; - - // Reference the RandomX dataset seed - // TODO: We can also send `next_seed_hash` when we know it. - let seed_hash = extended_fork.module.darkfi_rx_keys.0.inner(); - - // We will store this in our mining jobs map for reference when - // a miner solution is submitted. - let blocktemplate = - BlockTemplate { block, randomx_key: *seed_hash, target: target.clone(), secret }; - - // Construct everything needed for the Stratum response. - let blob = blocktemplate.block.header.to_blockhashing_blob(); - let job_id = *blocktemplate.block.header.hash().inner(); - let height = blocktemplate.block.header.height as f64; - // The target should be compacted to 8 bytes little-endian. - let target = &target.to_bytes_le()[..8]; - - // Store the job. unwrap should be fine because we created this above. - let jobs = mining_jobs.get_mut(&conn_id).unwrap(); - jobs.insert(job_id, blocktemplate); - - // Construct response - let job: HashMap = HashMap::from([ - ("blob".to_string(), hex::encode(&blob).to_string().into()), - ("job_id".to_string(), hex::encode(job_id).to_string().into()), - ("height".to_string(), height.into()), - ("target".to_string(), hex::encode(target).into()), - ("algo".to_string(), "rx/0".to_string().into()), - ("seed_hash".to_string(), hex::encode(seed_hash).into()), - ]); - - let result = HashMap::from([ - ("id".to_string(), hex::encode(conn_id).into()), - ("job".to_string(), job.into()), - ("status".to_string(), "OK".to_string().into()), - ]); - - // Ship it. - JsonResponse::new(result.into(), id).into() + // Now we have the new job, we ship it to RPC + let (job_id, job) = block_template.job_notification(); + info!( + target: "darkfid::rpc::rpc_stratum::stratum_login", + "[RPC-STRATUM] Created new mining job for client {client_id}: {job_id}" + ); + let response = JsonValue::from(HashMap::from([ + ("id".to_string(), JsonValue::from(client_id)), + ("job".to_string(), job), + ("status".to_string(), JsonValue::from(String::from("OK"))), + ])); + (publisher, JsonResponse::new(response, id)).into() } // RPCAPI: // Miner submits a job solution. // + // **Request:** + // * `id` : Registry client ID + // * `job_id` : Registry mining job ID + // * `nonce` : The hex encoded solution header nonce. + // * `result` : RandomX calculated hash + // + // **Response:** + // * `status`: Block submit status + // // --> {"jsonrpc":"2.0", "method": "submit", "id": 1, "params": {"id": "...", "job_id": "...", "nonce": "d0030040", "result": "e1364b8782719d7683e2ccd3d8f724bc59dfa780a9e960e7c0e0046acdb40100"}} // <-- {"jsonrpc":"2.0", "id": 1, "result": {"status": "OK"}} pub async fn stratum_submit(&self, id: u16, params: JsonValue) -> JsonResult { - // TODO: Maybe grab an exclusive lock to avoid the xmrig spam while - // we're doing the pow verification. xmrig spams us whenever it gets - // a solution, and this will end up in cloning a bunch of blocktemplates - // and is going to cause memory usage to go up significantly. - // Ideally we should block here until we finish each submit one-by-one - // and find a valid one. Then when we do find a valid one, we should - // clear the existing job(s) so this method will just return an error - // and not have to do all the block shenanigans. - // Additionally when a block is proposed successfully, the node should - // send a new job notification to xmrig so we should be fine. - // That notification part should also clear the existing jobs. + // Check if node is synced before responding + if !*self.validator.synced.read().await { + return server_error(RpcError::NotSynced, id, None) + } + + // Grab registry submissions lock + let submit_lock = self.registry.submit_lock.write().await; + + // Parse request params let Some(params) = params.get::>() else { return JsonError::new(InvalidParams, None, id).into() }; - let Some(conn_id) = params.get("id") else { - return JsonError::new(InvalidParams, Some("Missing 'id'".to_string()), id).into() - }; - let Some(conn_id) = conn_id.get::() else { + if params.len() != 4 { return JsonError::new(InvalidParams, None, id).into() + } + + // Parse client id + let Some(client_id) = params.get("id") else { + return server_error(RpcError::MinerMissingClientId, id, None) }; + let Some(client_id) = client_id.get::() else { + return server_error(RpcError::MinerInvalidClientId, id, None) + }; + + // If we don't know about this client, we can just abort here + let clients = self.registry.clients.read().await; + let Some(client) = clients.get(client_id) else { + return server_error(RpcError::MinerUnknownClient, id, None) + }; + + // Parse job id let Some(job_id) = params.get("job_id") else { - return JsonError::new(InvalidParams, Some("Missing 'job_id'".to_string()), id).into() + return server_error(RpcError::MinerMissingJobId, id, None) }; let Some(job_id) = job_id.get::() else { - return JsonError::new(InvalidParams, None, id).into() + return server_error(RpcError::MinerInvalidJobId, id, None) }; + + // If we don't know about this job or it doesn't match the + // client one, we can just abort here + if &client.job != job_id { + return server_error(RpcError::MinerUnknownJob, id, None) + } + let jobs = self.registry.jobs.read().await; + let Some(wallet) = jobs.get(job_id) else { + return server_error(RpcError::MinerUnknownJob, id, None) + }; + + // If this job wallet template doesn't exist, we can just + // abort here. + let mut block_templates = self.registry.block_templates.write().await; + let Some(block_template) = block_templates.get_mut(wallet) else { + return server_error(RpcError::MinerUnknownJob, id, None) + }; + + // If this template has been already submitted, reject this + // submission. + if block_template.submitted { + return JsonResponse::new( + JsonValue::from(HashMap::from([( + "status".to_string(), + JsonValue::from(String::from("rejected")), + )])), + id, + ) + .into() + } + + // Parse nonce let Some(nonce) = params.get("nonce") else { - return JsonError::new(InvalidParams, Some("Missing 'nonce'".to_string()), id).into() + return server_error(RpcError::MinerMissingNonce, id, None) }; let Some(nonce) = nonce.get::() else { - return JsonError::new(InvalidParams, None, id).into() + return server_error(RpcError::MinerInvalidNonce, id, None) }; - // result is the RandomX calculated hash. Useful to verify/debug. - let Some(result) = params.get("result") else { - return JsonError::new(InvalidParams, Some("Missing 'result'".to_string()), id).into() - }; - let Some(_result) = result.get::() else { - return JsonError::new(InvalidParams, None, id).into() - }; - - let Ok(conn_id) = hex::decode(conn_id) else { - return JsonError::new(InvalidParams, None, id).into() - }; - if conn_id.len() != 32 { - return JsonError::new(InvalidParams, None, id).into() - } - let Ok(job_id) = hex::decode(job_id) else { - return JsonError::new(InvalidParams, None, id).into() - }; - if job_id.len() != 32 { - return JsonError::new(InvalidParams, None, id).into() - } - - let conn_id: [u8; 32] = conn_id.try_into().unwrap(); - let job_id: [u8; 32] = job_id.try_into().unwrap(); - - // We should be aware of this conn_id and job_id. - let mut mining_jobs = self.registry.mining_jobs.lock().await; - let Some(jobs) = mining_jobs.get_mut(&conn_id) else { - return JsonError::new(InvalidParams, None, id).into() - }; - // Get the blocktemplate. - let Some(blocktemplate) = jobs.get_mut(&job_id) else { - return JsonError::new(InvalidParams, None, id).into() - }; - - // Parse the nonce into u32. let Ok(nonce_bytes) = hex::decode(nonce) else { - return JsonError::new(InvalidParams, None, id).into() + return server_error(RpcError::MinerInvalidNonce, id, None) }; if nonce_bytes.len() != 4 { - return JsonError::new(InvalidParams, None, id).into() + return server_error(RpcError::MinerInvalidNonce, id, None) } let nonce = u32::from_le_bytes(nonce_bytes.try_into().unwrap()); - // We clone the block, update the nonce, - // sign it, and ship it into a proposal. - let mut block = blocktemplate.block.clone(); - block.header.nonce = nonce; - block.sign(&blocktemplate.secret); + // Parse result + let Some(result) = params.get("result") else { + return server_error(RpcError::MinerMissingResult, id, None) + }; + let Some(_result) = result.get::() else { + return server_error(RpcError::MinerInvalidResult, id, None) + }; info!( - target: "darkfid::rpc_stratum::stratum_submit", - "[STRATUM] Proposing new block to network", + target: "darkfid::rpc::rpc_stratum::stratum_submit", + "[RPC-STRATUM] Got solution submission from client {client_id} for job: {job_id}", ); - let proposal = Proposal::new(block); - if let Err(e) = self.validator.append_proposal(&proposal).await { + + // Update the block nonce and sign it + let mut block = block_template.block.clone(); + block.header.nonce = nonce; + block.sign(&block_template.secret); + + // Submit the new block through the registry + if let Err(e) = + self.registry.submit(&self.validator, &self.subscribers, &self.p2p_handler, block).await + { error!( - target: "darkfid::rpc_stratum::stratum_submit", - "[STRATUM] Error proposing new block: {e}", + target: "darkfid::rpc::rpc_xmr::xmr_merge_submit_solution", + "[RPC-XMR] Error submitting new block: {e}", ); - return JsonError::new(InvalidParams, None, id).into() + return JsonResponse::new( + JsonValue::from(HashMap::from([( + "status".to_string(), + JsonValue::from(String::from("rejected")), + )])), + id, + ) + .into() } - // Proposal passed. We will now clear the jobs as it's assumed - // a new block needs to be mined. - mining_jobs.insert(conn_id, MiningJobs::default()); + // Mark block as submitted + block_template.submitted = true; - // Broadcast to network - let proposals_sub = self.subscribers.get("proposals").unwrap(); - let enc_prop = JsonValue::String(base64::encode(&serialize_async(&proposal).await)); - proposals_sub.notify(vec![enc_prop].into()).await; - - info!( - target: "darkfid::rpc_stratum::stratum_submit", - "[STRATUM] Broadcasting new block to network", - ); - let message = ProposalMessage(proposal); - self.p2p_handler.p2p.broadcast(&message).await; + // Release all locks + drop(block_templates); + drop(jobs); + drop(submit_lock); JsonResponse::new( - HashMap::from([("status".to_string(), "OK".to_string().into())]).into(), + JsonValue::from(HashMap::from([( + "status".to_string(), + JsonValue::from(String::from("OK")), + )])), id, ) .into() @@ -385,21 +342,47 @@ impl DarkfiNode { // RPCAPI: // Miner sends `keepalived` to prevent connection timeout. // + // **Request:** + // * `id` : Registry client ID + // + // **Response:** + // * `status`: Response status + // // --> {"jsonrpc":"2.0", "method": "keepalived", "id": 1, "params": {"id": "foo"}} // <-- {"jsonrpc":"2.0", "id": 1, "result": {"status": "KEEPALIVED"}} pub async fn stratum_keepalived(&self, id: u16, params: JsonValue) -> JsonResult { + // Check if node is synced before responding + if !*self.validator.synced.read().await { + return server_error(RpcError::NotSynced, id, None) + } + + // Parse request params let Some(params) = params.get::>() else { return JsonError::new(InvalidParams, None, id).into() }; - let Some(_conn_id) = params.get("id") else { - return JsonError::new(InvalidParams, Some("Missing 'id'".to_string()), id).into() + if params.len() != 1 { + return JsonError::new(InvalidParams, None, id).into() + } + + // Parse client id + let Some(client_id) = params.get("id") else { + return server_error(RpcError::MinerMissingClientId, id, None) + }; + let Some(client_id) = client_id.get::() else { + return server_error(RpcError::MinerInvalidClientId, id, None) }; - // TODO: This conn_id should likely exist. We should probably check - // that. Otherwise we might not want to reply at all. + // If we don't know about this client, we can just abort here + if !self.registry.clients.read().await.contains_key(client_id) { + return server_error(RpcError::MinerUnknownClient, id, None) + }; + // Respond with keepalived message JsonResponse::new( - JsonValue::from(HashMap::from([("status".into(), "KEEPALIVED".to_string().into())])), + JsonValue::from(HashMap::from([( + "status".to_string(), + JsonValue::from(String::from("KEEPALIVED")), + )])), id, ) .into() diff --git a/bin/darkfid/src/rpc/rpc_xmr.rs b/bin/darkfid/src/rpc/rpc_xmr.rs index 45a37ee52..fece60bff 100644 --- a/bin/darkfid/src/rpc/rpc_xmr.rs +++ b/bin/darkfid/src/rpc/rpc_xmr.rs @@ -43,20 +43,9 @@ use darkfi::{ server::RequestHandler, }, system::StoppableTaskPtr, - util::encoding::base64, - validator::consensus::Proposal, }; -use darkfi_sdk::{ - crypto::{keypair::Address, pasta_prelude::PrimeField, FuncId}, - pasta::pallas, -}; -use darkfi_serial::{deserialize_async, serialize_async}; -use crate::{ - proto::ProposalMessage, - registry::model::{generate_next_block, MinerRewardsRecipientConfig, MmBlockTemplate}, - server_error, DarkfiNode, RpcError, -}; +use crate::{registry::model::MinerRewardsRecipientConfig, server_error, DarkfiNode, RpcError}; // https://github.com/SChernykh/p2pool/blob/master/docs/MERGE_MINING.MD @@ -115,7 +104,7 @@ impl DarkfiNode { Ok(v) => v, Err(e) => { error!( - target: "darkfid::rpc::xmr_merge_mining_get_chain_id", + target: "darkfid::rpc::rpc_xmr::xmr_merge_mining_get_chain_id", "[RPC-XMR] Error fetching genesis block hash: {e}" ); return JsonError::new(ErrorCode::InternalError, None, id).into() @@ -141,12 +130,12 @@ impl DarkfiNode { // * `prev_id` : Hash of the previous Monero block // // **Response:** - // * `aux_blob`: The hex-encoded wallet address mining configuration blob + // * `aux_blob`: A hex-encoded blob of empty data // * `aux_diff`: Mining difficulty (decimal number) // * `aux_hash`: A 32-byte hex-encoded hash of merge mined block // // --> {"jsonrpc":"2.0", "method": "merge_mining_get_aux_block", "params": {"address": "MERGE_MINED_CHAIN_ADDRESS", "aux_hash": "f6952d6eef555ddd87aca66e56b91530222d6e318414816f3ba7cf5bf694bf0f", "height": 3000000, "prev_id":"ad505b0be8a49b89273e307106fa42133cbd804456724c5e7635bd953215d92a"}, "id": 1} - // <-- {"jsonrpc":"2.0", "result": {"aux_blob": "4c6f72656d20697073756d", "aux_diff": 123456, "aux_hash":"f6952d6eef555ddd87aca66e56b91530222d6e318414816f3ba7cf5bf694bf0f"}, "id": 1} + // <-- {"jsonrpc":"2.0", "result": {"aux_blob": "", "aux_diff": 123456, "aux_hash":"f6952d6eef555ddd87aca66e56b91530222d6e318414816f3ba7cf5bf694bf0f"}, "id": 1} pub async fn xmr_merge_mining_get_aux_block(&self, id: u16, params: JsonValue) -> JsonResult { // Check if node is synced before responding to p2pool if !*self.validator.synced.read().await { @@ -161,51 +150,6 @@ impl DarkfiNode { return JsonError::new(InvalidParams, None, id).into() } - // Parse address mining configuration - let Some(address) = params.get("address") else { - return server_error(RpcError::MinerMissingAddress, id, None) - }; - let Some(address) = address.get::() else { - return server_error(RpcError::MinerInvalidAddress, id, None) - }; - let Some(address_bytes) = base64::decode(address) else { - return server_error(RpcError::MinerInvalidAddress, id, None) - }; - let Ok((recipient, spend_hook, user_data)) = - deserialize_async::<(String, Option, Option)>(&address_bytes).await - else { - return server_error(RpcError::MinerInvalidAddress, id, None) - }; - let Ok(recipient) = Address::from_str(&recipient) else { - return server_error(RpcError::MinerInvalidRecipient, id, None) - }; - if recipient.network() != self.network { - return server_error(RpcError::MinerInvalidRecipientPrefix, id, None) - } - let spend_hook = match spend_hook { - Some(s) => match FuncId::from_str(&s) { - Ok(s) => Some(s), - Err(_) => return server_error(RpcError::MinerInvalidSpendHook, id, None), - }, - None => None, - }; - let user_data: Option = match user_data { - Some(u) => { - let Ok(bytes) = bs58::decode(&u).into_vec() else { - return server_error(RpcError::MinerInvalidUserData, id, None) - }; - let bytes: [u8; 32] = match bytes.try_into() { - Ok(b) => b, - Err(_) => return server_error(RpcError::MinerInvalidUserData, id, None), - }; - match pallas::Base::from_repr(bytes).into() { - Some(v) => Some(v), - None => return server_error(RpcError::MinerInvalidUserData, id, None), - } - } - None => None, - }; - // Parse aux_hash let Some(aux_hash) = params.get("aux_hash") else { return server_error(RpcError::MinerMissingAuxHash, id, None) @@ -213,10 +157,28 @@ impl DarkfiNode { let Some(aux_hash) = aux_hash.get::() else { return server_error(RpcError::MinerInvalidAuxHash, id, None) }; - let Ok(aux_hash) = HeaderHash::from_str(aux_hash) else { + if HeaderHash::from_str(aux_hash).is_err() { return server_error(RpcError::MinerInvalidAuxHash, id, None) }; + // Check if we already have this job + if self.registry.jobs.read().await.contains_key(&aux_hash.to_string()) { + return JsonResponse::new(JsonValue::from(HashMap::new()), id).into() + } + + // Parse address mining configuration + let Some(wallet) = params.get("address") else { + return server_error(RpcError::MinerMissingAddress, id, None) + }; + let Some(wallet) = wallet.get::() else { + return server_error(RpcError::MinerInvalidAddress, id, None) + }; + let config = + match MinerRewardsRecipientConfig::from_base64(&self.registry.network, wallet).await { + Ok(c) => c, + Err(e) => return server_error(e, id, None), + }; + // Parse height let Some(height) = params.get("height") else { return server_error(RpcError::MinerMissingHeight, id, None) @@ -238,126 +200,29 @@ impl DarkfiNode { }; let prev_id = monero::Hash::from_slice(&prev_id); - // Now that method params format is correct, we can check if we - // already have a mining job for this wallet. If we already - // have it, we check if the fork it extends is still the best - // one. If both checks pass, we can just return an empty - // response if the request `aux_hash` matches the job one, - // otherwise return the job block template hash. In case the - // best fork has changed, we drop this job and generate a - // new one. If we don't know this wallet, we create a new job. - // We'll also obtain a lock here to avoid getting polled - // multiple times and potentially missing a job. The lock is - // released when this function exits. - let mut mm_blocktemplates = self.registry.mm_blocktemplates.lock().await; - let mut extended_fork = match self.validator.best_current_fork().await { - Ok(f) => f, - Err(e) => { - error!( - target: "darkfid::rpc_xmr::xmr_merge_mining_get_aux_block", - "[RPC-XMR] Finding best fork index failed: {e}", - ); - return JsonError::new(ErrorCode::InternalError, None, id).into() - } - }; - if let Some(blocktemplate) = mm_blocktemplates.get(&address_bytes) { - let last_proposal = match extended_fork.last_proposal() { + // Register the new merge miner + let (job_id, difficulty) = + match self.registry.register_merge_miner(&self.validator, wallet, &config).await { Ok(p) => p, Err(e) => { error!( - target: "darkfid::rpc_xmr::xmr_merge_mining_get_aux_block", - "[RPC-XMR] Retrieving best fork last proposal failed: {e}", + target: "darkfid::rpc::rpc_xmr::xmr_merge_mining_get_aux_block", + "[RPC-XMR] Failed to register merge miner: {e}", ); return JsonError::new(ErrorCode::InternalError, None, id).into() } }; - if last_proposal.hash == blocktemplate.block.header.previous { - let blockhash = blocktemplate.block.header.template_hash(); - return if blockhash != aux_hash { - JsonResponse::new( - JsonValue::from(HashMap::from([ - ("aux_blob".to_string(), JsonValue::from(hex::encode(address_bytes))), - ("aux_diff".to_string(), JsonValue::from(blocktemplate.difficulty)), - ("aux_hash".to_string(), JsonValue::from(blockhash.as_string())), - ])), - id, - ) - .into() - } else { - JsonResponse::new(JsonValue::from(HashMap::new()), id).into() - } - } - mm_blocktemplates.remove(&address_bytes); - } - // At this point, we should query the Validator for a new blocktemplate. - // We first need to construct `MinerRewardsRecipientConfig` from the - // address configuration provided to us through the RPC. - let recipient_str = format!("{recipient}"); - let spend_hook_str = match spend_hook { - Some(spend_hook) => format!("{spend_hook}"), - None => String::from("-"), - }; - let user_data_str = match user_data { - Some(user_data) => bs58::encode(user_data.to_repr()).into_string(), - None => String::from("-"), - }; - let recipient_config = MinerRewardsRecipientConfig { recipient, spend_hook, user_data }; - - // Now let's try to construct the blocktemplate. - // Find the difficulty. Note we cast it to f64 here. - let difficulty: f64 = match extended_fork.module.next_difficulty() { - Ok(v) => { - // We will attempt to cast it to f64. This should always work. - v.to_string().parse().unwrap() - } - Err(e) => { - error!( - target: "darkfid::rpc_xmr::xmr_merge_mining_get_aux_block", - "[RPC-XMR] Finding next mining difficulty failed: {e}", - ); - return JsonError::new(ErrorCode::InternalError, None, id).into() - } - }; - - let (_, block, block_signing_secret) = match generate_next_block( - &mut extended_fork, - &recipient_config, - &self.registry.powrewardv1_zk.zkbin, - &self.registry.powrewardv1_zk.provingkey, - self.validator.consensus.module.read().await.target, - self.validator.verify_fees, - ) - .await - { - Ok(v) => v, - Err(e) => { - error!( - target: "darkfid::rpc_xmr::xmr_merge_mining_get_aux_block", - "[RPC-XMR] Failed to generate next blocktemplate: {e}", - ); - return JsonError::new(ErrorCode::InternalError, None, id).into() - } - }; - - // Now we have the blocktemplate. We'll mark it down in memory, - // and then ship it to RPC. - let blockhash = block.header.template_hash(); - mm_blocktemplates.insert( - address_bytes.clone(), - MmBlockTemplate { block, difficulty, secret: block_signing_secret }, - ); + // Now we have the new job, we ship it to RPC info!( - target: "darkfid::rpc_xmr::xmr_merge_mining_get_aux_block", - "[RPC-XMR] Created new blocktemplate: address={recipient_str}, spend_hook={spend_hook_str}, user_data={user_data_str}, aux_hash={blockhash}, height={height}, prev_id={prev_id}" + target: "darkfid::rpc::rpc_xmr::xmr_merge_mining_get_aux_block", + "[RPC-XMR] Created new merge mining job: aux_hash={job_id}, height={height}, prev_id={prev_id}" ); - let response = JsonValue::from(HashMap::from([ - ("aux_blob".to_string(), JsonValue::from(hex::encode(address_bytes))), + ("aux_blob".to_string(), JsonValue::from(hex::encode(vec![]))), ("aux_diff".to_string(), JsonValue::from(difficulty)), - ("aux_hash".to_string(), JsonValue::from(blockhash.as_string())), + ("aux_hash".to_string(), JsonValue::from(job_id)), ])); - JsonResponse::new(response, id).into() } @@ -381,7 +246,7 @@ impl DarkfiNode { // **Response:** // * `status`: Block submit status // - // --> {"jsonrpc":"2.0", "method": "merge_mining_submit_solution", "params": {"aux_blob": "4c6f72656d20697073756d", "aux_hash": "f6952d6eef555ddd87aca66e56b91530222d6e318414816f3ba7cf5bf694bf0f", "blob": "...", "merkle_proof": ["hash1", "hash2", "hash3"], "path": 3, "seed_hash": "22c3d47c595ae888b5d7fc304235f92f8854644d4fad38c5680a5d4a81009fcd"}, "id": 1} + // --> {"jsonrpc":"2.0", "method": "merge_mining_submit_solution", "params": {"aux_blob": "", "aux_hash": "f6952d6eef555ddd87aca66e56b91530222d6e318414816f3ba7cf5bf694bf0f", "blob": "...", "merkle_proof": ["hash1", "hash2", "hash3"], "path": 3, "seed_hash": "22c3d47c595ae888b5d7fc304235f92f8854644d4fad38c5680a5d4a81009fcd"}, "id": 1} // <-- {"jsonrpc":"2.0", "result": {"status": "accepted"}, "id": 1} pub async fn xmr_merge_mining_submit_solution(&self, id: u16, params: JsonValue) -> JsonResult { // Check if node is synced before responding to p2pool @@ -389,6 +254,9 @@ impl DarkfiNode { return server_error(RpcError::NotSynced, id, None) } + // Grab registry submissions lock + let submit_lock = self.registry.submit_lock.write().await; + // Parse request params let Some(params) = params.get::>() else { return JsonError::new(InvalidParams, None, id).into() @@ -397,46 +265,6 @@ impl DarkfiNode { return JsonError::new(InvalidParams, None, id).into() } - // Parse address mining configuration from aux_blob - let Some(aux_blob) = params.get("aux_blob") else { - return server_error(RpcError::MinerMissingAuxBlob, id, None) - }; - let Some(aux_blob) = aux_blob.get::() else { - return server_error(RpcError::MinerInvalidAuxBlob, id, None) - }; - let Ok(address_bytes) = hex::decode(aux_blob) else { - return server_error(RpcError::MinerInvalidAuxBlob, id, None) - }; - let Ok((recipient, spend_hook, user_data)) = - deserialize_async::<(String, Option, Option)>(&address_bytes).await - else { - return server_error(RpcError::MinerInvalidAuxBlob, id, None) - }; - let Ok(recipient) = Address::from_str(&recipient) else { - return server_error(RpcError::MinerInvalidRecipient, id, None) - }; - if recipient.network() != self.network { - return server_error(RpcError::MinerInvalidRecipientPrefix, id, None) - } - if let Some(spend_hook) = spend_hook { - if FuncId::from_str(&spend_hook).is_err() { - return server_error(RpcError::MinerInvalidSpendHook, id, None) - } - }; - if let Some(user_data) = user_data { - let Ok(bytes) = bs58::decode(&user_data).into_vec() else { - return server_error(RpcError::MinerInvalidUserData, id, None) - }; - let bytes: [u8; 32] = match bytes.try_into() { - Ok(b) => b, - Err(_) => return server_error(RpcError::MinerInvalidUserData, id, None), - }; - let _: pallas::Base = match pallas::Base::from_repr(bytes).into() { - Some(v) => v, - None => return server_error(RpcError::MinerInvalidUserData, id, None), - }; - }; - // Parse aux_hash let Some(aux_hash) = params.get("aux_hash") else { return server_error(RpcError::MinerMissingAuxHash, id, None) @@ -444,14 +272,48 @@ impl DarkfiNode { let Some(aux_hash) = aux_hash.get::() else { return server_error(RpcError::MinerInvalidAuxHash, id, None) }; - let Ok(aux_hash) = HeaderHash::from_str(aux_hash) else { + if HeaderHash::from_str(aux_hash).is_err() { return server_error(RpcError::MinerInvalidAuxHash, id, None) + } + + // If we don't know about this job, we can just abort here + let jobs = self.registry.jobs.read().await; + let Some(wallet) = jobs.get(aux_hash) else { + return server_error(RpcError::MinerUnknownJob, id, None) }; - // If we don't know about this job, we can just abort here. - let mut mm_blocktemplates = self.registry.mm_blocktemplates.lock().await; - if !mm_blocktemplates.contains_key(&address_bytes) { + // If this job wallet template doesn't exist, we can just + // abort here. + let mut block_templates = self.registry.block_templates.write().await; + let Some(block_template) = block_templates.get_mut(wallet) else { return server_error(RpcError::MinerUnknownJob, id, None) + }; + + // If this template has been already submitted, reject this + // submission. + if block_template.submitted { + return JsonResponse::new( + JsonValue::from(HashMap::from([( + "status".to_string(), + JsonValue::from(String::from("rejected")), + )])), + id, + ) + .into() + } + + // Parse aux_blob + let Some(aux_blob) = params.get("aux_blob") else { + return server_error(RpcError::MinerMissingAuxBlob, id, None) + }; + let Some(aux_blob) = aux_blob.get::() else { + return server_error(RpcError::MinerInvalidAuxBlob, id, None) + }; + let Ok(aux_blob) = hex::decode(aux_blob) else { + return server_error(RpcError::MinerInvalidAuxBlob, id, None) + }; + if !aux_blob.is_empty() { + return server_error(RpcError::MinerInvalidAuxBlob, id, None) } // Parse blob @@ -510,7 +372,7 @@ impl DarkfiNode { }; info!( - target: "darkfid::rpc_xmr::xmr_merge_mining_submit_solution", + target: "darkfid::rpc::rpc_xmr::xmr_merge_mining_submit_solution", "[RPC-XMR] Got solution submission: aux_hash={aux_hash}", ); @@ -522,7 +384,7 @@ impl DarkfiNode { Ok(v) => v, Err(e) => { error!( - target: "darkfid::rpc_xmr::xmr_merge_mining_submit_solution", + target: "darkfid::rpc::rpc_xmr::xmr_merge_mining_submit_solution", "[RPC-XMR] Failed constructing MoneroPowData: {e}", ); return server_error(RpcError::MinerMoneroPowDataConstructionFailed, id, None) @@ -530,52 +392,40 @@ impl DarkfiNode { }; // Append MoneroPowData to the DarkFi block and sign it - let blocktemplate = &mm_blocktemplates.get(&address_bytes).unwrap(); - let mut block = blocktemplate.block.clone(); + let mut block = block_template.block.clone(); block.header.pow_data = PowData::Monero(monero_pow_data); - block.sign(&blocktemplate.secret); + block.sign(&block_template.secret); - // At this point we should be able to remove the submitted job. - // We still won't release the lock in hope of proposing the block - // first. - mm_blocktemplates.remove(&address_bytes); - - // Propose the new block - info!( - target: "darkfid::rpc_xmr::xmr_merge_mining_submit_solution", - "[RPC-XMR] Proposing new block to network", - ); - let proposal = Proposal::new(block); - if let Err(e) = self.validator.append_proposal(&proposal).await { + // Submit the new block through the registry + if let Err(e) = + self.registry.submit(&self.validator, &self.subscribers, &self.p2p_handler, block).await + { error!( - target: "darkfid::rpc_xmr::xmr_merge_submit_solution", - "[RPC-XMR] Error proposing new block: {e}", + target: "darkfid::rpc::rpc_xmr::xmr_merge_submit_solution", + "[RPC-XMR] Error submitting new block: {e}", ); return JsonResponse::new( JsonValue::from(HashMap::from([( "status".to_string(), - JsonValue::from("rejected".to_string()), + JsonValue::from(String::from("rejected")), )])), id, ) .into() } - let proposals_sub = self.subscribers.get("proposals").unwrap(); - let enc_prop = JsonValue::String(base64::encode(&serialize_async(&proposal).await)); - proposals_sub.notify(vec![enc_prop].into()).await; + // Mark block as submitted + block_template.submitted = true; - info!( - target: "darkfid::rpc_xmr::xmr_merge_mining_submit_solution", - "[RPC-XMR] Broadcasting new block to network", - ); - let message = ProposalMessage(proposal); - self.p2p_handler.p2p.broadcast(&message).await; + // Release all locks + drop(block_templates); + drop(jobs); + drop(submit_lock); JsonResponse::new( JsonValue::from(HashMap::from([( "status".to_string(), - JsonValue::from("accepted".to_string()), + JsonValue::from(String::from("accepted")), )])), id, ) diff --git a/bin/darkfid/src/task/consensus.rs b/bin/darkfid/src/task/consensus.rs index 38ebfbcb3..2fc2569bd 100644 --- a/bin/darkfid/src/task/consensus.rs +++ b/bin/darkfid/src/task/consensus.rs @@ -192,8 +192,8 @@ async fn consensus_task( continue } - if let Err(e) = clean_blocktemplates(node).await { - error!(target: "darkfid", "Failed cleaning mining block templates: {e}") + if let Err(e) = node.registry.refresh(&node.validator).await { + error!(target: "darkfid", "Failed refreshing mining block templates: {e}") } let mut notif_blocks = Vec::with_capacity(confirmed.len()); @@ -219,80 +219,3 @@ async fn consensus_task( ); } } - -/// Auxiliary function to drop mining block templates not referencing -/// active forks or last confirmed block. -async fn clean_blocktemplates(node: &DarkfiNodePtr) -> Result<()> { - // Grab a lock over node mining templates - let mut blocktemplates = node.registry.blocktemplates.lock().await; - let mut mm_blocktemplates = node.registry.mm_blocktemplates.lock().await; - - // Early return if no mining block templates exist - if blocktemplates.is_empty() && mm_blocktemplates.is_empty() { - return Ok(()) - } - - // Grab a lock over node forks - let forks = node.validator.consensus.forks.read().await; - - // Grab last confirmed block for checks - let (_, last_confirmed) = node.validator.blockchain.last()?; - - // Loop through templates to find which can be dropped - let mut dropped_templates = vec![]; - 'outer: for (key, blocktemplate) in blocktemplates.iter() { - // Loop through all the forks - for fork in forks.iter() { - // Traverse fork proposals sequence in reverse - for p_hash in fork.proposals.iter().rev() { - // Check if job extends this fork - if &blocktemplate.block.header.previous == p_hash { - continue 'outer - } - } - } - - // Check if it extends last confirmed block - if blocktemplate.block.header.previous == last_confirmed { - continue - } - - // This job doesn't reference something so we drop it - dropped_templates.push(key.clone()); - } - - // Drop jobs not referencing active forks or last confirmed block - for key in dropped_templates { - blocktemplates.remove(&key); - } - - // Loop through merge mining templates to find which can be dropped - let mut dropped_templates = vec![]; - 'outer: for (key, blocktemplate) in mm_blocktemplates.iter() { - // Loop through all the forks - for fork in forks.iter() { - // Traverse fork proposals sequence in reverse - for p_hash in fork.proposals.iter().rev() { - // Check if job extends this fork - if &blocktemplate.block.header.previous == p_hash { - continue 'outer - } - } - } - - // Check if it extends last confirmed block - if blocktemplate.block.header.previous == last_confirmed { - continue - } - - // This job doesn't reference something so we drop it - dropped_templates.push(key.clone()); - } - - // Drop jobs not referencing active forks or last confirmed block - for key in dropped_templates { - mm_blocktemplates.remove(&key); - } - - Ok(()) -} diff --git a/bin/darkfid/src/task/garbage_collect.rs b/bin/darkfid/src/task/garbage_collect.rs index a3a627595..facf579f8 100644 --- a/bin/darkfid/src/task/garbage_collect.rs +++ b/bin/darkfid/src/task/garbage_collect.rs @@ -95,7 +95,7 @@ pub async fn garbage_collect_task(node: DarkfiNodePtr) -> Result<()> { }; // Verify transaction - match verify_transactions( + let result = verify_transactions( &overlay, next_block_height, node.validator.consensus.module.read().await.target, @@ -103,8 +103,13 @@ pub async fn garbage_collect_task(node: DarkfiNodePtr) -> Result<()> { &mut MerkleTree::new(1), false, ) - .await - { + .await; + + // Drop new trees opened by the forks' overlay + overlay.lock().unwrap().overlay.lock().unwrap().purge_new_trees()?; + + // Check result + match result { Ok(_) => valid = true, Err(Error::TxVerifyFailed(TxVerifyFailed::ErroneousTxs(_))) => { // Remove transaction from fork's mempool diff --git a/bin/darkfid/src/tests/harness.rs b/bin/darkfid/src/tests/harness.rs index 67cca4225..88b6cca0a 100644 --- a/bin/darkfid/src/tests/harness.rs +++ b/bin/darkfid/src/tests/harness.rs @@ -231,7 +231,7 @@ impl Harness { let timestamp = previous.header.timestamp.checked_add(1.into())?; // Generate header - let header = Header::new(previous.hash(), block_height, timestamp, last_nonce); + let header = Header::new(previous.hash(), block_height, last_nonce, timestamp); // Generate the block let mut block = BlockInfo::new_empty(header); @@ -293,16 +293,10 @@ pub async fn generate_node( subscribers.insert("dnet", JsonSubscriber::new("dnet.subscribe_events")); let p2p_handler = DarkfidP2pHandler::init(settings, ex).await?; - let registry = DarkfiMinersRegistry::init(&validator)?; - let node = DarkfiNode::new( - Network::Mainnet, - validator.clone(), - p2p_handler.clone(), - registry, - 50, - subscribers.clone(), - ) - .await?; + let registry = DarkfiMinersRegistry::init(Network::Mainnet, &validator)?; + let node = + DarkfiNode::new(validator.clone(), p2p_handler.clone(), registry, 50, subscribers.clone()) + .await?; p2p_handler.start(ex, &validator, &subscribers).await?; diff --git a/bin/darkfid/src/tests/unproposed_txs.rs b/bin/darkfid/src/tests/unproposed_txs.rs index 2deb3d005..eaa490c41 100644 --- a/bin/darkfid/src/tests/unproposed_txs.rs +++ b/bin/darkfid/src/tests/unproposed_txs.rs @@ -91,14 +91,7 @@ async fn simulate_unproposed_txs( let best_fork = &forks[best_fork_index(&forks)?]; // Retrieve unproposed transactions - let (tx, total_gas_used, _, _) = best_fork - .unproposed_txs( - &best_fork.clone().blockchain, - current_block_height, - validator.consensus.module.read().await.target, - false, - ) - .await?; + let (tx, total_gas_used, _) = best_fork.unproposed_txs(current_block_height, false).await?; Ok((tx.len() as u64, total_gas_used)) } diff --git a/script/research/gg/Cargo.toml b/script/research/gg/Cargo.toml index eeaba5404..78983f819 100644 --- a/script/research/gg/Cargo.toml +++ b/script/research/gg/Cargo.toml @@ -27,5 +27,5 @@ sled-overlay = "0.1.14" smol = "2.0.2" [patch.crates-io] -halo2_proofs = {git="https://github.com/parazyd/halo2", branch="v031"} -halo2_gadgets = {git="https://github.com/parazyd/halo2", branch="v031"} +halo2_proofs = {git="https://github.com/parazyd/halo2", branch="v032"} +halo2_gadgets = {git="https://github.com/parazyd/halo2", branch="v032"} diff --git a/src/blockchain/header_store.rs b/src/blockchain/header_store.rs index 5ef8d86cf..7dc80c8c8 100644 --- a/src/blockchain/header_store.rs +++ b/src/blockchain/header_store.rs @@ -109,7 +109,7 @@ pub struct Header { impl Header { /// Generates a new header with default transactions and state root, /// using DarkFi native Proof of Work data. - pub fn new(previous: HeaderHash, height: u32, timestamp: Timestamp, nonce: u32) -> Self { + pub fn new(previous: HeaderHash, height: u32, nonce: u32, timestamp: Timestamp) -> Self { let version = block_version(height); let transactions_root = MerkleTree::new(1).root(0).unwrap(); let state_root = *EMPTY_HASH; @@ -118,8 +118,8 @@ impl Header { version, previous, height, - timestamp, nonce, + timestamp, transactions_root, state_root, pow_data, @@ -179,8 +179,8 @@ impl Header { } } - /// Create a blockhashing blob from this header - pub fn to_blockhashing_blob(&self) -> Vec { + /// Create a block hashing blob from this header. + pub fn to_block_hashing_blob(&self) -> Vec { // For XMRig, we need to pad the blob so that our nonce ends // up at byte offset 39. let mut blob = vec![0x00, 0x00]; @@ -195,8 +195,8 @@ impl Default for Header { Header::new( HeaderHash::new(blake3::hash(b"Let there be dark!").into()), 0u32, - Timestamp::current_time(), 0u32, + Timestamp::current_time(), ) } } diff --git a/src/blockchain/monero/mod.rs b/src/blockchain/monero/mod.rs index 188eaac65..f2483a0b3 100644 --- a/src/blockchain/monero/mod.rs +++ b/src/blockchain/monero/mod.rs @@ -151,8 +151,8 @@ impl MoneroPowData { (self.merkle_root == merkle_root) && self.coinbase_merkle_proof.check_coinbase_path() } - /// Returns the blockhashing_blob for the Monero block - pub fn to_blockhashing_blob(&self) -> Vec { + /// Returns the block hashing blob for the Monero block. + pub fn to_block_hashing_blob(&self) -> Vec { create_blockhashing_blob(&self.header, &self.merkle_root, u64::from(self.transaction_count)) } diff --git a/src/contract/test-harness/src/money_pow_reward.rs b/src/contract/test-harness/src/money_pow_reward.rs index a739fcd09..9f107e02a 100644 --- a/src/contract/test-harness/src/money_pow_reward.rs +++ b/src/contract/test-harness/src/money_pow_reward.rs @@ -119,8 +119,8 @@ impl TestHarness { let header = Header::new( previous.hash(), previous.header.height + 1, - timestamp, previous.header.nonce, + timestamp, ); // Generate the block diff --git a/src/system/publisher.rs b/src/system/publisher.rs index 993d49a33..b2fce7a61 100644 --- a/src/system/publisher.rs +++ b/src/system/publisher.rs @@ -115,4 +115,28 @@ impl Publisher { } } } + + /// Clear inactive subscribtions. + /// Returns a flag indicating if we have active subscriptions + /// after cleanup. + pub async fn clear_inactive(&self) -> bool { + // Grab a lock over current jobs + let mut subs = self.subs.lock().await; + + // Find inactive subscriptions + let mut dropped = vec![]; + for (sub, channel) in subs.iter() { + if channel.receiver_count() == 0 { + dropped.push(*sub); + } + } + + // Drop inactive subscriptions + for sub in dropped { + subs.remove(&sub); + } + + // Return flag indicating if we still have subscriptions + subs.is_empty() + } } diff --git a/src/validator/consensus.rs b/src/validator/consensus.rs index 7631abe7c..29ecae4b3 100644 --- a/src/validator/consensus.rs +++ b/src/validator/consensus.rs @@ -840,23 +840,17 @@ impl Fork { } /// Auxiliary function to retrieve unproposed valid transactions, - /// along with their total gas used, total paid fees and the overlay - /// used to verify the transactions for further processing. + /// along with their total gas used and total paid fees. /// /// Note: Always remember to purge new trees from the overlay if not needed. pub async fn unproposed_txs( &self, - blockchain: &Blockchain, verifying_block_height: u32, - block_target: u32, verify_fees: bool, - ) -> Result<(Vec, u64, u64, BlockchainOverlayPtr)> { - // Clone forks' overlay - let overlay = self.overlay.lock().unwrap().full_clone()?; - + ) -> Result<(Vec, u64, u64)> { // Check if our mempool is not empty if self.mempool.is_empty() { - return Ok((vec![], 0, 0, overlay)) + return Ok((vec![], 0, 0)) } // Transactions Merkle tree @@ -870,7 +864,7 @@ impl Fork { let mut vks: HashMap<[u8; 32], HashMap> = HashMap::new(); // Grab all current proposals transactions hashes - let proposals_txs = overlay.lock().unwrap().get_blocks_txs_hashes(&self.proposals)?; + let proposals_txs = self.overlay.lock().unwrap().get_blocks_txs_hashes(&self.proposals)?; // Iterate through all pending transactions in the forks' mempool let mut unproposed_txs = vec![]; @@ -882,7 +876,7 @@ impl Fork { // Retrieve the actual unproposed transaction let unproposed_tx = - blockchain.transactions.get_pending(&[*tx], true)?[0].clone().unwrap(); + self.blockchain.transactions.get_pending(&[*tx], true)?[0].clone().unwrap(); // Update the verifying keys map for call in &unproposed_tx.calls { @@ -890,11 +884,11 @@ impl Fork { } // Verify the transaction against current state - overlay.lock().unwrap().checkpoint(); + self.overlay.lock().unwrap().checkpoint(); let gas_data = match verify_transaction( - &overlay, + &self.overlay, verifying_block_height, - block_target, + self.module.target, &unproposed_tx, &mut tree, &mut vks, @@ -905,7 +899,8 @@ impl Fork { Ok(gas_values) => gas_values, Err(e) => { debug!(target: "validator::consensus::unproposed_txs", "Transaction verification failed: {e}"); - overlay.lock().unwrap().revert_to_checkpoint()?; + self.overlay.lock().unwrap().overlay.lock().unwrap().purge_new_trees()?; + self.overlay.lock().unwrap().revert_to_checkpoint()?; continue } }; @@ -922,7 +917,8 @@ impl Fork { target: "validator::consensus::unproposed_txs", "Retrieving transaction {tx} would exceed configured unproposed transaction gas limit: {accumulated_gas_usage} - {BLOCK_GAS_LIMIT}" ); - overlay.lock().unwrap().revert_to_checkpoint()?; + self.overlay.lock().unwrap().overlay.lock().unwrap().purge_new_trees()?; + self.overlay.lock().unwrap().revert_to_checkpoint()?; break } @@ -934,7 +930,7 @@ impl Fork { unproposed_txs.push(unproposed_tx); } - Ok((unproposed_txs, total_gas_used, total_gas_paid, overlay)) + Ok((unproposed_txs, total_gas_used, total_gas_paid)) } /// Auxiliary function to create a full clone using BlockchainOverlay::full_clone. diff --git a/src/validator/pow.rs b/src/validator/pow.rs index ae72f4aa2..c0221da59 100644 --- a/src/validator/pow.rs +++ b/src/validator/pow.rs @@ -319,7 +319,7 @@ impl PoWModule { ); let verification_time = Instant::now(); - let out_hash = vm.calculate_hash(&header.to_blockhashing_blob())?; + let out_hash = vm.calculate_hash(&header.to_block_hashing_blob())?; (BigUint::from_bytes_le(&out_hash), verification_time) } Monero(powdata) => { @@ -332,7 +332,7 @@ impl PoWModule { ); let verification_time = Instant::now(); - let out_hash = vm.calculate_hash(&powdata.to_blockhashing_blob())?; + let out_hash = vm.calculate_hash(&powdata.to_block_hashing_blob())?; (BigUint::from_bytes_le(&out_hash), verification_time) } }; @@ -700,8 +700,8 @@ mod tests { let header = Header::new( previous.hash(), previous.height + 1, - parts[0].parse::().unwrap().into(), 0, + parts[0].parse::().unwrap().into(), ); let difficulty = BigUint::from_str_radix(&parts[1], 10).unwrap(); diff --git a/src/validator/verification.rs b/src/validator/verification.rs index 552acf6cf..7c7cb2eff 100644 --- a/src/validator/verification.rs +++ b/src/validator/verification.rs @@ -1017,6 +1017,7 @@ pub async fn verify_transactions( Err(e) => { warn!(target: "validator::verification::verify_transactions", "Transaction verification failed: {e}"); erroneous_txs.push(tx.clone()); + overlay.lock().unwrap().overlay.lock().unwrap().purge_new_trees()?; overlay.lock().unwrap().revert_to_checkpoint()?; continue } @@ -1036,6 +1037,7 @@ pub async fn verify_transactions( tx.hash() ); erroneous_txs.push(tx.clone()); + overlay.lock().unwrap().overlay.lock().unwrap().purge_new_trees()?; overlay.lock().unwrap().revert_to_checkpoint()?; break }