mmproxy: Rewrite daemon for solomining support.

The Stratum stuff works for pools only.
This commit is contained in:
parazyd
2023-12-01 12:25:05 +01:00
parent 6da3a4b637
commit 047a1e99da
7 changed files with 742 additions and 987 deletions

496
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -18,7 +18,10 @@ log = "0.4.20"
# Monero
monero = {version = "0.19.0", features = ["full"]}
# HTTP RPC
surf = "2.3.2"
tide = "0.17.0-beta.1"
# Encoding
bs58 = "0.5.0"

View File

@@ -1,3 +1,6 @@
# darkfi-mmproxy daemon listen URL
#listen = "http://127.0.0.1:3333"
[monerod]
# Monero network to use (mainnet/testnet)
network = "mainnet"

View File

@@ -1,31 +0,0 @@
/* This file is part of DarkFi (https://dark.fi)
*
* Copyright (C) 2020-2023 Dyne.org foundation
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
use darkfi::rpc::jsonrpc::ErrorCode;
/// Custom RPC error implementations
pub enum RpcError {
InvalidWorkerLogin = -32110,
UnsupportedMiningAlgo = -32111,
}
impl From<RpcError> for ErrorCode {
fn from(x: RpcError) -> ErrorCode {
ErrorCode::ServerError(x as i32)
}
}

View File

@@ -16,40 +16,23 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
use std::{
collections::{HashMap, HashSet},
sync::Arc,
};
use std::sync::Arc;
use darkfi::{
async_daemonize, cli_desc,
rpc::{
jsonrpc::{ErrorCode, JsonError, JsonRequest, JsonResult},
server::{listen_and_serve, RequestHandler},
},
system::{StoppableTask, StoppableTaskPtr},
Error, Result,
};
use darkfi_serial::async_trait;
use log::{error, info};
use darkfi::{async_daemonize, cli_desc, rpc::util::JsonValue, Error, Result};
use log::{debug, error, info};
use serde::Deserialize;
use smol::{
lock::{Mutex, MutexGuard, RwLock},
net::TcpStream,
stream::StreamExt,
Executor,
};
use smol::{net::TcpStream, stream::StreamExt, Executor};
use structopt::StructOpt;
use structopt_toml::StructOptToml;
use surf::StatusCode;
use url::Url;
use uuid::Uuid;
mod error;
mod stratum;
const CONFIG_FILE: &str = "darkfi_mmproxy.toml";
const CONFIG_FILE_CONTENTS: &str = include_str!("../darkfi_mmproxy.toml");
/// Monero RPC functions
mod monerod;
#[derive(Clone, Debug, Deserialize, StructOpt, StructOptToml)]
#[serde(default)]
#[structopt(name = "darkfi-mmproxy", about = cli_desc!())]
@@ -62,13 +45,9 @@ struct Args {
/// Configuration file to use
config: Option<String>,
#[structopt(long, default_value = "tcp://127.0.0.1:3333")]
/// mmproxy JSON-RPC server listen URL
rpc_listen: Url,
#[structopt(long)]
/// List of worker logins
workers: Vec<String>,
#[structopt(long, default_value = "http://127.0.0.1:3333")]
// mmproxy daemon listen URL
listen: Url,
#[structopt(long)]
/// Set log file output
@@ -82,35 +61,31 @@ struct Args {
#[structopt()]
struct MonerodArgs {
#[structopt(long, default_value = "mainnet")]
/// Mining reward wallet address
/// Monero network type (mainnet/testnet)
network: String,
#[structopt(long, default_value = "http://127.0.0.1:28081/json_rpc")]
#[structopt(long, default_value = "http://127.0.0.1:18081")]
/// monerod JSON-RPC server listen URL
rpc: Url,
}
/// Mining proxy state
struct MiningProxy {
/// monerod network type
monerod_network: monero::Network,
/// monerod RPC address
monerod_rpc: Url,
/// Workers UUIDs
workers: Arc<RwLock<HashMap<Uuid, stratum::Worker>>>,
/// JSON-RPC connection tracker
rpc_connections: Mutex<HashSet<StoppableTaskPtr>>,
/// Main async executor reference
executor: Arc<Executor<'static>>,
}
impl MiningProxy {
async fn new(monerod: MonerodArgs, executor: Arc<Executor<'static>>) -> Result<Self> {
let monerod_network = match monerod.network.as_str() {
/// Instantiate `MiningProxy` state
async fn new(monerod: MonerodArgs) -> Result<Self> {
let monerod_network = match monerod.network.to_lowercase().as_str() {
"mainnet" => monero::Network::Mainnet,
"testnet" => monero::Network::Testnet,
_ => {
error!("Invalid Monero network \"{}\"", monerod.network);
return Err(Error::Custom("Invalid Monero network".to_string()))
return Err(Error::Custom(format!("Invalid Monero network \"{}\"", monerod.network)))
}
};
@@ -120,60 +95,84 @@ impl MiningProxy {
return Err(e.into())
}
let workers = Arc::new(RwLock::new(HashMap::new()));
let rpc_connections = Mutex::new(HashSet::new());
Ok(Self { monerod_network, monerod_rpc: monerod.rpc, workers, rpc_connections, executor })
}
}
#[async_trait]
#[rustfmt::skip]
impl RequestHandler for MiningProxy {
async fn handle_request(&self, req: JsonRequest) -> JsonResult {
match req.method.as_str() {
"ping" => self.pong(req.id, req.params).await,
// Stratum methods
"login" => self.stratum_login(req.id, req.params).await,
"submit" => self.stratum_submit(req.id, req.params).await,
"keepalived" => self.stratum_keepalived(req.id, req.params).await,
_ => JsonError::new(ErrorCode::MethodNotFound, None, req.id).into(),
}
}
async fn connections_mut(&self) -> MutexGuard<'_, HashSet<StoppableTaskPtr>> {
self.rpc_connections.lock().await
Ok(Self { monerod_network, monerod_rpc: monerod.rpc })
}
}
async_daemonize!(realmain);
async fn realmain(args: Args, ex: Arc<Executor<'static>>) -> Result<()> {
info!("Starting DarkFi x Monero merge mining proxy...");
info!("Starting DarkFi x Monero merge mining proxy");
let mmproxy = Arc::new(MiningProxy::new(args.monerod, ex.clone()).await?);
let mmproxy = Arc::new(MiningProxy::new(args.monerod).await?);
let mut app = tide::with_state(mmproxy);
info!("Starting JSON-RPC server");
let rpc_task = StoppableTask::new();
rpc_task.clone().start(
listen_and_serve(args.rpc_listen, mmproxy.clone(), None, ex.clone()),
|res| async move {
match res {
Ok(()) | Err(Error::RpcServerStopped) => mmproxy.stop_connections().await,
Err(e) => error!("Failed stopping JSON-RPC server: {}", e),
// monerod `/getheight` endpoint proxy
app.at("/getheight").get(|req: tide::Request<Arc<MiningProxy>>| async move {
let mmproxy = req.state();
let return_data = mmproxy.monerod_get_height().await?;
let return_data = return_data.stringify()?;
debug!(target: "monerod::getheight", "<-- {}", return_data);
Ok(return_data)
});
// monerod `/getinfo` endpoint proxy
app.at("/getinfo").get(|req: tide::Request<Arc<MiningProxy>>| async move {
let mmproxy = req.state();
let return_data = mmproxy.monerod_get_info().await?;
let return_data = return_data.stringify()?;
debug!(target: "monerod::getinfo", "<-- {}", return_data);
Ok(return_data)
});
// monerod `/json_rpc` endpoint proxy
app.at("/json_rpc").post(|mut req: tide::Request<Arc<MiningProxy>>| async move {
let json_str: JsonValue = match req.body_string().await {
Ok(v) => v.parse()?,
Err(e) => return Err(e),
};
let JsonValue::Object(ref request) = json_str else {
return Err(surf::Error::new(
StatusCode::BadRequest,
Error::Custom("Invalid JSONRPC request".to_string()),
))
};
if !request.contains_key("method") || !request["method"].is_string() {
return Err(surf::Error::new(
StatusCode::BadRequest,
Error::Custom("Invalid JSONRPC request".to_string()),
))
}
let mmproxy = req.state();
let method = request["method"].get::<String>().unwrap();
// For XMRig we only have to handle 2 methods:
let return_data = match method.as_str() {
"getblocktemplate" => mmproxy.monerod_getblocktemplate(&json_str).await?,
"submitblock" => mmproxy.monerod_submit_block(&json_str).await?,
_ => {
return Err(surf::Error::new(
StatusCode::BadRequest,
Error::Custom("Invalid JSONRPC request".to_string()),
))
}
},
Error::RpcServerStopped,
ex.clone(),
);
};
info!("Merge mining proxy ready, waiting for connections...");
let return_data = return_data.stringify()?;
let log_tgt = format!("monerod::{}", method);
debug!(target: &log_tgt, "<-- {}", return_data);
Ok(return_data)
});
ex.spawn(async move { app.listen(args.listen).await.unwrap() }).detach();
info!("Merge mining proxy ready, waiting for connections");
// Signal handling for graceful termination.
let (signals_handler, signals_task) = SignalHandler::new(ex)?;
signals_handler.wait_termination(signals_task).await?;
info!("Caught termination signal, cleaning up and exiting...");
info!("Caught termination signal, cleaning up and exiting");
Ok(())
}

View File

@@ -0,0 +1,237 @@
/* This file is part of DarkFi (https://dark.fi)
*
* Copyright (C) 2020-2023 Dyne.org foundation
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
use std::{collections::HashMap, str::FromStr};
use darkfi::{
rpc::{
jsonrpc::{JsonRequest, JsonResponse},
util::JsonValue,
},
Error, Result,
};
use log::{debug, error};
use monero::blockdata::transaction::{ExtraField, RawExtraField, SubField::MergeMining};
use super::MiningProxy;
impl MiningProxy {
/// Perform a JSON-RPC GET request to monerod's endpoint with the given method
async fn monero_get_request(&self, method: &str) -> Result<JsonValue> {
let endpoint = format!("{}{}", self.monerod_rpc, method);
debug!(target: "monerod::monero_get_request", "--> {}", endpoint);
let mut rep = match surf::get(&endpoint).await {
Ok(v) => v,
Err(e) => {
error!(
target: "monerod::monero_get_request",
"Failed sending GET request to monerod: {}", e,
);
return Err(Error::Custom(format!("Failed sending GET request to monerod: {}", e)))
}
};
let json_str: JsonValue = match rep.body_string().await {
Ok(v) => match v.parse() {
Ok(v) => v,
Err(e) => {
error!(
target: "monerod::monero_get_request",
"Failed parsing JSON body string from monerod GET request response: {}", e,
);
return Err(Error::Custom(format!(
"Failed parsing JSON body string from monerod GET request response: {}",
e
)))
}
},
Err(e) => {
error!(
target: "monerod::monero_get_request",
"Failed parsing body string from monerod GET request response: {}", e,
);
return Err(Error::Custom(format!(
"Failed parsing body string from monerod GET request response: {}",
e
)))
}
};
Ok(json_str)
}
/// Perform a JSON-RPC POST request to monerod's endpoint with the given method
/// and JSON-RPC request
pub async fn monero_post_request(&self, req: JsonRequest) -> Result<JsonValue> {
let endpoint = format!("{}json_rpc", self.monerod_rpc);
debug!(target: "monerod::monero_post_request", "--> {}", endpoint);
let client = surf::Client::new();
let mut response = match client
.get(endpoint)
.header("Content-Type", "application/json")
.body(req.stringify()?)
.send()
.await
{
Ok(v) => v,
Err(e) => {
error!(
target: "monerod::monero_post_request",
"Failed sending monerod RPC POST request: {}", e,
);
return Err(Error::Custom(format!("Failed sending monerod RPC POST request: {}", e)))
}
};
let response_bytes = match response.body_bytes().await {
Ok(v) => v,
Err(e) => {
error!(
target: "monerod::monero_post_request",
"Failed decoding monerod RPC POST response body: {}", e,
);
return Err(Error::Custom(format!(
"Failed decoding monerod RPC POST response body: {}",
e,
)))
}
};
let response_string = match String::from_utf8(response_bytes) {
Ok(v) => v,
Err(e) => {
error!(
target: "monerod::monero_post_request",
"Failed decoding UTF8 string from monerod RPC POST response body: {}", e,
);
return Err(Error::Custom(format!(
"Failed decoding UTF8 string from monerod RPC POST response body: {}",
e,
)))
}
};
let response_json: JsonValue = match response_string.parse() {
Ok(v) => v,
Err(e) => {
error!(
target: "monerod::monero_post_request",
"Failed parsing JSON string from monerod RPC POST response body: {}", e,
);
return Err(Error::Custom(format!(
"Failed parsing JSON string from monerod RPC POST response body: {}",
e
)))
}
};
Ok(response_json)
}
/// Proxy the `getheight` RPC request
pub async fn monerod_get_height(&self) -> Result<JsonValue> {
let rep = self.monero_get_request("getheight").await?;
Ok(rep)
}
/// Proxy the `getinfo` RPC request
pub async fn monerod_get_info(&self) -> Result<JsonValue> {
let rep = self.monero_get_request("getinfo").await?;
Ok(rep)
}
/// Proxy the `submitblock` RPC request
pub async fn monerod_submit_block(&self, req: &JsonValue) -> Result<JsonValue> {
let request = JsonRequest::try_from(req)?;
let response = self.monero_post_request(request).await?;
Ok(response)
}
/// Perform the `getblocktemplate` request and modify it with the necessary
/// merge mining data.
pub async fn monerod_getblocktemplate(&self, req: &JsonValue) -> Result<JsonValue> {
let mut request = JsonRequest::try_from(req)?;
if !request.params.is_object() {
return Err(Error::Custom("Invalid request".to_string()))
}
let params: &mut HashMap<String, JsonValue> = request.params.get_mut().unwrap();
if !params.contains_key("wallet_address") || !params.contains_key("reserve_size") {
return Err(Error::Custom("Invalid request".to_string()))
}
let Some(wallet_address) = params["wallet_address"].get::<String>() else {
return Err(Error::Custom("Invalid request".to_string()))
};
let Ok(wallet_address) = monero::Address::from_str(wallet_address) else {
return Err(Error::Custom("Invalid request".to_string()))
};
if wallet_address.network != self.monerod_network {
return Err(Error::Custom("Monero network address mismatch".to_string()))
}
if wallet_address.addr_type != monero::AddressType::Standard {
return Err(Error::Custom("Non-standard Monero address".to_string()))
}
// Create the Merge Mining data
let mm_tag = MergeMining(Some(monero::VarInt(32)), monero::Hash([0_u8; 32]));
// Construct `tx_extra` from all the extra fields we have to add to
// the coinbase transaction in the block we're mining.
let tx_extra: RawExtraField = ExtraField(vec![mm_tag]).into();
// Modify the params `reserve_size` to fit our Merge Mining data
*params.get_mut("reserve_size").unwrap() = (tx_extra.0.len() as f64).into();
// Perform the `getblocktemplate` call:
let gbt_response = self.monero_post_request(request).await?;
let mut gbt_response = JsonResponse::try_from(&gbt_response)?;
let gbt_result: &mut HashMap<String, JsonValue> = gbt_response.result.get_mut().unwrap();
// Now we have to modify the block template:
let mut block_template = monero::consensus::deserialize::<monero::Block>(
&hex::decode(gbt_result["blocktemplate_blob"].get::<String>().unwrap()).unwrap(),
)
.unwrap();
// Update coinbase tx with our extra field
block_template.miner_tx.prefix.extra = tx_extra;
// Update `blocktemplate_blob` with the modified block:
gbt_result.insert(
"blocktemplate_blob".to_string(),
hex::encode(monero::consensus::serialize(&block_template)).into(),
);
// Update `blockhashing_blob` in order to perform correct PoW:
gbt_result.insert(
"blockhashing_blob".to_string(),
hex::encode(block_template.serialize_hashable()).into(),
);
// Return the modified JSON response
Ok((&gbt_response).into())
}
}

View File

@@ -1,792 +0,0 @@
/* This file is part of DarkFi (https://dark.fi)
*
* Copyright (C) 2020-2023 Dyne.org foundation
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
use std::{collections::HashMap, io, str::FromStr, sync::Arc, time::Duration};
use darkfi::{
rpc::{
jsonrpc::{
ErrorCode::{InternalError, InvalidParams, ServerError},
JsonError, JsonRequest, JsonResponse, JsonResult, JsonSubscriber,
},
util::JsonValue,
},
system::{sleep, timeout::timeout, StoppableTask, StoppableTaskPtr},
Error, Result,
};
use log::{debug, error, info, warn};
use monero::blockdata::transaction::{ExtraField, RawExtraField, SubField::MergeMining};
use num_bigint::BigUint;
use smol::{channel, lock::RwLock};
use url::Url;
use uuid::Uuid;
use super::{error::RpcError, MiningProxy};
/// Algo string representing Monero's RandomX
pub const RANDOMX_ALGO: &str = "rx/0";
/// A mining job instance
#[derive(Clone)]
struct MiningJob {
/// Current job ID for the worker
pub job_id: blake3::Hash,
/// Full block being mined
pub block: monero::Block,
/// Difficulty target,
pub target: String,
/// Block height
pub height: f64,
/// RandomX seed hash
pub seed_hash: String,
}
/// Single worker connected to the mining proxy
pub struct Worker {
/// Wallet address
addr: monero::Address,
/// Miner useragent
_agent: String,
/// JSON-RPC notification subscriber, used to send new job notifications
job_sub: JsonSubscriber,
/// Background keepalive task reference
_ka_task: StoppableTaskPtr,
/// Keepalive sender channel, pinged from Stratum keepalived
ka_send: channel::Sender<()>,
/// Background mining job task reference
_job_task: StoppableTaskPtr,
/// Block submit trigger sender channel, pinged from Stratum submit
submit_send: channel::Sender<()>,
/// Current mining job
mining_job: MiningJob,
}
impl Worker {
async fn notify_job(&mut self, mining_job: MiningJob) -> Result<()> {
// Update the mining job
self.mining_job = mining_job.clone();
// Build notification params
let params: JsonValue = JsonValue::Object(HashMap::from([
("blob".to_string(), hex::encode(mining_job.block.serialize_hashable()).into()),
("job_id".to_string(), mining_job.job_id.to_string().into()),
("target".to_string(), mining_job.target.into()),
("height".to_string(), mining_job.height.into()),
("seed_hash".to_string(), mining_job.seed_hash.into()),
("algo".to_string(), RANDOMX_ALGO.to_string().into()),
]));
info!(
target: "worker::notify_job",
"[STRATUM] Sending mining job notification to worker",
);
self.job_sub.notify(params).await;
Ok(())
}
}
/// Send a HTTP JSON-RPC request to the given monerod RPC endpoint
async fn monerod_request(endpoint: &Url, req: JsonRequest) -> Result<JsonValue> {
let client = surf::Client::new();
let mut response = match client
.get(endpoint)
.header("Content-Type", "application/json")
.body(req.stringify().unwrap())
.send()
.await
{
Ok(v) => v,
Err(e) => {
error!(
target: "stratum::monerod_request",
"[STRATUM] Failed sending RPC request to monerod: {}", e,
);
return Err(io::Error::new(io::ErrorKind::Other, e).into())
}
};
let response_bytes = match response.body_bytes().await {
Ok(v) => v,
Err(e) => {
error!(
target: "stratum::monerod_request",
"[STRATUM] Failed reading monerod RPC response: {}", e,
);
return Err(io::Error::new(io::ErrorKind::Other, e).into())
}
};
let response_string = match String::from_utf8(response_bytes) {
Ok(v) => v,
Err(e) => {
error!(
target: "stratum::monerod_request",
"[STRATUM] Failed parsing monerod RPC response: {}", e,
);
return Err(io::Error::new(io::ErrorKind::Other, e).into())
}
};
let response_json: JsonValue = match response_string.parse() {
Ok(v) => v,
Err(e) => {
error!(
target: "stratum::monerod_request",
"[STRATUM] Failed parsing monerod RPC response JSON: {}", e,
);
return Err(io::Error::new(io::ErrorKind::Other, e).into())
}
};
Ok(response_json)
}
/// Perform getblocktemplate from monerod and inject it with the
/// necessary merge mining data.
/// Returns data necessary to create a mining job
async fn getblocktemplate(endpoint: &Url, wallet_address: &monero::Address) -> Result<MiningJob> {
// Create the Merge Mining Tag: (`depth`, `merkle_root`)
let mm_tag = MergeMining(Some(monero::VarInt(32)), monero::Hash([0_u8; 32]));
// Construct `tx_extra` from all the extra fields we have to
// add to the coinbase transaction in the block we're mining
let tx_extra: RawExtraField = ExtraField(vec![mm_tag]).into();
// Create the monerod JSON-RPC request. `reserve_size` is the space
// we need to create for the `tx_extra` field created above.
let req = JsonRequest::new(
"get_block_template",
HashMap::from([
("wallet_address".to_string(), wallet_address.to_string().into()),
("reserve_size".to_string(), (tx_extra.0.len() as f64).into()),
])
.into(),
);
// Get block template from monerod
info!(target: "stratum::getblocktemplate", "[STRATUM] Sending getblocktemplate to monero");
let rep = match monerod_request(endpoint, req).await {
Ok(v) => v,
Err(e) => {
error!(
target: "stratum::getblocktemplate",
"[STRATUM] Failed sending getblocktemplate to monerod: {}", e,
);
return Err(io::Error::new(io::ErrorKind::Other, e).into())
}
};
// Now we have to modify the block template:
// * Update the coinbase tx with our tx_extra field
// * Update the `blockhashing_blob` in order to perform correct PoW
// Deserialize the block template
let mut block_template = monero::consensus::deserialize::<monero::Block>(
&hex::decode(rep["result"]["blocktemplate_blob"].get::<String>().unwrap()).unwrap(),
)
.unwrap();
// Modify the coinbase tx with our additional merge mining data
block_template.miner_tx.prefix.extra = tx_extra;
// Decode the difficulty and calculate the mining target
let mut difficulty_hex = rep["result"]["wide_difficulty"]
.get::<String>()
.unwrap()
.strip_prefix("0x")
.unwrap()
.to_string();
// Needed because hex::decode doesn't accept odd-length
if difficulty_hex.len() % 2 != 0 {
difficulty_hex = format!("0{}", difficulty_hex);
}
let difficulty_raw = hex::decode(&difficulty_hex).unwrap();
let difficulty = BigUint::from_radix_be(&difficulty_raw, 16).unwrap();
// Calculate the target. XMRig expects the 64 least significant bits.
let target_raw = BigUint::from_bytes_be(&[0xFF; 32]) / &difficulty;
// This iterator is ordered least significant first
let target_lsb: u64 = target_raw.iter_u64_digits().take(1).next().unwrap();
let target = hex::encode(target_lsb.to_be_bytes());
assert!(target.len() == 16);
info!(target: "stratum::getblocktemplate", "[STRATUM] Difficulty: {}", difficulty_hex);
info!(target: "stratum::getblocktemplate", "[STRATUM] Target: {}", target);
// Get the remaining metadata
let height = *rep["result"]["height"].get::<f64>().unwrap();
let seed_hash = rep["result"]["seed_hash"].get::<String>().unwrap().to_string();
// Create a deterministic job id
let mut hasher = blake3::Hasher::new();
hasher.update(&wallet_address.as_bytes());
hasher.update(&height.to_le_bytes());
hasher.update(seed_hash.as_bytes());
let job_id = hasher.finalize();
// Return the necessary data
Ok(MiningJob { job_id, block: block_template, target, height, seed_hash })
}
impl MiningProxy {
/// Background task listening for keepalives from a worker.
/// If timeout is reached, the worker will be dropped.
async fn keepalive_task(
workers: Arc<RwLock<HashMap<Uuid, Worker>>>,
uuid: Uuid,
ka_recv: channel::Receiver<()>,
) -> Result<()> {
debug!(target: "stratum::keepalive_task", "Spawned keepalive_task for worker {}", uuid);
const TIMEOUT: Duration = Duration::from_secs(65);
loop {
let Ok(r) = timeout(TIMEOUT, ka_recv.recv()).await else {
// Timeout, remove worker
warn!(
target: "stratum::keepalive_task",
"keepalive_task for worker {} timed out", uuid,
);
workers.write().await.remove(&uuid);
break
};
match r {
Ok(()) => {
debug!(
target: "stratum::keepalive_task",
"keepalive_task for worker {} got ping", uuid,
);
continue
}
Err(e) => {
error!(
target: "stratum::keepalive_task",
"keepalive_task for worker {} channel recv error: {}", uuid, e,
);
warn!(
target: "stratum::keepalive_task",
"Dropping worker {}", uuid,
);
workers.write().await.remove(&uuid);
break
}
}
}
Ok(())
}
/// Background task used to notify a worker about new mining jobs.
/// `keepalive_task` iis able to remove workers from the worker pool,
/// so this task can easily exit if the worker is not found.
async fn job_task(
workers: Arc<RwLock<HashMap<Uuid, Worker>>>,
uuid: Uuid,
endpoint: Url,
submit_recv: channel::Receiver<()>,
) -> Result<()> {
debug!(target: "stratum::job_task", "Spawned job_task for worker {}", uuid);
const POLL_INTERVAL: Duration = Duration::from_secs(60);
// Comfy wait for settling the Stratum login RPC call
sleep(5).await;
// In this loop, we'll be getting the block template for mining.
// At the beginning of the loop, we'll perform a getblocktemplate,
// and then inject our Merge Mining stuff, and forward it to the
// miner. After the notification, we'll either poll or wait for a
// trigger for a submitted block and reiterate the loop again in
// order to get the next mining job.
loop {
// Get the workers lock and the worker reference
debug!(target: "stratum::job_task", "Acquiring workers write lock...");
let mut workers_ptr = workers.write().await;
debug!(target: "stratum::job_task", "Acquired workers write lock");
let Some(worker) = workers_ptr.get_mut(&uuid) else {
info!(
target: "stratum::job_task",
"[STRATUM] Worker {} disconnected, exiting job_task", uuid,
);
break
};
// Get the next mining job
let mining_job = match getblocktemplate(&endpoint, &worker.addr).await {
Ok(v) => v,
Err(e) => {
error!(
target: "stratum::job_task",
"[STRATUM] Failed fetching getblocktemplate for worker {}: {}", uuid, e,
);
warn!(
target: "stratum::job_task",
"[STRATUM] Exiting job_task for worker {}", uuid,
);
break
}
};
// In case it's the same job, we'll wait and try again
if worker.mining_job.job_id == mining_job.job_id {
// Drop the workers lock before sleeping.
drop(workers_ptr);
match timeout(POLL_INTERVAL, submit_recv.recv()).await {
Ok(_) => continue,
Err(_) => continue,
}
}
// Notify the worker about the new job
if let Err(e) = worker.notify_job(mining_job).await {
error!(
target: "stratum::job_task",
"[STRATUM] Failed sending job to worker {}: {}", uuid, e,
);
warn!(
target: "stratum::job_task",
"[STRATUM] Exiting job_task for worker {}", uuid,
);
break
}
// Drop the workers lock before sleeping.
drop(workers_ptr);
// Now poll or wait for a trigger for a new job.
match timeout(POLL_INTERVAL, submit_recv.recv()).await {
Ok(_) => continue,
Err(_) => continue,
}
}
Ok(())
}
/// Stratum login method
///
/// `darkfi-mmproxy` will check that the worker provided a valid
/// address as the username, and will enforce `RANDOMX_ALGO` to
/// be supported. Upon success, we will fetch the block template
/// from monerod, inject it with our necessary merge mining info,
/// and forward it to the worker.
/// Additionally, we will spawn background tasks for new job and
/// keepalive notifications for this worker.
pub async fn stratum_login(&self, id: u16, params: JsonValue) -> JsonResult {
let Some(params) = params.get::<HashMap<String, JsonValue>>() else {
return JsonError::new(InvalidParams, None, id).into()
};
if !params.contains_key("login") ||
!params.contains_key("pass") ||
!params.contains_key("agent") ||
!params.contains_key("algo")
{
return JsonError::new(InvalidParams, None, id).into()
}
let Some(login) = params["login"].get::<String>() else {
return JsonError::new(InvalidParams, Some("Invalid \"login\" object".to_string()), id)
.into()
};
let Some(_pass) = params["pass"].get::<String>() else {
return JsonError::new(InvalidParams, Some("Invalid \"pass\" object".to_string()), id)
.into()
};
let Some(agent) = params["agent"].get::<String>() else {
return JsonError::new(InvalidParams, Some("Invalid \"agent\" object".to_string()), id)
.into()
};
let Some(algos) = params["algo"].get::<Vec<JsonValue>>() else {
return JsonError::new(InvalidParams, Some("Invalid \"algo\" object".to_string()), id)
.into()
};
// We'll only support `RANDOMX_ALGO`
let mut found_randomx_algo = false;
for algo in algos.iter() {
if !algo.is_string() {
return JsonError::new(InvalidParams, Some("Algo is not a string".to_string()), id)
.into()
}
if algo.get::<String>().unwrap() == RANDOMX_ALGO {
found_randomx_algo = true;
break
}
}
if !found_randomx_algo {
return JsonError::new(
RpcError::UnsupportedMiningAlgo.into(),
Some("Unsupported mining algos".to_string()),
id,
)
.into()
}
// Check valid login. We will parse the username as a Monero
// address, and validate that it corresponds to the network
// we're mining on.
let addr = match monero::Address::from_str(login) {
Ok(v) => v,
Err(e) => {
return JsonError::new(
RpcError::InvalidWorkerLogin.into(),
Some(format!("Invalid Monero address login: {}", e)),
id,
)
.into()
}
};
if addr.network != self.monerod_network {
return JsonError::new(
RpcError::InvalidWorkerLogin.into(),
Some(format!(
"Invalid Monero address network, expected \"{:?}\"",
self.monerod_network
)),
id,
)
.into()
}
if addr.addr_type != monero::AddressType::Standard {
return JsonError::new(
RpcError::InvalidWorkerLogin.into(),
Some(format!(
"Invalid Monero address type, expected \"{}\"",
monero::AddressType::Standard
)),
id,
)
.into()
}
// Now we have a valid address for mining.
// Create a new UUID for the worker, and initialize the `Worker`
// struct that will live throughout the miner's lifetime.
let worker_uuid = Uuid::new_v4();
// Create job subscriber
let job_sub = JsonSubscriber::new("job");
// Create keepalive channel
let (ka_send, ka_recv) = channel::unbounded();
// Create submit trigger channel
let (submit_send, submit_recv) = channel::unbounded();
// Create background keepalive task
let ka_task = StoppableTask::new();
// Create background job task
let job_task = StoppableTask::new();
// Get the current mining job for the worker
let mining_job = match getblocktemplate(&self.monerod_rpc, &addr).await {
Ok(v) => v,
Err(e) => {
error!(
target: "stratum::login",
"[STRATUM] Failed fetching block template for worker: {}", e,
);
return JsonError::new(InternalError, None, id).into()
}
};
// Create worker
let worker = Worker {
addr,
_agent: agent.clone(),
job_sub: job_sub.clone(),
_ka_task: ka_task.clone(),
ka_send,
_job_task: job_task.clone(),
submit_send,
mining_job: mining_job.clone(),
};
// Insert the worker into connections map
self.workers.write().await.insert(worker_uuid, worker);
// Spawn keepalive background task
ka_task.start(
Self::keepalive_task(self.workers.clone(), worker_uuid, ka_recv),
move |_| async move { debug!("keepalive_task for {} exited", worker_uuid) },
Error::DetachedTaskStopped,
self.executor.clone(),
);
// Spawn job notification background task
job_task.start(
Self::job_task(
self.workers.clone(),
worker_uuid,
self.monerod_rpc.clone(),
submit_recv,
),
move |_| async move { debug!("job_task for {} exited", worker_uuid) },
Error::DetachedTaskStopped,
self.executor.clone(),
);
info!("[STRATUM] Added worker {}", worker_uuid);
// Finally, we return the job notification subscriber, along with the
// initial job response as noted in:
// https://github.com/xmrig/xmrig-proxy/blob/master/doc/STRATUM.md#example-success-reply
let blob = hex::encode(mining_job.block.serialize_hashable());
let response = JsonResponse::new(
HashMap::from([
("status".to_string(), "OK".to_string().into()),
("id".to_string(), worker_uuid.to_string().into()),
(
"extensions".to_string(),
vec!["algo".to_string().into(), "keepalive".to_string().into()].into(),
),
(
"job".to_string(),
HashMap::from([
("blob".to_string(), blob.into()),
("job_id".to_string(), mining_job.job_id.to_string().into()),
("target".to_string(), mining_job.target.to_string().into()),
("height".to_string(), mining_job.height.into()),
("seed_hash".to_string(), mining_job.seed_hash.to_string().into()),
("algo".to_string(), RANDOMX_ALGO.to_string().into()),
])
.into(),
),
])
.into(),
id,
);
JsonResult::SubscriberWithReply(job_sub, response)
}
/// Stratum submit method
///
/// The miner submits the request after a share was found.
pub async fn stratum_submit(&self, id: u16, params: JsonValue) -> JsonResult {
let Some(params) = params.get::<HashMap<String, JsonValue>>() else {
return JsonError::new(InvalidParams, None, id).into()
};
if !params.contains_key("id") ||
!params.contains_key("job_id") ||
!params.contains_key("nonce") ||
!params.contains_key("result") ||
!params.contains_key("algo")
{
return JsonError::new(InvalidParams, None, id).into()
}
// Validate all the parameters
let Some(worker_uuid) = params["id"].get::<String>() else {
error!(target: "stratum::submit", "[STRATUM] Missing \"id\" field for stratum::submit");
return JsonError::new(InvalidParams, Some("Missing \"id\" field".to_string()), id)
.into()
};
let Ok(worker_uuid) = Uuid::try_from(worker_uuid.as_str()) else {
error!(target: "stratum::submit", "[STRATUM] Invalid \"id\" field for stratum::submit");
return JsonError::new(InvalidParams, Some("Invalid \"id\" field".to_string()), id)
.into()
};
let Some(job_id) = params["job_id"].get::<String>() else {
error!(target: "stratum::submit", "[STRATUM] Missing \"job_id\" field for stratum::submit");
return JsonError::new(InvalidParams, Some("Missing \"job_id\" field".to_string()), id)
.into()
};
let Ok(job_id) = blake3::Hash::from_str(job_id) else {
error!(target: "stratum::submit", "[STRATUM] Invalid \"job_id\" field for stratum::submit");
return JsonError::new(InvalidParams, Some("Invalid \"job_id\" field".to_string()), id)
.into()
};
let Some(nonce) = params["nonce"].get::<String>() else {
error!(target: "stratum::submit", "[STRATUM] Missing \"nonce\" field for stratum::submit");
return JsonError::new(InvalidParams, Some("Missing \"nonce\" field".to_string()), id)
.into()
};
let Ok(nonce) = u32::from_str_radix(nonce, 16) else {
error!(target: "stratum::submit", "[STRATUM] Invalid \"nonce\" field for stratum::submit");
return JsonError::new(InvalidParams, Some("Invalid \"nonce\" field".to_string()), id)
.into()
};
let Some(_result) = params["result"].get::<String>() else {
error!(target: "stratum::submit", "[STRATUM] Missing \"result\" field for stratum::submit");
return JsonError::new(InvalidParams, Some("Invalid \"result\" field".to_string()), id)
.into()
};
let Some(algo) = params["algo"].get::<String>() else {
error!(target: "stratum::submit", "[STRATUM] Missing \"algo\" field for stratum::submit");
return JsonError::new(InvalidParams, Some("Missing \"algo\" field".to_string()), id)
.into()
};
if algo != RANDOMX_ALGO {
error!(target: "stratum::submit", "[STRATUM] Invalid \"algo\" field for stratum::submit");
return JsonError::new(InvalidParams, Some("Invalid \"algo\" field".to_string()), id)
.into()
}
// Get the worker reference and confirm this is submitted for the current job
debug!(target: "stratum::submit", "Acquiring workers read lock...");
let workers_ptr = self.workers.read().await;
debug!(target: "stratum::submit", "Acquired workers read lock");
let Some(worker) = workers_ptr.get(&worker_uuid) else {
error!(target: "stratum::submit", "[STRATUM] Unknown worker UUID for stratum::submit");
return JsonError::new(InvalidParams, Some("Unknown worker UUID".to_string()), id).into()
};
if worker.mining_job.job_id != job_id {
error!(target: "stratum::submit", "[STRATUM] Job ID mismatch for stratum::submit");
return JsonError::new(InvalidParams, Some("Job ID mismatch".to_string()), id).into()
}
// Get the block template from the worker reference and update the nonce
let mut block_template = worker.mining_job.block.clone();
block_template.header.nonce = nonce;
// Submit the block to monerod
let block = monero::consensus::serialize_hex(&block_template);
let params: JsonValue = vec![block.into()].into();
let req = JsonRequest::new("submit_block", params);
let resp = match monerod_request(&self.monerod_rpc, req).await {
Ok(v) => v,
Err(e) => {
error!(
target: "stratum::submit",
"[STRATUM] Failed submitting block to monerod: {}", e,
);
return JsonError::new(
InternalError,
Some("Failed submitting block".to_string()),
id,
)
.into()
}
};
// Ping the job_task to reiterate.
// We don't release the lock after this, so that we can hopefully first
// return the result of the `submit` call, and then unlock job_task for
// the new notification.
let _ = worker.submit_send.send(()).await;
match JsonResult::try_from_value(&resp) {
Ok(JsonResult::Response(r)) => {
info!(
target: "stratum::submit",
"[STRATUM] Sucessfully submitted block to monerod: {:?}", r,
);
let result = HashMap::from([("status".to_string(), "OK".to_string().into())]);
JsonResponse::new(result.into(), id).into()
}
Ok(JsonResult::Error(e)) => {
error!(
target: "stratum::submit",
"[STRATUM] Failed submitting block to monerod: {:?}", e,
);
JsonError::new(ServerError(e.error.code), Some(e.error.message), id).into()
}
Ok(x) => {
error!(
target: "stratum::submit",
"[STRATUM] Unexpected RPC reply from monerod: {:?}", x,
);
JsonError::new(InternalError, Some("Failed submitting block".to_string()), id)
.into()
}
Err(e) => {
error!(
target: "stratum::submit",
"[STRATUM] Unexpected RPC reply from monerod: {}", e,
);
JsonError::new(InternalError, Some("Failed submitting block".to_string()), id)
.into()
}
}
}
/// Nonstandard, but widely supported protocol extension.
/// The miner sends `keepalived` to prevent connection timeout.
/// `darkfi-mmproxy` makes having keepalived mandatory.
pub async fn stratum_keepalived(&self, id: u16, params: JsonValue) -> JsonResult {
let Some(params) = params.get::<HashMap<String, JsonValue>>() else {
return JsonError::new(InvalidParams, None, id).into()
};
if !params.contains_key("id") {
return JsonError::new(InvalidParams, Some("Missing \"id\" field".to_string()), id)
.into()
};
let Some(worker_uuid) = params["id"].get::<String>() else {
return JsonError::new(InvalidParams, Some("Invalid \"id\" field".to_string()), id)
.into()
};
let Ok(worker_uuid) = Uuid::try_from(worker_uuid.as_str()) else {
return JsonError::new(InvalidParams, Some("Invalid \"id\" field".to_string()), id)
.into()
};
// Get the worker reference
let workers_ptr = self.workers.read().await;
let Some(worker) = workers_ptr.get(&worker_uuid) else {
return JsonError::new(InvalidParams, Some("Invalid \"id\" field".to_string()), id)
.into()
};
// Ping the keepalive task
if let Err(e) = worker.ka_send.send(()).await {
error!(
target: "stratum::keepalived",
"[STRATUM] Keepalive task ping error for {}: {}", worker_uuid, e,
);
return JsonError::new(InternalError, None, id).into()
}
JsonResponse::new(
JsonValue::Object(HashMap::from([(
"status".to_string(),
"KEEPALIVED".to_string().into(),
)])),
id,
)
.into()
}
}