darkfid2: use minerd to mine blocks, validator: cleaned up threads info as its not longer required

This commit is contained in:
aggstam
2024-01-12 15:03:42 +02:00
parent 09137d4633
commit 7c9b3549cf
14 changed files with 134 additions and 108 deletions

View File

@@ -20,8 +20,8 @@ database = "~/.local/darkfi/darkfid_blockchain_localnet"
# Finalization threshold, denominated by number of blocks
threshold = 3
# PoW miner number of threads to use
pow_threads = 4
# minerd JSON-RPC endpoint
minerd_endpoint = "tcp://127.0.0.1:28467"
# PoW block production target, in seconds
pow_target = 10
@@ -164,8 +164,8 @@ database = "~/.local/darkfi/darkfid_blockchain_testnet"
# Finalization threshold, denominated by number of blocks
threshold = 6
# PoW miner number of threads to use
pow_threads = 6
# minerd JSON-RPC endpoint
minerd_endpoint = "tcp://127.0.0.1:28467"
# PoW block production target, in seconds
pow_target = 90
@@ -323,8 +323,8 @@ database = "~/.local/darkfi/darkfid_blockchain_mainnet"
# Finalization threshold, denominated by number of blocks
threshold = 11
# PoW miner number of threads to use
pow_threads = 8
# minerd JSON-RPC endpoint
minerd_endpoint = "tcp://127.0.0.1:28467"
# PoW block production target, in seconds
pow_target = 90

View File

@@ -34,6 +34,9 @@ pub enum RpcError {
// Contract-related errors
ContractZkasDbNotFound = -32200,
// Misc errors
PingFailed = -32300,
}
fn to_tuple(e: RpcError) -> (i32, String) {
@@ -48,6 +51,8 @@ fn to_tuple(e: RpcError) -> (i32, String) {
RpcError::ParseError => "Parse error",
// Contract-related errors
RpcError::ContractZkasDbNotFound => "zkas database not found for given contract",
// Misc errors
RpcError::PingFailed => "Miner daemon ping error",
};
(e as i32, msg.to_string())

View File

@@ -33,6 +33,7 @@ use darkfi::{
cli_desc,
net::{settings::SettingsOpt, P2pPtr},
rpc::{
client::RpcClient,
jsonrpc::JsonSubscriber,
server::{listen_and_serve, RequestHandler},
},
@@ -125,9 +126,9 @@ pub struct BlockchainNetwork {
/// Finalization threshold, denominated by number of blocks
pub threshold: usize,
#[structopt(long, default_value = "4")]
/// PoW miner number of threads to use
pub pow_threads: usize,
#[structopt(long, default_value = "tcp://127.0.0.1:28467")]
/// minerd JSON-RPC endpoint
pub minerd_endpoint: Url,
#[structopt(long, default_value = "10")]
/// PoW block production target, in seconds
@@ -186,6 +187,8 @@ pub struct Darkfid {
subscribers: HashMap<&'static str, JsonSubscriber>,
/// JSON-RPC connection tracker
rpc_connections: Mutex<HashSet<StoppableTaskPtr>>,
/// JSON-RPC client to execute requests to the miner daemon
rpc_client: Option<RpcClient>,
}
impl Darkfid {
@@ -194,6 +197,7 @@ impl Darkfid {
consensus_p2p: Option<P2pPtr>,
validator: ValidatorPtr,
subscribers: HashMap<&'static str, JsonSubscriber>,
rpc_client: Option<RpcClient>,
) -> Self {
Self {
sync_p2p,
@@ -201,6 +205,7 @@ impl Darkfid {
validator,
subscribers,
rpc_connections: Mutex::new(HashSet::new()),
rpc_client,
}
}
}
@@ -252,7 +257,6 @@ async fn realmain(args: Args, ex: Arc<smol::Executor<'static>>) -> Result<()> {
let config = ValidatorConfig::new(
time_keeper,
blockchain_config.threshold,
blockchain_config.pow_threads,
blockchain_config.pow_target,
pow_fixed_difficulty,
genesis_block,
@@ -279,26 +283,48 @@ async fn realmain(args: Args, ex: Arc<smol::Executor<'static>>) -> Result<()> {
.await;
// Initialize consensus P2P network
let consensus_p2p = if blockchain_config.consensus {
Some(
spawn_consensus_p2p(
&blockchain_config.consensus_net.into(),
&validator,
&subscribers,
ex.clone(),
)
.await,
let (consensus_p2p, rpc_client) = if blockchain_config.consensus {
let Ok(rpc_client) = RpcClient::new(blockchain_config.minerd_endpoint, ex.clone()).await
else {
error!(target: "darkfid", "Failed to initialize miner daemon rpc client, check if minerd is running");
return Err(Error::RpcClientStopped)
};
(
Some(
spawn_consensus_p2p(
&blockchain_config.consensus_net.into(),
&validator,
&subscribers,
ex.clone(),
)
.await,
),
Some(rpc_client),
)
} else {
None
(None, None)
};
// Initialize node
let darkfid =
Darkfid::new(sync_p2p.clone(), consensus_p2p.clone(), validator.clone(), subscribers).await;
let darkfid = Darkfid::new(
sync_p2p.clone(),
consensus_p2p.clone(),
validator.clone(),
subscribers,
rpc_client,
)
.await;
let darkfid = Arc::new(darkfid);
info!(target: "darkfid", "Node initialized successfully!");
// Pinging minerd daemon to verify it listens
if blockchain_config.consensus {
if let Err(e) = darkfid.ping_miner_daemon().await {
error!(target: "darkfid", "Failed to ping miner daemon: {}", e);
return Err(Error::RpcClientStopped)
}
}
// JSON-RPC server
info!(target: "darkfid", "Starting JSON-RPC server");
// Here we create a task variable so we can manually close the
@@ -342,7 +368,7 @@ async fn realmain(args: Args, ex: Arc<smol::Executor<'static>>) -> Result<()> {
darkfid.validator.purge_pending_txs().await?;
// Consensus protocol
let (consensus_task, consensus_sender) = if blockchain_config.consensus {
let consensus_task = if blockchain_config.consensus {
info!(target: "darkfid", "Starting consensus protocol task");
// Grab rewards recipient public key(address)
if blockchain_config.recipient.is_none() {
@@ -353,11 +379,10 @@ async fn realmain(args: Args, ex: Arc<smol::Executor<'static>>) -> Result<()> {
Err(_) => return Err(Error::InvalidAddress),
};
let (sender, recvr) = smol::channel::bounded(1);
let task = StoppableTask::new();
task.clone().start(
// Weird hack to prevent lifetimes hell
async move { miner_task(&darkfid, &recipient, &recvr).await },
async move { miner_task(&darkfid, &recipient).await },
|res| async {
match res {
Ok(()) | Err(Error::MinerTaskStopped) => { /* Do nothing */ }
@@ -367,10 +392,10 @@ async fn realmain(args: Args, ex: Arc<smol::Executor<'static>>) -> Result<()> {
Error::MinerTaskStopped,
ex.clone(),
);
(Some(task), Some(sender))
Some(task)
} else {
info!(target: "darkfid", "Not participating in consensus");
(None, None)
None
};
// Signal handling for graceful termination.
@@ -389,8 +414,6 @@ async fn realmain(args: Args, ex: Arc<smol::Executor<'static>>) -> Result<()> {
consensus_p2p.unwrap().stop().await;
info!(target: "darkfid", "Stopping consensus task...");
// Send signal to spawned miner threads to stop
consensus_sender.unwrap().send(()).await?;
consensus_task.unwrap().stop().await;
}

View File

@@ -16,10 +16,10 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
use std::collections::HashSet;
use std::{collections::HashSet, time::Instant};
use async_trait::async_trait;
use log::debug;
use log::{debug, error};
use smol::lock::MutexGuard;
use tinyjson::JsonValue;
@@ -30,9 +30,13 @@ use darkfi::{
},
system::StoppableTaskPtr,
util::time::Timestamp,
Error, Result,
};
use crate::Darkfid;
use crate::{
error::{server_error, RpcError},
Darkfid,
};
#[async_trait]
#[rustfmt::skip]
@@ -48,6 +52,7 @@ impl RequestHandler for Darkfid {
"clock" => return self.clock(req.id, req.params).await,
"sync_dnet_switch" => return self.sync_dnet_switch(req.id, req.params).await,
"consensus_dnet_switch" => return self.consensus_dnet_switch(req.id, req.params).await,
"ping_miner" => return self.ping_miner(req.id, req.params).await,
// ==================
// Blockchain methods
@@ -139,4 +144,36 @@ impl Darkfid {
JsonResponse::new(JsonValue::Boolean(true), id).into()
}
// RPCAPI:
// Pings configured miner daemon for livenes.
// Returns `true` on success.
//
// --> {"jsonrpc": "2.0", "method": "ping_miner", "params": [], "id": 1}
// <-- {"jsonrpc": "2.0", "result": "true", "id": 1}
async fn ping_miner(&self, id: u16, _params: JsonValue) -> JsonResult {
if let Err(e) = self.ping_miner_daemon().await {
error!(target: "darkfid::rpc::ping_miner", "Failed to ping miner daemon: {}", e);
return server_error(RpcError::PingFailed, id, None)
}
JsonResponse::new(JsonValue::Boolean(true), id).into()
}
pub async fn ping_miner_daemon(&self) -> Result<()> {
debug!(target: "darkfid::ping_miner_daemon", "Pinging miner daemon...");
self.miner_daemon_request("ping", JsonValue::Array(vec![])).await?;
Ok(())
}
pub async fn miner_daemon_request(&self, method: &str, params: JsonValue) -> Result<JsonValue> {
let Some(ref rpc_client) = self.rpc_client else { return Err(Error::RpcClientStopped) };
debug!(target: "darkfid::rpc::miner_daemon_request", "Executing request {} with params: {:?}", method, params);
let latency = Instant::now();
let req = JsonRequest::new(method, params);
let rep = rpc_client.request(req).await?;
let latency = latency.elapsed();
debug!(target: "darkfid::rpc::miner_daemon_request", "Got reply: {:?}", rep);
debug!(target: "darkfid::rpc::miner_daemon_request", "Latency: {:?}", latency);
Ok(rep)
}
}

View File

@@ -18,10 +18,11 @@
use darkfi::{
blockchain::BlockInfo,
rpc::util::JsonValue,
tx::{ContractCallLeaf, Transaction, TransactionBuilder},
util::encoding::base64,
validator::{
consensus::{Fork, Proposal},
pow::PoWModule,
utils::best_forks_indexes,
},
zk::{empty_witnesses, ProvingKey, ZkCircuit},
@@ -37,21 +38,17 @@ use darkfi_sdk::{
pasta::pallas,
ContractCall,
};
use darkfi_serial::Encodable;
use darkfi_serial::{deserialize, serialize, Encodable};
use log::info;
use num_bigint::BigUint;
use rand::rngs::OsRng;
use smol::channel::Receiver;
use crate::{proto::BlockInfoMessage, Darkfid};
// TODO: handle all ? so the task don't stop on errors
/// async task used for participating in the PoW consensus protocol
pub async fn miner_task(
node: &Darkfid,
recipient: &PublicKey,
stop_signal: &Receiver<()>,
) -> Result<()> {
pub async fn miner_task(node: &Darkfid, recipient: &PublicKey) -> Result<()> {
// TODO: For now we asume we have a single miner that produces block,
// until the PoW consensus and proper validations have been added.
// The miner workflow would be:
@@ -72,17 +69,13 @@ pub async fn miner_task(
info!(target: "darkfid::task::miner_task", "Starting miner task...");
// Start miner loop
miner_loop(node, recipient, stop_signal).await?;
miner_loop(node, recipient).await?;
Ok(())
}
/// Miner loop
async fn miner_loop(
node: &Darkfid,
recipient: &PublicKey,
stop_signal: &Receiver<()>,
) -> Result<()> {
async fn miner_loop(node: &Darkfid, recipient: &PublicKey) -> Result<()> {
// Grab zkas proving keys and bin for PoWReward transaction
info!(target: "darkfid::task::miner_task", "Generating zkas bin and proving keys...");
let blockchain = node.validator.blockchain.clone();
@@ -108,10 +101,17 @@ async fn miner_loop(
info!(target: "darkfid::task::miner_task", "Miner loop starts!");
// Miner loop
loop {
// Grab next block
let (mut next_block, module) =
// Grab next target and block
let (next_target, mut next_block) =
generate_next_block(node, &mut secret, recipient, &zkbin, &pk).await?;
module.mine_block(&mut next_block, stop_signal)?;
// Execute request to minerd and parse response
let target = JsonValue::String(next_target.to_string());
let block = JsonValue::String(base64::encode(&serialize(&next_block)));
let response =
node.miner_daemon_request("mine", JsonValue::Array(vec![target, block])).await?;
let nonce_bytes = base64::decode(response.get::<String>().unwrap()).unwrap();
next_block.header.nonce = deserialize::<pallas::Base>(&nonce_bytes)?;
// Sign the mined block
next_block.sign(&secret)?;
@@ -141,7 +141,7 @@ async fn generate_next_block(
recipient: &PublicKey,
zkbin: &ZkBinary,
pk: &ProvingKey,
) -> Result<(BlockInfo, PoWModule)> {
) -> Result<(BigUint, BlockInfo)> {
// Grab a lock over nodes' current forks
let forks = node.validator.consensus.forks.read().await;
@@ -161,13 +161,13 @@ async fn generate_next_block(
let tx = generate_pow_transaction(fork, secret, recipient, zkbin, pk)?;
// Generate next block proposal
let target = fork.module.next_mine_target()?;
let next_block = node.validator.consensus.generate_unsigned_block(fork, tx).await?;
let module = fork.module.clone();
// Drop forks lock
drop(forks);
Ok((next_block, module))
Ok((target, next_block))
}
/// Auxiliary function to generate a Money::PoWReward transaction

View File

@@ -31,7 +31,7 @@ fn forks() -> Result<()> {
// Create a temporary blockchain and a PoW module
let blockchain = Blockchain::new(&sled::Config::new().temporary(true).open()?)?;
let module = PoWModule::new(blockchain.clone(), 2, 90, None)?;
let module = PoWModule::new(blockchain.clone(), 90, None)?;
// Create a fork
let fork = Fork::new(&blockchain, module).await?;

View File

@@ -43,7 +43,6 @@ use crate::{
};
pub struct HarnessConfig {
pub pow_threads: usize,
pub pow_target: usize,
pub pow_fixed_difficulty: Option<BigUint>,
pub pos_testing_mode: bool,
@@ -87,7 +86,6 @@ impl Harness {
let validator_config = ValidatorConfig::new(
time_keeper,
3,
config.pow_threads,
config.pow_target,
config.pow_fixed_difficulty.clone(),
genesis_block,
@@ -148,7 +146,6 @@ impl Harness {
.validate_blockchain(
genesis_txs_total,
vec![],
self.config.pow_threads,
self.config.pow_target,
self.config.pow_fixed_difficulty.clone(),
)
@@ -156,7 +153,6 @@ impl Harness {
bob.validate_blockchain(
genesis_txs_total,
vec![],
self.config.pow_threads,
self.config.pow_target,
self.config.pow_fixed_difficulty.clone(),
)
@@ -269,7 +265,8 @@ pub async fn generate_node(
} else {
None
};
let node = Darkfid::new(sync_p2p.clone(), consensus_p2p.clone(), validator, subscribers).await;
let node =
Darkfid::new(sync_p2p.clone(), consensus_p2p.clone(), validator, subscribers, None).await;
sync_p2p.clone().start().await?;

View File

@@ -32,11 +32,9 @@ async fn sync_pos_blocks_real(ex: Arc<Executor<'static>>) -> Result<()> {
init_logger();
// Initialize harness in testing mode
let pow_threads = 2;
let pow_target = 90;
let pow_fixed_difficulty = None;
let config = HarnessConfig {
pow_threads,
pow_target,
pow_fixed_difficulty: pow_fixed_difficulty.clone(),
pos_testing_mode: true,
@@ -76,13 +74,7 @@ async fn sync_pos_blocks_real(ex: Arc<Executor<'static>>) -> Result<()> {
let alice = &th.alice.validator;
let charlie = &charlie.validator;
charlie
.validate_blockchain(
genesis_txs_total,
vec![],
pow_threads,
pow_target,
pow_fixed_difficulty,
)
.validate_blockchain(genesis_txs_total, vec![], pow_target, pow_fixed_difficulty)
.await?;
assert_eq!(alice.blockchain.len(), charlie.blockchain.len());
assert_eq!(alice.blockchain.slots.len(), charlie.blockchain.slots.len());

View File

@@ -163,7 +163,6 @@ impl Wallet {
let config = ValidatorConfig::new(
time_keeper,
3,
1,
90,
Some(BigUint::from(1_u8)),
genesis_block.clone(),

View File

@@ -69,17 +69,12 @@ impl Consensus {
blockchain: Blockchain,
time_keeper: TimeKeeper,
finalization_threshold: usize,
pow_threads: usize,
pow_target: usize,
pow_fixed_difficulty: Option<BigUint>,
pos_testing_mode: bool,
) -> Result<Self> {
let module = RwLock::new(PoWModule::new(
blockchain.clone(),
pow_threads,
pow_target,
pow_fixed_difficulty,
)?);
let module =
RwLock::new(PoWModule::new(blockchain.clone(), pow_target, pow_fixed_difficulty)?);
Ok(Self {
blockchain,
time_keeper,

View File

@@ -73,8 +73,6 @@ pub struct ValidatorConfig {
pub time_keeper: TimeKeeper,
/// Currently configured finalization security threshold
pub finalization_threshold: usize,
/// Currently configured PoW miner number of threads to use
pub pow_threads: usize,
/// Currently configured PoW target
pub pow_target: usize,
/// Optional fixed difficulty, for testing purposes
@@ -96,7 +94,6 @@ impl ValidatorConfig {
pub fn new(
time_keeper: TimeKeeper,
finalization_threshold: usize,
pow_threads: usize,
pow_target: usize,
pow_fixed_difficulty: Option<BigUint>,
genesis_block: BlockInfo,
@@ -108,7 +105,6 @@ impl ValidatorConfig {
Self {
time_keeper,
finalization_threshold,
pow_threads,
pow_target,
pow_fixed_difficulty,
genesis_block,
@@ -171,7 +167,6 @@ impl Validator {
blockchain.clone(),
config.time_keeper,
config.finalization_threshold,
config.pow_threads,
config.pow_target,
config.pow_fixed_difficulty,
pos_testing_mode,
@@ -571,7 +566,6 @@ impl Validator {
&self,
genesis_txs_total: u64,
faucet_pubkeys: Vec<PublicKey>,
pow_threads: usize,
pow_target: usize,
pow_fixed_difficulty: Option<BigUint>,
) -> Result<()> {
@@ -592,8 +586,7 @@ impl Validator {
// Create a time keeper and a PoW module to validate each block
let mut time_keeper = self.consensus.time_keeper.clone();
let mut module =
PoWModule::new(blockchain.clone(), pow_threads, pow_target, pow_fixed_difficulty)?;
let mut module = PoWModule::new(blockchain.clone(), pow_target, pow_fixed_difficulty)?;
// Deploy native wasm contracts
deploy_native_contracts(&overlay, &time_keeper, &faucet_pubkeys).await?;

View File

@@ -77,8 +77,6 @@ const BLOCK_FUTURE_TIME_LIMIT: u64 = 60 * 60 * 2;
/// This struct represents the information required by the PoW algorithm
#[derive(Clone)]
pub struct PoWModule {
/// Number of threads to use for hashing
pub threads: usize,
/// Target block time, in seconds
pub target: usize,
/// Optional fixed difficulty
@@ -97,7 +95,6 @@ pub struct PoWModule {
impl PoWModule {
pub fn new(
blockchain: Blockchain,
threads: usize,
target: usize,
fixed_difficulty: Option<BigUint>,
) -> Result<Self> {
@@ -117,14 +114,7 @@ impl PoWModule {
assert!(diff > &BigUint::zero());
}
Ok(Self {
threads,
target,
fixed_difficulty,
timestamps,
difficulties,
cummulative_difficulty,
})
Ok(Self { target, fixed_difficulty, timestamps, difficulties, cummulative_difficulty })
}
/// Compute the next mining difficulty, based on current ring buffers.
@@ -290,19 +280,19 @@ impl PoWModule {
pub fn mine_block(
&self,
miner_block: &mut BlockInfo,
threads: usize,
stop_signal: &Receiver<()>,
) -> Result<()> {
// Grab the next mine target
let target = self.next_mine_target()?;
mine_block(&target, miner_block, self.threads, stop_signal)
mine_block(&target, miner_block, threads, stop_signal)
}
}
impl std::fmt::Display for PoWModule {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "PoWModule:")?;
write!(f, "\tthreads: {}", self.threads)?;
write!(f, "\ttarget: {}", self.target)?;
write!(f, "\ttimestamps: {:?}", self.timestamps)?;
write!(f, "\tdifficulties: {:?}", self.difficulties)?;
@@ -419,8 +409,7 @@ mod tests {
fn test_wide_difficulty() -> Result<()> {
let sled_db = sled::Config::new().temporary(true).open()?;
let blockchain = Blockchain::new(&sled_db)?;
let mut module =
PoWModule::new(blockchain, DEFAULT_TEST_THREADS, DEFAULT_TEST_DIFFICULTY_TARGET, None)?;
let mut module = PoWModule::new(blockchain, DEFAULT_TEST_DIFFICULTY_TARGET, None)?;
let output = Command::new("./script/research/pow/gen_wide_data.py").output().unwrap();
let reader = Cursor::new(output.stdout);
@@ -453,15 +442,14 @@ mod tests {
// Default setup
let sled_db = sled::Config::new().temporary(true).open()?;
let blockchain = Blockchain::new(&sled_db)?;
let module =
PoWModule::new(blockchain, DEFAULT_TEST_THREADS, DEFAULT_TEST_DIFFICULTY_TARGET, None)?;
let module = PoWModule::new(blockchain, DEFAULT_TEST_DIFFICULTY_TARGET, None)?;
let (_, recvr) = smol::channel::bounded(1);
let genesis_block = BlockInfo::default();
// Mine next block
let mut next_block = BlockInfo::default();
next_block.header.previous = genesis_block.hash()?;
module.mine_block(&mut next_block, &recvr)?;
module.mine_block(&mut next_block, DEFAULT_TEST_THREADS, &recvr)?;
// Verify it
module.verify_current_block(&next_block)?;

View File

@@ -336,13 +336,11 @@ pub fn validate_pos_slot(
/// Be careful as this will try to load everything in memory.
pub fn validate_blockchain(
blockchain: &Blockchain,
pow_threads: usize,
pow_target: usize,
pow_fixed_difficulty: Option<BigUint>,
) -> Result<()> {
// Generate a PoW module
let mut module =
PoWModule::new(blockchain.clone(), pow_threads, pow_target, pow_fixed_difficulty)?;
let mut module = PoWModule::new(blockchain.clone(), pow_target, pow_fixed_difficulty)?;
// We use block order store here so we have all blocks in order
let blocks = blockchain.order.get_all()?;
for (index, block) in blocks[1..].iter().enumerate() {

View File

@@ -30,7 +30,6 @@ use darkfi_sdk::{
pasta::{group::ff::Field, pallas},
};
const POW_THREADS: usize = 1;
const POW_TARGET: usize = 10;
struct Node {
@@ -41,7 +40,7 @@ struct Node {
impl Node {
fn new() -> Result<Self> {
let blockchain = Blockchain::new(&sled::Config::new().temporary(true).open()?)?;
let module = PoWModule::new(blockchain.clone(), POW_THREADS, POW_TARGET, None)?;
let module = PoWModule::new(blockchain.clone(), POW_TARGET, None)?;
Ok(Self { blockchain, module })
}
}
@@ -64,8 +63,8 @@ impl Harness {
}
fn validate_chains(&self) -> Result<()> {
validate_blockchain(&self.alice.blockchain, POW_THREADS, POW_TARGET, None)?;
validate_blockchain(&self.bob.blockchain, POW_THREADS, POW_TARGET, None)?;
validate_blockchain(&self.alice.blockchain, POW_TARGET, None)?;
validate_blockchain(&self.bob.blockchain, POW_TARGET, None)?;
assert_eq!(self.alice.blockchain.len(), self.bob.blockchain.len());