darkfid: massive overhaul of mining rpc so everything goes through the registry

This commit is contained in:
skoupidi
2025-12-27 19:37:39 +02:00
parent 662da29b59
commit e81e01dd98
18 changed files with 935 additions and 737 deletions

View File

@@ -37,37 +37,55 @@ pub enum RpcError {
ContractStateNotFound = -32201,
ContractStateKeyNotFound = -32202,
// Miner errors
MinerMissingHeader = -32300,
MinerInvalidHeader = -32301,
MinerMissingRecipient = -32302,
MinerInvalidRecipient = -32303,
MinerInvalidRecipientPrefix = -32304,
MinerInvalidSpendHook = -32305,
MinerInvalidUserData = -32306,
MinerMissingNonce = -32307,
MinerInvalidNonce = -32308,
MinerMissingAddress = -32309,
MinerInvalidAddress = -32310,
MinerMissingAuxHash = -32311,
MinerInvalidAuxHash = -32312,
MinerMissingHeight = -32313,
MinerInvalidHeight = -32314,
MinerMissingPrevId = -32315,
MinerInvalidPrevId = -32316,
MinerMissingAuxBlob = -32317,
MinerInvalidAuxBlob = -32318,
MinerMissingBlob = -32319,
MinerInvalidBlob = -32320,
MinerMissingMerkleProof = -32321,
MinerInvalidMerkleProof = -32322,
MinerMissingPath = -32323,
MinerInvalidPath = -32324,
MinerMissingSeedHash = -32325,
MinerInvalidSeedHash = -32326,
MinerMerkleProofConstructionFailed = -32327,
MinerMoneroPowDataConstructionFailed = -32328,
MinerUnknownJob = -32329,
// Miner configuration errors
MinerInvalidWalletConfig = -32301,
MinerInvalidRecipient = -32302,
MinerInvalidRecipientPrefix = -32303,
MinerInvalidSpendHook = -32304,
MinerInvalidUserData = -32305,
// Stratum errors
MinerMissingLogin = -32306,
MinerInvalidLogin = -32307,
MinerMissingPassword = -32308,
MinerInvalidPassword = -32309,
MinerMissingAgent = -32310,
MinerInvalidAgent = -32311,
MinerMissingAlgo = -32312,
MinerInvalidAlgo = -32313,
MinerRandomXNotSupported = -32314,
MinerMissingClientId = -32315,
MinerInvalidClientId = -32316,
MinerUnknownClient = -32317,
MinerMissingJobId = -32318,
MinerInvalidJobId = -32319,
MinerUnknownJob = -32320,
MinerMissingNonce = -32321,
MinerInvalidNonce = -32322,
MinerMissingResult = -32323,
MinerInvalidResult = -32324,
// Merge mining errors
MinerMissingAddress = -32325,
MinerInvalidAddress = -32326,
MinerMissingAuxHash = -32327,
MinerInvalidAuxHash = -32328,
MinerMissingHeight = -32329,
MinerInvalidHeight = -32330,
MinerMissingPrevId = -32331,
MinerInvalidPrevId = -32332,
MinerMissingAuxBlob = -32333,
MinerInvalidAuxBlob = -32334,
MinerMissingBlob = -32335,
MinerInvalidBlob = -32336,
MinerMissingMerkleProof = -32337,
MinerInvalidMerkleProof = -32338,
MinerMissingPath = -32339,
MinerInvalidPath = -32340,
MinerMissingSeedHash = -32341,
MinerInvalidSeedHash = -32342,
MinerMerkleProofConstructionFailed = -32343,
MinerMoneroPowDataConstructionFailed = -32344,
}
fn to_tuple(e: RpcError) -> (i32, String) {
@@ -75,27 +93,50 @@ fn to_tuple(e: RpcError) -> (i32, String) {
// Transaction-related errors
RpcError::TxSimulationFail => "Failed simulating transaction state change",
RpcError::TxGasCalculationFail => "Failed to calculate transaction's gas",
// State-related errors
RpcError::NotSynced => "Blockchain is not synced",
RpcError::UnknownBlockHeight => "Did not find block height",
// Parsing errors
RpcError::ParseError => "Parse error",
// Contract-related errors
RpcError::ContractZkasDbNotFound => "zkas database not found for given contract",
RpcError::ContractStateNotFound => "Records not found for given contract state",
RpcError::ContractStateKeyNotFound => "Value not found for given contract state key",
// Miner errors
RpcError::MinerMissingHeader => "Request is missing the Header hash",
RpcError::MinerInvalidHeader => "Request Header hash is invalid",
RpcError::MinerMissingRecipient => "Request is missing the recipient wallet address",
// Miner configuration errors
RpcError::MinerInvalidWalletConfig => "Request wallet configuration is invalid",
RpcError::MinerInvalidRecipient => "Request recipient wallet address is invalid",
RpcError::MinerInvalidRecipientPrefix => {
"Request recipient wallet address prefix is invalid"
}
RpcError::MinerInvalidSpendHook => "Request spend hook is invalid",
RpcError::MinerInvalidUserData => "Request user data is invalid",
RpcError::MinerMissingNonce => "Request is missing the Header nonce",
RpcError::MinerInvalidNonce => "Request Header nonce is invalid",
// Stratum errors
RpcError::MinerMissingLogin => "Request is missing the login",
RpcError::MinerInvalidLogin => "Request login is invalid",
RpcError::MinerMissingPassword => "Request is missing the password",
RpcError::MinerInvalidPassword => "Request password is invalid",
RpcError::MinerMissingAgent => "Request is missing the agent",
RpcError::MinerInvalidAgent => "Request agent is invalid",
RpcError::MinerMissingAlgo => "Request is missing the algo",
RpcError::MinerInvalidAlgo => "Request algo is invalid",
RpcError::MinerRandomXNotSupported => "Request doesn't support rx/0",
RpcError::MinerMissingClientId => "Request is missing the client ID",
RpcError::MinerInvalidClientId => "Request client ID is invalid",
RpcError::MinerUnknownClient => "Request client is unknown",
RpcError::MinerMissingJobId => "Request is missing the job ID",
RpcError::MinerInvalidJobId => "Request job ID is invalid",
RpcError::MinerUnknownJob => "Request job is unknown",
RpcError::MinerMissingNonce => "Request is missing the nonce",
RpcError::MinerInvalidNonce => "Request nonce is invalid",
RpcError::MinerMissingResult => "Request is missing the result",
RpcError::MinerInvalidResult => "Request nonce is result",
// Merge mining errors
RpcError::MinerMissingAddress => {
"Request is missing the recipient wallet address configuration"
}
@@ -122,7 +163,6 @@ fn to_tuple(e: RpcError) -> (i32, String) {
"failed constructing aux chain Merkle proof"
}
RpcError::MinerMoneroPowDataConstructionFailed => "Failed constructing Monero PoW data",
RpcError::MinerUnknownJob => "Request job is unknown",
};
(e as i32, msg.to_string())

View File

@@ -57,18 +57,13 @@ use proto::{DarkfidP2pHandler, DarkfidP2pHandlerPtr};
/// Miners registry
mod registry;
use registry::{
model::{BlockTemplate, MiningJobs},
DarkfiMinersRegistry, DarkfiMinersRegistryPtr,
};
use registry::{DarkfiMinersRegistry, DarkfiMinersRegistryPtr};
/// Atomic pointer to the DarkFi node
pub type DarkfiNodePtr = Arc<DarkfiNode>;
/// Structure representing a DarkFi node
pub struct DarkfiNode {
/// Blockchain network
network: Network,
/// Validator(node) pointer
validator: ValidatorPtr,
/// P2P network protocols handler
@@ -85,7 +80,6 @@ pub struct DarkfiNode {
impl DarkfiNode {
pub async fn new(
network: Network,
validator: ValidatorPtr,
p2p_handler: DarkfidP2pHandlerPtr,
registry: DarkfiMinersRegistryPtr,
@@ -93,7 +87,6 @@ impl DarkfiNode {
subscribers: HashMap<&'static str, JsonSubscriber>,
) -> Result<DarkfiNodePtr> {
Ok(Arc::new(Self {
network,
validator,
p2p_handler,
registry,
@@ -140,7 +133,7 @@ impl Darkfid {
let p2p_handler = DarkfidP2pHandler::init(net_settings, ex).await?;
// Initialize the miners registry
let registry = DarkfiMinersRegistry::init(&validator)?;
let registry = DarkfiMinersRegistry::init(network, &validator)?;
// Grab blockchain network configured transactions batch size for garbage collection
let txs_batch_size = match txs_batch_size {
@@ -163,8 +156,7 @@ impl Darkfid {
// Initialize node
let node =
DarkfiNode::new(network, validator, p2p_handler, registry, txs_batch_size, subscribers)
.await?;
DarkfiNode::new(validator, p2p_handler, registry, txs_batch_size, subscribers).await?;
// Generate the background tasks
let dnet_task = StoppableTask::new();

View File

@@ -21,41 +21,58 @@ use std::{
sync::Arc,
};
use smol::lock::Mutex;
use smol::lock::{Mutex, RwLock};
use tinyjson::JsonValue;
use tracing::{error, info};
use darkfi::{
blockchain::BlockInfo,
rpc::{
jsonrpc::JsonSubscriber,
server::{listen_and_serve, RequestHandler},
settings::RpcSettings,
},
system::{ExecutorPtr, StoppableTask, StoppableTaskPtr},
validator::ValidatorPtr,
util::encoding::base64,
validator::{consensus::Proposal, ValidatorPtr},
Error, Result,
};
use darkfi_sdk::crypto::{keypair::Network, pasta_prelude::PrimeField};
use darkfi_serial::serialize_async;
use crate::{
proto::{DarkfidP2pHandlerPtr, ProposalMessage},
rpc::{rpc_stratum::StratumRpcHandler, rpc_xmr::MmRpcHandler},
DarkfiNode, DarkfiNodePtr,
};
/// Block related structures
pub mod model;
use model::{BlockTemplate, MiningJobs, MmBlockTemplate, PowRewardV1Zk};
use model::{
generate_next_block_template, BlockTemplate, MinerClient, MinerRewardsRecipientConfig,
PowRewardV1Zk,
};
/// Atomic pointer to the DarkFi node miners registry.
pub type DarkfiMinersRegistryPtr = Arc<DarkfiMinersRegistry>;
/// DarkFi node miners registry.
pub struct DarkfiMinersRegistry {
/// Blockchain network
pub network: Network,
/// PowRewardV1 ZK data
pub powrewardv1_zk: PowRewardV1Zk,
/// Native mining block templates
pub blocktemplates: Mutex<HashMap<Vec<u8>, BlockTemplate>>,
/// Active native mining jobs per connection ID
pub mining_jobs: Mutex<HashMap<[u8; 32], MiningJobs>>,
/// Merge mining block templates
pub mm_blocktemplates: Mutex<HashMap<Vec<u8>, MmBlockTemplate>>,
/// Mining block templates of each wallet config
pub block_templates: RwLock<HashMap<String, BlockTemplate>>,
/// Active mining jobs mapped to the wallet template they
/// represent. For native jobs the key(job id) is the hex
/// encoded header hash, while for merge mining jobs it's
/// the header template hash.
pub jobs: RwLock<HashMap<String, String>>,
/// Active native clients mapped to their information.
pub clients: RwLock<HashMap<String, MinerClient>>,
/// Submission lock so we can queue up submissions process
pub submit_lock: RwLock<()>,
/// Stratum JSON-RPC background task
stratum_rpc_task: StoppableTaskPtr,
/// Stratum JSON-RPC connection tracker
@@ -68,7 +85,7 @@ pub struct DarkfiMinersRegistry {
impl DarkfiMinersRegistry {
/// Initialize a DarkFi node miners registry.
pub fn init(validator: &ValidatorPtr) -> Result<DarkfiMinersRegistryPtr> {
pub fn init(network: Network, validator: &ValidatorPtr) -> Result<DarkfiMinersRegistryPtr> {
info!(
target: "darkfid::registry::mod::DarkfiMinersRegistry::init",
"Initializing a new DarkFi node miners registry..."
@@ -93,10 +110,12 @@ impl DarkfiMinersRegistry {
);
Ok(Arc::new(Self {
network,
powrewardv1_zk,
blocktemplates: Mutex::new(HashMap::new()),
mining_jobs: Mutex::new(HashMap::new()),
mm_blocktemplates: Mutex::new(HashMap::new()),
block_templates: RwLock::new(HashMap::new()),
jobs: RwLock::new(HashMap::new()),
clients: RwLock::new(HashMap::new()),
submit_lock: RwLock::new(()),
stratum_rpc_task,
stratum_rpc_connections,
mm_rpc_task,
@@ -190,4 +209,283 @@ impl DarkfiMinersRegistry {
info!(target: "darkfid::registry::mod::DarkfiMinersRegistry::stop", "DarkFi node miners registry terminated successfully!");
}
/// Create a registry record for provided wallet config. If the
/// record already exists return its template, otherwise create its
/// current template based on provided validator state.
async fn create_template(
&self,
validator: &ValidatorPtr,
wallet: &String,
config: &MinerRewardsRecipientConfig,
) -> Result<BlockTemplate> {
// Grab a lock over current templates
let mut block_templates = self.block_templates.write().await;
// Check if a template already exists for this wallet
if let Some(block_template) = block_templates.get(wallet) {
return Ok(block_template.clone())
}
// Grab validator best current fork
let mut extended_fork = validator.best_current_fork().await?;
// Generate the next block template
let result = generate_next_block_template(
&mut extended_fork,
config,
&self.powrewardv1_zk.zkbin,
&self.powrewardv1_zk.provingkey,
validator.verify_fees,
)
.await;
// Drop new trees opened by the forks' overlay
extended_fork.overlay.lock().unwrap().overlay.lock().unwrap().purge_new_trees()?;
// Check result
let block_template = result?;
// Create the new registry record
block_templates.insert(wallet.clone(), block_template.clone());
// Print the new template wallet information
let recipient_str = format!("{}", config.recipient);
let spend_hook_str = match config.spend_hook {
Some(spend_hook) => format!("{spend_hook}"),
None => String::from("-"),
};
let user_data_str = match config.user_data {
Some(user_data) => bs58::encode(user_data.to_repr()).into_string(),
None => String::from("-"),
};
info!(target: "darkfid::registry::mod::DarkfiMinersRegistry::create_template",
"Created new block template for wallet: address={recipient_str}, spend_hook={spend_hook_str}, user_data={user_data_str}",
);
Ok(block_template)
}
/// Register a new miner and create its job.
pub async fn register_miner(
&self,
validator: &ValidatorPtr,
wallet: &String,
config: &MinerRewardsRecipientConfig,
) -> Result<(String, BlockTemplate, JsonSubscriber)> {
// Grab a lock over current jobs and clients
let mut jobs = self.jobs.write().await;
let mut clients = self.clients.write().await;
// Create wallet template
let block_template = self.create_template(validator, wallet, config).await?;
// Grab the hex encoded block hash and create the job record
let block_hash = hex::encode(block_template.block.header.hash().inner()).to_string();
jobs.insert(block_hash.clone(), wallet.clone());
// Create the client record
let (client_id, client) = MinerClient::new(wallet, config, &block_hash);
let publisher = client.publisher.clone();
clients.insert(client_id.clone(), client);
Ok((client_id, block_template, publisher))
}
/// Register a new merge miner and create its job.
pub async fn register_merge_miner(
&self,
validator: &ValidatorPtr,
wallet: &String,
config: &MinerRewardsRecipientConfig,
) -> Result<(String, f64)> {
// Grab a lock over current jobs
let mut jobs = self.jobs.write().await;
// Create wallet template
let block_template = self.create_template(validator, wallet, config).await?;
// Grab the block template hash and its difficulty, and then
// create the job record.
let block_template_hash = block_template.block.header.template_hash().as_string();
let difficulty = block_template.difficulty;
jobs.insert(block_template_hash.clone(), wallet.clone());
Ok((block_template_hash, difficulty))
}
/// Submit provided block to the provided node.
pub async fn submit(
&self,
validator: &ValidatorPtr,
subscribers: &HashMap<&'static str, JsonSubscriber>,
p2p_handler: &DarkfidP2pHandlerPtr,
block: BlockInfo,
) -> Result<()> {
info!(
target: "darkfid::registry::mod::DarkfiMinersRegistry::submit",
"Proposing new block to network",
);
let proposal = Proposal::new(block);
validator.append_proposal(&proposal).await?;
let proposals_sub = subscribers.get("proposals").unwrap();
let enc_prop = JsonValue::String(base64::encode(&serialize_async(&proposal).await));
proposals_sub.notify(vec![enc_prop].into()).await;
info!(
target: "darkfid::registry::mod::DarkfiMinersRegistry::submit",
"Broadcasting new block to network",
);
let message = ProposalMessage(proposal);
p2p_handler.p2p.broadcast(&message).await;
Ok(())
}
/// Refresh outdated jobs in the registry based on provided
/// validator state.
pub async fn refresh(&self, validator: &ValidatorPtr) -> Result<()> {
// Grab locks
let submit_lock = self.submit_lock.write().await;
let mut clients = self.clients.write().await;
let mut jobs = self.jobs.write().await;
let mut block_templates = self.block_templates.write().await;
// Find inactive clients
let mut dropped_clients = vec![];
let mut active_clients_jobs = vec![];
for (client_id, client) in clients.iter() {
if client.publisher.publisher.clear_inactive().await {
dropped_clients.push(client_id.clone());
continue
}
active_clients_jobs.push(client.job.clone());
}
// Drop inactive clients and their jobs
for client_id in dropped_clients {
// Its safe to unwrap here since the client key is from the
// previous loop.
let client = clients.remove(&client_id).unwrap();
let wallet = jobs.remove(&client.job).unwrap();
block_templates.remove(&wallet);
}
// Return if no clients exists. Merge miners will create a new
// template and job on next poll.
if clients.is_empty() {
*jobs = HashMap::new();
*block_templates = HashMap::new();
return Ok(())
}
// Find inactive jobs (not referenced by clients)
let mut dropped_jobs = vec![];
let mut active_wallets = vec![];
for (job, wallet) in jobs.iter() {
if !active_clients_jobs.contains(job) {
dropped_jobs.push(job.clone());
continue
}
active_wallets.push(wallet.clone());
}
// Drop inactive jobs
for job in dropped_jobs {
jobs.remove(&job);
}
// Return if no jobs exists. Merge miners will create a new
// template and job on next poll.
if jobs.is_empty() {
*block_templates = HashMap::new();
return Ok(())
}
// Find inactive wallets templates
let mut dropped_wallets = vec![];
for wallet in block_templates.keys() {
if !active_wallets.contains(wallet) {
dropped_wallets.push(wallet.clone());
}
}
// Drop inactive wallets templates
for wallet in dropped_wallets {
block_templates.remove(&wallet);
}
// Return if no wallets templates exists. Merge miners will
// create a new template and job on next poll.
if block_templates.is_empty() {
return Ok(())
}
// Grab validator best current fork
let extended_fork = validator.best_current_fork().await?;
// Iterate over active clients to refresh their jobs
for (_, client) in clients.iter_mut() {
// Clone the fork so each client generates over a new one
let mut extended_fork = extended_fork.full_clone()?;
// Drop its current job. Its safe to unwrap here since we
// know the job exists.
let wallet = jobs.remove(&client.job).unwrap();
// Drop its current template. Its safe to unwrap here since
// we know the template exists.
block_templates.remove(&wallet);
// Generate the next block template
let result = generate_next_block_template(
&mut extended_fork,
&client.config,
&self.powrewardv1_zk.zkbin,
&self.powrewardv1_zk.provingkey,
validator.verify_fees,
)
.await;
// Drop new trees opened by the forks' overlay
extended_fork.overlay.lock().unwrap().overlay.lock().unwrap().purge_new_trees()?;
// Check result
let block_template = result?;
// Print the updated template wallet information
let recipient_str = format!("{}", client.config.recipient);
let spend_hook_str = match client.config.spend_hook {
Some(spend_hook) => format!("{spend_hook}"),
None => String::from("-"),
};
let user_data_str = match client.config.user_data {
Some(user_data) => bs58::encode(user_data.to_repr()).into_string(),
None => String::from("-"),
};
info!(target: "darkfid::registry::mod::DarkfiMinersRegistry::create_template",
"Updated block template for wallet: address={recipient_str}, spend_hook={spend_hook_str}, user_data={user_data_str}",
);
// Create the new job notification
let (job, notification) = block_template.job_notification();
// Create the new registry records
block_templates.insert(wallet.clone(), block_template);
jobs.insert(job.clone(), wallet);
// Update the client record
client.job = job;
// Push job notification to subscriber
client.publisher.notify(notification).await;
}
// Release all locks
drop(block_templates);
drop(jobs);
drop(submit_lock);
Ok(())
}
}

View File

@@ -16,16 +16,20 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
use std::collections::HashMap;
use std::{collections::HashMap, str::FromStr};
use num_bigint::BigUint;
use rand::rngs::OsRng;
use tinyjson::JsonValue;
use tracing::info;
use darkfi::{
blockchain::{BlockInfo, Header},
blockchain::{BlockInfo, Header, HeaderHash},
rpc::jsonrpc::JsonSubscriber,
tx::{ContractCallLeaf, Transaction, TransactionBuilder},
util::time::Timestamp,
util::{
encoding::base64,
time::{NanoTimestamp, Timestamp},
},
validator::{consensus::Fork, verification::apply_producer_transaction, ValidatorPtr},
zk::{empty_witnesses, ProvingKey, ZkCircuit},
zkas::ZkBinary,
@@ -36,15 +40,19 @@ use darkfi_money_contract::{
};
use darkfi_sdk::{
crypto::{
keypair::{Address, Keypair, SecretKey},
keypair::{Address, Keypair, Network, SecretKey},
pasta_prelude::PrimeField,
FuncId, MerkleTree, MONEY_CONTRACT_ID,
},
pasta::pallas,
ContractCall,
};
use darkfi_serial::Encodable;
use darkfi_serial::{deserialize_async, Encodable};
use crate::error::RpcError;
/// Auxiliary structure representing node miner rewards recipient configuration.
#[derive(Debug, Clone)]
pub struct MinerRewardsRecipientConfig {
/// Wallet mining address to receive mining rewards
pub recipient: Address,
@@ -55,47 +63,128 @@ pub struct MinerRewardsRecipientConfig {
pub user_data: Option<pallas::Base>,
}
/// Auxiliary structure representing a block template for native
/// mining.
impl MinerRewardsRecipientConfig {
pub fn new(
recipient: Address,
spend_hook: Option<FuncId>,
user_data: Option<pallas::Base>,
) -> Self {
Self { recipient, spend_hook, user_data }
}
pub async fn from_base64(
network: &Network,
encoded_address: &str,
) -> std::result::Result<Self, RpcError> {
let Some(address_bytes) = base64::decode(encoded_address) else {
return Err(RpcError::MinerInvalidWalletConfig)
};
let Ok((recipient, spend_hook, user_data)) =
deserialize_async::<(String, Option<String>, Option<String>)>(&address_bytes).await
else {
return Err(RpcError::MinerInvalidWalletConfig)
};
let Ok(recipient) = Address::from_str(&recipient) else {
return Err(RpcError::MinerInvalidRecipient)
};
if recipient.network() != *network {
return Err(RpcError::MinerInvalidRecipientPrefix)
}
let spend_hook = match spend_hook {
Some(s) => match FuncId::from_str(&s) {
Ok(s) => Some(s),
Err(_) => return Err(RpcError::MinerInvalidSpendHook),
},
None => None,
};
let user_data: Option<pallas::Base> = match user_data {
Some(u) => {
let Ok(bytes) = bs58::decode(&u).into_vec() else {
return Err(RpcError::MinerInvalidUserData)
};
let bytes: [u8; 32] = match bytes.try_into() {
Ok(b) => b,
Err(_) => return Err(RpcError::MinerInvalidUserData),
};
match pallas::Base::from_repr(bytes).into() {
Some(v) => Some(v),
None => return Err(RpcError::MinerInvalidUserData),
}
}
None => None,
};
Ok(Self { recipient, spend_hook, user_data })
}
}
/// Auxiliary structure representing a block template for mining.
#[derive(Debug, Clone)]
pub struct BlockTemplate {
/// Block that is being mined
pub block: BlockInfo,
/// RandomX init key
pub randomx_key: [u8; 32],
/// Block mining target
pub target: BigUint,
/// Ephemeral signing secret for this blocktemplate
pub secret: SecretKey,
}
/// Auxiliary structure representing a block template for merge mining.
#[derive(Debug, Clone)]
pub struct MmBlockTemplate {
/// Block that is being mined
pub block: BlockInfo,
/// RandomX current key
pub randomx_key: HeaderHash,
/// Compacted block mining target
pub target: Vec<u8>,
/// Block difficulty
pub difficulty: f64,
/// Ephemeral signing secret for this blocktemplate
pub secret: SecretKey,
/// Flag indicating if this template has been submitted
pub submitted: bool,
}
/// Storage for active mining jobs. These are stored per connection ID.
/// A new map will be made for each stratum login.
#[derive(Debug, Default)]
pub struct MiningJobs(HashMap<[u8; 32], BlockTemplate>);
impl MiningJobs {
pub fn insert(&mut self, job_id: [u8; 32], blocktemplate: BlockTemplate) {
self.0.insert(job_id, blocktemplate);
impl BlockTemplate {
fn new(
block: BlockInfo,
randomx_key: HeaderHash,
target: Vec<u8>,
difficulty: f64,
secret: SecretKey,
) -> Self {
Self { block, randomx_key, target, difficulty, secret, submitted: false }
}
pub fn get(&self, job_id: &[u8; 32]) -> Option<&BlockTemplate> {
self.0.get(job_id)
pub fn job_notification(&self) -> (String, JsonValue) {
let block_hash = hex::encode(self.block.header.hash().inner()).to_string();
let job = HashMap::from([
(
"blob".to_string(),
JsonValue::from(hex::encode(self.block.header.to_block_hashing_blob()).to_string()),
),
("job_id".to_string(), JsonValue::from(block_hash.clone())),
("height".to_string(), JsonValue::from(self.block.header.height as f64)),
("target".to_string(), JsonValue::from(hex::encode(&self.target))),
("algo".to_string(), JsonValue::from(String::from("rx/0"))),
(
"seed_hash".to_string(),
JsonValue::from(hex::encode(self.randomx_key.inner()).to_string()),
),
]);
(block_hash, JsonValue::from(job))
}
}
pub fn get_mut(&mut self, job_id: &[u8; 32]) -> Option<&mut BlockTemplate> {
self.0.get_mut(job_id)
/// Auxiliary structure representing a native miner client record.
#[derive(Debug, Clone)]
pub struct MinerClient {
/// Miner recipient configuration
pub config: MinerRewardsRecipientConfig,
/// Current mining job
pub job: String,
/// Connection publisher to push new jobs
pub publisher: JsonSubscriber,
}
impl MinerClient {
pub fn new(wallet: &str, config: &MinerRewardsRecipientConfig, job: &str) -> (String, Self) {
let mut hasher = blake3::Hasher::new();
hasher.update(wallet.as_bytes());
hasher.update(&NanoTimestamp::current_time().inner().to_le_bytes());
let client_id = hex::encode(hasher.finalize().as_bytes()).to_string();
let publisher = JsonSubscriber::new("job");
(client_id, Self { config: config.clone(), job: job.to_owned(), publisher })
}
}
@@ -125,25 +214,32 @@ impl PowRewardV1Zk {
}
}
/// Auxiliary function to generate next block in an atomic manner.
pub async fn generate_next_block(
/// Auxiliary function to generate next mining block template, in an
/// atomic manner.
pub async fn generate_next_block_template(
extended_fork: &mut Fork,
recipient_config: &MinerRewardsRecipientConfig,
zkbin: &ZkBinary,
pk: &ProvingKey,
block_target: u32,
verify_fees: bool,
) -> Result<(BigUint, BlockInfo, SecretKey)> {
) -> Result<BlockTemplate> {
// Grab forks' last block proposal(previous)
let last_proposal = extended_fork.last_proposal()?;
// Grab forks' next block height
let next_block_height = last_proposal.block.header.height + 1;
// Grab forks' next mine target and difficulty
let (target, difficulty) = extended_fork.module.next_mine_target_and_difficulty()?;
// The target should be compacted to 8 bytes little-endian.
let target = target.to_bytes_le()[..8].to_vec();
// Cast difficulty to f64. This should always work.
let difficulty = difficulty.to_string().parse()?;
// Grab forks' unproposed transactions
let (mut txs, _, fees, overlay) = extended_fork
.unproposed_txs(&extended_fork.blockchain, next_block_height, block_target, verify_fees)
.await?;
let (mut txs, _, fees) = extended_fork.unproposed_txs(next_block_height, verify_fees).await?;
// Create an ephemeral block signing keypair. Its secret key will
// be stored in the PowReward transaction's encrypted note for
@@ -161,11 +257,11 @@ pub async fn generate_next_block(
pk,
)?;
// Apply producer transaction in the overlay
// Apply producer transaction in the forks' overlay
let _ = apply_producer_transaction(
&overlay,
&extended_fork.overlay,
next_block_height,
block_target,
extended_fork.module.target,
&tx,
&mut MerkleTree::new(1),
)
@@ -173,8 +269,10 @@ pub async fn generate_next_block(
txs.push(tx);
// Grab the updated contracts states root
let diff = overlay.lock().unwrap().overlay.lock().unwrap().diff(&extended_fork.diffs)?;
overlay
let diff =
extended_fork.overlay.lock().unwrap().overlay.lock().unwrap().diff(&extended_fork.diffs)?;
extended_fork
.overlay
.lock()
.unwrap()
.contracts
@@ -183,12 +281,9 @@ pub async fn generate_next_block(
return Err(Error::ContractsStatesRootNotFoundError);
};
// Drop new trees opened by the unproposed transactions overlay
overlay.lock().unwrap().overlay.lock().unwrap().purge_new_trees()?;
// Generate the new header
let mut header =
Header::new(last_proposal.hash, next_block_height, Timestamp::current_time(), 0);
Header::new(last_proposal.hash, next_block_height, 0, Timestamp::current_time());
header.state_root = state_root;
// Generate the block
@@ -197,10 +292,13 @@ pub async fn generate_next_block(
// Add transactions to the block
next_block.append_txs(txs);
// Grab the next mine target
let target = extended_fork.module.next_mine_target()?;
Ok((target, next_block, block_signing_keypair.secret))
Ok(BlockTemplate::new(
next_block,
extended_fork.module.darkfi_rx_keys.0,
target,
difficulty,
block_signing_keypair.secret,
))
}
/// Auxiliary function to generate a Money::PoWReward transaction.

View File

@@ -16,10 +16,7 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
use std::{
collections::{HashMap, HashSet},
str::FromStr,
};
use std::collections::{HashMap, HashSet};
use async_trait::async_trait;
use smol::lock::MutexGuard;
@@ -34,17 +31,9 @@ use darkfi::{
server::RequestHandler,
},
system::StoppableTaskPtr,
util::{encoding::base64, time::Timestamp},
validator::consensus::Proposal,
};
use darkfi_sdk::crypto::keypair::Address;
use darkfi_serial::serialize_async;
use crate::{
proto::ProposalMessage,
registry::model::{generate_next_block, MinerRewardsRecipientConfig},
BlockTemplate, DarkfiNode, MiningJobs,
};
use crate::{registry::model::MinerRewardsRecipientConfig, server_error, DarkfiNode, RpcError};
// https://github.com/xmrig/xmrig-proxy/blob/master/doc/STRATUM.md
// https://github.com/xmrig/xmrig-proxy/blob/master/doc/STRATUM_EXT.md
@@ -74,77 +63,92 @@ impl RequestHandler<StratumRpcHandler> for DarkfiNode {
}
}
// TODO: We often just return InvalidParams. These should be cleaned up
// and more verbose.
// TODO: The jobs storing method is not the most ideal. Think of a better one.
// `self.mining_jobs`
// Random testnet address for reference:
// fUfG4WhbHP5C2MhYW3FHctVqi2jfXHamoQeU8KiirKVtoMBEUejkwq9F
impl DarkfiNode {
// RPCAPI:
// Miner sends a `login` request after establishing connection
// in order to authorize.
// Register a new mining client to the registry and generate a new
// job.
//
// The server will return a job along with an id.
// ```
// "job": {
// "blob": 070780e6b9d...4d62fa6c77e76c3001",
// "job_id": "q7PLUPL25UV0z5Ij14IyMk8htXbj",
// "target": "b88d0600",
// "algo": "rx/0"
// }
// ```
// **Request:**
// * `login` : A base-64 encoded wallet address mining configuration
// * `pass` : Unused client password field. Expects default "x" value.
// * `agent` : Client agent description
// * `algo` : Client supported mining algorithms
//
// --> {"jsonrpc":"2.0", "method": "login", "id": 1, "params": {"login": "receiving_address", "pass": "x", "agent": "XMRig", "algo": ["rx/0"]}}
// **Response:**
// * `id` : Registry client ID
// * `job` : The generated mining job
// * `status` : Response status
//
// The generated mining job consists of the following fields:
// * `blob` : The hex encoded block hashing blob of the job block
// * `job_id` : Registry mining job ID
// * `height` : The job block height
// * `target` : Current mining target
// * `algo` : The mining algorithm - RandomX
// * `seed_hash` : Current RandomX key
//
// --> {"jsonrpc":"2.0", "method": "login", "id": 1, "params": {"login": "MINING_CONFIG", "pass": "", "agent": "XMRig", "algo": ["rx/0"]}}
// <-- {"jsonrpc":"2.0", "id": 1, "result": {"id": "1be0b7b6-b15a-47be-a17d-46b2911cf7d0", "job": { ... }, "status": "OK"}}
pub async fn stratum_login(&self, id: u16, params: JsonValue) -> JsonResult {
// TODO: Fail when not synced
// Check if node is synced before responding
if !*self.validator.synced.read().await {
return server_error(RpcError::NotSynced, id, None)
}
// Parse request params
let Some(params) = params.get::<HashMap<String, JsonValue>>() else {
return JsonError::new(InvalidParams, None, id).into()
};
let Some(login) = params.get("login") else {
return JsonError::new(InvalidParams, Some("Missing 'login'".to_string()), id).into()
};
let Some(login) = login.get::<String>() else {
if params.len() != 4 {
return JsonError::new(InvalidParams, None, id).into()
}
// Parse login mining configuration
let Some(wallet) = params.get("login") else {
return server_error(RpcError::MinerMissingLogin, id, None)
};
let Some(wallet) = wallet.get::<String>() else {
return server_error(RpcError::MinerInvalidLogin, id, None)
};
let config =
match MinerRewardsRecipientConfig::from_base64(&self.registry.network, wallet).await {
Ok(c) => c,
Err(e) => return server_error(e, id, None),
};
// Parse password
let Some(pass) = params.get("pass") else {
return JsonError::new(InvalidParams, Some("Missing 'pass'".to_string()), id).into()
return server_error(RpcError::MinerMissingPassword, id, None)
};
let Some(_pass) = pass.get::<String>() else {
return JsonError::new(InvalidParams, None, id).into()
let Some(pass) = pass.get::<String>() else {
return server_error(RpcError::MinerInvalidPassword, id, None)
};
if pass != "x" {
return server_error(RpcError::MinerInvalidPassword, id, None)
}
// Parse agent
let Some(agent) = params.get("agent") else {
return JsonError::new(InvalidParams, Some("Missing 'agent'".to_string()), id).into()
return server_error(RpcError::MinerMissingAgent, id, None)
};
let Some(agent) = agent.get::<String>() else {
return JsonError::new(InvalidParams, None, id).into()
};
let Some(algo) = params.get("algo") else {
return JsonError::new(InvalidParams, Some("Missing 'algo'".to_string()), id).into()
};
let Some(algo) = algo.get::<Vec<JsonValue>>() else {
return JsonError::new(InvalidParams, None, id).into()
return server_error(RpcError::MinerInvalidAgent, id, None)
};
// Try to parse `login` as valid address. This will be the
// block reward recipient.
let Ok(address) = Address::from_str(login) else {
return JsonError::new(InvalidParams, Some("Invalid address".to_string()), id).into()
// Parge algo
let Some(algo) = params.get("algo") else {
return server_error(RpcError::MinerMissingAlgo, id, None)
};
let Some(algo) = algo.get::<Vec<JsonValue>>() else {
return server_error(RpcError::MinerInvalidAlgo, id, None)
};
if address.network() != self.network {
return JsonError::new(InvalidParams, Some("Invalid address prefix".to_string()), id)
.into()
}
// Iterate through `algo` to see if "rx/0" is supported.
// rx/0 is RandomX.
let mut found_rx0 = false;
for i in algo {
let Some(algo) = i.get::<String>() else {
return JsonError::new(InvalidParams, None, id).into()
return server_error(RpcError::MinerInvalidAlgo, id, None)
};
if algo == "rx/0" {
found_rx0 = true;
@@ -152,231 +156,184 @@ impl DarkfiNode {
}
}
if !found_rx0 {
return JsonError::new(InvalidParams, Some("rx/0 not supported".to_string()), id).into()
return server_error(RpcError::MinerRandomXNotSupported, id, None)
}
info!("[STRATUM] Got login from {} ({})", address, agent);
let conn_id = {
let mut hasher = blake3::Hasher::new();
hasher.update(&address.to_string().into_bytes());
hasher.update(&Timestamp::current_time().inner().to_le_bytes());
*hasher.finalize().as_bytes()
};
// Register the new miner
info!(target: "darkfid::rpc::rpc_stratum::stratum_login","[RPC-STRATUM] Got login from {wallet} ({agent})");
let (client_id, block_template, publisher) =
match self.registry.register_miner(&self.validator, wallet, &config).await {
Ok(p) => p,
Err(e) => {
error!(
target: "darkfid::rpc::rpc_stratum::stratum_login",
"[RPC-STRATUM] Failed to register miner: {e}",
);
return JsonError::new(ErrorCode::InternalError, None, id).into()
}
};
// Now we should register this login, and create a blocktemplate and
// a job for them.
// TODO: We also have to spawn the notification task that will send
// JSONRPC notifications to this connection when a new job is available.
// We'll clear any existing jobs for this login.
let mut mining_jobs = self.registry.mining_jobs.lock().await;
mining_jobs.insert(conn_id, MiningJobs::default());
// Find applicable chain fork
let mut extended_fork = match self.validator.best_current_fork().await {
Ok(f) => f,
Err(e) => {
error!(
target: "darkfid::rpc_stratum::stratum_login",
"[STRATUM] Finding best fork index failed: {e}",
);
return JsonError::new(ErrorCode::InternalError, None, id).into()
}
};
// Query the Validator for a new blocktemplate.
// We first need to construct `MinerRewardsRecipientConfig` from the
// address configuration provided to us through the RPC.
// TODO: Parse any spend hook from the login. We might also want to
// define a specific address format if it includes extra data.
// We could also include arbitrary information in the login password.
let recipient_config =
MinerRewardsRecipientConfig { recipient: address, spend_hook: None, user_data: None };
// Find next block target
let target = self.validator.consensus.module.read().await.target;
// Generate blocktemplate with all the information.
// This will return the mining target, the entire block, and the
// ephemeral secret used to sign the mined block.
let (target, block, secret) = match generate_next_block(
&mut extended_fork,
&recipient_config,
&self.registry.powrewardv1_zk.zkbin,
&self.registry.powrewardv1_zk.provingkey,
target,
self.validator.verify_fees,
)
.await
{
Ok(v) => v,
Err(e) => {
error!(
target: "darkfid::rpc_stratum::stratum_login",
"[STRATUM] Failed to generate next blocktemplate: {e}",
);
return JsonError::new(ErrorCode::InternalError, None, id).into()
}
};
// Reference the RandomX dataset seed
// TODO: We can also send `next_seed_hash` when we know it.
let seed_hash = extended_fork.module.darkfi_rx_keys.0.inner();
// We will store this in our mining jobs map for reference when
// a miner solution is submitted.
let blocktemplate =
BlockTemplate { block, randomx_key: *seed_hash, target: target.clone(), secret };
// Construct everything needed for the Stratum response.
let blob = blocktemplate.block.header.to_blockhashing_blob();
let job_id = *blocktemplate.block.header.hash().inner();
let height = blocktemplate.block.header.height as f64;
// The target should be compacted to 8 bytes little-endian.
let target = &target.to_bytes_le()[..8];
// Store the job. unwrap should be fine because we created this above.
let jobs = mining_jobs.get_mut(&conn_id).unwrap();
jobs.insert(job_id, blocktemplate);
// Construct response
let job: HashMap<String, JsonValue> = HashMap::from([
("blob".to_string(), hex::encode(&blob).to_string().into()),
("job_id".to_string(), hex::encode(job_id).to_string().into()),
("height".to_string(), height.into()),
("target".to_string(), hex::encode(target).into()),
("algo".to_string(), "rx/0".to_string().into()),
("seed_hash".to_string(), hex::encode(seed_hash).into()),
]);
let result = HashMap::from([
("id".to_string(), hex::encode(conn_id).into()),
("job".to_string(), job.into()),
("status".to_string(), "OK".to_string().into()),
]);
// Ship it.
JsonResponse::new(result.into(), id).into()
// Now we have the new job, we ship it to RPC
let (job_id, job) = block_template.job_notification();
info!(
target: "darkfid::rpc::rpc_stratum::stratum_login",
"[RPC-STRATUM] Created new mining job for client {client_id}: {job_id}"
);
let response = JsonValue::from(HashMap::from([
("id".to_string(), JsonValue::from(client_id)),
("job".to_string(), job),
("status".to_string(), JsonValue::from(String::from("OK"))),
]));
(publisher, JsonResponse::new(response, id)).into()
}
// RPCAPI:
// Miner submits a job solution.
//
// **Request:**
// * `id` : Registry client ID
// * `job_id` : Registry mining job ID
// * `nonce` : The hex encoded solution header nonce.
// * `result` : RandomX calculated hash
//
// **Response:**
// * `status`: Block submit status
//
// --> {"jsonrpc":"2.0", "method": "submit", "id": 1, "params": {"id": "...", "job_id": "...", "nonce": "d0030040", "result": "e1364b8782719d7683e2ccd3d8f724bc59dfa780a9e960e7c0e0046acdb40100"}}
// <-- {"jsonrpc":"2.0", "id": 1, "result": {"status": "OK"}}
pub async fn stratum_submit(&self, id: u16, params: JsonValue) -> JsonResult {
// TODO: Maybe grab an exclusive lock to avoid the xmrig spam while
// we're doing the pow verification. xmrig spams us whenever it gets
// a solution, and this will end up in cloning a bunch of blocktemplates
// and is going to cause memory usage to go up significantly.
// Ideally we should block here until we finish each submit one-by-one
// and find a valid one. Then when we do find a valid one, we should
// clear the existing job(s) so this method will just return an error
// and not have to do all the block shenanigans.
// Additionally when a block is proposed successfully, the node should
// send a new job notification to xmrig so we should be fine.
// That notification part should also clear the existing jobs.
// Check if node is synced before responding
if !*self.validator.synced.read().await {
return server_error(RpcError::NotSynced, id, None)
}
// Grab registry submissions lock
let submit_lock = self.registry.submit_lock.write().await;
// Parse request params
let Some(params) = params.get::<HashMap<String, JsonValue>>() else {
return JsonError::new(InvalidParams, None, id).into()
};
let Some(conn_id) = params.get("id") else {
return JsonError::new(InvalidParams, Some("Missing 'id'".to_string()), id).into()
};
let Some(conn_id) = conn_id.get::<String>() else {
if params.len() != 4 {
return JsonError::new(InvalidParams, None, id).into()
}
// Parse client id
let Some(client_id) = params.get("id") else {
return server_error(RpcError::MinerMissingClientId, id, None)
};
let Some(client_id) = client_id.get::<String>() else {
return server_error(RpcError::MinerInvalidClientId, id, None)
};
// If we don't know about this client, we can just abort here
let clients = self.registry.clients.read().await;
let Some(client) = clients.get(client_id) else {
return server_error(RpcError::MinerUnknownClient, id, None)
};
// Parse job id
let Some(job_id) = params.get("job_id") else {
return JsonError::new(InvalidParams, Some("Missing 'job_id'".to_string()), id).into()
return server_error(RpcError::MinerMissingJobId, id, None)
};
let Some(job_id) = job_id.get::<String>() else {
return JsonError::new(InvalidParams, None, id).into()
return server_error(RpcError::MinerInvalidJobId, id, None)
};
// If we don't know about this job or it doesn't match the
// client one, we can just abort here
if &client.job != job_id {
return server_error(RpcError::MinerUnknownJob, id, None)
}
let jobs = self.registry.jobs.read().await;
let Some(wallet) = jobs.get(job_id) else {
return server_error(RpcError::MinerUnknownJob, id, None)
};
// If this job wallet template doesn't exist, we can just
// abort here.
let mut block_templates = self.registry.block_templates.write().await;
let Some(block_template) = block_templates.get_mut(wallet) else {
return server_error(RpcError::MinerUnknownJob, id, None)
};
// If this template has been already submitted, reject this
// submission.
if block_template.submitted {
return JsonResponse::new(
JsonValue::from(HashMap::from([(
"status".to_string(),
JsonValue::from(String::from("rejected")),
)])),
id,
)
.into()
}
// Parse nonce
let Some(nonce) = params.get("nonce") else {
return JsonError::new(InvalidParams, Some("Missing 'nonce'".to_string()), id).into()
return server_error(RpcError::MinerMissingNonce, id, None)
};
let Some(nonce) = nonce.get::<String>() else {
return JsonError::new(InvalidParams, None, id).into()
return server_error(RpcError::MinerInvalidNonce, id, None)
};
// result is the RandomX calculated hash. Useful to verify/debug.
let Some(result) = params.get("result") else {
return JsonError::new(InvalidParams, Some("Missing 'result'".to_string()), id).into()
};
let Some(_result) = result.get::<String>() else {
return JsonError::new(InvalidParams, None, id).into()
};
let Ok(conn_id) = hex::decode(conn_id) else {
return JsonError::new(InvalidParams, None, id).into()
};
if conn_id.len() != 32 {
return JsonError::new(InvalidParams, None, id).into()
}
let Ok(job_id) = hex::decode(job_id) else {
return JsonError::new(InvalidParams, None, id).into()
};
if job_id.len() != 32 {
return JsonError::new(InvalidParams, None, id).into()
}
let conn_id: [u8; 32] = conn_id.try_into().unwrap();
let job_id: [u8; 32] = job_id.try_into().unwrap();
// We should be aware of this conn_id and job_id.
let mut mining_jobs = self.registry.mining_jobs.lock().await;
let Some(jobs) = mining_jobs.get_mut(&conn_id) else {
return JsonError::new(InvalidParams, None, id).into()
};
// Get the blocktemplate.
let Some(blocktemplate) = jobs.get_mut(&job_id) else {
return JsonError::new(InvalidParams, None, id).into()
};
// Parse the nonce into u32.
let Ok(nonce_bytes) = hex::decode(nonce) else {
return JsonError::new(InvalidParams, None, id).into()
return server_error(RpcError::MinerInvalidNonce, id, None)
};
if nonce_bytes.len() != 4 {
return JsonError::new(InvalidParams, None, id).into()
return server_error(RpcError::MinerInvalidNonce, id, None)
}
let nonce = u32::from_le_bytes(nonce_bytes.try_into().unwrap());
// We clone the block, update the nonce,
// sign it, and ship it into a proposal.
let mut block = blocktemplate.block.clone();
block.header.nonce = nonce;
block.sign(&blocktemplate.secret);
// Parse result
let Some(result) = params.get("result") else {
return server_error(RpcError::MinerMissingResult, id, None)
};
let Some(_result) = result.get::<String>() else {
return server_error(RpcError::MinerInvalidResult, id, None)
};
info!(
target: "darkfid::rpc_stratum::stratum_submit",
"[STRATUM] Proposing new block to network",
target: "darkfid::rpc::rpc_stratum::stratum_submit",
"[RPC-STRATUM] Got solution submission from client {client_id} for job: {job_id}",
);
let proposal = Proposal::new(block);
if let Err(e) = self.validator.append_proposal(&proposal).await {
// Update the block nonce and sign it
let mut block = block_template.block.clone();
block.header.nonce = nonce;
block.sign(&block_template.secret);
// Submit the new block through the registry
if let Err(e) =
self.registry.submit(&self.validator, &self.subscribers, &self.p2p_handler, block).await
{
error!(
target: "darkfid::rpc_stratum::stratum_submit",
"[STRATUM] Error proposing new block: {e}",
target: "darkfid::rpc::rpc_xmr::xmr_merge_submit_solution",
"[RPC-XMR] Error submitting new block: {e}",
);
return JsonError::new(InvalidParams, None, id).into()
return JsonResponse::new(
JsonValue::from(HashMap::from([(
"status".to_string(),
JsonValue::from(String::from("rejected")),
)])),
id,
)
.into()
}
// Proposal passed. We will now clear the jobs as it's assumed
// a new block needs to be mined.
mining_jobs.insert(conn_id, MiningJobs::default());
// Mark block as submitted
block_template.submitted = true;
// Broadcast to network
let proposals_sub = self.subscribers.get("proposals").unwrap();
let enc_prop = JsonValue::String(base64::encode(&serialize_async(&proposal).await));
proposals_sub.notify(vec![enc_prop].into()).await;
info!(
target: "darkfid::rpc_stratum::stratum_submit",
"[STRATUM] Broadcasting new block to network",
);
let message = ProposalMessage(proposal);
self.p2p_handler.p2p.broadcast(&message).await;
// Release all locks
drop(block_templates);
drop(jobs);
drop(submit_lock);
JsonResponse::new(
HashMap::from([("status".to_string(), "OK".to_string().into())]).into(),
JsonValue::from(HashMap::from([(
"status".to_string(),
JsonValue::from(String::from("OK")),
)])),
id,
)
.into()
@@ -385,21 +342,47 @@ impl DarkfiNode {
// RPCAPI:
// Miner sends `keepalived` to prevent connection timeout.
//
// **Request:**
// * `id` : Registry client ID
//
// **Response:**
// * `status`: Response status
//
// --> {"jsonrpc":"2.0", "method": "keepalived", "id": 1, "params": {"id": "foo"}}
// <-- {"jsonrpc":"2.0", "id": 1, "result": {"status": "KEEPALIVED"}}
pub async fn stratum_keepalived(&self, id: u16, params: JsonValue) -> JsonResult {
// Check if node is synced before responding
if !*self.validator.synced.read().await {
return server_error(RpcError::NotSynced, id, None)
}
// Parse request params
let Some(params) = params.get::<HashMap<String, JsonValue>>() else {
return JsonError::new(InvalidParams, None, id).into()
};
let Some(_conn_id) = params.get("id") else {
return JsonError::new(InvalidParams, Some("Missing 'id'".to_string()), id).into()
if params.len() != 1 {
return JsonError::new(InvalidParams, None, id).into()
}
// Parse client id
let Some(client_id) = params.get("id") else {
return server_error(RpcError::MinerMissingClientId, id, None)
};
let Some(client_id) = client_id.get::<String>() else {
return server_error(RpcError::MinerInvalidClientId, id, None)
};
// TODO: This conn_id should likely exist. We should probably check
// that. Otherwise we might not want to reply at all.
// If we don't know about this client, we can just abort here
if !self.registry.clients.read().await.contains_key(client_id) {
return server_error(RpcError::MinerUnknownClient, id, None)
};
// Respond with keepalived message
JsonResponse::new(
JsonValue::from(HashMap::from([("status".into(), "KEEPALIVED".to_string().into())])),
JsonValue::from(HashMap::from([(
"status".to_string(),
JsonValue::from(String::from("KEEPALIVED")),
)])),
id,
)
.into()

View File

@@ -43,20 +43,9 @@ use darkfi::{
server::RequestHandler,
},
system::StoppableTaskPtr,
util::encoding::base64,
validator::consensus::Proposal,
};
use darkfi_sdk::{
crypto::{keypair::Address, pasta_prelude::PrimeField, FuncId},
pasta::pallas,
};
use darkfi_serial::{deserialize_async, serialize_async};
use crate::{
proto::ProposalMessage,
registry::model::{generate_next_block, MinerRewardsRecipientConfig, MmBlockTemplate},
server_error, DarkfiNode, RpcError,
};
use crate::{registry::model::MinerRewardsRecipientConfig, server_error, DarkfiNode, RpcError};
// https://github.com/SChernykh/p2pool/blob/master/docs/MERGE_MINING.MD
@@ -115,7 +104,7 @@ impl DarkfiNode {
Ok(v) => v,
Err(e) => {
error!(
target: "darkfid::rpc::xmr_merge_mining_get_chain_id",
target: "darkfid::rpc::rpc_xmr::xmr_merge_mining_get_chain_id",
"[RPC-XMR] Error fetching genesis block hash: {e}"
);
return JsonError::new(ErrorCode::InternalError, None, id).into()
@@ -141,12 +130,12 @@ impl DarkfiNode {
// * `prev_id` : Hash of the previous Monero block
//
// **Response:**
// * `aux_blob`: The hex-encoded wallet address mining configuration blob
// * `aux_blob`: A hex-encoded blob of empty data
// * `aux_diff`: Mining difficulty (decimal number)
// * `aux_hash`: A 32-byte hex-encoded hash of merge mined block
//
// --> {"jsonrpc":"2.0", "method": "merge_mining_get_aux_block", "params": {"address": "MERGE_MINED_CHAIN_ADDRESS", "aux_hash": "f6952d6eef555ddd87aca66e56b91530222d6e318414816f3ba7cf5bf694bf0f", "height": 3000000, "prev_id":"ad505b0be8a49b89273e307106fa42133cbd804456724c5e7635bd953215d92a"}, "id": 1}
// <-- {"jsonrpc":"2.0", "result": {"aux_blob": "4c6f72656d20697073756d", "aux_diff": 123456, "aux_hash":"f6952d6eef555ddd87aca66e56b91530222d6e318414816f3ba7cf5bf694bf0f"}, "id": 1}
// <-- {"jsonrpc":"2.0", "result": {"aux_blob": "", "aux_diff": 123456, "aux_hash":"f6952d6eef555ddd87aca66e56b91530222d6e318414816f3ba7cf5bf694bf0f"}, "id": 1}
pub async fn xmr_merge_mining_get_aux_block(&self, id: u16, params: JsonValue) -> JsonResult {
// Check if node is synced before responding to p2pool
if !*self.validator.synced.read().await {
@@ -161,51 +150,6 @@ impl DarkfiNode {
return JsonError::new(InvalidParams, None, id).into()
}
// Parse address mining configuration
let Some(address) = params.get("address") else {
return server_error(RpcError::MinerMissingAddress, id, None)
};
let Some(address) = address.get::<String>() else {
return server_error(RpcError::MinerInvalidAddress, id, None)
};
let Some(address_bytes) = base64::decode(address) else {
return server_error(RpcError::MinerInvalidAddress, id, None)
};
let Ok((recipient, spend_hook, user_data)) =
deserialize_async::<(String, Option<String>, Option<String>)>(&address_bytes).await
else {
return server_error(RpcError::MinerInvalidAddress, id, None)
};
let Ok(recipient) = Address::from_str(&recipient) else {
return server_error(RpcError::MinerInvalidRecipient, id, None)
};
if recipient.network() != self.network {
return server_error(RpcError::MinerInvalidRecipientPrefix, id, None)
}
let spend_hook = match spend_hook {
Some(s) => match FuncId::from_str(&s) {
Ok(s) => Some(s),
Err(_) => return server_error(RpcError::MinerInvalidSpendHook, id, None),
},
None => None,
};
let user_data: Option<pallas::Base> = match user_data {
Some(u) => {
let Ok(bytes) = bs58::decode(&u).into_vec() else {
return server_error(RpcError::MinerInvalidUserData, id, None)
};
let bytes: [u8; 32] = match bytes.try_into() {
Ok(b) => b,
Err(_) => return server_error(RpcError::MinerInvalidUserData, id, None),
};
match pallas::Base::from_repr(bytes).into() {
Some(v) => Some(v),
None => return server_error(RpcError::MinerInvalidUserData, id, None),
}
}
None => None,
};
// Parse aux_hash
let Some(aux_hash) = params.get("aux_hash") else {
return server_error(RpcError::MinerMissingAuxHash, id, None)
@@ -213,10 +157,28 @@ impl DarkfiNode {
let Some(aux_hash) = aux_hash.get::<String>() else {
return server_error(RpcError::MinerInvalidAuxHash, id, None)
};
let Ok(aux_hash) = HeaderHash::from_str(aux_hash) else {
if HeaderHash::from_str(aux_hash).is_err() {
return server_error(RpcError::MinerInvalidAuxHash, id, None)
};
// Check if we already have this job
if self.registry.jobs.read().await.contains_key(&aux_hash.to_string()) {
return JsonResponse::new(JsonValue::from(HashMap::new()), id).into()
}
// Parse address mining configuration
let Some(wallet) = params.get("address") else {
return server_error(RpcError::MinerMissingAddress, id, None)
};
let Some(wallet) = wallet.get::<String>() else {
return server_error(RpcError::MinerInvalidAddress, id, None)
};
let config =
match MinerRewardsRecipientConfig::from_base64(&self.registry.network, wallet).await {
Ok(c) => c,
Err(e) => return server_error(e, id, None),
};
// Parse height
let Some(height) = params.get("height") else {
return server_error(RpcError::MinerMissingHeight, id, None)
@@ -238,126 +200,29 @@ impl DarkfiNode {
};
let prev_id = monero::Hash::from_slice(&prev_id);
// Now that method params format is correct, we can check if we
// already have a mining job for this wallet. If we already
// have it, we check if the fork it extends is still the best
// one. If both checks pass, we can just return an empty
// response if the request `aux_hash` matches the job one,
// otherwise return the job block template hash. In case the
// best fork has changed, we drop this job and generate a
// new one. If we don't know this wallet, we create a new job.
// We'll also obtain a lock here to avoid getting polled
// multiple times and potentially missing a job. The lock is
// released when this function exits.
let mut mm_blocktemplates = self.registry.mm_blocktemplates.lock().await;
let mut extended_fork = match self.validator.best_current_fork().await {
Ok(f) => f,
Err(e) => {
error!(
target: "darkfid::rpc_xmr::xmr_merge_mining_get_aux_block",
"[RPC-XMR] Finding best fork index failed: {e}",
);
return JsonError::new(ErrorCode::InternalError, None, id).into()
}
};
if let Some(blocktemplate) = mm_blocktemplates.get(&address_bytes) {
let last_proposal = match extended_fork.last_proposal() {
// Register the new merge miner
let (job_id, difficulty) =
match self.registry.register_merge_miner(&self.validator, wallet, &config).await {
Ok(p) => p,
Err(e) => {
error!(
target: "darkfid::rpc_xmr::xmr_merge_mining_get_aux_block",
"[RPC-XMR] Retrieving best fork last proposal failed: {e}",
target: "darkfid::rpc::rpc_xmr::xmr_merge_mining_get_aux_block",
"[RPC-XMR] Failed to register merge miner: {e}",
);
return JsonError::new(ErrorCode::InternalError, None, id).into()
}
};
if last_proposal.hash == blocktemplate.block.header.previous {
let blockhash = blocktemplate.block.header.template_hash();
return if blockhash != aux_hash {
JsonResponse::new(
JsonValue::from(HashMap::from([
("aux_blob".to_string(), JsonValue::from(hex::encode(address_bytes))),
("aux_diff".to_string(), JsonValue::from(blocktemplate.difficulty)),
("aux_hash".to_string(), JsonValue::from(blockhash.as_string())),
])),
id,
)
.into()
} else {
JsonResponse::new(JsonValue::from(HashMap::new()), id).into()
}
}
mm_blocktemplates.remove(&address_bytes);
}
// At this point, we should query the Validator for a new blocktemplate.
// We first need to construct `MinerRewardsRecipientConfig` from the
// address configuration provided to us through the RPC.
let recipient_str = format!("{recipient}");
let spend_hook_str = match spend_hook {
Some(spend_hook) => format!("{spend_hook}"),
None => String::from("-"),
};
let user_data_str = match user_data {
Some(user_data) => bs58::encode(user_data.to_repr()).into_string(),
None => String::from("-"),
};
let recipient_config = MinerRewardsRecipientConfig { recipient, spend_hook, user_data };
// Now let's try to construct the blocktemplate.
// Find the difficulty. Note we cast it to f64 here.
let difficulty: f64 = match extended_fork.module.next_difficulty() {
Ok(v) => {
// We will attempt to cast it to f64. This should always work.
v.to_string().parse().unwrap()
}
Err(e) => {
error!(
target: "darkfid::rpc_xmr::xmr_merge_mining_get_aux_block",
"[RPC-XMR] Finding next mining difficulty failed: {e}",
);
return JsonError::new(ErrorCode::InternalError, None, id).into()
}
};
let (_, block, block_signing_secret) = match generate_next_block(
&mut extended_fork,
&recipient_config,
&self.registry.powrewardv1_zk.zkbin,
&self.registry.powrewardv1_zk.provingkey,
self.validator.consensus.module.read().await.target,
self.validator.verify_fees,
)
.await
{
Ok(v) => v,
Err(e) => {
error!(
target: "darkfid::rpc_xmr::xmr_merge_mining_get_aux_block",
"[RPC-XMR] Failed to generate next blocktemplate: {e}",
);
return JsonError::new(ErrorCode::InternalError, None, id).into()
}
};
// Now we have the blocktemplate. We'll mark it down in memory,
// and then ship it to RPC.
let blockhash = block.header.template_hash();
mm_blocktemplates.insert(
address_bytes.clone(),
MmBlockTemplate { block, difficulty, secret: block_signing_secret },
);
// Now we have the new job, we ship it to RPC
info!(
target: "darkfid::rpc_xmr::xmr_merge_mining_get_aux_block",
"[RPC-XMR] Created new blocktemplate: address={recipient_str}, spend_hook={spend_hook_str}, user_data={user_data_str}, aux_hash={blockhash}, height={height}, prev_id={prev_id}"
target: "darkfid::rpc::rpc_xmr::xmr_merge_mining_get_aux_block",
"[RPC-XMR] Created new merge mining job: aux_hash={job_id}, height={height}, prev_id={prev_id}"
);
let response = JsonValue::from(HashMap::from([
("aux_blob".to_string(), JsonValue::from(hex::encode(address_bytes))),
("aux_blob".to_string(), JsonValue::from(hex::encode(vec![]))),
("aux_diff".to_string(), JsonValue::from(difficulty)),
("aux_hash".to_string(), JsonValue::from(blockhash.as_string())),
("aux_hash".to_string(), JsonValue::from(job_id)),
]));
JsonResponse::new(response, id).into()
}
@@ -381,7 +246,7 @@ impl DarkfiNode {
// **Response:**
// * `status`: Block submit status
//
// --> {"jsonrpc":"2.0", "method": "merge_mining_submit_solution", "params": {"aux_blob": "4c6f72656d20697073756d", "aux_hash": "f6952d6eef555ddd87aca66e56b91530222d6e318414816f3ba7cf5bf694bf0f", "blob": "...", "merkle_proof": ["hash1", "hash2", "hash3"], "path": 3, "seed_hash": "22c3d47c595ae888b5d7fc304235f92f8854644d4fad38c5680a5d4a81009fcd"}, "id": 1}
// --> {"jsonrpc":"2.0", "method": "merge_mining_submit_solution", "params": {"aux_blob": "", "aux_hash": "f6952d6eef555ddd87aca66e56b91530222d6e318414816f3ba7cf5bf694bf0f", "blob": "...", "merkle_proof": ["hash1", "hash2", "hash3"], "path": 3, "seed_hash": "22c3d47c595ae888b5d7fc304235f92f8854644d4fad38c5680a5d4a81009fcd"}, "id": 1}
// <-- {"jsonrpc":"2.0", "result": {"status": "accepted"}, "id": 1}
pub async fn xmr_merge_mining_submit_solution(&self, id: u16, params: JsonValue) -> JsonResult {
// Check if node is synced before responding to p2pool
@@ -389,6 +254,9 @@ impl DarkfiNode {
return server_error(RpcError::NotSynced, id, None)
}
// Grab registry submissions lock
let submit_lock = self.registry.submit_lock.write().await;
// Parse request params
let Some(params) = params.get::<HashMap<String, JsonValue>>() else {
return JsonError::new(InvalidParams, None, id).into()
@@ -397,46 +265,6 @@ impl DarkfiNode {
return JsonError::new(InvalidParams, None, id).into()
}
// Parse address mining configuration from aux_blob
let Some(aux_blob) = params.get("aux_blob") else {
return server_error(RpcError::MinerMissingAuxBlob, id, None)
};
let Some(aux_blob) = aux_blob.get::<String>() else {
return server_error(RpcError::MinerInvalidAuxBlob, id, None)
};
let Ok(address_bytes) = hex::decode(aux_blob) else {
return server_error(RpcError::MinerInvalidAuxBlob, id, None)
};
let Ok((recipient, spend_hook, user_data)) =
deserialize_async::<(String, Option<String>, Option<String>)>(&address_bytes).await
else {
return server_error(RpcError::MinerInvalidAuxBlob, id, None)
};
let Ok(recipient) = Address::from_str(&recipient) else {
return server_error(RpcError::MinerInvalidRecipient, id, None)
};
if recipient.network() != self.network {
return server_error(RpcError::MinerInvalidRecipientPrefix, id, None)
}
if let Some(spend_hook) = spend_hook {
if FuncId::from_str(&spend_hook).is_err() {
return server_error(RpcError::MinerInvalidSpendHook, id, None)
}
};
if let Some(user_data) = user_data {
let Ok(bytes) = bs58::decode(&user_data).into_vec() else {
return server_error(RpcError::MinerInvalidUserData, id, None)
};
let bytes: [u8; 32] = match bytes.try_into() {
Ok(b) => b,
Err(_) => return server_error(RpcError::MinerInvalidUserData, id, None),
};
let _: pallas::Base = match pallas::Base::from_repr(bytes).into() {
Some(v) => v,
None => return server_error(RpcError::MinerInvalidUserData, id, None),
};
};
// Parse aux_hash
let Some(aux_hash) = params.get("aux_hash") else {
return server_error(RpcError::MinerMissingAuxHash, id, None)
@@ -444,14 +272,48 @@ impl DarkfiNode {
let Some(aux_hash) = aux_hash.get::<String>() else {
return server_error(RpcError::MinerInvalidAuxHash, id, None)
};
let Ok(aux_hash) = HeaderHash::from_str(aux_hash) else {
if HeaderHash::from_str(aux_hash).is_err() {
return server_error(RpcError::MinerInvalidAuxHash, id, None)
}
// If we don't know about this job, we can just abort here
let jobs = self.registry.jobs.read().await;
let Some(wallet) = jobs.get(aux_hash) else {
return server_error(RpcError::MinerUnknownJob, id, None)
};
// If we don't know about this job, we can just abort here.
let mut mm_blocktemplates = self.registry.mm_blocktemplates.lock().await;
if !mm_blocktemplates.contains_key(&address_bytes) {
// If this job wallet template doesn't exist, we can just
// abort here.
let mut block_templates = self.registry.block_templates.write().await;
let Some(block_template) = block_templates.get_mut(wallet) else {
return server_error(RpcError::MinerUnknownJob, id, None)
};
// If this template has been already submitted, reject this
// submission.
if block_template.submitted {
return JsonResponse::new(
JsonValue::from(HashMap::from([(
"status".to_string(),
JsonValue::from(String::from("rejected")),
)])),
id,
)
.into()
}
// Parse aux_blob
let Some(aux_blob) = params.get("aux_blob") else {
return server_error(RpcError::MinerMissingAuxBlob, id, None)
};
let Some(aux_blob) = aux_blob.get::<String>() else {
return server_error(RpcError::MinerInvalidAuxBlob, id, None)
};
let Ok(aux_blob) = hex::decode(aux_blob) else {
return server_error(RpcError::MinerInvalidAuxBlob, id, None)
};
if !aux_blob.is_empty() {
return server_error(RpcError::MinerInvalidAuxBlob, id, None)
}
// Parse blob
@@ -510,7 +372,7 @@ impl DarkfiNode {
};
info!(
target: "darkfid::rpc_xmr::xmr_merge_mining_submit_solution",
target: "darkfid::rpc::rpc_xmr::xmr_merge_mining_submit_solution",
"[RPC-XMR] Got solution submission: aux_hash={aux_hash}",
);
@@ -522,7 +384,7 @@ impl DarkfiNode {
Ok(v) => v,
Err(e) => {
error!(
target: "darkfid::rpc_xmr::xmr_merge_mining_submit_solution",
target: "darkfid::rpc::rpc_xmr::xmr_merge_mining_submit_solution",
"[RPC-XMR] Failed constructing MoneroPowData: {e}",
);
return server_error(RpcError::MinerMoneroPowDataConstructionFailed, id, None)
@@ -530,52 +392,40 @@ impl DarkfiNode {
};
// Append MoneroPowData to the DarkFi block and sign it
let blocktemplate = &mm_blocktemplates.get(&address_bytes).unwrap();
let mut block = blocktemplate.block.clone();
let mut block = block_template.block.clone();
block.header.pow_data = PowData::Monero(monero_pow_data);
block.sign(&blocktemplate.secret);
block.sign(&block_template.secret);
// At this point we should be able to remove the submitted job.
// We still won't release the lock in hope of proposing the block
// first.
mm_blocktemplates.remove(&address_bytes);
// Propose the new block
info!(
target: "darkfid::rpc_xmr::xmr_merge_mining_submit_solution",
"[RPC-XMR] Proposing new block to network",
);
let proposal = Proposal::new(block);
if let Err(e) = self.validator.append_proposal(&proposal).await {
// Submit the new block through the registry
if let Err(e) =
self.registry.submit(&self.validator, &self.subscribers, &self.p2p_handler, block).await
{
error!(
target: "darkfid::rpc_xmr::xmr_merge_submit_solution",
"[RPC-XMR] Error proposing new block: {e}",
target: "darkfid::rpc::rpc_xmr::xmr_merge_submit_solution",
"[RPC-XMR] Error submitting new block: {e}",
);
return JsonResponse::new(
JsonValue::from(HashMap::from([(
"status".to_string(),
JsonValue::from("rejected".to_string()),
JsonValue::from(String::from("rejected")),
)])),
id,
)
.into()
}
let proposals_sub = self.subscribers.get("proposals").unwrap();
let enc_prop = JsonValue::String(base64::encode(&serialize_async(&proposal).await));
proposals_sub.notify(vec![enc_prop].into()).await;
// Mark block as submitted
block_template.submitted = true;
info!(
target: "darkfid::rpc_xmr::xmr_merge_mining_submit_solution",
"[RPC-XMR] Broadcasting new block to network",
);
let message = ProposalMessage(proposal);
self.p2p_handler.p2p.broadcast(&message).await;
// Release all locks
drop(block_templates);
drop(jobs);
drop(submit_lock);
JsonResponse::new(
JsonValue::from(HashMap::from([(
"status".to_string(),
JsonValue::from("accepted".to_string()),
JsonValue::from(String::from("accepted")),
)])),
id,
)

View File

@@ -192,8 +192,8 @@ async fn consensus_task(
continue
}
if let Err(e) = clean_blocktemplates(node).await {
error!(target: "darkfid", "Failed cleaning mining block templates: {e}")
if let Err(e) = node.registry.refresh(&node.validator).await {
error!(target: "darkfid", "Failed refreshing mining block templates: {e}")
}
let mut notif_blocks = Vec::with_capacity(confirmed.len());
@@ -219,80 +219,3 @@ async fn consensus_task(
);
}
}
/// Auxiliary function to drop mining block templates not referencing
/// active forks or last confirmed block.
async fn clean_blocktemplates(node: &DarkfiNodePtr) -> Result<()> {
// Grab a lock over node mining templates
let mut blocktemplates = node.registry.blocktemplates.lock().await;
let mut mm_blocktemplates = node.registry.mm_blocktemplates.lock().await;
// Early return if no mining block templates exist
if blocktemplates.is_empty() && mm_blocktemplates.is_empty() {
return Ok(())
}
// Grab a lock over node forks
let forks = node.validator.consensus.forks.read().await;
// Grab last confirmed block for checks
let (_, last_confirmed) = node.validator.blockchain.last()?;
// Loop through templates to find which can be dropped
let mut dropped_templates = vec![];
'outer: for (key, blocktemplate) in blocktemplates.iter() {
// Loop through all the forks
for fork in forks.iter() {
// Traverse fork proposals sequence in reverse
for p_hash in fork.proposals.iter().rev() {
// Check if job extends this fork
if &blocktemplate.block.header.previous == p_hash {
continue 'outer
}
}
}
// Check if it extends last confirmed block
if blocktemplate.block.header.previous == last_confirmed {
continue
}
// This job doesn't reference something so we drop it
dropped_templates.push(key.clone());
}
// Drop jobs not referencing active forks or last confirmed block
for key in dropped_templates {
blocktemplates.remove(&key);
}
// Loop through merge mining templates to find which can be dropped
let mut dropped_templates = vec![];
'outer: for (key, blocktemplate) in mm_blocktemplates.iter() {
// Loop through all the forks
for fork in forks.iter() {
// Traverse fork proposals sequence in reverse
for p_hash in fork.proposals.iter().rev() {
// Check if job extends this fork
if &blocktemplate.block.header.previous == p_hash {
continue 'outer
}
}
}
// Check if it extends last confirmed block
if blocktemplate.block.header.previous == last_confirmed {
continue
}
// This job doesn't reference something so we drop it
dropped_templates.push(key.clone());
}
// Drop jobs not referencing active forks or last confirmed block
for key in dropped_templates {
mm_blocktemplates.remove(&key);
}
Ok(())
}

View File

@@ -95,7 +95,7 @@ pub async fn garbage_collect_task(node: DarkfiNodePtr) -> Result<()> {
};
// Verify transaction
match verify_transactions(
let result = verify_transactions(
&overlay,
next_block_height,
node.validator.consensus.module.read().await.target,
@@ -103,8 +103,13 @@ pub async fn garbage_collect_task(node: DarkfiNodePtr) -> Result<()> {
&mut MerkleTree::new(1),
false,
)
.await
{
.await;
// Drop new trees opened by the forks' overlay
overlay.lock().unwrap().overlay.lock().unwrap().purge_new_trees()?;
// Check result
match result {
Ok(_) => valid = true,
Err(Error::TxVerifyFailed(TxVerifyFailed::ErroneousTxs(_))) => {
// Remove transaction from fork's mempool

View File

@@ -231,7 +231,7 @@ impl Harness {
let timestamp = previous.header.timestamp.checked_add(1.into())?;
// Generate header
let header = Header::new(previous.hash(), block_height, timestamp, last_nonce);
let header = Header::new(previous.hash(), block_height, last_nonce, timestamp);
// Generate the block
let mut block = BlockInfo::new_empty(header);
@@ -293,16 +293,10 @@ pub async fn generate_node(
subscribers.insert("dnet", JsonSubscriber::new("dnet.subscribe_events"));
let p2p_handler = DarkfidP2pHandler::init(settings, ex).await?;
let registry = DarkfiMinersRegistry::init(&validator)?;
let node = DarkfiNode::new(
Network::Mainnet,
validator.clone(),
p2p_handler.clone(),
registry,
50,
subscribers.clone(),
)
.await?;
let registry = DarkfiMinersRegistry::init(Network::Mainnet, &validator)?;
let node =
DarkfiNode::new(validator.clone(), p2p_handler.clone(), registry, 50, subscribers.clone())
.await?;
p2p_handler.start(ex, &validator, &subscribers).await?;

View File

@@ -91,14 +91,7 @@ async fn simulate_unproposed_txs(
let best_fork = &forks[best_fork_index(&forks)?];
// Retrieve unproposed transactions
let (tx, total_gas_used, _, _) = best_fork
.unproposed_txs(
&best_fork.clone().blockchain,
current_block_height,
validator.consensus.module.read().await.target,
false,
)
.await?;
let (tx, total_gas_used, _) = best_fork.unproposed_txs(current_block_height, false).await?;
Ok((tx.len() as u64, total_gas_used))
}

View File

@@ -27,5 +27,5 @@ sled-overlay = "0.1.14"
smol = "2.0.2"
[patch.crates-io]
halo2_proofs = {git="https://github.com/parazyd/halo2", branch="v031"}
halo2_gadgets = {git="https://github.com/parazyd/halo2", branch="v031"}
halo2_proofs = {git="https://github.com/parazyd/halo2", branch="v032"}
halo2_gadgets = {git="https://github.com/parazyd/halo2", branch="v032"}

View File

@@ -109,7 +109,7 @@ pub struct Header {
impl Header {
/// Generates a new header with default transactions and state root,
/// using DarkFi native Proof of Work data.
pub fn new(previous: HeaderHash, height: u32, timestamp: Timestamp, nonce: u32) -> Self {
pub fn new(previous: HeaderHash, height: u32, nonce: u32, timestamp: Timestamp) -> Self {
let version = block_version(height);
let transactions_root = MerkleTree::new(1).root(0).unwrap();
let state_root = *EMPTY_HASH;
@@ -118,8 +118,8 @@ impl Header {
version,
previous,
height,
timestamp,
nonce,
timestamp,
transactions_root,
state_root,
pow_data,
@@ -179,8 +179,8 @@ impl Header {
}
}
/// Create a blockhashing blob from this header
pub fn to_blockhashing_blob(&self) -> Vec<u8> {
/// Create a block hashing blob from this header.
pub fn to_block_hashing_blob(&self) -> Vec<u8> {
// For XMRig, we need to pad the blob so that our nonce ends
// up at byte offset 39.
let mut blob = vec![0x00, 0x00];
@@ -195,8 +195,8 @@ impl Default for Header {
Header::new(
HeaderHash::new(blake3::hash(b"Let there be dark!").into()),
0u32,
Timestamp::current_time(),
0u32,
Timestamp::current_time(),
)
}
}

View File

@@ -151,8 +151,8 @@ impl MoneroPowData {
(self.merkle_root == merkle_root) && self.coinbase_merkle_proof.check_coinbase_path()
}
/// Returns the blockhashing_blob for the Monero block
pub fn to_blockhashing_blob(&self) -> Vec<u8> {
/// Returns the block hashing blob for the Monero block.
pub fn to_block_hashing_blob(&self) -> Vec<u8> {
create_blockhashing_blob(&self.header, &self.merkle_root, u64::from(self.transaction_count))
}

View File

@@ -119,8 +119,8 @@ impl TestHarness {
let header = Header::new(
previous.hash(),
previous.header.height + 1,
timestamp,
previous.header.nonce,
timestamp,
);
// Generate the block

View File

@@ -115,4 +115,28 @@ impl<T: Clone> Publisher<T> {
}
}
}
/// Clear inactive subscribtions.
/// Returns a flag indicating if we have active subscriptions
/// after cleanup.
pub async fn clear_inactive(&self) -> bool {
// Grab a lock over current jobs
let mut subs = self.subs.lock().await;
// Find inactive subscriptions
let mut dropped = vec![];
for (sub, channel) in subs.iter() {
if channel.receiver_count() == 0 {
dropped.push(*sub);
}
}
// Drop inactive subscriptions
for sub in dropped {
subs.remove(&sub);
}
// Return flag indicating if we still have subscriptions
subs.is_empty()
}
}

View File

@@ -840,23 +840,17 @@ impl Fork {
}
/// Auxiliary function to retrieve unproposed valid transactions,
/// along with their total gas used, total paid fees and the overlay
/// used to verify the transactions for further processing.
/// along with their total gas used and total paid fees.
///
/// Note: Always remember to purge new trees from the overlay if not needed.
pub async fn unproposed_txs(
&self,
blockchain: &Blockchain,
verifying_block_height: u32,
block_target: u32,
verify_fees: bool,
) -> Result<(Vec<Transaction>, u64, u64, BlockchainOverlayPtr)> {
// Clone forks' overlay
let overlay = self.overlay.lock().unwrap().full_clone()?;
) -> Result<(Vec<Transaction>, u64, u64)> {
// Check if our mempool is not empty
if self.mempool.is_empty() {
return Ok((vec![], 0, 0, overlay))
return Ok((vec![], 0, 0))
}
// Transactions Merkle tree
@@ -870,7 +864,7 @@ impl Fork {
let mut vks: HashMap<[u8; 32], HashMap<String, VerifyingKey>> = HashMap::new();
// Grab all current proposals transactions hashes
let proposals_txs = overlay.lock().unwrap().get_blocks_txs_hashes(&self.proposals)?;
let proposals_txs = self.overlay.lock().unwrap().get_blocks_txs_hashes(&self.proposals)?;
// Iterate through all pending transactions in the forks' mempool
let mut unproposed_txs = vec![];
@@ -882,7 +876,7 @@ impl Fork {
// Retrieve the actual unproposed transaction
let unproposed_tx =
blockchain.transactions.get_pending(&[*tx], true)?[0].clone().unwrap();
self.blockchain.transactions.get_pending(&[*tx], true)?[0].clone().unwrap();
// Update the verifying keys map
for call in &unproposed_tx.calls {
@@ -890,11 +884,11 @@ impl Fork {
}
// Verify the transaction against current state
overlay.lock().unwrap().checkpoint();
self.overlay.lock().unwrap().checkpoint();
let gas_data = match verify_transaction(
&overlay,
&self.overlay,
verifying_block_height,
block_target,
self.module.target,
&unproposed_tx,
&mut tree,
&mut vks,
@@ -905,7 +899,8 @@ impl Fork {
Ok(gas_values) => gas_values,
Err(e) => {
debug!(target: "validator::consensus::unproposed_txs", "Transaction verification failed: {e}");
overlay.lock().unwrap().revert_to_checkpoint()?;
self.overlay.lock().unwrap().overlay.lock().unwrap().purge_new_trees()?;
self.overlay.lock().unwrap().revert_to_checkpoint()?;
continue
}
};
@@ -922,7 +917,8 @@ impl Fork {
target: "validator::consensus::unproposed_txs",
"Retrieving transaction {tx} would exceed configured unproposed transaction gas limit: {accumulated_gas_usage} - {BLOCK_GAS_LIMIT}"
);
overlay.lock().unwrap().revert_to_checkpoint()?;
self.overlay.lock().unwrap().overlay.lock().unwrap().purge_new_trees()?;
self.overlay.lock().unwrap().revert_to_checkpoint()?;
break
}
@@ -934,7 +930,7 @@ impl Fork {
unproposed_txs.push(unproposed_tx);
}
Ok((unproposed_txs, total_gas_used, total_gas_paid, overlay))
Ok((unproposed_txs, total_gas_used, total_gas_paid))
}
/// Auxiliary function to create a full clone using BlockchainOverlay::full_clone.

View File

@@ -319,7 +319,7 @@ impl PoWModule {
);
let verification_time = Instant::now();
let out_hash = vm.calculate_hash(&header.to_blockhashing_blob())?;
let out_hash = vm.calculate_hash(&header.to_block_hashing_blob())?;
(BigUint::from_bytes_le(&out_hash), verification_time)
}
Monero(powdata) => {
@@ -332,7 +332,7 @@ impl PoWModule {
);
let verification_time = Instant::now();
let out_hash = vm.calculate_hash(&powdata.to_blockhashing_blob())?;
let out_hash = vm.calculate_hash(&powdata.to_block_hashing_blob())?;
(BigUint::from_bytes_le(&out_hash), verification_time)
}
};
@@ -700,8 +700,8 @@ mod tests {
let header = Header::new(
previous.hash(),
previous.height + 1,
parts[0].parse::<u64>().unwrap().into(),
0,
parts[0].parse::<u64>().unwrap().into(),
);
let difficulty = BigUint::from_str_radix(&parts[1], 10).unwrap();

View File

@@ -1017,6 +1017,7 @@ pub async fn verify_transactions(
Err(e) => {
warn!(target: "validator::verification::verify_transactions", "Transaction verification failed: {e}");
erroneous_txs.push(tx.clone());
overlay.lock().unwrap().overlay.lock().unwrap().purge_new_trees()?;
overlay.lock().unwrap().revert_to_checkpoint()?;
continue
}
@@ -1036,6 +1037,7 @@ pub async fn verify_transactions(
tx.hash()
);
erroneous_txs.push(tx.clone());
overlay.lock().unwrap().overlay.lock().unwrap().purge_new_trees()?;
overlay.lock().unwrap().revert_to_checkpoint()?;
break
}